From b3cd1a55e5a39aec7ecdfc6fd156ce33efcd1f56 Mon Sep 17 00:00:00 2001 From: Oliver Gorwits Date: Wed, 2 Jan 2013 19:38:28 +0000 Subject: [PATCH] first pass at Interactive worker --- Netdisco/bin/netdisco-daemon | 5 ++ .../App/Netdisco/Daemon/DB/Result/Admin.pm | 6 ++ Netdisco/lib/App/Netdisco/Daemon/Queue.pm | 6 +- .../App/Netdisco/Daemon/Worker/Interactive.pm | 84 ++++--------------- .../lib/App/Netdisco/Daemon/Worker/Manager.pm | 4 +- 5 files changed, 34 insertions(+), 71 deletions(-) diff --git a/Netdisco/bin/netdisco-daemon b/Netdisco/bin/netdisco-daemon index c6fcc9ca..7902e2b6 100755 --- a/Netdisco/bin/netdisco-daemon +++ b/Netdisco/bin/netdisco-daemon @@ -59,11 +59,16 @@ sub worker_factory { my $role = shift; return sub { my $self = shift; + + # my $wid = $self->wid; + # $self->sendto('stderr', ">>> worker $wid starting with role $role\n"); Role::Tiny->apply_roles_to_object($self, "App::Netdisco::Daemon::Worker::$role"); + # XXX temporary hack to work around MCE's use of __DIE__ my $mce_die = $SIG{__DIE__}; $SIG{__DIE__} = sub { return @_ if $^S and $^S eq 1; $mce_die->(@_) }; # XXX "there's nothing so permanent as temporary..." ~~ Milton Friedman + $self->worker_begin if $self->can('worker_begin'); }; } diff --git a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm b/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm index a2a8fb65..2806b8c4 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm @@ -19,6 +19,8 @@ __PACKAGE__->add_columns( "wid", # worker ID, only != 0 once taken { data_type => "integer", is_nullable => 0, default_value => 0 }, + "entered", + { data_type => "timestamp", is_nullable => 1 }, "started", { data_type => "timestamp", is_nullable => 1 }, "finished", @@ -33,6 +35,10 @@ __PACKAGE__->add_columns( { data_type => "text", is_nullable => 1 }, "status", { data_type => "text", is_nullable => 1 }, + "username", + { data_type => "text", is_nullable => 1 }, + "userip", + { data_type => "inet", is_nullable => 1 }, "log", { data_type => "text", is_nullable => 1 }, "debug", diff --git a/Netdisco/lib/App/Netdisco/Daemon/Queue.pm b/Netdisco/lib/App/Netdisco/Daemon/Queue.pm index dcc9c4d1..d1ab9483 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Queue.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Queue.pm @@ -52,8 +52,12 @@ sub take_jobs { {rows => $max}, ); + return [] if $rs->count == 0; + + my @rows = $rs->all; $rs->update({wid => $wid}); - return [ map {$_->get_columns} $rs->all ]; + + return [ map {{$_->get_columns}} @rows ]; } sub reset_jobs { diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm index 21ccdff8..a0965805 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm @@ -14,98 +14,46 @@ with 'App::Netdisco::Daemon::Worker::Interactive::DeviceActions', sub worker_body { my $self = shift; - # get all pending jobs - my $rs = schema('daemon')->resultset('Admin')->search({ - action => [qw/location contact portcontrol portname vlan power/], - status => 'queued', - }); - while (1) { - while (my $job = $rs->next) { - my $target = 'set_'. $job->action; - next unless $self->can($target); + my $jobs = $self->do('take_jobs', $self->wid, 'Interactive'); - # mark job as running - next unless $self->lock_job($job); + foreach my $job (@$jobs) { + my $target = 'set_'. $job->{action}; + next unless $self->can($target); # do job my ($status, $log); try { + $job->{started} = scalar localtime; ($status, $log) = $self->$target($job); } - catch { warn "error running job: $_\n" }; + catch { + $status = 'error'; + $log = "error running job: $_"; + $self->sendto('stderr', $log ."\n"); + }; - # revert to queued status if we failed to action the job - if (not $status) { - $self->revert_job($job->job); - } - else { - # update job state to done/error with log - $self->close_job($job->job, $status, $log); - } + $self->close_job($job, $status, $log); } - # reset iterator so ->next() triggers another DB query - $rs->reset; - $self->gd_sleep( setting('daemon_sleep_time') || 5 ); + sleep( setting('daemon_sleep_time') || 5 ); } } -sub lock_job { - my ($self, $job) = @_; - my $happy = 1; - - # lock db table, check job state is still queued, update to running - try { - my $status_updated = schema('daemon')->txn_do(sub { - my $row = schema('daemon')->resultset('Admin')->find( - {job => $job->job}, - {for => 'update'} - ); - - $happy = 0 if $row->status ne 'queued'; - $row->update({status => "running-$$", started => \"datetime('now')" }); - }); - - $happy = 0 if not $status_updated; - } - catch { - warn "error locking job: $_\n"; - $happy = 0; - }; - - return $happy; -} - -sub revert_job { - my ($self, $id) = @_; - - try { - schema('daemon')->resultset('Admin') - ->find($id) - ->update({status => 'queued', started => undef}); - } - catch { warn "error reverting job: $_\n" }; -} - sub close_job { - my ($self, $id, $status, $log) = @_; + my ($self, $job, $status, $log) = @_; try { - my $local = schema('daemon')->resultset('Admin')->find($id); - schema('netdisco')->resultset('Admin') - ->find($id) + ->find($job->{job}) ->update({ status => $status, log => $log, - started => $local->started, + started => $job->{started}, finished => \'now()', }); - - $local->delete; } - catch { warn "error closing job: $_\n" }; + catch { $self->sendto('stderr', "error closing job: $_\n") }; } 1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm index e9336541..eb3b052f 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm @@ -24,7 +24,7 @@ sub worker_begin { my $rs = schema('netdisco')->resultset('Admin') ->search({status => "queued-$fqdn"}); - my @jobs = map {$_->get_columns} $rs->all; + my @jobs = map {{$_->get_columns}} $rs->all; map { $_->{role} = $role_map->{$_->{action}} } @jobs; $self->do('add_jobs', \@jobs); @@ -48,7 +48,7 @@ sub worker_body { # mark job as running next unless $self->lock_job($job); - my $local_job = $job->get_columns; + my $local_job = { $job->get_columns }; $local_job->{role} = $role_map->{$job->action}; # copy job to local queue