From 25907d3544b86bb357d1cc7b29d47bf133b32443 Mon Sep 17 00:00:00 2001 From: Oliver Gorwits Date: Mon, 6 Nov 2017 21:26:01 +0000 Subject: [PATCH] move status tracking and checking inside job instance --- lib/App/Netdisco/Backend/Job.pm | 115 ++++++++++++++++++--- lib/App/Netdisco/Backend/Role/Manager.pm | 10 +- lib/App/Netdisco/Backend/Role/Poller.pm | 12 +-- lib/App/Netdisco/Backend/Role/Scheduler.pm | 4 +- lib/App/Netdisco/JobQueue/PostgreSQL.pm | 8 +- lib/App/Netdisco/Worker/Plugin.pm | 51 ++++----- lib/App/Netdisco/Worker/Runner.pm | 62 ++++------- lib/App/Netdisco/Worker/Status.pm | 3 +- 8 files changed, 165 insertions(+), 100 deletions(-) diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm index a5b21e1a..c92f3cdf 100644 --- a/lib/App/Netdisco/Backend/Job.pm +++ b/lib/App/Netdisco/Backend/Job.pm @@ -1,5 +1,7 @@ package App::Netdisco::Backend::Job; +use aliased 'App::Netdisco::Worker::Status'; + use Moo; use namespace::clean; @@ -17,6 +19,10 @@ foreach my $slot (qw/ userip log debug + + _phase + _namespace + _priority /) { has $slot => ( @@ -24,40 +30,119 @@ foreach my $slot (qw/ ); } +has '_statuslist' => ( + is => 'rw', + default => sub { [] }, +); + =head1 METHODS =head2 summary -An attempt to make a meaningful statement about the job. +An attempt to make a meaningful written statement about the job. =cut sub summary { - my $job = shift; - return join ' ', - $job->action, - ($job->device || ''), - ($job->port || ''); -# ($job->subaction ? (q{'}. $job->subaction .q{'}) : ''); + my $job = shift; + return join ' ', + $job->action, + ($job->device || ''), + ($job->port || ''); } -=head2 update_status +=head2 finalise_status -Passed an L will update this job's C and -C slots. +Find the best status and log it into the job's C and C slots. =cut -sub update_status { +sub finalise_status { my $job = shift; - my $status = shift or return; - $job->status( $status->status ); - $job->log( $status->log ); - return $job; + my $max_level = Status->error()->level; + + # fallback + $job->status('error'); + $job->log('failed to succeed at any worker!'); + + foreach my $status (@{ $self->_statuslist }) { + next unless $status->phase =~ m/^(?:early|main)$/; + if ($status->level >= $max_level) { + $job->status( $status->status ); + $job->log( $status->log ); + $max_level = $status->level; + } + } +} + +=head2 check_passed + +Returns true if at least one worker during the C phase flagged status +C. + +=cut + +sub check_passed { + my $job = shift; + foreach my $status (@{ $self->_statuslist }) { + next unless $status->phase eq 'check'; + return 1 if $status->is_ok; + } + return 0; +} + +=head2 namespace_passed( \%workerconf ) + +Returns true when, for the namespace specified in the passed configuration, +all workers of a higher priority level have succeeded. + +=cut + +sub namespace_passed { + my ($job, $workerconf) = @_; + + if (defined $job->_namespace + and ($workerconf->{phase} eq $job->_phase) + and ($workerconf->{namespace} eq $job->_namespace) + and ($workerconf->{priority} != $job->_priority)) { + + foreach my $status (@{ $self->_statuslist }) { + next unless ($status->phase eq $job->_phase) + and ($staus->namespace eq $job->_namespace) + and ($status->priority == $job->_priority); + return 1 if $status->is_ok; + } + } + + $job->_phase( $workerconf->{phase} ); + $job->_namespace( $workerconf->{namespace} ); + $job->_priority( $workerconf->{priority} ); + return 0; +} + +=head2 add_status + +Passed an L will add it to this job's internal +store. + +=cut + +sub add_status { + my ($job, $status) = @_; + return unless ref $status eq 'App::Netdisco::Worker::Status'; + push @{ $self->_statuslist }, $status; } =head1 ADDITIONAL COLUMNS +=head2 id + +Alias for the C column. + +=cut + +sub id { (shift)->job } + =head2 extra Alias for the C column. diff --git a/lib/App/Netdisco/Backend/Role/Manager.pm b/lib/App/Netdisco/Backend/Role/Manager.pm index 4842184f..25d13ba7 100644 --- a/lib/App/Netdisco/Backend/Role/Manager.pm +++ b/lib/App/Netdisco/Backend/Role/Manager.pm @@ -5,12 +5,12 @@ use Dancer qw/:moose :syntax :script/; use List::Util 'sum'; use App::Netdisco::Util::MCE; -use Role::Tiny; -use namespace::clean; - use App::Netdisco::JobQueue qw/jq_locked jq_getsome jq_getsomep jq_lock jq_warm_thrusters/; +use Role::Tiny; +use namespace::clean; + sub worker_begin { my $self = shift; my $wid = $self->wid; @@ -58,7 +58,7 @@ sub worker_body { # mark job as running next unless jq_lock($job); info sprintf "mgr (%s): job %s booked out for this processing node", - $wid, $job->job; + $wid, $job->id; # copy job to local queue $self->{queue}->enqueuep(100, $job); @@ -75,7 +75,7 @@ sub worker_body { # mark job as running next unless jq_lock($job); info sprintf "mgr (%s): job %s booked out for this processing node", - $wid, $job->job; + $wid, $job->id; # copy job to local queue $self->{queue}->enqueue($job); diff --git a/lib/App/Netdisco/Backend/Role/Poller.pm b/lib/App/Netdisco/Backend/Role/Poller.pm index a39cd756..ff689f52 100644 --- a/lib/App/Netdisco/Backend/Role/Poller.pm +++ b/lib/App/Netdisco/Backend/Role/Poller.pm @@ -5,12 +5,12 @@ use Dancer qw/:moose :syntax :script/; use Try::Tiny; use App::Netdisco::Util::MCE; -use Role::Tiny; -use namespace::clean; - use Time::HiRes 'sleep'; use App::Netdisco::JobQueue qw/jq_defer jq_complete/; +use Role::Tiny; +use namespace::clean; + # add dispatch methods for poller tasks with 'App::Netdisco::Worker::Runner'; @@ -29,9 +29,9 @@ sub worker_body { try { $job->started(scalar localtime); prctl sprintf 'nd2: #%s poll: #%s: %s', - $wid, $job->job, $job->summary; + $wid, $job->id, $job->summary; info sprintf "pol (%s): starting %s job(%s) at %s", - $wid, $job->action, $job->job, $job->started; + $wid, $job->action, $job->id, $job->started; $self->run($job); } catch { @@ -51,7 +51,7 @@ sub close_job { my $now = scalar localtime; info sprintf "pol (%s): wrapping up %s job(%s) - status %s at %s", - $self->wid, $job->action, $job->job, $job->status, $now; + $self->wid, $job->action, $job->id, $job->status, $now; try { if ($job->status eq 'defer') { diff --git a/lib/App/Netdisco/Backend/Role/Scheduler.pm b/lib/App/Netdisco/Backend/Role/Scheduler.pm index 41bcd546..a62f3f22 100644 --- a/lib/App/Netdisco/Backend/Role/Scheduler.pm +++ b/lib/App/Netdisco/Backend/Role/Scheduler.pm @@ -5,11 +5,11 @@ use Dancer qw/:moose :syntax :script/; use Algorithm::Cron; use App::Netdisco::Util::MCE; +use App::Netdisco::JobQueue qw/jq_insert/; + use Role::Tiny; use namespace::clean; -use App::Netdisco::JobQueue qw/jq_insert/; - sub worker_begin { my $self = shift; my $wid = $self->wid; diff --git a/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/lib/App/Netdisco/JobQueue/PostgreSQL.pm index 0f1eda9c..d7b73d53 100644 --- a/lib/App/Netdisco/JobQueue/PostgreSQL.pm +++ b/lib/App/Netdisco/JobQueue/PostgreSQL.pm @@ -153,12 +153,12 @@ sub jq_lock { try { schema('netdisco')->txn_do(sub { schema('netdisco')->resultset('Admin') - ->search({ job => $job->job }, { for => 'update' }) + ->search({ job => $job->id }, { for => 'update' }) ->update({ status => ('queued-'. setting('workers')->{'BACKEND'}) }); return unless schema('netdisco')->resultset('Admin') - ->count({ job => $job->job, + ->count({ job => $job->id, status => ('queued-'. setting('workers')->{'BACKEND'}) }); # remove any duplicate jobs, needed because we have race conditions @@ -202,7 +202,7 @@ sub jq_defer { # lock db row and update to show job is available schema('netdisco')->resultset('Admin') - ->find($job->job, {for => 'update'}) + ->find($job->id, {for => 'update'}) ->update({ status => 'queued', started => undef }); }); $happy = true; @@ -233,7 +233,7 @@ sub jq_complete { } schema('netdisco')->resultset('Admin') - ->find($job->job, {for => 'update'})->update({ + ->find($job->id, {for => 'update'})->update({ status => $job->status, log => $job->log, started => $job->started, diff --git a/lib/App/Netdisco/Worker/Plugin.pm b/lib/App/Netdisco/Worker/Plugin.pm index bf871644..a3cc3337 100644 --- a/lib/App/Netdisco/Worker/Plugin.pm +++ b/lib/App/Netdisco/Worker/Plugin.pm @@ -4,7 +4,6 @@ use Dancer ':syntax'; use Dancer::Plugin; use Scope::Guard 'guard'; -use aliased 'App::Netdisco::Worker::Status'; use App::Netdisco::Util::Permission qw/check_acl_no check_acl_only/; register 'register_worker' => sub { @@ -29,43 +28,47 @@ register 'register_worker' => sub { ? setting('driver_priority')->{$workerconf->{driver}} : 0); my $worker = sub { - my $job = shift or return Status->error('missing job param'); + my $job = shift or die 'missing job param'; # use DDP; p $workerconf; + # update job's record of namespace and priority + # check to see if this namespace has already passed at higher priority + return if $job->namespace_passed($workerconf); + + my @newuserconf = (); + my @userconf = @{ setting('device_auth') || [] }; + # worker might be vendor/platform specific if (ref $job->device) { my $no = (exists $workerconf->{no} ? $workerconf->{no} : undef); my $only = (exists $workerconf->{only} ? $workerconf->{only} : undef); - my $defer = Status->defer('worker is not applicable to this device'); - return $defer if $no and check_acl_no($job->device, $no); - return $defer if $only and not check_acl_only($job->device, $only); + return $job->defer('worker is not applicable to this device') + if ($no and check_acl_no($job->device, $no)) + or ($only and not check_acl_only($job->device, $only)); + + # reduce device_auth by driver and action filters + foreach my $stanza (@userconf) { + next if exists $stanza->{driver} and exists $workerconf->{driver} + and (($stanza->{driver} || '') ne ($workerconf->{driver} || '')); + + next if exists $stanza->{action} + and not _find_matchaction($workerconf, lc($stanza->{action})); + + push @newuserconf, $stanza; + } + + # per-device action but no device creds available + return $job->defer('deferred job with no device creds') + if 0 == scalar @newuserconf; } - my @newuserconf = (); - my @userconf = @{ setting('device_auth') || [] }; - - # reduce device_auth by driver and action filters - foreach my $stanza (@userconf) { - next if exists $stanza->{driver} and exists $workerconf->{driver} - and (($stanza->{driver} || '') ne ($workerconf->{driver} || '')); - - next if exists $stanza->{action} - and not _find_matchaction($workerconf, lc($stanza->{action})); - - push @newuserconf, $stanza; - } - - # per-device action but no device creds available - return Status->defer('skipped with no device creds') - if ref $job->device and 0 == scalar @newuserconf; - # back up and restore device_auth my $guard = guard { set(device_auth => \@userconf) }; set(device_auth => \@newuserconf); # run worker - return $code->($job, $workerconf); + $code->($job, $workerconf); }; # store the built worker as Worker.pm will build the dispatch order later on diff --git a/lib/App/Netdisco/Worker/Runner.pm b/lib/App/Netdisco/Worker/Runner.pm index 6ba1bae2..4f31b436 100644 --- a/lib/App/Netdisco/Worker/Runner.pm +++ b/lib/App/Netdisco/Worker/Runner.pm @@ -2,23 +2,18 @@ package App::Netdisco::Worker::Runner; use Dancer qw/:moose :syntax/; use Dancer::Factory::Hook; -use aliased 'App::Netdisco::Worker::Status'; use App::Netdisco::Util::Permission qw/check_acl_no check_acl_only/; use App::Netdisco::Util::Device 'get_device'; use Try::Tiny; -use Moo::Role; use Module::Load (); use Scope::Guard 'guard'; + +use Moo::Role; use namespace::clean; -has ['job', 'jobstat'] => ( is => 'rw' ); - -after 'run', 'run_workers' => sub { - my $self = shift; - $self->job->update_status($self->jobstat); -}; +has 'job' => ( is => 'rw' ); # mixin code to run workers loaded via plugins sub run { @@ -29,11 +24,8 @@ sub run { unless ref $job eq 'App::Netdisco::Backend::Job'; $self->job($job); - $self->job->device( get_device($job->device) ); - $self->jobstat( Status->error('failed in job init') ); - - my $action = $job->action; - Module::Load::load 'App::Netdisco::Worker' => $action; + $job->device( get_device($job->device) ); + Module::Load::load 'App::Netdisco::Worker' => $job->action; my @newuserconf = (); my @userconf = @{ setting('device_auth') || [] }; @@ -49,32 +41,32 @@ sub run { push @newuserconf, $stanza; } + + # per-device action but no device creds available + return $job->defer('deferred job with no device creds') + if 0 == scalar @newuserconf; } - # per-device action but no device creds available - return $self->jobstat->defer('deferred job with no device creds') - if ref $job->device and 0 == scalar @newuserconf; - # back up and restore device_auth - my $guard = guard { set(device_auth => \@userconf) }; + my $configguard = guard { set(device_auth => \@userconf) }; set(device_auth => \@newuserconf); - # run check phase - # optional - but if there are workers then one MUST return done - my $store = Dancer::Factory::Hook->instance(); - $self->jobstat( Status->error('check phase did not pass for this action') ); + # finalise job status when we exit + my $statusguard = guard { $job->finalise_status }; + + # run check phase and if there are workers then one MUST be successful $self->run_workers('nd2_core_check'); - return if scalar @{ $store->get_hooks_for('nd2_core_check') } - and $self->jobstat->not_ok; + return if not $job->check_passed; # run other phases - $self->jobstat( Status->error('no worker succeeded during main phase') ); $self->run_workers("nd2_core_${_}") for qw/early main user/; } sub run_workers { my $self = shift; - my $hook = shift or return $self->jobstat->error('missing hook param'); + my $job = $self->job or die error 'no job in worker job slot'; + my $hook = shift or return $job->error('missing hook param'); + my $store = Dancer::Factory::Hook->instance(); (my $phase = $hook) =~ s/^nd2_core_//; @@ -82,24 +74,10 @@ sub run_workers { debug "=> running workers for phase: $phase"; foreach my $worker (@{ $store->get_hooks_for($hook) }) { - try { - # could die or return undef or a scalar or Status or another class - my $retval = $worker->($self->job); - - if (ref $retval eq 'App::Netdisco::Worker::Status') { - debug ('=> '. $retval->log) if $retval->log; - - # update (save) the status if we're in check, early, or main phases - # because these logs can end up in the job queue as status message - $self->jobstat($retval) - if ($phase =~ m/^(?:check|early|main)$/) - and $retval->level >= $self->jobstat->level; - } - } - # errors at most phases are ignored + try { $job->add_status( $worker->($job) ) } catch { debug "=> $_" if $_; - $self->jobstat->error($_) if $phase eq 'check'; + $job->error($_); }; } } diff --git a/lib/App/Netdisco/Worker/Status.pm b/lib/App/Netdisco/Worker/Status.pm index 6bfef35d..3b74c856 100644 --- a/lib/App/Netdisco/Worker/Status.pm +++ b/lib/App/Netdisco/Worker/Status.pm @@ -11,10 +11,9 @@ use namespace::clean; has 'status' => ( is => 'rw', default => undef, - clearer => 1, ); -has 'log' => ( +has ['log', 'phase', 'namespace', 'priority'] => ( is => 'rw', default => '', );