first pass at Manager worker

This commit is contained in:
Oliver Gorwits
2013-01-01 23:45:56 +00:00
parent e2b9fd3430
commit 61a3542834
4 changed files with 108 additions and 139 deletions

View File

@@ -10,6 +10,9 @@ use Dancer qw/:moose :script/;
# callbacks and local job queue management
use App::Netdisco::Daemon::Queue ':all';
# needed to quench AF_INET6 symbol errors
use NetAddr::IP::Lite ':lower';
use MCE;
use Role::Tiny;
use Path::Class 'dir';
@@ -64,29 +67,30 @@ sub worker_factory {
my $role = shift;
return sub {
my $self = shift;
# with "App::Netdisco::Daemon::Worker::$role";
with "App::Netdisco::Daemon::Worker::$role";
$self->worker_begin if $self->can('worker_begin');
};
}
__END__
sub interruptible_sleep {
my ($period) = @_;
my $hires = 0;
sub register_worker {
my (undef, $pid) = @_;
$workers{$pid} = $next_role;
}
sub unregister_worker {
my (undef, $pid, $status) = @_;
delete $workers{$pid};
# also check for bad exit status?
# revert any running jobs (will be such if child died)
try {
schema('daemon')->resultset('Admin')
->search({status => "running-$pid"})
->update({status => 'queued', started => undef});
if ($period * 1000 != int($period * 1000)) {
$hires = 1;
require Time::HiRes;
import Time::HiRes qw(time sleep);
}
catch { warn "error reverting jobs for pid $pid: $_\n" };
}
my $t = time;
while (time - $t < $period) {
if ($hires) {
my $p = (time - $t < 1)
? time - $t
: 1;
sleep($p);
} else {
sleep(1);
}
}
}

View File

@@ -16,8 +16,8 @@ __PACKAGE__->add_columns(
"role", # Poller, Interactive, etc
{ data_type => "text", is_nullable => 0 },
"wid", # worker ID, only assigned once taken
{ data_type => "integer", is_nullable => 1 },
"wid", # worker ID, only != 0 once taken
{ data_type => "integer", is_nullable => 0, default_value => 0 },
"started",
{ data_type => "timestamp", is_nullable => 1 },

View File

@@ -9,6 +9,36 @@ our @EXPORT = ();
our @EXPORT_OK = qw/ add_jobs take_jobs reset_jobs /;
our %EXPORT_TAGS = ( all => \@EXPORT_OK );
my $queue = schema('daemon')->resultset('Admin');
sub add_jobs {
my ($jobs) = @_;
$queue->populate($jobs);
}
sub take_jobs {
my ($wid, $role, $max) = @_;
$max ||= 1;
# asking for more jobs means the current ones are done
$queue->search({wid => $wid})->delete;
my $rs = $queue->search(
{role => $role, wid => 0},
{rows => $max},
);
$rs->update({wid => $wid});
return [ map {$_->get_columns} $rs->all ];
}
sub reset_jobs {
my ($wid) = @_;
return unless $wid > 1;
$queue->search({wid => $wid})
->update({wid => 0});
}
{
my $daemon = schema('daemon');
@@ -35,48 +65,4 @@ our %EXPORT_TAGS = ( all => \@EXPORT_OK );
$daemon->resultset('Admin')->delete;
}
sub add_jobs {
my ($jobs) = @_;
try { schema('daemon')->resultset('Admin')->populate($jobs) }
catch { warn "error adding jobs: $_\n" };
}
sub take_jobs {
my ($wid, $role, $max) = @_;
my $jobs = [];
my $rs = schema('daemon')->resultset('Admin')
->search({role => $role, status => 'queued'});
while (my $job = $rs->next) {
last if scalar $jobs eq $max;
try {
schema('daemon')->txn_do(sub {
my $row = schema('daemon')->resultset('Admin')->find(
{job => $job->job},
{for => 'update'}
);
if ($row->status eq 'queued') {
$row->update({status => 'taken', wid => $wid});
push @$jobs, $row->get_columns;
}
});
};
}
return $jobs;
}
sub reset_jobs {
my ($wid) = @_;
try {
schema('daemon')->resultset('Admin')
->search({wid => $wid})
->update({wid => undef, status => 'queued', started => undef});
}
catch { warn "error resetting jobs for wid [$wid]: $_\n" };
}
1;

View File

@@ -4,45 +4,34 @@ use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';
use App::Netdisco::Util::DeviceProperties 'is_discoverable';
use Net::Domain 'hostfqdn';
use Try::Tiny;
use Role::Tiny;
use namespace::clean;
my $fqdn = hostfqdn || 'localhost';
# forward and reverse mappings for worker role to Netdisco job type (action)
# this needs updating when we invent new job types
my $action_map = {
Interactive => [qw/location contact portcontrol portname vlan power/]
};
my $role_map = {
map {$_ => 'Interactive'} @{ $action_map->{Interactive} }
};
sub worker_begin {
my $self = shift;
my $daemon = schema('daemon');
# deploy local db if not already done
try {
$daemon->storage->dbh_do(sub {
my ($storage, $dbh) = @_;
$dbh->selectrow_arrayref("SELECT * FROM admin WHERE 0 = 1");
});
}
catch {
$daemon->txn_do(sub {
$daemon->storage->disconnect;
$daemon->deploy;
});
};
# requeue jobs locally
my $rs = schema('netdisco')->resultset('Admin')
->search({status => "queued-$fqdn"});
$daemon->storage->disconnect;
if ($daemon->get_db_version < $daemon->schema_version) {
$daemon->txn_do(sub { $daemon->upgrade });
}
my @jobs = map {$_->get_columns} $rs->all;
map { $_->{role} = $role_map->{$_->{action}} } @jobs;
# on start, any jobs previously grabbed by a daemon on this host
# will be reset to "queued", which is the simplest way to restart them.
my $rs = schema('netdisco')->resultset('Admin')->search({
status => "running-$self->{nd_host}"
});
if ($rs->count > 0) {
$daemon->resultset('Admin')->delete;
$rs->update({status => 'queued', started => undef});
}
$self->do('add_jobs', \@jobs);
}
sub worker_body {
@@ -50,79 +39,69 @@ sub worker_body {
# get all pending jobs
my $rs = schema('netdisco')->resultset('Admin')
->search({status => 'queued'});
->search({wid => 0});
while (1) {
while (my $job = $rs->next) {
# filter for discover_*
next unless is_discoverable($job->device);
# check for available local capacity
next unless $self->capacity_for($job);
# mark job as running
next unless $self->lock_job($job);
my $local_job = $job->get_columns;
$local_job->{role} = $role_map->{$local_job->{action}};
# copy job to local queue
$self->copy_job($job)
or $self->revert_job($job->job);
$self->do('add_jobs', [$local_job]);
}
# reset iterator so ->next() triggers another DB query
$rs->reset;
$self->gd_sleep( setting('daemon_sleep_time') || 5 );
# TODO also check for stale jobs in Netdisco DB
interruptible_sleep( setting('daemon_sleep_time') || 5 );
}
}
sub capacity_for {
my ($self, $job) = @_;
my $setting_map = {
Poller => 'daemon_pollers',
Interactive => 'daemon_interactives',
};
my $role = $role_map->{$job->action};
my $setting = $setting_map->{$role};
my $current = schema('netdisco')->resultset('Admin')
->search({role => $role, status => "queued-$fqdn"})->count;
return ($current < setting($setting));
}
sub lock_job {
my ($self, $job) = @_;
my $happy = 1;
my $happy = 0;
# lock db table, check job state is still queued, update to running
# lock db row and update to show job has been picked
try {
my $status_updated = schema('netdisco')->txn_do(sub {
schema('netdisco')->txn_do(sub {
my $row = schema('netdisco')->resultset('Admin')->find(
{job => $job->job},
{for => 'update'}
{job => $job->job, status => 'queued'}, {for => 'update'}
);
$happy = 0 if $row->status ne 'queued';
$row->update({
status => "running-$self->{nd_host}",
started => \'now()'
});
$row->update({status => "queued-$fqdn"});
});
$happy = 0 if not $status_updated;
}
catch {
warn "error locking job: $_\n";
$happy = 0;
$happy = 1;
};
return $happy;
}
sub copy_job {
my ($self, $job) = @_;
try {
my %data = $job->get_columns;
delete $data{$_} for qw/entered username userip/;
schema('daemon')->resultset('Admin')->update_or_create({
%data, status => 'queued', started => undef,
});
}
catch { warn "error copying job: $_\n" };
}
sub revert_job {
my ($self, $id) = @_;
try {
schema('netdisco')->resultset('Admin')
->find($id)
->update({status => 'queued', started => undef});
}
catch { warn "error reverting job: $_\n" };
}
1;