first pass at Interactive worker

This commit is contained in:
Oliver Gorwits
2013-01-02 19:38:28 +00:00
parent b96b27b604
commit b3cd1a55e5
5 changed files with 34 additions and 71 deletions

View File

@@ -59,11 +59,16 @@ sub worker_factory {
my $role = shift; my $role = shift;
return sub { return sub {
my $self = shift; my $self = shift;
# my $wid = $self->wid;
# $self->sendto('stderr', ">>> worker $wid starting with role $role\n");
Role::Tiny->apply_roles_to_object($self, "App::Netdisco::Daemon::Worker::$role"); Role::Tiny->apply_roles_to_object($self, "App::Netdisco::Daemon::Worker::$role");
# XXX temporary hack to work around MCE's use of __DIE__ # XXX temporary hack to work around MCE's use of __DIE__
my $mce_die = $SIG{__DIE__}; my $mce_die = $SIG{__DIE__};
$SIG{__DIE__} = sub { return @_ if $^S and $^S eq 1; $mce_die->(@_) }; $SIG{__DIE__} = sub { return @_ if $^S and $^S eq 1; $mce_die->(@_) };
# XXX "there's nothing so permanent as temporary..." ~~ Milton Friedman # XXX "there's nothing so permanent as temporary..." ~~ Milton Friedman
$self->worker_begin if $self->can('worker_begin'); $self->worker_begin if $self->can('worker_begin');
}; };
} }

View File

@@ -19,6 +19,8 @@ __PACKAGE__->add_columns(
"wid", # worker ID, only != 0 once taken "wid", # worker ID, only != 0 once taken
{ data_type => "integer", is_nullable => 0, default_value => 0 }, { data_type => "integer", is_nullable => 0, default_value => 0 },
"entered",
{ data_type => "timestamp", is_nullable => 1 },
"started", "started",
{ data_type => "timestamp", is_nullable => 1 }, { data_type => "timestamp", is_nullable => 1 },
"finished", "finished",
@@ -33,6 +35,10 @@ __PACKAGE__->add_columns(
{ data_type => "text", is_nullable => 1 }, { data_type => "text", is_nullable => 1 },
"status", "status",
{ data_type => "text", is_nullable => 1 }, { data_type => "text", is_nullable => 1 },
"username",
{ data_type => "text", is_nullable => 1 },
"userip",
{ data_type => "inet", is_nullable => 1 },
"log", "log",
{ data_type => "text", is_nullable => 1 }, { data_type => "text", is_nullable => 1 },
"debug", "debug",

View File

@@ -52,8 +52,12 @@ sub take_jobs {
{rows => $max}, {rows => $max},
); );
return [] if $rs->count == 0;
my @rows = $rs->all;
$rs->update({wid => $wid}); $rs->update({wid => $wid});
return [ map {$_->get_columns} $rs->all ];
return [ map {{$_->get_columns}} @rows ];
} }
sub reset_jobs { sub reset_jobs {

View File

@@ -14,98 +14,46 @@ with 'App::Netdisco::Daemon::Worker::Interactive::DeviceActions',
sub worker_body { sub worker_body {
my $self = shift; my $self = shift;
# get all pending jobs
my $rs = schema('daemon')->resultset('Admin')->search({
action => [qw/location contact portcontrol portname vlan power/],
status => 'queued',
});
while (1) { while (1) {
while (my $job = $rs->next) { my $jobs = $self->do('take_jobs', $self->wid, 'Interactive');
my $target = 'set_'. $job->action;
next unless $self->can($target);
# mark job as running foreach my $job (@$jobs) {
next unless $self->lock_job($job); my $target = 'set_'. $job->{action};
next unless $self->can($target);
# do job # do job
my ($status, $log); my ($status, $log);
try { try {
$job->{started} = scalar localtime;
($status, $log) = $self->$target($job); ($status, $log) = $self->$target($job);
} }
catch { warn "error running job: $_\n" };
# revert to queued status if we failed to action the job
if (not $status) {
$self->revert_job($job->job);
}
else {
# update job state to done/error with log
$self->close_job($job->job, $status, $log);
}
}
# reset iterator so ->next() triggers another DB query
$rs->reset;
$self->gd_sleep( setting('daemon_sleep_time') || 5 );
}
}
sub lock_job {
my ($self, $job) = @_;
my $happy = 1;
# lock db table, check job state is still queued, update to running
try {
my $status_updated = schema('daemon')->txn_do(sub {
my $row = schema('daemon')->resultset('Admin')->find(
{job => $job->job},
{for => 'update'}
);
$happy = 0 if $row->status ne 'queued';
$row->update({status => "running-$$", started => \"datetime('now')" });
});
$happy = 0 if not $status_updated;
}
catch { catch {
warn "error locking job: $_\n"; $status = 'error';
$happy = 0; $log = "error running job: $_";
$self->sendto('stderr', $log ."\n");
}; };
return $happy; $self->close_job($job, $status, $log);
} }
sub revert_job { sleep( setting('daemon_sleep_time') || 5 );
my ($self, $id) = @_;
try {
schema('daemon')->resultset('Admin')
->find($id)
->update({status => 'queued', started => undef});
} }
catch { warn "error reverting job: $_\n" };
} }
sub close_job { sub close_job {
my ($self, $id, $status, $log) = @_; my ($self, $job, $status, $log) = @_;
try { try {
my $local = schema('daemon')->resultset('Admin')->find($id);
schema('netdisco')->resultset('Admin') schema('netdisco')->resultset('Admin')
->find($id) ->find($job->{job})
->update({ ->update({
status => $status, status => $status,
log => $log, log => $log,
started => $local->started, started => $job->{started},
finished => \'now()', finished => \'now()',
}); });
$local->delete;
} }
catch { warn "error closing job: $_\n" }; catch { $self->sendto('stderr', "error closing job: $_\n") };
} }
1; 1;

View File

@@ -24,7 +24,7 @@ sub worker_begin {
my $rs = schema('netdisco')->resultset('Admin') my $rs = schema('netdisco')->resultset('Admin')
->search({status => "queued-$fqdn"}); ->search({status => "queued-$fqdn"});
my @jobs = map {$_->get_columns} $rs->all; my @jobs = map {{$_->get_columns}} $rs->all;
map { $_->{role} = $role_map->{$_->{action}} } @jobs; map { $_->{role} = $role_map->{$_->{action}} } @jobs;
$self->do('add_jobs', \@jobs); $self->do('add_jobs', \@jobs);
@@ -48,7 +48,7 @@ sub worker_body {
# mark job as running # mark job as running
next unless $self->lock_job($job); next unless $self->lock_job($job);
my $local_job = $job->get_columns; my $local_job = { $job->get_columns };
$local_job->{role} = $role_map->{$job->action}; $local_job->{role} = $role_map->{$job->action};
# copy job to local queue # copy job to local queue