#!/usr/bin/env perl use Dancer qw/:moose :script/; use Dancer::Plugin::DBIC 'schema'; use Daemon::Generic::While1; use Parallel::Prefork; use Net::Domain 'hostfqdn'; use Role::Tiny; use Try::Tiny; # Parallel::Prefork instance my $pp = undef; # track worker pids and their roles my %workers = (); my $next_role = undef; # must come after globals initialization newdaemon( progname => 'netdisco-daemon', ($> != 0 ? (pidbase => './') : ()), logpriority => 'daemon.info', ); sub gd_preconfig { my $gd = shift; # used for locking jobs in central Pg queue $gd->{nd_host} = hostfqdn; # deploy the daemon's local DB schema $gd->deploy_daemon_db; # sync with netdisco Pg DB $gd->netdisco_db_sync; # init Parallel::Prefork $gd->bootstrap_prefork; # do not remove this line - required by Daemon::Generic return (); } # main loop sub gd_run_body { my $gd = shift; $gd->handle_term if $pp->signal_received =~ m/^(?:TERM|INT)$/; $pp->start(sub { with "Netdisco::Daemon::Worker::$next_role"; print STDERR ">>> new $next_role worker started.\n"; $gd->worker_body; }); # I don't think Parallel::Prefork ever returns from start() # until a child exits. Not sure this is ever reached. $gd->gd_sleep( setting('daemon_sleep_time') || 5 ) if not $pp->signal_received; } sub register_worker { my (undef, $pid) = @_; $workers{$pid} = $next_role; } sub unregister_worker { my (undef, $pid, $status) = @_; delete $workers{$pid}; # also check for bad exit status? # revert any running jobs (will be such if child died) try { schema('daemon')->resultset('Admin') ->search({status => "running-$pid"}) ->update({status => 'queued', started => undef}); } catch { warn "error reverting jobs for pid $pid: $_\n" }; } sub set_next_worker_role { my $pp = shift; $next_role = _find_next_worker_role(); } sub _find_next_worker_role { my @cur = values %workers; my $manager = scalar grep {$_ eq 'Manager'} @cur; my $poller = scalar grep {$_ eq 'Poller'} @cur; my $inter = scalar grep {$_ eq 'Interactive'} @cur; return 'Manager' if $manager < 1; my $need_poller = $poller < setting('daemon_pollers'); my $need_inter = $inter < setting('daemon_interactives'); if ($need_poller and $need_inter) { return (int(rand(2)) ? 'Interactive' : 'Poller'); } return 'Interactive' if $need_inter; return 'Poller' if $need_poller; } sub handle_term { my $gd = shift; $pp->wait_all_children; $gd->gd_quit_event } # in case we screw up and die ourselves END { $pp->signal_all_children('TERM'); $pp->wait_all_children; } sub deploy_daemon_db { my $gd = shift; my $daemon = schema('daemon'); try { $daemon->storage->dbh_do(sub { my ($storage, $dbh) = @_; $dbh->selectrow_arrayref("SELECT * FROM admin WHERE 0 = 1"); }); } catch { $daemon->deploy }; $daemon->storage->disconnect; if ($daemon->get_db_version < $daemon->schema_version) { $daemon->upgrade; } } sub netdisco_db_sync { my $gd = shift; my $netdisco = schema('netdisco'); my $daemon = schema('daemon'); # on start, any jobs previously grabbed by a daemon on this host # will be reset to "queued", which is the simplest way to restart them. my $rs = $netdisco->resultset('Admin')->search({ status => "running-$gd->{nd_host}" }); if ($rs->count > 0) { $daemon->resultset('Admin')->delete; $rs->update({status => 'queued', started => undef}); } } sub bootstrap_prefork { my $gd = shift; # set defaults set(daemon_pollers => 2) if !defined setting('daemon_pollers'); set(daemon_interactives => 2) if !defined setting('daemon_interactives'); # need to do this after setting defaults $pp = Parallel::Prefork->new( max_workers => (1 + setting('daemon_pollers') + setting('daemon_interactives')), spawn_interval => 2, before_fork => \&set_next_worker_role, after_fork => \®ister_worker, on_child_reap => \&unregister_worker, trap_signals => { TERM => 'TERM', INT => 'TERM', }, ); } # nullify this to permit Parallel::Prefork to register handlers instead sub gd_setup_signals {}