From 6d802f9b85b70be4f05f54ccf2e108827a5c2f2a Mon Sep 17 00:00:00 2001 From: Oliver Gorwits Date: Sat, 15 Dec 2012 00:52:42 +0000 Subject: [PATCH] add manager and full worker support --- Netdisco/bin/netdisco-daemon | 89 +++++++++++------- Netdisco/lib/Netdisco/Daemon/DB.pm | 23 +++++ .../lib/Netdisco/Daemon/DB/Result/Admin.pm | 37 ++++++++ .../lib/Netdisco/Daemon/Worker/Interactive.pm | 38 ++++---- .../lib/Netdisco/Daemon/Worker/Manager.pm | 92 +++++++++++++++++++ 5 files changed, 230 insertions(+), 49 deletions(-) create mode 100644 Netdisco/lib/Netdisco/Daemon/DB.pm create mode 100644 Netdisco/lib/Netdisco/Daemon/DB/Result/Admin.pm mode change 100755 => 100644 Netdisco/lib/Netdisco/Daemon/Worker/Interactive.pm create mode 100644 Netdisco/lib/Netdisco/Daemon/Worker/Manager.pm diff --git a/Netdisco/bin/netdisco-daemon b/Netdisco/bin/netdisco-daemon index b15399fd..b8e37f39 100755 --- a/Netdisco/bin/netdisco-daemon +++ b/Netdisco/bin/netdisco-daemon @@ -1,16 +1,19 @@ #!/usr/bin/env perl use Dancer qw/:moose :script/; +use Dancer::Plugin::DBIC 'schema'; + use Daemon::Generic::While1; use Parallel::Prefork; +use Sys::Hostname; use Role::Tiny; - -# with 'Netdisco::Daemon::Manager'; +use Try::Tiny; my $pp = Parallel::Prefork->new( max_workers => (setting('daemon_workers') || 2), - spawn_interval => 2, - after_fork => \®ister_worker, + spawn_interval => 1, + before_fork => \&next_worker_role, + after_fork => \®ister_worker, on_child_reap => \&unregister_worker, trap_signals => { TERM => 'TERM', @@ -30,7 +33,21 @@ newdaemon( logpriority => 'daemon.info', ); -# main manager loop +# deploy the daemon's local DB schema +sub gd_preconfig { + my $self = shift; + + my $rs = schema('daemon')->resultset('Admin'); + try { $rs->first } + catch { schema('daemon')->deploy }; + + $self->{nd_host} = hostname; + + # do not remove this line - required by Daemon::Generic + return (); +} + +# main loop sub gd_run_body { my $self = shift; @@ -41,16 +58,15 @@ sub gd_run_body { if $pp->signal_received eq 'HUP'; if ($pp->num_workers < $pp->max_workers) { - $next_role = $self->next_worker_role or return; $pp->start and return; with "Netdisco::Daemon::Worker::$next_role"; + print STDERR ">>> I am a $next_role Worker\n"; $self->worker_body; $pp->finish; } - # check for new jobs, take one if available - # $self->manager_body; - + # I don't think Parallel::Prefork ever returns from start() + # until a child exits. Not sure this is ever reached. $self->gd_sleep( setting('daemon_sleep_time') || 5 ); } @@ -63,25 +79,44 @@ sub unregister_worker { my (undef, $pid, $status) = @_; delete $workers{$pid}; # also check for bad exit status? + + # revert any running jobs (will be such if child died) + try { + schema('daemon')->resultset('Admin') + ->search({status => "running-$pid"}) + ->update({status => 'queued', started => undef}); + } + catch { warn "error reverting jobs for pid $pid: $_\n" }; } sub next_worker_role { my $self = shift; my @cur = values %workers; - my $poller = scalar grep {$_ eq 'Poller'} @cur; - my $inter = scalar grep {$_ eq 'Interactive'} @cur; + my $manager = scalar grep {$_ eq 'Manager'} @cur; + my $poller = scalar grep {$_ eq 'Poller'} @cur; + my $inter = scalar grep {$_ eq 'Interactive'} @cur; - my $need_poller = $poller < (setting('daemon_pollers') || 0); - my $need_inter = $inter < (setting('daemon_interactive') || 2); - - if ($need_poller and $need_inter) { - return (int(rand(2)) ? 'Interactive' : 'Poller'); + if ($manager < 1) { + $next_role = 'Manager'; + return; } - return 'Interactive' if $need_inter; - return 'Poller' if $need_poller; - return undef; + my $need_poller = $poller < (setting('daemon_pollers') || 0); + my $need_inter = $inter < (setting('daemon_interactive') || 2); + + if ($need_poller and $need_inter) { + $next_role = (int(rand(2)) ? 'Interactive' : 'Poller'); + return; + } + + $next_role = 'Interactive' if $need_inter; + $next_role = 'Poller' if $need_poller; +} + +sub handle_hup { + my $self = shift; + print "HUP is not supported. Please instead.\n"; } sub handle_term { @@ -90,24 +125,12 @@ sub handle_term { $self->gd_quit_event } -sub handle_hup { - my $self = shift; - # clear signal - $pp->signal_received(''); - - # reload dancer config - %Dancer::Config::_LOADED = (); - Dancer::Config::load(); - - # kill workers (they will be restarted) +# in case we screw up and die ourselves +END { $pp->signal_all_children('TERM'); $pp->wait_all_children; - $pp->{_no_adjust_until} = 0; # BUG in Prefork.pm } -# do not remove - must be redefined for Daemon::Generic -sub gd_preconfig { return () } - # nullify this to permit Parallel::Prefork to register handlers instead sub gd_setup_signals {} diff --git a/Netdisco/lib/Netdisco/Daemon/DB.pm b/Netdisco/lib/Netdisco/Daemon/DB.pm new file mode 100644 index 00000000..58c2d9a7 --- /dev/null +++ b/Netdisco/lib/Netdisco/Daemon/DB.pm @@ -0,0 +1,23 @@ +use utf8; +package Netdisco::Daemon::DB; + +use strict; +use warnings; + +use base 'DBIx::Class::Schema'; + +__PACKAGE__->load_namespaces; + +our $VERSION = 1; # schema version used for upgrades, keep as integer + +use Path::Class; +use File::Basename; + +my (undef, $libpath, undef) = fileparse( $INC{ 'Netdisco/Daemon/DB.pm' } ); +our $schema_versions_dir = Path::Class::Dir->new($libpath) + ->subdir("DB", "schema_versions")->stringify; + +__PACKAGE__->load_components(qw/Schema::Versioned/); +__PACKAGE__->upgrade_directory($schema_versions_dir); + +1; diff --git a/Netdisco/lib/Netdisco/Daemon/DB/Result/Admin.pm b/Netdisco/lib/Netdisco/Daemon/DB/Result/Admin.pm new file mode 100644 index 00000000..42cc8563 --- /dev/null +++ b/Netdisco/lib/Netdisco/Daemon/DB/Result/Admin.pm @@ -0,0 +1,37 @@ +use utf8; +package Netdisco::Daemon::DB::Result::Admin; + +use strict; +use warnings; + +use base 'DBIx::Class::Core'; +__PACKAGE__->table("admin"); +__PACKAGE__->add_columns( + "job", + { + data_type => "integer", + is_nullable => 0, + }, + "started", + { data_type => "timestamp", is_nullable => 1 }, + "finished", + { data_type => "timestamp", is_nullable => 1 }, + "device", + { data_type => "inet", is_nullable => 1 }, + "port", + { data_type => "text", is_nullable => 1 }, + "action", + { data_type => "text", is_nullable => 1 }, + "subaction", + { data_type => "text", is_nullable => 1 }, + "status", + { data_type => "text", is_nullable => 1 }, + "log", + { data_type => "text", is_nullable => 1 }, + "debug", + { data_type => "boolean", is_nullable => 1 }, +); + +__PACKAGE__->set_primary_key("job"); + +1; diff --git a/Netdisco/lib/Netdisco/Daemon/Worker/Interactive.pm b/Netdisco/lib/Netdisco/Daemon/Worker/Interactive.pm old mode 100755 new mode 100644 index dc25de74..bd3f787f --- a/Netdisco/lib/Netdisco/Daemon/Worker/Interactive.pm +++ b/Netdisco/lib/Netdisco/Daemon/Worker/Interactive.pm @@ -2,8 +2,6 @@ package Netdisco::Daemon::Worker::Interactive; use Dancer qw/:moose :syntax :script/; use Dancer::Plugin::DBIC 'schema'; - -use Netdisco::Util::DeviceProperties 'is_discoverable'; use Try::Tiny; use Role::Tiny; @@ -17,7 +15,7 @@ sub worker_body { my $self = shift; # get all pending jobs - my $rs = schema('netdisco')->resultset('Admin')->search({ + my $rs = schema('daemon')->resultset('Admin')->search({ action => [qw/location contact portcontrol portname vlan power/], status => 'queued', }); @@ -27,9 +25,6 @@ sub worker_body { my $target = 'set_'. $job->action; next unless $self->can($target); - # filter for discover_* - next unless is_discoverable($job->device); - # mark job as running next unless $self->lock_job($job); @@ -49,6 +44,8 @@ sub worker_body { $self->close_job($job->job, $status, $log); } } + + # reset iterator so ->next() triggers another DB query $rs->reset; $self->gd_sleep( setting('daemon_sleep_time') || 5 ); } @@ -56,35 +53,35 @@ sub worker_body { sub lock_job { my ($self, $job) = @_; + my $happy = 1; # lock db table, check job state is still queued, update to running try { - my $status_updated = schema('netdisco')->txn_do(sub { - my $row = schema('netdisco')->resultset('Admin')->find( + my $status_updated = schema('daemon')->txn_do(sub { + my $row = schema('daemon')->resultset('Admin')->find( {job => $job->job}, {for => 'update'} ); - return 0 if $row->status ne 'queued'; - $row->update({status => 'running', started => \'now()'}); - return 1; + $happy = 0 if $row->status ne 'queued'; + $row->update({status => "running-$$", started => \"datetime('now')" }); }); - return 0 if not $status_updated; + $happy = 0 if not $status_updated; } catch { warn "error locking job: $_\n"; - return 0; + $happy = 0; }; - return 1; + return $happy; } sub revert_job { my ($self, $id) = @_; try { - schema('netdisco')->resultset('Admin') + schema('daemon')->resultset('Admin') ->find($id) ->update({status => 'queued', started => undef}); } @@ -95,9 +92,18 @@ sub close_job { my ($self, $id, $status, $log) = @_; try { + my $local = schema('daemon')->resultset('Admin')->find($id); + schema('netdisco')->resultset('Admin') ->find($id) - ->update({status => $status, log => $log, finished => \'now()'}); + ->update({ + status => $status, + log => $log, + started => $local->started, + finished => \'now()', + }); + + $local->delete; } catch { warn "error closing job: $_\n" }; } diff --git a/Netdisco/lib/Netdisco/Daemon/Worker/Manager.pm b/Netdisco/lib/Netdisco/Daemon/Worker/Manager.pm new file mode 100644 index 00000000..d0522b28 --- /dev/null +++ b/Netdisco/lib/Netdisco/Daemon/Worker/Manager.pm @@ -0,0 +1,92 @@ +package Netdisco::Daemon::Worker::Manager; + +use Dancer qw/:moose :syntax :script/; +use Dancer::Plugin::DBIC 'schema'; + +use Netdisco::Util::DeviceProperties 'is_discoverable'; +use Try::Tiny; + +use Role::Tiny; +use namespace::clean; + +sub worker_body { + my $self = shift; + + # get all pending jobs + my $rs = schema('netdisco')->resultset('Admin') + ->search({status => 'queued'}); + + while (1) { + while (my $job = $rs->next) { + # filter for discover_* + next unless is_discoverable($job->device); + + # mark job as running + next unless $self->lock_job($job); + + # copy job to local queue + $self->copy_job($job) + or $self->revert_job($job->job); + } + + # reset iterator so ->next() triggers another DB query + $rs->reset; + $self->gd_sleep( setting('daemon_sleep_time') || 5 ); + } +} + +sub lock_job { + my ($self, $job) = @_; + my $happy = 1; + + # lock db table, check job state is still queued, update to running + try { + my $status_updated = schema('netdisco')->txn_do(sub { + my $row = schema('netdisco')->resultset('Admin')->find( + {job => $job->job}, + {for => 'update'} + ); + + $happy = 0 if $row->status ne 'queued'; + $row->update({ + status => "running-$self->{nd_host}", + started => \'now()' + }); + }); + + $happy = 0 if not $status_updated; + } + catch { + warn "error locking job: $_\n"; + $happy = 0; + }; + + return $happy; +} + +sub copy_job { + my ($self, $job) = @_; + + try { + my %data = $job->get_columns; + delete $data{$_} for qw/entered username userip/; + + schema('daemon')->resultset('Admin')->update_or_create({ + %data, status => 'queued', started => undef, + }); + } + catch { warn "error copying job: $_\n" }; +} + +sub revert_job { + my ($self, $id) = @_; + + try { + schema('netdisco')->resultset('Admin') + ->find($id) + ->update({status => 'queued', started => undef}); + } + catch { warn "error reverting job: $_\n" }; +} + +1;