diff --git a/Netdisco/lib/App/Netdisco/Daemon/JobQueue.pm b/Netdisco/lib/App/Netdisco/Daemon/JobQueue.pm index 7580f646..80ab7d7b 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/JobQueue.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/JobQueue.pm @@ -1,20 +1,19 @@ package App::Netdisco::Daemon::JobQueue; -use Dancer qw/:moose :syntax :script/; - use Role::Tiny; use namespace::clean; use Module::Load (); -Module::Load::load_remote 'JobQueue' => - 'App::Netdisco::JobQueue::' . setting('workers')->{queue} => ':all'; +Module::Load::load_remote 'JobQueue' => 'App::Netdisco::JobQueue' => ':all'; +# central queue sub jq_get { shift and JobQueue::jq_get(@_) } sub jq_getlocal { shift and JobQueue::jq_getlocal(@_) } sub jq_queued { shift and JobQueue::jq_queued(@_) } +sub jq_take { goto \&JobQueue::jq_take } sub jq_lock { shift and JobQueue::jq_lock(@_) } sub jq_defer { shift and JobQueue::jq_defer(@_) } sub jq_complete { shift and JobQueue::jq_complete(@_) } sub jq_insert { shift and JobQueue::jq_insert(@_) } -true; +1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm index cdb6d017..4b1bd5e5 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm @@ -17,7 +17,7 @@ sub worker_body { while (1) { debug "$type ($wid): asking for a job"; - my $jobs = $self->do('take_jobs', $self->wid, $name); + my $jobs = $self->jq_take($self->wid, $name); foreach my $job (@$jobs) { my $target = $self->munge_action($job->action); diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm index b5f8d1ae..bec58049 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm @@ -5,6 +5,7 @@ use Dancer qw/:moose :syntax :script/; use Role::Tiny; use namespace::clean; +use List::Util 'sum'; with 'App::Netdisco::Daemon::JobQueue'; sub worker_begin { @@ -25,6 +26,8 @@ sub worker_begin { sub worker_body { my $self = shift; my $wid = $self->wid; + my $num_slots = sum( 0, map { setting('workers')->{$_} } + values %{setting('job_type_keys')} ); while (1) { debug "mgr ($wid): getting potential jobs for $num_slots workers"; diff --git a/Netdisco/lib/App/Netdisco/JobQueue.pm b/Netdisco/lib/App/Netdisco/JobQueue.pm index 418c465c..a0b6cbc1 100644 --- a/Netdisco/lib/App/Netdisco/JobQueue.pm +++ b/Netdisco/lib/App/Netdisco/JobQueue.pm @@ -14,6 +14,7 @@ our @EXPORT_OK = qw/ jq_queued jq_log jq_userlog + jq_take jq_lock jq_defer jq_complete @@ -63,6 +64,12 @@ Returns a list of jobs which have been entered into the queue by the passed C<$user>. Jobs are returned as objects which implement the Netdisco job instance interface (see below). +=head2 jq_take( $wid, $type, $max? ) + +Searches in the queue for jobs of type C<$type> and if up to C<$max> are +available, will book them out to the worker with ID C<$wid>. The default +number of booked jobs is 1. + =head2 jq_lock( $job ) Marks a job in the queue as booked out to this processing node (denoted by the @@ -88,7 +95,7 @@ status, log and finished fields will be updated from the passed C<$job>. Returns true if successful else returns false. -=head2 jq_insert( \%job | [ %job, \%job ...] ) +=head2 jq_insert( \%job | [ \%job, \%job ...] ) Adds the passed jobs to the queue. diff --git a/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm index 42d0c3b6..61cc1f53 100644 --- a/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm +++ b/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm @@ -14,6 +14,7 @@ our @EXPORT_OK = qw/ jq_queued jq_log jq_userlog + jq_take jq_lock jq_defer jq_complete @@ -98,6 +99,11 @@ sub jq_userlog { return @returned; } +# PostgreSQL engine depends on LocalQueue, which is accessed synchronously via +# the main daemon process. This is only used by daemon workers which can use +# MCE ->do() method. +sub jq_take { (shift)->do('take_jobs', @_) } + sub jq_lock { my $job = shift; my $fqdn = hostfqdn || 'localhost'; @@ -113,6 +119,7 @@ sub jq_lock { # remove any duplicate jobs, needed because we have race conditions # when queueing jobs of a type for all devices schema('netdisco')->resultset('Admin')->search({ + status => 'queued', device => $job->device, port => $job->port, action => $job->action,