single DB poll for new jobs both high and normal priority

This commit is contained in:
Oliver Gorwits
2017-11-24 06:31:34 +00:00
parent 3db242cbe8
commit de594c647f
6 changed files with 92 additions and 58 deletions

View File

@@ -21,6 +21,7 @@ foreach my $slot (qw/
userip userip
log log
device_key device_key
job_priority
_current_phase _current_phase
_last_namespace _last_namespace

View File

@@ -6,7 +6,7 @@ use List::Util 'sum';
use App::Netdisco::Util::MCE; use App::Netdisco::Util::MCE;
use App::Netdisco::JobQueue use App::Netdisco::JobQueue
qw/jq_locked jq_getsome jq_getsomep jq_lock jq_warm_thrusters/; qw/jq_locked jq_getsome jq_lock jq_warm_thrusters/;
use Role::Tiny; use Role::Tiny;
use namespace::clean; use namespace::clean;
@@ -60,28 +60,8 @@ sub worker_body {
$num_slots = parse_max_workers( setting('workers')->{tasks} ) $num_slots = parse_max_workers( setting('workers')->{tasks} )
- $self->{queue}->pending(); - $self->{queue}->pending();
debug "mgr ($wid): getting potential jobs for $num_slots workers (HP)"; debug "mgr ($wid): getting potential jobs for $num_slots workers";
# get some high priority jobs
# TODO also check for stale jobs in Netdisco DB
foreach my $job ( jq_getsomep($num_slots) ) {
next if $seen_job{ $memoize->($job) }++;
# mark job as running
next unless jq_lock($job);
info sprintf "mgr (%s): job %s booked out for this processing node",
$wid, $job->id;
# copy job to local queue
$self->{queue}->enqueuep(100, $job);
}
$num_slots = parse_max_workers( setting('workers')->{tasks} )
- $self->{queue}->pending();
debug "mgr ($wid): getting potential jobs for $num_slots workers (NP)";
# get some normal priority jobs
# TODO also check for stale jobs in Netdisco DB
foreach my $job ( jq_getsome($num_slots) ) { foreach my $job ( jq_getsome($num_slots) ) {
next if $seen_job{ $memoize->($job) }++; next if $seen_job{ $memoize->($job) }++;
@@ -91,7 +71,7 @@ sub worker_body {
$wid, $job->id; $wid, $job->id;
# copy job to local queue # copy job to local queue
$self->{queue}->enqueue($job); $self->{queue}->enqueuep($job->job_priority, $job);
} }
#if (scalar grep {$_ > 1} values %seen_job) { #if (scalar grep {$_ > 1} values %seen_job) {

View File

@@ -6,7 +6,7 @@ use warnings;
use base 'DBIx::Class::ResultSet'; use base 'DBIx::Class::ResultSet';
__PACKAGE__->load_components(qw/ __PACKAGE__->load_components(qw/
Helper::ResultSet::SetOperations +App::Netdisco::DB::SetOperations
Helper::ResultSet::Shortcut Helper::ResultSet::Shortcut
Helper::ResultSet::CorrelateRelationship Helper::ResultSet::CorrelateRelationship
/); /);

View File

@@ -0,0 +1,50 @@
package App::Netdisco::DB::SetOperations;
use strict;
use warnings;
use parent 'DBIx::Class::Helper::ResultSet::SetOperations';
sub _set_operation {
my ( $self, $operation, $other ) = @_;
my @sql;
my @params;
my $as = $self->_resolved_attrs->{as};
my @operands = ( $self, ref $other eq 'ARRAY' ? @$other : $other );
for (@operands) {
$self->throw_exception("ResultClass of ResultSets do not match!")
unless $self->result_class eq $_->result_class;
my $attrs = $_->_resolved_attrs;
$self->throw_exception('ResultSets do not all have the same selected columns!')
unless $self->_compare_arrays($as, $attrs->{as});
my ($sql, @bind) = @{${$_->as_query}};
# $sql =~ s/^\s*\((.*)\)\s*$/$1/;
$sql = q<(> . $sql . q<)>;
push @sql, $sql;
push @params, @bind;
}
my $query = q<(> . join(" $operation ", @sql). q<)>;
my $attrs = $self->_resolved_attrs;
return $self->result_source->resultset->search(undef, {
alias => $self->current_source_alias,
from => [{
$self->current_source_alias => \[ $query, @params ],
-alias => $self->current_source_alias,
-source_handle => $self->result_source->handle,
}],
columns => $attrs->{as},
result_class => $self->result_class,
});
}
1;

View File

@@ -9,16 +9,15 @@ Module::Load::load
use base 'Exporter'; use base 'Exporter';
our @EXPORT = (); our @EXPORT = ();
our @EXPORT_OK = qw/ our @EXPORT_OK = qw/
jq_warm_thrusters
jq_getsome jq_getsome
jq_getsomep
jq_locked jq_locked
jq_queued jq_queued
jq_warm_thrusters
jq_log
jq_userlog
jq_lock jq_lock
jq_defer jq_defer
jq_complete jq_complete
jq_log
jq_userlog
jq_insert jq_insert
jq_delete jq_delete
/; /;
@@ -43,10 +42,6 @@ Returns a list of randomly selected queued jobs. Default is to return one job,
unless C<$num> is provided. Jobs are returned as objects which implement the unless C<$num> is provided. Jobs are returned as objects which implement the
Netdisco job instance interface (see below). Netdisco job instance interface (see below).
=head2 jq_getsomep( $num? )
Same as C<jq_getsome> but for high priority jobs.
=head2 jq_locked() =head2 jq_locked()
Returns the list of jobs currently booked out to this processing node (denoted Returns the list of jobs currently booked out to this processing node (denoted

View File

@@ -15,7 +15,6 @@ our @EXPORT = ();
our @EXPORT_OK = qw/ our @EXPORT_OK = qw/
jq_warm_thrusters jq_warm_thrusters
jq_getsome jq_getsome
jq_getsomep
jq_locked jq_locked
jq_queued jq_queued
jq_lock jq_lock
@@ -69,23 +68,49 @@ sub jq_warm_thrusters {
}); });
} }
sub _getsome { sub jq_getsome {
my ($num_slots, $where) = @_; my $num_slots = shift;
return () if ((!defined $num_slots) or ($num_slots < 1)); return () unless $num_slots and $num_slots > 0;
return () if ((!defined $where) or (ref {} ne ref $where));
my $jobs = schema('netdisco')->resultset('Admin'); my $jobs = schema('netdisco')->resultset('Admin');
my $rs = $jobs->search({ my @returned = ();
my %jobsearch = (
status => 'queued', status => 'queued',
device => { '-not_in' => device => { '-not_in' =>
$jobs->skipped(setting('workers')->{'BACKEND'}, $jobs->skipped(setting('workers')->{'BACKEND'},
setting('workers')->{'max_deferrals'}, setting('workers')->{'max_deferrals'},
setting('workers')->{'retry_after'}) setting('workers')->{'retry_after'})
->columns('device')->as_query }, ->columns('device')->as_query },
%$where, );
}, { order_by => 'random()', rows => $num_slots }); my %randoms = (order_by => 'random()', rows => $num_slots );
my $hiprio = $jobs->search({
%jobsearch,
-or => [{
username => { '!=' => undef },
action => { -in => setting('job_prio')->{'normal'} },
},{
action => { -in => setting('job_prio')->{'high'} },
}],
}, {
%randoms,
'+select' => [\'100 as job_priority'], '+as' => ['me.job_priority'],
});
my $loprio = $jobs->search({
%jobsearch,
action => { -in => setting('job_prio')->{'normal'} },
}, {
%randoms,
'+select' => [\'0 as job_priority'], '+as' => ['me.job_priority'],
});
my $rs = $hiprio->union($loprio)->search(undef, {
order_by => { '-desc' => 'job_priority' },
rows => $num_slots,
});
my @returned = ();
while (my $job = $rs->next) { while (my $job = $rs->next) {
if ($job->device) { if ($job->device) {
# need to handle device discovered since backend daemon started # need to handle device discovered since backend daemon started
@@ -140,23 +165,6 @@ sub _getsome {
return @returned; return @returned;
} }
sub jq_getsome {
return _getsome(shift,
{ action => { -in => setting('job_prio')->{'normal'} } }
);
}
sub jq_getsomep {
return _getsome(shift, {
-or => [{
username => { '!=' => undef },
action => { -in => setting('job_prio')->{'normal'} },
},{
action => { -in => setting('job_prio')->{'high'} },
}],
});
}
sub jq_locked { sub jq_locked {
my @returned = (); my @returned = ();
my $rs = schema('netdisco')->resultset('Admin') my $rs = schema('netdisco')->resultset('Admin')