add manager and full worker support

This commit is contained in:
Oliver Gorwits
2012-12-15 00:52:42 +00:00
parent f9f7a223b8
commit 6d802f9b85
5 changed files with 230 additions and 49 deletions

View File

@@ -1,16 +1,19 @@
#!/usr/bin/env perl #!/usr/bin/env perl
use Dancer qw/:moose :script/; use Dancer qw/:moose :script/;
use Dancer::Plugin::DBIC 'schema';
use Daemon::Generic::While1; use Daemon::Generic::While1;
use Parallel::Prefork; use Parallel::Prefork;
use Sys::Hostname;
use Role::Tiny; use Role::Tiny;
use Try::Tiny;
# with 'Netdisco::Daemon::Manager';
my $pp = Parallel::Prefork->new( my $pp = Parallel::Prefork->new(
max_workers => (setting('daemon_workers') || 2), max_workers => (setting('daemon_workers') || 2),
spawn_interval => 2, spawn_interval => 1,
after_fork => \&register_worker, before_fork => \&next_worker_role,
after_fork => \&register_worker,
on_child_reap => \&unregister_worker, on_child_reap => \&unregister_worker,
trap_signals => { trap_signals => {
TERM => 'TERM', TERM => 'TERM',
@@ -30,7 +33,21 @@ newdaemon(
logpriority => 'daemon.info', logpriority => 'daemon.info',
); );
# main manager loop # deploy the daemon's local DB schema
sub gd_preconfig {
my $self = shift;
my $rs = schema('daemon')->resultset('Admin');
try { $rs->first }
catch { schema('daemon')->deploy };
$self->{nd_host} = hostname;
# do not remove this line - required by Daemon::Generic
return ();
}
# main loop
sub gd_run_body { sub gd_run_body {
my $self = shift; my $self = shift;
@@ -41,16 +58,15 @@ sub gd_run_body {
if $pp->signal_received eq 'HUP'; if $pp->signal_received eq 'HUP';
if ($pp->num_workers < $pp->max_workers) { if ($pp->num_workers < $pp->max_workers) {
$next_role = $self->next_worker_role or return;
$pp->start and return; $pp->start and return;
with "Netdisco::Daemon::Worker::$next_role"; with "Netdisco::Daemon::Worker::$next_role";
print STDERR ">>> I am a $next_role Worker\n";
$self->worker_body; $self->worker_body;
$pp->finish; $pp->finish;
} }
# check for new jobs, take one if available # I don't think Parallel::Prefork ever returns from start()
# $self->manager_body; # until a child exits. Not sure this is ever reached.
$self->gd_sleep( setting('daemon_sleep_time') || 5 ); $self->gd_sleep( setting('daemon_sleep_time') || 5 );
} }
@@ -63,25 +79,44 @@ sub unregister_worker {
my (undef, $pid, $status) = @_; my (undef, $pid, $status) = @_;
delete $workers{$pid}; delete $workers{$pid};
# also check for bad exit status? # also check for bad exit status?
# revert any running jobs (will be such if child died)
try {
schema('daemon')->resultset('Admin')
->search({status => "running-$pid"})
->update({status => 'queued', started => undef});
}
catch { warn "error reverting jobs for pid $pid: $_\n" };
} }
sub next_worker_role { sub next_worker_role {
my $self = shift; my $self = shift;
my @cur = values %workers; my @cur = values %workers;
my $poller = scalar grep {$_ eq 'Poller'} @cur; my $manager = scalar grep {$_ eq 'Manager'} @cur;
my $inter = scalar grep {$_ eq 'Interactive'} @cur; my $poller = scalar grep {$_ eq 'Poller'} @cur;
my $inter = scalar grep {$_ eq 'Interactive'} @cur;
my $need_poller = $poller < (setting('daemon_pollers') || 0); if ($manager < 1) {
my $need_inter = $inter < (setting('daemon_interactive') || 2); $next_role = 'Manager';
return;
if ($need_poller and $need_inter) {
return (int(rand(2)) ? 'Interactive' : 'Poller');
} }
return 'Interactive' if $need_inter; my $need_poller = $poller < (setting('daemon_pollers') || 0);
return 'Poller' if $need_poller; my $need_inter = $inter < (setting('daemon_interactive') || 2);
return undef;
if ($need_poller and $need_inter) {
$next_role = (int(rand(2)) ? 'Interactive' : 'Poller');
return;
}
$next_role = 'Interactive' if $need_inter;
$next_role = 'Poller' if $need_poller;
}
sub handle_hup {
my $self = shift;
print "HUP is not supported. Please <restart> instead.\n";
} }
sub handle_term { sub handle_term {
@@ -90,24 +125,12 @@ sub handle_term {
$self->gd_quit_event $self->gd_quit_event
} }
sub handle_hup { # in case we screw up and die ourselves
my $self = shift; END {
# clear signal
$pp->signal_received('');
# reload dancer config
%Dancer::Config::_LOADED = ();
Dancer::Config::load();
# kill workers (they will be restarted)
$pp->signal_all_children('TERM'); $pp->signal_all_children('TERM');
$pp->wait_all_children; $pp->wait_all_children;
$pp->{_no_adjust_until} = 0; # BUG in Prefork.pm
} }
# do not remove - must be redefined for Daemon::Generic
sub gd_preconfig { return () }
# nullify this to permit Parallel::Prefork to register handlers instead # nullify this to permit Parallel::Prefork to register handlers instead
sub gd_setup_signals {} sub gd_setup_signals {}

View File

@@ -0,0 +1,23 @@
use utf8;
package Netdisco::Daemon::DB;
use strict;
use warnings;
use base 'DBIx::Class::Schema';
__PACKAGE__->load_namespaces;
our $VERSION = 1; # schema version used for upgrades, keep as integer
use Path::Class;
use File::Basename;
my (undef, $libpath, undef) = fileparse( $INC{ 'Netdisco/Daemon/DB.pm' } );
our $schema_versions_dir = Path::Class::Dir->new($libpath)
->subdir("DB", "schema_versions")->stringify;
__PACKAGE__->load_components(qw/Schema::Versioned/);
__PACKAGE__->upgrade_directory($schema_versions_dir);
1;

View File

@@ -0,0 +1,37 @@
use utf8;
package Netdisco::Daemon::DB::Result::Admin;
use strict;
use warnings;
use base 'DBIx::Class::Core';
__PACKAGE__->table("admin");
__PACKAGE__->add_columns(
"job",
{
data_type => "integer",
is_nullable => 0,
},
"started",
{ data_type => "timestamp", is_nullable => 1 },
"finished",
{ data_type => "timestamp", is_nullable => 1 },
"device",
{ data_type => "inet", is_nullable => 1 },
"port",
{ data_type => "text", is_nullable => 1 },
"action",
{ data_type => "text", is_nullable => 1 },
"subaction",
{ data_type => "text", is_nullable => 1 },
"status",
{ data_type => "text", is_nullable => 1 },
"log",
{ data_type => "text", is_nullable => 1 },
"debug",
{ data_type => "boolean", is_nullable => 1 },
);
__PACKAGE__->set_primary_key("job");
1;

38
Netdisco/lib/Netdisco/Daemon/Worker/Interactive.pm Executable file → Normal file
View File

@@ -2,8 +2,6 @@ package Netdisco::Daemon::Worker::Interactive;
use Dancer qw/:moose :syntax :script/; use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema'; use Dancer::Plugin::DBIC 'schema';
use Netdisco::Util::DeviceProperties 'is_discoverable';
use Try::Tiny; use Try::Tiny;
use Role::Tiny; use Role::Tiny;
@@ -17,7 +15,7 @@ sub worker_body {
my $self = shift; my $self = shift;
# get all pending jobs # get all pending jobs
my $rs = schema('netdisco')->resultset('Admin')->search({ my $rs = schema('daemon')->resultset('Admin')->search({
action => [qw/location contact portcontrol portname vlan power/], action => [qw/location contact portcontrol portname vlan power/],
status => 'queued', status => 'queued',
}); });
@@ -27,9 +25,6 @@ sub worker_body {
my $target = 'set_'. $job->action; my $target = 'set_'. $job->action;
next unless $self->can($target); next unless $self->can($target);
# filter for discover_*
next unless is_discoverable($job->device);
# mark job as running # mark job as running
next unless $self->lock_job($job); next unless $self->lock_job($job);
@@ -49,6 +44,8 @@ sub worker_body {
$self->close_job($job->job, $status, $log); $self->close_job($job->job, $status, $log);
} }
} }
# reset iterator so ->next() triggers another DB query
$rs->reset; $rs->reset;
$self->gd_sleep( setting('daemon_sleep_time') || 5 ); $self->gd_sleep( setting('daemon_sleep_time') || 5 );
} }
@@ -56,35 +53,35 @@ sub worker_body {
sub lock_job { sub lock_job {
my ($self, $job) = @_; my ($self, $job) = @_;
my $happy = 1;
# lock db table, check job state is still queued, update to running # lock db table, check job state is still queued, update to running
try { try {
my $status_updated = schema('netdisco')->txn_do(sub { my $status_updated = schema('daemon')->txn_do(sub {
my $row = schema('netdisco')->resultset('Admin')->find( my $row = schema('daemon')->resultset('Admin')->find(
{job => $job->job}, {job => $job->job},
{for => 'update'} {for => 'update'}
); );
return 0 if $row->status ne 'queued'; $happy = 0 if $row->status ne 'queued';
$row->update({status => 'running', started => \'now()'}); $row->update({status => "running-$$", started => \"datetime('now')" });
return 1;
}); });
return 0 if not $status_updated; $happy = 0 if not $status_updated;
} }
catch { catch {
warn "error locking job: $_\n"; warn "error locking job: $_\n";
return 0; $happy = 0;
}; };
return 1; return $happy;
} }
sub revert_job { sub revert_job {
my ($self, $id) = @_; my ($self, $id) = @_;
try { try {
schema('netdisco')->resultset('Admin') schema('daemon')->resultset('Admin')
->find($id) ->find($id)
->update({status => 'queued', started => undef}); ->update({status => 'queued', started => undef});
} }
@@ -95,9 +92,18 @@ sub close_job {
my ($self, $id, $status, $log) = @_; my ($self, $id, $status, $log) = @_;
try { try {
my $local = schema('daemon')->resultset('Admin')->find($id);
schema('netdisco')->resultset('Admin') schema('netdisco')->resultset('Admin')
->find($id) ->find($id)
->update({status => $status, log => $log, finished => \'now()'}); ->update({
status => $status,
log => $log,
started => $local->started,
finished => \'now()',
});
$local->delete;
} }
catch { warn "error closing job: $_\n" }; catch { warn "error closing job: $_\n" };
} }

View File

@@ -0,0 +1,92 @@
package Netdisco::Daemon::Worker::Manager;
use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';
use Netdisco::Util::DeviceProperties 'is_discoverable';
use Try::Tiny;
use Role::Tiny;
use namespace::clean;
sub worker_body {
my $self = shift;
# get all pending jobs
my $rs = schema('netdisco')->resultset('Admin')
->search({status => 'queued'});
while (1) {
while (my $job = $rs->next) {
# filter for discover_*
next unless is_discoverable($job->device);
# mark job as running
next unless $self->lock_job($job);
# copy job to local queue
$self->copy_job($job)
or $self->revert_job($job->job);
}
# reset iterator so ->next() triggers another DB query
$rs->reset;
$self->gd_sleep( setting('daemon_sleep_time') || 5 );
}
}
sub lock_job {
my ($self, $job) = @_;
my $happy = 1;
# lock db table, check job state is still queued, update to running
try {
my $status_updated = schema('netdisco')->txn_do(sub {
my $row = schema('netdisco')->resultset('Admin')->find(
{job => $job->job},
{for => 'update'}
);
$happy = 0 if $row->status ne 'queued';
$row->update({
status => "running-$self->{nd_host}",
started => \'now()'
});
});
$happy = 0 if not $status_updated;
}
catch {
warn "error locking job: $_\n";
$happy = 0;
};
return $happy;
}
sub copy_job {
my ($self, $job) = @_;
try {
my %data = $job->get_columns;
delete $data{$_} for qw/entered username userip/;
schema('daemon')->resultset('Admin')->update_or_create({
%data, status => 'queued', started => undef,
});
}
catch { warn "error copying job: $_\n" };
}
sub revert_job {
my ($self, $id) = @_;
try {
schema('netdisco')->resultset('Admin')
->find($id)
->update({status => 'queued', started => undef});
}
catch { warn "error reverting job: $_\n" };
}
1;