diff --git a/Netdisco/bin/netdisco-daemon b/Netdisco/bin/netdisco-daemon index c6986057..e8112538 100755 --- a/Netdisco/bin/netdisco-daemon +++ b/Netdisco/bin/netdisco-daemon @@ -29,14 +29,24 @@ sub gd_preconfig { # used for locking jobs in central Pg queue $gd->{nd_host} = hostfqdn; - # deploy the daemon's local DB schema - $gd->deploy_daemon_db; + set(daemon_pollers => 2) + if !defined setting('daemon_pollers'); + set(daemon_interactives => 2) + if !defined setting('daemon_interactives'); - # sync with netdisco Pg DB - $gd->netdisco_db_sync; - - # init Parallel::Prefork - $gd->bootstrap_prefork; + # need to do this after setting defaults + $pp = Parallel::Prefork->new( + max_workers => (1 + setting('daemon_pollers') + + setting('daemon_interactives')), + spawn_interval => 2, + before_fork => \&set_next_worker_role, + after_fork => \®ister_worker, + on_child_reap => \&unregister_worker, + trap_signals => { + TERM => 'TERM', + INT => 'TERM', + }, + ); # do not remove this line - required by Daemon::Generic return (); @@ -50,9 +60,11 @@ sub gd_run_body { if $pp->signal_received =~ m/^(?:TERM|INT)$/; $pp->start(sub { + print STDERR ">>> new $next_role worker starting...\n"; with "Netdisco::Daemon::Worker::$next_role"; - print STDERR ">>> new $next_role worker started.\n"; + $gd->worker_begin if $gd->can('worker_begin'); $gd->worker_body; + $gd->worker_end if $gd->can('worker_end'); }); # I don't think Parallel::Prefork ever returns from start() @@ -116,63 +128,6 @@ END { $pp->wait_all_children; } -sub deploy_daemon_db { - my $gd = shift; - my $daemon = schema('daemon'); - - try { - $daemon->storage->dbh_do(sub { - my ($storage, $dbh) = @_; - $dbh->selectrow_arrayref("SELECT * FROM admin WHERE 0 = 1"); - }); - } - catch { $daemon->deploy }; - - $daemon->storage->disconnect; - if ($daemon->get_db_version < $daemon->schema_version) { - $daemon->upgrade; - } -} - -sub netdisco_db_sync { - my $gd = shift; - my $netdisco = schema('netdisco'); - my $daemon = schema('daemon'); - - # on start, any jobs previously grabbed by a daemon on this host - # will be reset to "queued", which is the simplest way to restart them. - - my $rs = $netdisco->resultset('Admin')->search({ - status => "running-$gd->{nd_host}" - }); - - if ($rs->count > 0) { - $daemon->resultset('Admin')->delete; - $rs->update({status => 'queued', started => undef}); - } -} - -sub bootstrap_prefork { - my $gd = shift; - - # set defaults - set(daemon_pollers => 2) if !defined setting('daemon_pollers'); - set(daemon_interactives => 2) if !defined setting('daemon_interactives'); - - # need to do this after setting defaults - $pp = Parallel::Prefork->new( - max_workers => (1 + setting('daemon_pollers') + setting('daemon_interactives')), - spawn_interval => 2, - before_fork => \&set_next_worker_role, - after_fork => \®ister_worker, - on_child_reap => \&unregister_worker, - trap_signals => { - TERM => 'TERM', - INT => 'TERM', - }, - ); -} - # nullify this to permit Parallel::Prefork to register handlers instead sub gd_setup_signals {} diff --git a/Netdisco/lib/Netdisco/Daemon/Worker/Manager.pm b/Netdisco/lib/Netdisco/Daemon/Worker/Manager.pm index d0522b28..12bcbf07 100644 --- a/Netdisco/lib/Netdisco/Daemon/Worker/Manager.pm +++ b/Netdisco/lib/Netdisco/Daemon/Worker/Manager.pm @@ -9,6 +9,37 @@ use Try::Tiny; use Role::Tiny; use namespace::clean; +sub worker_begin { + my $self = shift; + my $daemon = schema('daemon'); + + # deploy local db if not already done + try { + $daemon->storage->dbh_do(sub { + my ($storage, $dbh) = @_; + $dbh->selectrow_arrayref("SELECT * FROM admin WHERE 0 = 1"); + }); + } + catch { $daemon->deploy }; + + $daemon->storage->disconnect; + if ($daemon->get_db_version < $daemon->schema_version) { + $daemon->upgrade; + } + + # on start, any jobs previously grabbed by a daemon on this host + # will be reset to "queued", which is the simplest way to restart them. + + my $rs = schema('netdisco')->resultset('Admin')->search({ + status => "running-$self->{nd_host}" + }); + + if ($rs->count > 0) { + $daemon->resultset('Admin')->delete; + $rs->update({status => 'queued', started => undef}); + } +} + sub worker_body { my $self = shift;