implement begin/end for workers

This commit is contained in:
Oliver Gorwits
2012-12-16 16:08:27 +00:00
parent a852b9c093
commit 07aed68ae4
2 changed files with 51 additions and 65 deletions

View File

@@ -29,14 +29,24 @@ sub gd_preconfig {
# used for locking jobs in central Pg queue
$gd->{nd_host} = hostfqdn;
# deploy the daemon's local DB schema
$gd->deploy_daemon_db;
set(daemon_pollers => 2)
if !defined setting('daemon_pollers');
set(daemon_interactives => 2)
if !defined setting('daemon_interactives');
# sync with netdisco Pg DB
$gd->netdisco_db_sync;
# init Parallel::Prefork
$gd->bootstrap_prefork;
# need to do this after setting defaults
$pp = Parallel::Prefork->new(
max_workers => (1 + setting('daemon_pollers')
+ setting('daemon_interactives')),
spawn_interval => 2,
before_fork => \&set_next_worker_role,
after_fork => \&register_worker,
on_child_reap => \&unregister_worker,
trap_signals => {
TERM => 'TERM',
INT => 'TERM',
},
);
# do not remove this line - required by Daemon::Generic
return ();
@@ -50,9 +60,11 @@ sub gd_run_body {
if $pp->signal_received =~ m/^(?:TERM|INT)$/;
$pp->start(sub {
print STDERR ">>> new $next_role worker starting...\n";
with "Netdisco::Daemon::Worker::$next_role";
print STDERR ">>> new $next_role worker started.\n";
$gd->worker_begin if $gd->can('worker_begin');
$gd->worker_body;
$gd->worker_end if $gd->can('worker_end');
});
# I don't think Parallel::Prefork ever returns from start()
@@ -116,63 +128,6 @@ END {
$pp->wait_all_children;
}
sub deploy_daemon_db {
my $gd = shift;
my $daemon = schema('daemon');
try {
$daemon->storage->dbh_do(sub {
my ($storage, $dbh) = @_;
$dbh->selectrow_arrayref("SELECT * FROM admin WHERE 0 = 1");
});
}
catch { $daemon->deploy };
$daemon->storage->disconnect;
if ($daemon->get_db_version < $daemon->schema_version) {
$daemon->upgrade;
}
}
sub netdisco_db_sync {
my $gd = shift;
my $netdisco = schema('netdisco');
my $daemon = schema('daemon');
# on start, any jobs previously grabbed by a daemon on this host
# will be reset to "queued", which is the simplest way to restart them.
my $rs = $netdisco->resultset('Admin')->search({
status => "running-$gd->{nd_host}"
});
if ($rs->count > 0) {
$daemon->resultset('Admin')->delete;
$rs->update({status => 'queued', started => undef});
}
}
sub bootstrap_prefork {
my $gd = shift;
# set defaults
set(daemon_pollers => 2) if !defined setting('daemon_pollers');
set(daemon_interactives => 2) if !defined setting('daemon_interactives');
# need to do this after setting defaults
$pp = Parallel::Prefork->new(
max_workers => (1 + setting('daemon_pollers') + setting('daemon_interactives')),
spawn_interval => 2,
before_fork => \&set_next_worker_role,
after_fork => \&register_worker,
on_child_reap => \&unregister_worker,
trap_signals => {
TERM => 'TERM',
INT => 'TERM',
},
);
}
# nullify this to permit Parallel::Prefork to register handlers instead
sub gd_setup_signals {}

View File

@@ -9,6 +9,37 @@ use Try::Tiny;
use Role::Tiny;
use namespace::clean;
sub worker_begin {
my $self = shift;
my $daemon = schema('daemon');
# deploy local db if not already done
try {
$daemon->storage->dbh_do(sub {
my ($storage, $dbh) = @_;
$dbh->selectrow_arrayref("SELECT * FROM admin WHERE 0 = 1");
});
}
catch { $daemon->deploy };
$daemon->storage->disconnect;
if ($daemon->get_db_version < $daemon->schema_version) {
$daemon->upgrade;
}
# on start, any jobs previously grabbed by a daemon on this host
# will be reset to "queued", which is the simplest way to restart them.
my $rs = schema('netdisco')->resultset('Admin')->search({
status => "running-$self->{nd_host}"
});
if ($rs->count > 0) {
$daemon->resultset('Admin')->delete;
$rs->update({status => 'queued', started => undef});
}
}
sub worker_body {
my $self = shift;