move status tracking and checking inside job instance

This commit is contained in:
Oliver Gorwits
2017-11-06 21:26:01 +00:00
parent 4436150bf4
commit 25907d3544
8 changed files with 165 additions and 100 deletions

View File

@@ -1,5 +1,7 @@
package App::Netdisco::Backend::Job; package App::Netdisco::Backend::Job;
use aliased 'App::Netdisco::Worker::Status';
use Moo; use Moo;
use namespace::clean; use namespace::clean;
@@ -17,6 +19,10 @@ foreach my $slot (qw/
userip userip
log log
debug debug
_phase
_namespace
_priority
/) { /) {
has $slot => ( has $slot => (
@@ -24,11 +30,16 @@ foreach my $slot (qw/
); );
} }
has '_statuslist' => (
is => 'rw',
default => sub { [] },
);
=head1 METHODS =head1 METHODS
=head2 summary =head2 summary
An attempt to make a meaningful statement about the job. An attempt to make a meaningful written statement about the job.
=cut =cut
@@ -38,26 +49,100 @@ sub summary {
$job->action, $job->action,
($job->device || ''), ($job->device || ''),
($job->port || ''); ($job->port || '');
# ($job->subaction ? (q{'}. $job->subaction .q{'}) : '');
} }
=head2 update_status =head2 finalise_status
Passed an L<App::Netdisco::Worker::Status> will update this job's C<log> and Find the best status and log it into the job's C<status> and C<log> slots.
C<status> slots.
=cut =cut
sub update_status { sub finalise_status {
my $job = shift; my $job = shift;
my $status = shift or return; my $max_level = Status->error()->level;
# fallback
$job->status('error');
$job->log('failed to succeed at any worker!');
foreach my $status (@{ $self->_statuslist }) {
next unless $status->phase =~ m/^(?:early|main)$/;
if ($status->level >= $max_level) {
$job->status( $status->status ); $job->status( $status->status );
$job->log( $status->log ); $job->log( $status->log );
return $job; $max_level = $status->level;
}
}
}
=head2 check_passed
Returns true if at least one worker during the C<check> phase flagged status
C<done>.
=cut
sub check_passed {
my $job = shift;
foreach my $status (@{ $self->_statuslist }) {
next unless $status->phase eq 'check';
return 1 if $status->is_ok;
}
return 0;
}
=head2 namespace_passed( \%workerconf )
Returns true when, for the namespace specified in the passed configuration,
all workers of a higher priority level have succeeded.
=cut
sub namespace_passed {
my ($job, $workerconf) = @_;
if (defined $job->_namespace
and ($workerconf->{phase} eq $job->_phase)
and ($workerconf->{namespace} eq $job->_namespace)
and ($workerconf->{priority} != $job->_priority)) {
foreach my $status (@{ $self->_statuslist }) {
next unless ($status->phase eq $job->_phase)
and ($staus->namespace eq $job->_namespace)
and ($status->priority == $job->_priority);
return 1 if $status->is_ok;
}
}
$job->_phase( $workerconf->{phase} );
$job->_namespace( $workerconf->{namespace} );
$job->_priority( $workerconf->{priority} );
return 0;
}
=head2 add_status
Passed an L<App::Netdisco::Worker::Status> will add it to this job's internal
store.
=cut
sub add_status {
my ($job, $status) = @_;
return unless ref $status eq 'App::Netdisco::Worker::Status';
push @{ $self->_statuslist }, $status;
} }
=head1 ADDITIONAL COLUMNS =head1 ADDITIONAL COLUMNS
=head2 id
Alias for the C<job> column.
=cut
sub id { (shift)->job }
=head2 extra =head2 extra
Alias for the C<subaction> column. Alias for the C<subaction> column.

View File

