diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm index 361d976e..d28f0ff1 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm @@ -8,7 +8,7 @@ use App::Netdisco::Util::Daemon; use Role::Tiny; use namespace::clean; -use App::Netdisco::JobQueue qw/jq_locked jq_getsome jq_lock/; +use App::Netdisco::JobQueue qw/jq_locked jq_getsome jq_getsomep jq_lock/; use MCE::Util (); sub worker_begin { @@ -27,7 +27,7 @@ sub worker_begin { if (scalar @jobs) { info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs; - $self->{queue}->enqueue(@jobs); + $self->{queue}->enqueuep(100, @jobs); } } @@ -40,15 +40,34 @@ sub worker_body { return debug "mgr ($wid): no need for manager... quitting" } - my $num_slots = - MCE::Util::_parse_max_workers( setting('workers')->{tasks} ) - - $self->{queue}->pending(); - while (1) { - debug "mgr ($wid): getting potential jobs for $num_slots workers"; prctl sprintf 'netdisco-daemon: worker #%s manager: gathering', $wid; + my $num_slots = 0; - # get some pending jobs + $num_slots = + MCE::Util::_parse_max_workers( setting('workers')->{tasks} ) + - $self->{queue}->pending(); + debug "mgr ($wid): getting potential jobs for $num_slots workers (HP)"; + + # get some high priority jobs + # TODO also check for stale jobs in Netdisco DB + foreach my $job ( jq_getsomep($num_slots) ) { + + # mark job as running + next unless jq_lock($job); + info sprintf "mgr (%s): job %s booked out for this processing node", + $wid, $job->job; + + # copy job to local queue + $self->{queue}->enqueuep(100, $job); + } + + $num_slots = + MCE::Util::_parse_max_workers( setting('workers')->{tasks} ) + - $self->{queue}->pending(); + debug "mgr ($wid): getting potential jobs for $num_slots workers (NP)"; + + # get some normal priority jobs # TODO also check for stale jobs in Netdisco DB foreach my $job ( jq_getsome($num_slots) ) { diff --git a/Netdisco/lib/App/Netdisco/JobQueue.pm b/Netdisco/lib/App/Netdisco/JobQueue.pm index d8485888..fd93e726 100644 --- a/Netdisco/lib/App/Netdisco/JobQueue.pm +++ b/Netdisco/lib/App/Netdisco/JobQueue.pm @@ -10,6 +10,7 @@ use base 'Exporter'; our @EXPORT = (); our @EXPORT_OK = qw/ jq_getsome + jq_getsomep jq_locked jq_queued jq_log @@ -41,6 +42,10 @@ Returns a list of randomly selected queued jobs. Default is to return one job, unless C<$num> is provided. Jobs are returned as objects which implement the Netdisco job instance interface (see below). +=head2 jq_getsomep( $num? ) + +Same as C but for high priority jobs. + =head2 jq_locked() Returns the list of jobs currently booked out to this processing node (denoted diff --git a/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm index 03a93c3a..02d77d3a 100644 --- a/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm +++ b/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm @@ -12,6 +12,7 @@ use base 'Exporter'; our @EXPORT = (); our @EXPORT_OK = qw/ jq_getsome + jq_getsomep jq_locked jq_queued jq_log @@ -25,29 +26,26 @@ our @EXPORT_OK = qw/ our %EXPORT_TAGS = ( all => \@EXPORT_OK ); sub jq_getsome { - my $num_slots = shift; + my ($num_slots, $prio) = @_; + return () if defined $num_slots and $num_slots eq '0'; + $num_slots ||= 1; + $prio ||= 'normal'; my @returned = (); my $rs = schema('netdisco')->resultset('Admin') ->search( - {status => 'queued', action => { -in => setting('job_prio')->{high} } }, + {status => 'queued', action => { -in => setting('job_prio')->{$prio} } }, {order_by => 'random()', rows => ($num_slots || 1)}, ); - unless ($rs->count) { - $rs = schema('netdisco')->resultset('Admin') - ->search( - {status => 'queued', action => { -in => setting('job_prio')->{normal} } }, - {order_by => 'random()', rows => ($num_slots || 1)}, - ); - } - while (my $job = $rs->next) { push @returned, App::Netdisco::Daemon::Job->new({ $job->get_columns }); } return @returned; } +sub jq_getsomep { return jq_getsome(shift, 'high') } + sub jq_locked { my $fqdn = hostfqdn || 'localhost'; my @returned = ();