make worker use only JobQueue not LocalQueue directly
This commit is contained in:
		| @@ -1,20 +1,19 @@ | ||||
| package App::Netdisco::Daemon::JobQueue; | ||||
|  | ||||
| use Dancer qw/:moose :syntax :script/; | ||||
|  | ||||
| use Role::Tiny; | ||||
| use namespace::clean; | ||||
|  | ||||
| use Module::Load (); | ||||
| Module::Load::load_remote 'JobQueue' => | ||||
|   'App::Netdisco::JobQueue::' . setting('workers')->{queue} => ':all'; | ||||
| Module::Load::load_remote 'JobQueue' => 'App::Netdisco::JobQueue' => ':all'; | ||||
|  | ||||
| # central queue | ||||
| sub jq_get      { shift and JobQueue::jq_get(@_) } | ||||
| sub jq_getlocal { shift and JobQueue::jq_getlocal(@_) } | ||||
| sub jq_queued   { shift and JobQueue::jq_queued(@_) } | ||||
| sub jq_take     { goto \&JobQueue::jq_take } | ||||
| sub jq_lock     { shift and JobQueue::jq_lock(@_) } | ||||
| sub jq_defer    { shift and JobQueue::jq_defer(@_) } | ||||
| sub jq_complete { shift and JobQueue::jq_complete(@_) } | ||||
| sub jq_insert   { shift and JobQueue::jq_insert(@_) } | ||||
|  | ||||
| true; | ||||
| 1; | ||||
|   | ||||
| @@ -17,7 +17,7 @@ sub worker_body { | ||||
|  | ||||
|   while (1) { | ||||
|       debug "$type ($wid): asking for a job"; | ||||
|       my $jobs = $self->do('take_jobs', $self->wid, $name); | ||||
|       my $jobs = $self->jq_take($self->wid, $name); | ||||
|  | ||||
|       foreach my $job (@$jobs) { | ||||
|           my $target = $self->munge_action($job->action); | ||||
|   | ||||
| @@ -5,6 +5,7 @@ use Dancer qw/:moose :syntax :script/; | ||||
| use Role::Tiny; | ||||
| use namespace::clean; | ||||
|  | ||||
| use List::Util 'sum'; | ||||
| with 'App::Netdisco::Daemon::JobQueue'; | ||||
|  | ||||
| sub worker_begin { | ||||
| @@ -25,6 +26,8 @@ sub worker_begin { | ||||
| sub worker_body { | ||||
|   my $self = shift; | ||||
|   my $wid = $self->wid; | ||||
|   my $num_slots = sum( 0, map { setting('workers')->{$_} } | ||||
|                               values %{setting('job_type_keys')} ); | ||||
|  | ||||
|   while (1) { | ||||
|       debug "mgr ($wid): getting potential jobs for $num_slots workers"; | ||||
|   | ||||
| @@ -14,6 +14,7 @@ our @EXPORT_OK = qw/ | ||||
|   jq_queued | ||||
|   jq_log | ||||
|   jq_userlog | ||||
|   jq_take | ||||
|   jq_lock | ||||
|   jq_defer | ||||
|   jq_complete | ||||
| @@ -63,6 +64,12 @@ Returns a list of jobs which have been entered into the queue by the passed | ||||
| C<$user>. Jobs are returned as objects which implement the Netdisco job | ||||
| instance interface (see below). | ||||
|  | ||||
| =head2 jq_take( $wid, $type, $max? ) | ||||
|  | ||||
| Searches in the queue for jobs of type C<$type> and if up to C<$max> are | ||||
| available, will book them out to the worker with ID C<$wid>. The default | ||||
| number of booked jobs is 1. | ||||
|  | ||||
| =head2 jq_lock( $job ) | ||||
|  | ||||
| Marks a job in the queue as booked out to this processing node (denoted by the | ||||
| @@ -88,7 +95,7 @@ status, log and finished fields will be updated from the passed C<$job>. | ||||
|  | ||||
| Returns true if successful else returns false. | ||||
|  | ||||
| =head2 jq_insert( \%job | [ %job, \%job ...] ) | ||||
| =head2 jq_insert( \%job | [ \%job, \%job ...] ) | ||||
|  | ||||
| Adds the passed jobs to the queue. | ||||
|  | ||||
|   | ||||
| @@ -14,6 +14,7 @@ our @EXPORT_OK = qw/ | ||||
|   jq_queued | ||||
|   jq_log | ||||
|   jq_userlog | ||||
|   jq_take | ||||
|   jq_lock | ||||
|   jq_defer | ||||
|   jq_complete | ||||
| @@ -98,6 +99,11 @@ sub jq_userlog { | ||||
|   return @returned; | ||||
| } | ||||
|  | ||||
| # PostgreSQL engine depends on LocalQueue, which is accessed synchronously via | ||||
| # the main daemon process. This is only used by daemon workers which can use | ||||
| # MCE ->do() method. | ||||
| sub jq_take { (shift)->do('take_jobs', @_) } | ||||
|  | ||||
| sub jq_lock { | ||||
|   my $job = shift; | ||||
|   my $fqdn = hostfqdn || 'localhost'; | ||||
| @@ -113,6 +119,7 @@ sub jq_lock { | ||||
|       # remove any duplicate jobs, needed because we have race conditions | ||||
|       # when queueing jobs of a type for all devices | ||||
|       schema('netdisco')->resultset('Admin')->search({ | ||||
|         status    => 'queued', | ||||
|         device    => $job->device, | ||||
|         port      => $job->port, | ||||
|         action    => $job->action, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user