@@ -5,12 +5,12 @@ use Dancer qw/:moose :syntax :script/;
use List::Util 'sum'; use List::Util 'sum';
use App::Netdisco::Util::MCE; use App::Netdisco::Util::MCE;
use Role::Tiny;
use namespace::clean;
use App::Netdisco::JobQueue use App::Netdisco::JobQueue
qw/jq_locked jq_getsome jq_getsomep jq_lock jq_warm_thrusters/; qw/jq_locked jq_getsome jq_getsomep jq_lock jq_warm_thrusters/;
use Role::Tiny;
use namespace::clean;
sub worker_begin { sub worker_begin {
my $self = shift; my $self = shift;
my $wid = $self->wid; my $wid = $self->wid;
@@ -58,7 +58,7 @@ sub worker_body {
# mark job as running # mark job as running
next unless jq_lock($job); next unless jq_lock($job);
info sprintf "mgr (%s): job %s booked out for this processing node", info sprintf "mgr (%s): job %s booked out for this processing node",
$wid, $job->job; $wid, $job->id;
# copy job to local queue # copy job to local queue
$self->{queue}->enqueuep(100, $job); $self->{queue}->enqueuep(100, $job);
@@ -75,7 +75,7 @@ sub worker_body {
# mark job as running # mark job as running
next unless jq_lock($job); next unless jq_lock($job);
info sprintf "mgr (%s): job %s booked out for this processing node", info sprintf "mgr (%s): job %s booked out for this processing node",
$wid, $job->job; $wid, $job->id;
# copy job to local queue # copy job to local queue
$self->{queue}->enqueue($job); $self->{queue}->enqueue($job);

View File

@@ -5,12 +5,12 @@ use Dancer qw/:moose :syntax :script/;
use Try::Tiny; use Try::Tiny;
use App::Netdisco::Util::MCE; use App::Netdisco::Util::MCE;
use Role::Tiny;
use namespace::clean;
use Time::HiRes 'sleep'; use Time::HiRes 'sleep';
use App::Netdisco::JobQueue qw/jq_defer jq_complete/; use App::Netdisco::JobQueue qw/jq_defer jq_complete/;
use Role::Tiny;
use namespace::clean;
# add dispatch methods for poller tasks # add dispatch methods for poller tasks
with 'App::Netdisco::Worker::Runner'; with 'App::Netdisco::Worker::Runner';
@@ -29,9 +29,9 @@ sub worker_body {
try { try {
$job->started(scalar localtime); $job->started(scalar localtime);
prctl sprintf 'nd2: #%s poll: #%s: %s', prctl sprintf 'nd2: #%s poll: #%s: %s',
$wid, $job->job, $job->summary; $wid, $job->id, $job->summary;
info sprintf "pol (%s): starting %s job(%s) at %s", info sprintf "pol (%s): starting %s job(%s) at %s",
$wid, $job->action, $job->job, $job->started; $wid, $job->action, $job->id, $job->started;
$self->run($job); $self->run($job);
} }
catch { catch {
@@ -51,7 +51,7 @@ sub close_job {
my $now = scalar localtime; my $now = scalar localtime;
info sprintf "pol (%s): wrapping up %s job(%s) - status %s at %s", info sprintf "pol (%s): wrapping up %s job(%s) - status %s at %s",
$self->wid, $job->action, $job->job, $job->status, $now; $self->wid, $job->action, $job->id, $job->status, $now;
try { try {
if ($job->status eq 'defer') { if ($job->status eq 'defer') {

View File

@@ -5,11 +5,11 @@ use Dancer qw/:moose :syntax :script/;
use Algorithm::Cron; use Algorithm::Cron;
use App::Netdisco::Util::MCE; use App::Netdisco::Util::MCE;
use App::Netdisco::JobQueue qw/jq_insert/;
use Role::Tiny; use Role::Tiny;
use namespace::clean; use namespace::clean;
use App::Netdisco::JobQueue qw/jq_insert/;
sub worker_begin { sub worker_begin {
my $self = shift; my $self = shift;
my $wid = $self->wid; my $wid = $self->wid;

View File

@@ -153,12 +153,12 @@ sub jq_lock {
try { try {
schema('netdisco')->txn_do(sub { schema('netdisco')->txn_do(sub {
schema('netdisco')->resultset('Admin') schema('netdisco')->resultset('Admin')
->search({ job => $job->job }, { for => 'update' }) ->search({ job => $job->id }, { for => 'update' })
->update({ status => ('queued-'. setting('workers')->{'BACKEND'}) }); ->update({ status => ('queued-'. setting('workers')->{'BACKEND'}) });
return unless return unless
schema('netdisco')->resultset('Admin') schema('netdisco')->resultset('Admin')
->count({ job => $job->job, ->count({ job => $job->id,
status => ('queued-'. setting('workers')->{'BACKEND'}) }); status => ('queued-'. setting('workers')->{'BACKEND'}) });
# remove any duplicate jobs, needed because we have race conditions # remove any duplicate jobs, needed because we have race conditions
@@ -202,7 +202,7 @@ sub jq_defer {
# lock db row and update to show job is available # lock db row and update to show job is available
schema('netdisco')->resultset('Admin') schema('netdisco')->resultset('Admin')
->find($job->job, {for => 'update'}) ->find($job->id, {for => 'update'})
->update({ status => 'queued', started => undef }); ->update({ status => 'queued', started => undef });
}); });
$happy = true; $happy = true;
@@ -233,7 +233,7 @@ sub jq_complete {
} }
schema('netdisco')->resultset('Admin') schema('netdisco')->resultset('Admin')
->find($job->job, {for => 'update'})->update({ ->find($job->id, {for => 'update'})->update({
status => $job->status, status => $job->status,
log => $job->log, log => $job->log,
started => $job->started, started => $job->started,

View File

@@ -4,7 +4,6 @@ use Dancer ':syntax';
use Dancer::Plugin; use Dancer::Plugin;
use Scope::Guard 'guard'; use Scope::Guard 'guard';
use aliased 'App::Netdisco::Worker::Status';
use App::Netdisco::Util::Permission qw/check_acl_no check_acl_only/; use App::Netdisco::Util::Permission qw/check_acl_no check_acl_only/;
register 'register_worker' => sub { register 'register_worker' => sub {
@@ -29,21 +28,24 @@ register 'register_worker' => sub {
? setting('driver_priority')->{$workerconf->{driver}} : 0); ? setting('driver_priority')->{$workerconf->{driver}} : 0);
my $worker = sub { my $worker = sub {
my $job = shift or return Status->error('missing job param'); my $job = shift or die 'missing job param';
# use DDP; p $workerconf; # use DDP; p $workerconf;
# update job's record of namespace and priority
# check to see if this namespace has already passed at higher priority
return if $job->namespace_passed($workerconf);
my @newuserconf = ();
my @userconf = @{ setting('device_auth') || [] };
# worker might be vendor/platform specific # worker might be vendor/platform specific
if (ref $job->device) { if (ref $job->device) {
my $no = (exists $workerconf->{no} ? $workerconf->{no} : undef); my $no = (exists $workerconf->{no} ? $workerconf->{no} : undef);
my $only = (exists $workerconf->{only} ? $workerconf->{only} : undef); my $only = (exists $workerconf->{only} ? $workerconf->{only} : undef);
my $defer = Status->defer('worker is not applicable to this device'); return $job->defer('worker is not applicable to this device')
return $defer if $no and check_acl_no($job->device, $no); if ($no and check_acl_no($job->device, $no))
return $defer if $only and not check_acl_only($job->device, $only); or ($only and not check_acl_only($job->device, $only));
}
my @newuserconf = ();
my @userconf = @{ setting('device_auth') || [] };
# reduce device_auth by driver and action filters # reduce device_auth by driver and action filters
foreach my $stanza (@userconf) { foreach my $stanza (@userconf) {
@@ -57,15 +59,16 @@ register 'register_worker' => sub {
} }
# per-device action but no device creds available # per-device action but no device creds available
return Status->defer('skipped with no device creds') return $job->defer('deferred job with no device creds')
if ref $job->device and 0 == scalar @newuserconf; if 0 == scalar @newuserconf;
}
# back up and restore device_auth # back up and restore device_auth
my $guard = guard { set(device_auth => \@userconf) }; my $guard = guard { set(device_auth => \@userconf) };
set(device_auth => \@newuserconf); set(device_auth => \@newuserconf);
# run worker # run worker
return $code->($job, $workerconf); $code->($job, $workerconf);
}; };
# store the built worker as Worker.pm will build the dispatch order later on # store the built worker as Worker.pm will build the dispatch order later on

View File

@@ -2,23 +2,18 @@ package App::Netdisco::Worker::Runner;
use Dancer qw/:moose :syntax/; use Dancer qw/:moose :syntax/;
use Dancer::Factory::Hook; use Dancer::Factory::Hook;
use aliased 'App::Netdisco::Worker::Status';
use App::Netdisco::Util::Permission qw/check_acl_no check_acl_only/; use App::Netdisco::Util::Permission qw/check_acl_no check_acl_only/;
use App::Netdisco::Util::Device 'get_device'; use App::Netdisco::Util::Device 'get_device';
use Try::Tiny; use Try::Tiny;
use Moo::Role;
use Module::Load (); use Module::Load ();
use Scope::Guard 'guard'; use Scope::Guard 'guard';
use Moo::Role;
use namespace::clean; use namespace::clean;
has ['job', 'jobstat'] => ( is => 'rw' ); has 'job' => ( is => 'rw' );
after 'run', 'run_workers' => sub {
my $self = shift;
$self->job->update_status($self->jobstat);
};
# mixin code to run workers loaded via plugins # mixin code to run workers loaded via plugins
sub run { sub run {
@@ -29,11 +24,8 @@ sub run {
unless ref $job eq 'App::Netdisco::Backend::Job'; unless ref $job eq 'App::Netdisco::Backend::Job';
$self->job($job); $self->job($job);
$self->job->device( get_device($job->device) ); $job->device( get_device($job->device) );
$self->jobstat( Status->error('failed in job init') ); Module::Load::load 'App::Netdisco::Worker' => $job->action;
my $action = $job->action;
Module::Load::load 'App::Netdisco::Worker' => $action;
my @newuserconf = (); my @newuserconf = ();
my @userconf = @{ setting('device_auth') || [] }; my @userconf = @{ setting('device_auth') || [] };
@@ -49,32 +41,32 @@ sub run {
push @newuserconf, $stanza; push @newuserconf, $stanza;
} }
}
# per-device action but no device creds available # per-device action but no device creds available
return $self->jobstat->defer('deferred job with no device creds') return $job->defer('deferred job with no device creds')
if ref $job->device and 0 == scalar @newuserconf; if 0 == scalar @newuserconf;
}
# back up and restore device_auth # back up and restore device_auth
my $guard = guard { set(device_auth => \@userconf) }; my $configguard = guard { set(device_auth => \@userconf) };
set(device_auth => \@newuserconf); set(device_auth => \@newuserconf);
# run check phase # finalise job status when we exit
# optional - but if there are workers then one MUST return done my $statusguard = guard { $job->finalise_status };
my $store = Dancer::Factory::Hook->instance();
$self->jobstat( Status->error('check phase did not pass for this action') ); # run check phase and if there are workers then one MUST be successful
$self->run_workers('nd2_core_check'); $self->run_workers('nd2_core_check');
return if scalar @{ $store->get_hooks_for('nd2_core_check') } return if not $job->check_passed;
and $self->jobstat->not_ok;
# run other phases # run other phases
$self->jobstat( Status->error('no worker succeeded during main phase') );
$self->run_workers("nd2_core_${_}") for qw/early main user/; $self->run_workers("nd2_core_${_}") for qw/early main user/;
} }
sub run_workers { sub run_workers {
my $self = shift; my $self = shift;
my $hook = shift or return $self->jobstat->error('missing hook param'); my $job = $self->job or die error 'no job in worker job slot';
my $hook = shift or return $job->error('missing hook param');
my $store = Dancer::Factory::Hook->instance(); my $store = Dancer::Factory::Hook->instance();
(my $phase = $hook) =~ s/^nd2_core_//; (my $phase = $hook) =~ s/^nd2_core_//;
@@ -82,24 +74,10 @@ sub run_workers {
debug "=> running workers for phase: $phase"; debug "=> running workers for phase: $phase";
foreach my $worker (@{ $store->get_hooks_for($hook) }) { foreach my $worker (@{ $store->get_hooks_for($hook) }) {
try { try { $job->add_status( $worker->($job) ) }
# could die or return undef or a scalar or Status or another class
my $retval = $worker->($self->job);
if (ref $retval eq 'App::Netdisco::Worker::Status') {
debug ('=> '. $retval->log) if $retval->log;
# update (save) the status if we're in check, early, or main phases
# because these logs can end up in the job queue as status message
$self->jobstat($retval)
if ($phase =~ m/^(?:check|early|main)$/)
and $retval->level >= $self->jobstat->level;
}
}
# errors at most phases are ignored
catch { catch {
debug "=> $_" if $_; debug "=> $_" if $_;
$self->jobstat->error($_) if $phase eq 'check'; $job->error($_);
}; };
} }
} }

View File

@@ -11,10 +11,9 @@ use namespace::clean;
has 'status' => ( has 'status' => (
is => 'rw', is => 'rw',
default => undef, default => undef,
clearer => 1,
); );
has 'log' => ( has ['log', 'phase', 'namespace', 'priority'] => (
is => 'rw', is => 'rw',
default => '', default => '',
); );