high priority jobs are picked first and inserted to prio queue
This commit is contained in:
@@ -8,7 +8,7 @@ use App::Netdisco::Util::Daemon;
|
|||||||
use Role::Tiny;
|
use Role::Tiny;
|
||||||
use namespace::clean;
|
use namespace::clean;
|
||||||
|
|
||||||
use App::Netdisco::JobQueue qw/jq_locked jq_getsome jq_lock/;
|
use App::Netdisco::JobQueue qw/jq_locked jq_getsome jq_getsomep jq_lock/;
|
||||||
use MCE::Util ();
|
use MCE::Util ();
|
||||||
|
|
||||||
sub worker_begin {
|
sub worker_begin {
|
||||||
@@ -27,7 +27,7 @@ sub worker_begin {
|
|||||||
if (scalar @jobs) {
|
if (scalar @jobs) {
|
||||||
info sprintf "mgr (%s): found %s jobs booked to this processing node",
|
info sprintf "mgr (%s): found %s jobs booked to this processing node",
|
||||||
$wid, scalar @jobs;
|
$wid, scalar @jobs;
|
||||||
$self->{queue}->enqueue(@jobs);
|
$self->{queue}->enqueuep(100, @jobs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -40,15 +40,34 @@ sub worker_body {
|
|||||||
return debug "mgr ($wid): no need for manager... quitting"
|
return debug "mgr ($wid): no need for manager... quitting"
|
||||||
}
|
}
|
||||||
|
|
||||||
my $num_slots =
|
|
||||||
MCE::Util::_parse_max_workers( setting('workers')->{tasks} )
|
|
||||||
- $self->{queue}->pending();
|
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
debug "mgr ($wid): getting potential jobs for $num_slots workers";
|
|
||||||
prctl sprintf 'netdisco-daemon: worker #%s manager: gathering', $wid;
|
prctl sprintf 'netdisco-daemon: worker #%s manager: gathering', $wid;
|
||||||
|
my $num_slots = 0;
|
||||||
|
|
||||||
# get some pending jobs
|
$num_slots =
|
||||||
|
MCE::Util::_parse_max_workers( setting('workers')->{tasks} )
|
||||||
|
- $self->{queue}->pending();
|
||||||
|
debug "mgr ($wid): getting potential jobs for $num_slots workers (HP)";
|
||||||
|
|
||||||
|
# get some high priority jobs
|
||||||
|
# TODO also check for stale jobs in Netdisco DB
|
||||||
|
foreach my $job ( jq_getsomep($num_slots) ) {
|
||||||
|
|
||||||
|
# mark job as running
|
||||||
|
next unless jq_lock($job);
|
||||||
|
info sprintf "mgr (%s): job %s booked out for this processing node",
|
||||||
|
$wid, $job->job;
|
||||||
|
|
||||||
|
# copy job to local queue
|
||||||
|
$self->{queue}->enqueuep(100, $job);
|
||||||
|
}
|
||||||
|
|
||||||
|
$num_slots =
|
||||||
|
MCE::Util::_parse_max_workers( setting('workers')->{tasks} )
|
||||||
|
- $self->{queue}->pending();
|
||||||
|
debug "mgr ($wid): getting potential jobs for $num_slots workers (NP)";
|
||||||
|
|
||||||
|
# get some normal priority jobs
|
||||||
# TODO also check for stale jobs in Netdisco DB
|
# TODO also check for stale jobs in Netdisco DB
|
||||||
foreach my $job ( jq_getsome($num_slots) ) {
|
foreach my $job ( jq_getsome($num_slots) ) {
|
||||||
|
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ use base 'Exporter';
|
|||||||
our @EXPORT = ();
|
our @EXPORT = ();
|
||||||
our @EXPORT_OK = qw/
|
our @EXPORT_OK = qw/
|
||||||
jq_getsome
|
jq_getsome
|
||||||
|
jq_getsomep
|
||||||
jq_locked
|
jq_locked
|
||||||
jq_queued
|
jq_queued
|
||||||
jq_log
|
jq_log
|
||||||
@@ -41,6 +42,10 @@ Returns a list of randomly selected queued jobs. Default is to return one job,
|
|||||||
unless C<$num> is provided. Jobs are returned as objects which implement the
|
unless C<$num> is provided. Jobs are returned as objects which implement the
|
||||||
Netdisco job instance interface (see below).
|
Netdisco job instance interface (see below).
|
||||||
|
|
||||||
|
=head2 jq_getsomep( $num? )
|
||||||
|
|
||||||
|
Same as C<jq_getsome> but for high priority jobs.
|
||||||
|
|
||||||
=head2 jq_locked()
|
=head2 jq_locked()
|
||||||
|
|
||||||
Returns the list of jobs currently booked out to this processing node (denoted
|
Returns the list of jobs currently booked out to this processing node (denoted
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ use base 'Exporter';
|
|||||||
our @EXPORT = ();
|
our @EXPORT = ();
|
||||||
our @EXPORT_OK = qw/
|
our @EXPORT_OK = qw/
|
||||||
jq_getsome
|
jq_getsome
|
||||||
|
jq_getsomep
|
||||||
jq_locked
|
jq_locked
|
||||||
jq_queued
|
jq_queued
|
||||||
jq_log
|
jq_log
|
||||||
@@ -25,29 +26,26 @@ our @EXPORT_OK = qw/
|
|||||||
our %EXPORT_TAGS = ( all => \@EXPORT_OK );
|
our %EXPORT_TAGS = ( all => \@EXPORT_OK );
|
||||||
|
|
||||||
sub jq_getsome {
|
sub jq_getsome {
|
||||||
my $num_slots = shift;
|
my ($num_slots, $prio) = @_;
|
||||||
|
return () if defined $num_slots and $num_slots eq '0';
|
||||||
|
$num_slots ||= 1;
|
||||||
|
$prio ||= 'normal';
|
||||||
my @returned = ();
|
my @returned = ();
|
||||||
|
|
||||||
my $rs = schema('netdisco')->resultset('Admin')
|
my $rs = schema('netdisco')->resultset('Admin')
|
||||||
->search(
|
->search(
|
||||||
{status => 'queued', action => { -in => setting('job_prio')->{high} } },
|
{status => 'queued', action => { -in => setting('job_prio')->{$prio} } },
|
||||||
{order_by => 'random()', rows => ($num_slots || 1)},
|
{order_by => 'random()', rows => ($num_slots || 1)},
|
||||||
);
|
);
|
||||||
|
|
||||||
unless ($rs->count) {
|
|
||||||
$rs = schema('netdisco')->resultset('Admin')
|
|
||||||
->search(
|
|
||||||
{status => 'queued', action => { -in => setting('job_prio')->{normal} } },
|
|
||||||
{order_by => 'random()', rows => ($num_slots || 1)},
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
while (my $job = $rs->next) {
|
while (my $job = $rs->next) {
|
||||||
push @returned, App::Netdisco::Daemon::Job->new({ $job->get_columns });
|
push @returned, App::Netdisco::Daemon::Job->new({ $job->get_columns });
|
||||||
}
|
}
|
||||||
return @returned;
|
return @returned;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sub jq_getsomep { return jq_getsome(shift, 'high') }
|
||||||
|
|
||||||
sub jq_locked {
|
sub jq_locked {
|
||||||
my $fqdn = hostfqdn || 'localhost';
|
my $fqdn = hostfqdn || 'localhost';
|
||||||
my @returned = ();
|
my @returned = ();
|
||||||
|
|||||||
Reference in New Issue
Block a user