From 61a3542834e03ea3731843f956252aaa953f6a80 Mon Sep 17 00:00:00 2001 From: Oliver Gorwits Date: Tue, 1 Jan 2013 23:45:56 +0000 Subject: [PATCH] first pass at Manager worker --- Netdisco/bin/netdisco-daemon | 42 +++--- .../App/Netdisco/Daemon/DB/Result/Admin.pm | 4 +- Netdisco/lib/App/Netdisco/Daemon/Queue.pm | 74 +++++----- .../lib/App/Netdisco/Daemon/Worker/Manager.pm | 127 ++++++++---------- 4 files changed, 108 insertions(+), 139 deletions(-) diff --git a/Netdisco/bin/netdisco-daemon b/Netdisco/bin/netdisco-daemon index caed472e..2c506473 100755 --- a/Netdisco/bin/netdisco-daemon +++ b/Netdisco/bin/netdisco-daemon @@ -10,6 +10,9 @@ use Dancer qw/:moose :script/; # callbacks and local job queue management use App::Netdisco::Daemon::Queue ':all'; +# needed to quench AF_INET6 symbol errors +use NetAddr::IP::Lite ':lower'; + use MCE; use Role::Tiny; use Path::Class 'dir'; @@ -64,29 +67,30 @@ sub worker_factory { my $role = shift; return sub { my $self = shift; - # with "App::Netdisco::Daemon::Worker::$role"; + with "App::Netdisco::Daemon::Worker::$role"; $self->worker_begin if $self->can('worker_begin'); }; } -__END__ +sub interruptible_sleep { + my ($period) = @_; + my $hires = 0; -sub register_worker { - my (undef, $pid) = @_; - $workers{$pid} = $next_role; -} - -sub unregister_worker { - my (undef, $pid, $status) = @_; - delete $workers{$pid}; - # also check for bad exit status? - - # revert any running jobs (will be such if child died) - try { - schema('daemon')->resultset('Admin') - ->search({status => "running-$pid"}) - ->update({status => 'queued', started => undef}); + if ($period * 1000 != int($period * 1000)) { + $hires = 1; + require Time::HiRes; + import Time::HiRes qw(time sleep); } - catch { warn "error reverting jobs for pid $pid: $_\n" }; -} + my $t = time; + while (time - $t < $period) { + if ($hires) { + my $p = (time - $t < 1) + ? time - $t + : 1; + sleep($p); + } else { + sleep(1); + } + } +} diff --git a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm b/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm index 3d4362a3..a2a8fb65 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm @@ -16,8 +16,8 @@ __PACKAGE__->add_columns( "role", # Poller, Interactive, etc { data_type => "text", is_nullable => 0 }, - "wid", # worker ID, only assigned once taken - { data_type => "integer", is_nullable => 1 }, + "wid", # worker ID, only != 0 once taken + { data_type => "integer", is_nullable => 0, default_value => 0 }, "started", { data_type => "timestamp", is_nullable => 1 }, diff --git a/Netdisco/lib/App/Netdisco/Daemon/Queue.pm b/Netdisco/lib/App/Netdisco/Daemon/Queue.pm index 9ff680a5..e9999f4f 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Queue.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Queue.pm @@ -9,6 +9,36 @@ our @EXPORT = (); our @EXPORT_OK = qw/ add_jobs take_jobs reset_jobs /; our %EXPORT_TAGS = ( all => \@EXPORT_OK ); +my $queue = schema('daemon')->resultset('Admin'); + +sub add_jobs { + my ($jobs) = @_; + $queue->populate($jobs); +} + +sub take_jobs { + my ($wid, $role, $max) = @_; + $max ||= 1; + + # asking for more jobs means the current ones are done + $queue->search({wid => $wid})->delete; + + my $rs = $queue->search( + {role => $role, wid => 0}, + {rows => $max}, + ); + + $rs->update({wid => $wid}); + return [ map {$_->get_columns} $rs->all ]; +} + +sub reset_jobs { + my ($wid) = @_; + return unless $wid > 1; + $queue->search({wid => $wid}) + ->update({wid => 0}); +} + { my $daemon = schema('daemon'); @@ -35,48 +65,4 @@ our %EXPORT_TAGS = ( all => \@EXPORT_OK ); $daemon->resultset('Admin')->delete; } -sub add_jobs { - my ($jobs) = @_; - try { schema('daemon')->resultset('Admin')->populate($jobs) } - catch { warn "error adding jobs: $_\n" }; -} - -sub take_jobs { - my ($wid, $role, $max) = @_; - my $jobs = []; - - my $rs = schema('daemon')->resultset('Admin') - ->search({role => $role, status => 'queued'}); - - while (my $job = $rs->next) { - last if scalar $jobs eq $max; - - try { - schema('daemon')->txn_do(sub { - my $row = schema('daemon')->resultset('Admin')->find( - {job => $job->job}, - {for => 'update'} - ); - - if ($row->status eq 'queued') { - $row->update({status => 'taken', wid => $wid}); - push @$jobs, $row->get_columns; - } - }); - }; - } - - return $jobs; -} - -sub reset_jobs { - my ($wid) = @_; - try { - schema('daemon')->resultset('Admin') - ->search({wid => $wid}) - ->update({wid => undef, status => 'queued', started => undef}); - } - catch { warn "error resetting jobs for wid [$wid]: $_\n" }; -} - 1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm index a06513e6..e7791a0c 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm @@ -4,45 +4,34 @@ use Dancer qw/:moose :syntax :script/; use Dancer::Plugin::DBIC 'schema'; use App::Netdisco::Util::DeviceProperties 'is_discoverable'; +use Net::Domain 'hostfqdn'; use Try::Tiny; use Role::Tiny; use namespace::clean; +my $fqdn = hostfqdn || 'localhost'; + +# forward and reverse mappings for worker role to Netdisco job type (action) +# this needs updating when we invent new job types +my $action_map = { + Interactive => [qw/location contact portcontrol portname vlan power/] +}; +my $role_map = { + map {$_ => 'Interactive'} @{ $action_map->{Interactive} } +}; + sub worker_begin { my $self = shift; - my $daemon = schema('daemon'); - # deploy local db if not already done - try { - $daemon->storage->dbh_do(sub { - my ($storage, $dbh) = @_; - $dbh->selectrow_arrayref("SELECT * FROM admin WHERE 0 = 1"); - }); - } - catch { - $daemon->txn_do(sub { - $daemon->storage->disconnect; - $daemon->deploy; - }); - }; + # requeue jobs locally + my $rs = schema('netdisco')->resultset('Admin') + ->search({status => "queued-$fqdn"}); - $daemon->storage->disconnect; - if ($daemon->get_db_version < $daemon->schema_version) { - $daemon->txn_do(sub { $daemon->upgrade }); - } + my @jobs = map {$_->get_columns} $rs->all; + map { $_->{role} = $role_map->{$_->{action}} } @jobs; - # on start, any jobs previously grabbed by a daemon on this host - # will be reset to "queued", which is the simplest way to restart them. - - my $rs = schema('netdisco')->resultset('Admin')->search({ - status => "running-$self->{nd_host}" - }); - - if ($rs->count > 0) { - $daemon->resultset('Admin')->delete; - $rs->update({status => 'queued', started => undef}); - } + $self->do('add_jobs', \@jobs); } sub worker_body { @@ -50,79 +39,69 @@ sub worker_body { # get all pending jobs my $rs = schema('netdisco')->resultset('Admin') - ->search({status => 'queued'}); + ->search({wid => 0}); while (1) { while (my $job = $rs->next) { # filter for discover_* next unless is_discoverable($job->device); + # check for available local capacity + next unless $self->capacity_for($job); + # mark job as running next unless $self->lock_job($job); + my $local_job = $job->get_columns; + $local_job->{role} = $role_map->{$local_job->{action}}; + # copy job to local queue - $self->copy_job($job) - or $self->revert_job($job->job); + $self->do('add_jobs', [$local_job]); } # reset iterator so ->next() triggers another DB query $rs->reset; - $self->gd_sleep( setting('daemon_sleep_time') || 5 ); + + # TODO also check for stale jobs in Netdisco DB + + interruptible_sleep( setting('daemon_sleep_time') || 5 ); } } +sub capacity_for { + my ($self, $job) = @_; + + my $setting_map = { + Poller => 'daemon_pollers', + Interactive => 'daemon_interactives', + }; + + my $role = $role_map->{$job->action}; + my $setting = $setting_map->{$role}; + + my $current = schema('netdisco')->resultset('Admin') + ->search({role => $role, status => "queued-$fqdn"})->count; + + return ($current < setting($setting)); +} + sub lock_job { my ($self, $job) = @_; - my $happy = 1; + my $happy = 0; - # lock db table, check job state is still queued, update to running + # lock db row and update to show job has been picked try { - my $status_updated = schema('netdisco')->txn_do(sub { + schema('netdisco')->txn_do(sub { my $row = schema('netdisco')->resultset('Admin')->find( - {job => $job->job}, - {for => 'update'} + {job => $job->job, status => 'queued'}, {for => 'update'} ); - $happy = 0 if $row->status ne 'queued'; - $row->update({ - status => "running-$self->{nd_host}", - started => \'now()' - }); + $row->update({status => "queued-$fqdn"}); }); - - $happy = 0 if not $status_updated; - } - catch { - warn "error locking job: $_\n"; - $happy = 0; + $happy = 1; }; return $happy; } -sub copy_job { - my ($self, $job) = @_; - - try { - my %data = $job->get_columns; - delete $data{$_} for qw/entered username userip/; - - schema('daemon')->resultset('Admin')->update_or_create({ - %data, status => 'queued', started => undef, - }); - } - catch { warn "error copying job: $_\n" }; -} - -sub revert_job { - my ($self, $id) = @_; - - try { - schema('netdisco')->resultset('Admin') - ->find($id) - ->update({status => 'queued', started => undef}); - } - catch { warn "error reverting job: $_\n" }; -} - 1;