diff --git a/Netdisco/bin/netdisco-daemon b/Netdisco/bin/netdisco-daemon index fde44ba4..caed472e 100755 --- a/Netdisco/bin/netdisco-daemon +++ b/Netdisco/bin/netdisco-daemon @@ -1,88 +1,76 @@ #!/usr/bin/env perl -# fix for older Perl which warns about a bug in File::Slurp -BEGIN { - no warnings 'portable'; - use File::Slurp; -} - use FindBin; use lib "$FindBin::Bin/../lib"; use App::Netdisco; +# for netdisco app config use Dancer qw/:moose :script/; -use Dancer::Plugin::DBIC 'schema'; -use Daemon::Generic::While1; -use Parallel::Prefork; -use Net::Domain 'hostfqdn'; +# callbacks and local job queue management +use App::Netdisco::Daemon::Queue ':all'; + +use MCE; use Role::Tiny; -use Try::Tiny; +use Path::Class 'dir'; -# Parallel::Prefork instance -my $pp = undef; +my $mce = MCE->new( + tmp_dir => dir($ENV{HOME}, 'tmp'), + spawn_delay => 0.15, + job_delay => 0.15, + user_func => \&call_worker_body, + on_post_exit => \&restart_worker, + user_tasks => build_tasks_list(), +)->run(); -# track worker pids and their roles -my %workers = (); -my $next_role = undef; +sub call_worker_body { + my ($self) = @_; + $self->worker_body if $self->can('worker_body'); +} -# must come after globals initialization -newdaemon( - progname => 'netdisco-daemon', - ($> != 0 ? (pidbase => './') : ()), - logpriority => 'daemon.info', -); +sub restart_worker { + my ($self, $e) = @_; + reset_jobs($e->{wid}); + $self->restart_worker($e->{wid}); +} -sub gd_preconfig { - my $gd = shift; - - # used for locking jobs in central Pg queue - $gd->{nd_host} = hostfqdn; +sub build_tasks_list { + my $tasks = [{ + max_workers => 1, + user_begin => worker_factory('Manager'), + }]; set(daemon_pollers => 2) if !defined setting('daemon_pollers'); set(daemon_interactives => 2) if !defined setting('daemon_interactives'); - # need to do this after setting defaults - $pp = Parallel::Prefork->new( - max_workers => (1 + setting('daemon_pollers') - + setting('daemon_interactives')), - spawn_interval => 2, - before_fork => \&set_next_worker_role, - after_fork => \®ister_worker, - on_child_reap => \&unregister_worker, - trap_signals => { - TERM => 'TERM', - INT => 'TERM', - }, - ); + # XXX MCE does not like max_workers => 0 - # do not remove this line - required by Daemon::Generic - return (); + push @$tasks, { + max_workers => setting('daemon_pollers'), + user_begin => worker_factory('Poller'), + } if setting('daemon_pollers'); + + push @$tasks, { + max_workers => setting('daemon_interactives'), + user_begin => worker_factory('Interactive'), + } if setting('daemon_interactives'); + + return $tasks; } -# main loop -sub gd_run_body { - my $gd = shift; - - $gd->handle_term - if $pp->signal_received =~ m/^(?:TERM|INT)$/; - - $pp->start(sub { - print STDERR ">>> new $next_role worker starting...\n"; - with "App::Netdisco::Daemon::Worker::$next_role"; - $gd->worker_begin if $gd->can('worker_begin'); - $gd->worker_body; - $gd->worker_end if $gd->can('worker_end'); - }); - - # I don't think Parallel::Prefork ever returns from start() - # until a child exits. Not sure this is ever reached. - $gd->gd_sleep( setting('daemon_sleep_time') || 5 ) - if not $pp->signal_received; +sub worker_factory { + my $role = shift; + return sub { + my $self = shift; + # with "App::Netdisco::Daemon::Worker::$role"; + $self->worker_begin if $self->can('worker_begin'); + }; } +__END__ + sub register_worker { my (undef, $pid) = @_; $workers{$pid} = $next_role; @@ -102,44 +90,3 @@ sub unregister_worker { catch { warn "error reverting jobs for pid $pid: $_\n" }; } -sub set_next_worker_role { - my $pp = shift; - $next_role = _find_next_worker_role(); -} - -sub _find_next_worker_role { - my @cur = values %workers; - my $manager = scalar grep {$_ eq 'Manager'} @cur; - my $poller = scalar grep {$_ eq 'Poller'} @cur; - my $inter = scalar grep {$_ eq 'Interactive'} @cur; - - return 'Manager' if $manager < 1; - - my $need_poller = $poller < setting('daemon_pollers'); - my $need_inter = $inter < setting('daemon_interactives'); - - if ($need_poller and $need_inter) { - return (int(rand(2)) ? 'Interactive' : 'Poller'); - } - - return 'Interactive' if $need_inter; - return 'Poller' if $need_poller; -} - -sub handle_term { - my $gd = shift; - $pp->wait_all_children; - $gd->gd_quit_event -} - -# in case we screw up and die ourselves -END { - if (defined $pp) { - $pp->signal_all_children('TERM'); - $pp->wait_all_children; - } -} - -# nullify this to permit Parallel::Prefork to register handlers instead -sub gd_setup_signals {} - diff --git a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm b/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm index 5aab62a6..3d4362a3 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm @@ -12,6 +12,13 @@ __PACKAGE__->add_columns( data_type => "integer", is_nullable => 0, }, + + "role", # Poller, Interactive, etc + { data_type => "text", is_nullable => 0 }, + + "wid", # worker ID, only assigned once taken + { data_type => "integer", is_nullable => 1 }, + "started", { data_type => "timestamp", is_nullable => 1 }, "finished", diff --git a/Netdisco/lib/App/Netdisco/Daemon/Queue.pm b/Netdisco/lib/App/Netdisco/Daemon/Queue.pm new file mode 100644 index 00000000..9ff680a5 --- /dev/null +++ b/Netdisco/lib/App/Netdisco/Daemon/Queue.pm @@ -0,0 +1,82 @@ +package App::Netdisco::Daemon::Queue; + +use Dancer qw/:moose :syntax :script/; +use Dancer::Plugin::DBIC 'schema'; +use Try::Tiny; + +use base 'Exporter'; +our @EXPORT = (); +our @EXPORT_OK = qw/ add_jobs take_jobs reset_jobs /; +our %EXPORT_TAGS = ( all => \@EXPORT_OK ); + +{ + my $daemon = schema('daemon'); + + # deploy local db if not already done + try { + $daemon->storage->dbh_do(sub { + my ($storage, $dbh) = @_; + $dbh->selectrow_arrayref("SELECT * FROM admin WHERE 0 = 1"); + }); + } + catch { + $daemon->txn_do(sub { + $daemon->storage->disconnect; + $daemon->deploy; + }); + }; + + $daemon->storage->disconnect; + if ($daemon->get_db_version < $daemon->schema_version) { + $daemon->txn_do(sub { $daemon->upgrade }); + } + + # empty local db of any stale queued jobs + $daemon->resultset('Admin')->delete; +} + +sub add_jobs { + my ($jobs) = @_; + try { schema('daemon')->resultset('Admin')->populate($jobs) } + catch { warn "error adding jobs: $_\n" }; +} + +sub take_jobs { + my ($wid, $role, $max) = @_; + my $jobs = []; + + my $rs = schema('daemon')->resultset('Admin') + ->search({role => $role, status => 'queued'}); + + while (my $job = $rs->next) { + last if scalar $jobs eq $max; + + try { + schema('daemon')->txn_do(sub { + my $row = schema('daemon')->resultset('Admin')->find( + {job => $job->job}, + {for => 'update'} + ); + + if ($row->status eq 'queued') { + $row->update({status => 'taken', wid => $wid}); + push @$jobs, $row->get_columns; + } + }); + }; + } + + return $jobs; +} + +sub reset_jobs { + my ($wid) = @_; + try { + schema('daemon')->resultset('Admin') + ->search({wid => $wid}) + ->update({wid => undef, status => 'queued', started => undef}); + } + catch { warn "error resetting jobs for wid [$wid]: $_\n" }; +} + +1;