From de594c647ff3e8d43afa69a1ce1bdfc54442e5c0 Mon Sep 17 00:00:00 2001 From: Oliver Gorwits Date: Fri, 24 Nov 2017 06:31:34 +0000 Subject: [PATCH] single DB poll for new jobs both high and normal priority --- lib/App/Netdisco/Backend/Job.pm | 1 + lib/App/Netdisco/Backend/Role/Manager.pm | 26 ++-------- lib/App/Netdisco/DB/ResultSet.pm | 2 +- lib/App/Netdisco/DB/SetOperations.pm | 50 ++++++++++++++++++++ lib/App/Netdisco/JobQueue.pm | 11 ++--- lib/App/Netdisco/JobQueue/PostgreSQL.pm | 60 ++++++++++++++---------- 6 files changed, 92 insertions(+), 58 deletions(-) create mode 100644 lib/App/Netdisco/DB/SetOperations.pm diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm index ddbfd034..9eef998e 100644 --- a/lib/App/Netdisco/Backend/Job.pm +++ b/lib/App/Netdisco/Backend/Job.pm @@ -21,6 +21,7 @@ foreach my $slot (qw/ userip log device_key + job_priority _current_phase _last_namespace diff --git a/lib/App/Netdisco/Backend/Role/Manager.pm b/lib/App/Netdisco/Backend/Role/Manager.pm index d594df4d..3c6e7e00 100644 --- a/lib/App/Netdisco/Backend/Role/Manager.pm +++ b/lib/App/Netdisco/Backend/Role/Manager.pm @@ -6,7 +6,7 @@ use List::Util 'sum'; use App::Netdisco::Util::MCE; use App::Netdisco::JobQueue - qw/jq_locked jq_getsome jq_getsomep jq_lock jq_warm_thrusters/; + qw/jq_locked jq_getsome jq_lock jq_warm_thrusters/; use Role::Tiny; use namespace::clean; @@ -60,28 +60,8 @@ sub worker_body { $num_slots = parse_max_workers( setting('workers')->{tasks} ) - $self->{queue}->pending(); - debug "mgr ($wid): getting potential jobs for $num_slots workers (HP)"; + debug "mgr ($wid): getting potential jobs for $num_slots workers"; - # get some high priority jobs - # TODO also check for stale jobs in Netdisco DB - foreach my $job ( jq_getsomep($num_slots) ) { - next if $seen_job{ $memoize->($job) }++; - - # mark job as running - next unless jq_lock($job); - info sprintf "mgr (%s): job %s booked out for this processing node", - $wid, $job->id; - - # copy job to local queue - $self->{queue}->enqueuep(100, $job); - } - - $num_slots = parse_max_workers( setting('workers')->{tasks} ) - - $self->{queue}->pending(); - debug "mgr ($wid): getting potential jobs for $num_slots workers (NP)"; - - # get some normal priority jobs - # TODO also check for stale jobs in Netdisco DB foreach my $job ( jq_getsome($num_slots) ) { next if $seen_job{ $memoize->($job) }++; @@ -91,7 +71,7 @@ sub worker_body { $wid, $job->id; # copy job to local queue - $self->{queue}->enqueue($job); + $self->{queue}->enqueuep($job->job_priority, $job); } #if (scalar grep {$_ > 1} values %seen_job) { diff --git a/lib/App/Netdisco/DB/ResultSet.pm b/lib/App/Netdisco/DB/ResultSet.pm index 953c8e80..22c25cf8 100644 --- a/lib/App/Netdisco/DB/ResultSet.pm +++ b/lib/App/Netdisco/DB/ResultSet.pm @@ -6,7 +6,7 @@ use warnings; use base 'DBIx::Class::ResultSet'; __PACKAGE__->load_components(qw/ - Helper::ResultSet::SetOperations + +App::Netdisco::DB::SetOperations Helper::ResultSet::Shortcut Helper::ResultSet::CorrelateRelationship /); diff --git a/lib/App/Netdisco/DB/SetOperations.pm b/lib/App/Netdisco/DB/SetOperations.pm new file mode 100644 index 00000000..fef5efb4 --- /dev/null +++ b/lib/App/Netdisco/DB/SetOperations.pm @@ -0,0 +1,50 @@ +package App::Netdisco::DB::SetOperations; + +use strict; +use warnings; + +use parent 'DBIx::Class::Helper::ResultSet::SetOperations'; + +sub _set_operation { + my ( $self, $operation, $other ) = @_; + + my @sql; + my @params; + + my $as = $self->_resolved_attrs->{as}; + + my @operands = ( $self, ref $other eq 'ARRAY' ? @$other : $other ); + + for (@operands) { + $self->throw_exception("ResultClass of ResultSets do not match!") + unless $self->result_class eq $_->result_class; + + my $attrs = $_->_resolved_attrs; + + $self->throw_exception('ResultSets do not all have the same selected columns!') + unless $self->_compare_arrays($as, $attrs->{as}); + + my ($sql, @bind) = @{${$_->as_query}}; + # $sql =~ s/^\s*\((.*)\)\s*$/$1/; + $sql = q<(> . $sql . q<)>; + + push @sql, $sql; + push @params, @bind; + } + + my $query = q<(> . join(" $operation ", @sql). q<)>; + + my $attrs = $self->_resolved_attrs; + return $self->result_source->resultset->search(undef, { + alias => $self->current_source_alias, + from => [{ + $self->current_source_alias => \[ $query, @params ], + -alias => $self->current_source_alias, + -source_handle => $self->result_source->handle, + }], + columns => $attrs->{as}, + result_class => $self->result_class, + }); +} + +1; diff --git a/lib/App/Netdisco/JobQueue.pm b/lib/App/Netdisco/JobQueue.pm index 875ec468..733cf4bb 100644 --- a/lib/App/Netdisco/JobQueue.pm +++ b/lib/App/Netdisco/JobQueue.pm @@ -9,16 +9,15 @@ Module::Load::load use base 'Exporter'; our @EXPORT = (); our @EXPORT_OK = qw/ + jq_warm_thrusters jq_getsome - jq_getsomep jq_locked jq_queued - jq_warm_thrusters - jq_log - jq_userlog jq_lock jq_defer jq_complete + jq_log + jq_userlog jq_insert jq_delete /; @@ -43,10 +42,6 @@ Returns a list of randomly selected queued jobs. Default is to return one job, unless C<$num> is provided. Jobs are returned as objects which implement the Netdisco job instance interface (see below). -=head2 jq_getsomep( $num? ) - -Same as C but for high priority jobs. - =head2 jq_locked() Returns the list of jobs currently booked out to this processing node (denoted diff --git a/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/lib/App/Netdisco/JobQueue/PostgreSQL.pm index 859e9380..80405701 100644 --- a/lib/App/Netdisco/JobQueue/PostgreSQL.pm +++ b/lib/App/Netdisco/JobQueue/PostgreSQL.pm @@ -15,7 +15,6 @@ our @EXPORT = (); our @EXPORT_OK = qw/ jq_warm_thrusters jq_getsome - jq_getsomep jq_locked jq_queued jq_lock @@ -69,23 +68,49 @@ sub jq_warm_thrusters { }); } -sub _getsome { - my ($num_slots, $where) = @_; - return () if ((!defined $num_slots) or ($num_slots < 1)); - return () if ((!defined $where) or (ref {} ne ref $where)); +sub jq_getsome { + my $num_slots = shift; + return () unless $num_slots and $num_slots > 0; my $jobs = schema('netdisco')->resultset('Admin'); - my $rs = $jobs->search({ + my @returned = (); + + my %jobsearch = ( status => 'queued', device => { '-not_in' => $jobs->skipped(setting('workers')->{'BACKEND'}, setting('workers')->{'max_deferrals'}, setting('workers')->{'retry_after'}) ->columns('device')->as_query }, - %$where, - }, { order_by => 'random()', rows => $num_slots }); + ); + my %randoms = (order_by => 'random()', rows => $num_slots ); + + my $hiprio = $jobs->search({ + %jobsearch, + -or => [{ + username => { '!=' => undef }, + action => { -in => setting('job_prio')->{'normal'} }, + },{ + action => { -in => setting('job_prio')->{'high'} }, + }], + }, { + %randoms, + '+select' => [\'100 as job_priority'], '+as' => ['me.job_priority'], + }); + + my $loprio = $jobs->search({ + %jobsearch, + action => { -in => setting('job_prio')->{'normal'} }, + }, { + %randoms, + '+select' => [\'0 as job_priority'], '+as' => ['me.job_priority'], + }); + + my $rs = $hiprio->union($loprio)->search(undef, { + order_by => { '-desc' => 'job_priority' }, + rows => $num_slots, + }); - my @returned = (); while (my $job = $rs->next) { if ($job->device) { # need to handle device discovered since backend daemon started @@ -140,23 +165,6 @@ sub _getsome { return @returned; } -sub jq_getsome { - return _getsome(shift, - { action => { -in => setting('job_prio')->{'normal'} } } - ); -} - -sub jq_getsomep { - return _getsome(shift, { - -or => [{ - username => { '!=' => undef }, - action => { -in => setting('job_prio')->{'normal'} }, - },{ - action => { -in => setting('job_prio')->{'high'} }, - }], - }); -} - sub jq_locked { my @returned = (); my $rs = schema('netdisco')->resultset('Admin')