diff --git a/Netdisco/Makefile.PL b/Netdisco/Makefile.PL index 368abbd1..92226919 100644 --- a/Netdisco/Makefile.PL +++ b/Netdisco/Makefile.PL @@ -13,7 +13,6 @@ requires 'Archive::Extract' => 0; requires 'CGI::Expand' => 2.05; requires 'Data::Printer' => 0; requires 'DBD::Pg' => 0; -requires 'DBD::SQLite' => 1.37; requires 'DBIx::Class' => 0.08250; requires 'DBIx::Class::Helpers' => 2.018004; requires 'Daemon::Control' => 0.001000; @@ -32,7 +31,7 @@ requires 'MIME::Base64' => 3.13; requires 'Module::Find' => 0.12; requires 'Module::Load' => 0.32; requires 'Moo' => 1.001000; -requires 'MCE' => 1.408; +requires 'MCE' => 1.515; requires 'Net::Domain' => 1.23; requires 'Net::DNS' => 0.72; requires 'Net::LDAP' => 0; @@ -44,6 +43,7 @@ requires 'Plack' => 1.0023; requires 'Plack::Middleware::Expires' => 0.03; requires 'Plack::Middleware::ReverseProxy' => 0.15; requires 'Role::Tiny' => 1.002005; +requires 'Sereal' => 0; requires 'Socket6' => 0.23; requires 'Starman' => 0.4008; requires 'SNMP::Info' => 3.18; diff --git a/Netdisco/bin/netdisco-daemon-fg b/Netdisco/bin/netdisco-daemon-fg index d3585cf5..5cfcb5eb 100755 --- a/Netdisco/bin/netdisco-daemon-fg +++ b/Netdisco/bin/netdisco-daemon-fg @@ -24,16 +24,13 @@ use App::Netdisco; use Dancer qw/:moose :script/; warning sprintf "App::Netdisco %s backend", ($App::Netdisco::VERSION || 'HEAD'); -# local job queue management -use App::Netdisco::Daemon::LocalQueue ':all'; use App::Netdisco::Util::Daemon; - -# needed to quench AF_INET6 symbol errors -use NetAddr::IP::Lite ':lower'; -use List::Util 'sum'; +use NetAddr::IP::Lite ':lower'; # to quench AF_INET6 symbol errors use Role::Tiny::With; + use MCE::Signal '-setpgrp'; -use MCE; +use MCE::Flow Sereal => 1; +use MCE::Queue; # set temporary MCE files' location in home directory my $home = ($ENV{NETDISCO_HOME} || $ENV{HOME}); @@ -43,78 +40,109 @@ mkdir $tmp_dir if ! -d $tmp_dir; setpgrp(0,0); # only portable variety of setpgrp prctl 'netdisco-daemon: master'; -my $mce = MCE->new( - spawn_delay => 0.15, - job_delay => 1.15, - tmp_dir => $tmp_dir, - user_func => sub { $_[0]->worker_body }, - on_post_exit => \&restart_this_worker, - user_tasks => build_tasks_list(), -)->run(); +# shared local job queue +my $Q = MCE::Queue->new; -sub build_tasks_list { - # NB MCE does not like max_workers => 0 - my $tasks = []; +mce_flow { + task_name => [qw/ scheduler manager poller /], + max_workers => [ 1, 1, 'AUTO * 2' ], # FIXME allow setting override + tmp_dir => $tmp_dir, + on_post_exit => sub { MCE->restart_worker() }, +}, _mk_wkr('Scheduler'), _mk_wkr('Manager'), _mk_wkr('Poller'); - push @$tasks, { - max_workers => 1, - user_begin => worker_factory('Manager'), - } if num_workers() > 0; - - push @$tasks, { - max_workers => 1, - user_begin => worker_factory('Scheduler'), - } if setting('schedule'); - - my @logmsg = (); - foreach my $key (keys %{setting('job_type_keys')}) { - my $val = setting('job_type_keys')->{$key}; - - setting('workers')->{$val} = 2 - if !defined setting('workers')->{$val}; - - push @logmsg, setting('workers')->{$val} ." $key"; - push @$tasks, { - max_workers => setting('workers')->{$val}, - user_begin => worker_factory($key), - } if setting('workers')->{$val}; - } - - info sprintf "MCE will load: %s Manager, %s Scheduler, %s", - (num_workers() ? 1 : 0), - (setting('schedule') ? 1 : 0), - (join ', ', @logmsg); - - return $tasks; -} - -sub num_workers { - return sum( 0, map { setting('workers')->{$_} } - values %{setting('job_type_keys')} ); -} - -sub worker_factory { +sub _mk_wkr { my $role = shift; return sub { my $self = shift; my $wid = $self->wid; + $self->{Q} = $Q; # FIXME make it a method + prctl sprintf 'netdisco-daemon: worker #%s %s: init', $wid, lc($role); info "applying role $role to worker $wid"; - # $self->sendto('stderr', ">>> worker $wid starting with role $role\n"); - Role::Tiny->apply_roles_to_object($self, "App::Netdisco::Daemon::Worker::$role"); + # post-fork, become manager, scheduler, poller, etc + Role::Tiny->apply_roles_to_object( + $self => "App::Netdisco::Daemon::Worker::$role"); $self->worker_begin if $self->can('worker_begin'); + $self->worker_body; }; } -sub restart_this_worker { - my ($self, $e) = @_; - reset_jobs($e->{wid}); +#use List::Util 'sum'; +#sub num_workers { +# return sum( 0, map { setting('workers')->{$_} } +# values %{setting('job_type_keys')} ); +#} - debug "restarting worker $e->{wid}"; - $self->restart_worker($e->{wid}); -} +#sub restart_this_worker { +# my ($self, $e) = @_; +# reset_jobs($e->{wid}); +# +# debug "restarting worker ". MCE->wid(); +# MCE->restart_worker(); +# $self->restart_worker($e->{wid}); +#} + +#sub build_tasks_list { +# # NB MCE does not like max_workers => 0 +# my $tasks = []; +# +# push @$tasks, { +# max_workers => 1, +# user_begin => worker_factory('Manager'), +# } if num_workers() > 0; +# +# push @$tasks, { +# max_workers => 1, +# user_begin => worker_factory('Scheduler'), +# } if setting('schedule'); +# +# my @logmsg = (); +# foreach my $key (keys %{setting('job_type_keys')}) { +# my $val = setting('job_type_keys')->{$key}; +# +# setting('workers')->{$val} = 2 +# if !defined setting('workers')->{$val}; +# +# push @logmsg, setting('workers')->{$val} ." $key"; +# push @$tasks, { +# max_workers => setting('workers')->{$val}, +# user_begin => worker_factory($key), +# } if setting('workers')->{$val}; +# } +# +# info sprintf "MCE will load: %s Manager, %s Scheduler, %s", +# (num_workers() ? 1 : 0), +# (setting('schedule') ? 1 : 0), +# (join ', ', @logmsg); +# +# return $tasks; +#} + +#sub worker_factory { +# my $role = shift; +# return sub { +# my $self = shift; +# my $wid = $self->wid; +# prctl sprintf 'netdisco-daemon: worker #%s %s: init', $wid, lc($role); +# info "applying role $role to worker $wid"; +# +# # $self->sendto('stderr', ">>> worker $wid starting with role $role\n"); +# Role::Tiny->apply_roles_to_object($self, "App::Netdisco::Daemon::Worker::$role"); +# +# $self->worker_begin if $self->can('worker_begin'); +# }; +#} + +#my $mce = MCE->new( +# spawn_delay => 0.15, +# job_delay => 1.15, +# tmp_dir => $tmp_dir, +# user_func => sub { $_[0]->worker_body }, +# on_post_exit => \&restart_this_worker, +# user_tasks => build_tasks_list(), +#)->run(); =head1 NAME diff --git a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm b/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm index e73d981d..4a0c60c4 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm @@ -9,13 +9,6 @@ __PACKAGE__->table("admin"); __PACKAGE__->add_columns( "job", { data_type => "integer", is_nullable => 0 }, - - "type", # Poller, Interactive, etc - { data_type => "text", is_nullable => 0 }, - - "wid", # worker ID, only != 0 once taken - { data_type => "integer", is_nullable => 0, default_value => 0 }, - "entered", { data_type => "timestamp", is_nullable => 1 }, "started", diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm index 55c7f00d..edc0bda5 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm @@ -1,6 +1,7 @@ package App::Netdisco::Daemon::Worker::Common; use Dancer qw/:moose :syntax :script/; +use Dancer::Plugin::DBIC 'schema'; use Try::Tiny; use App::Netdisco::Util::Daemon; @@ -8,7 +9,7 @@ use App::Netdisco::Util::Daemon; use Role::Tiny; use namespace::clean; -use App::Netdisco::JobQueue qw/jq_take jq_defer jq_complete/; +use App::Netdisco::JobQueue qw/jq_defer jq_complete/; sub worker_body { my $self = shift; @@ -19,9 +20,11 @@ sub worker_body { while (1) { prctl sprintf 'netdisco-daemon: worker #%s %s: idle', $wid, lc($type); - my $jobs = jq_take($self->wid, $type); + my $jobs = [ $self->{Q}->dequeue(1) ]; # FIXME multiple take, take type, thaw foreach my $job (@$jobs) { + next unless defined $job; + $job = schema('daemon')->dclone( $job ); my $target = $self->munge_action($job->action); try { diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm index 10f19928..ea1a9d04 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm @@ -9,6 +9,7 @@ use Role::Tiny; use namespace::clean; use App::Netdisco::JobQueue qw/jq_locked jq_getsome jq_lock/; +use MCE::Util 'get_ncpu'; sub worker_begin { my $self = shift; @@ -26,7 +27,7 @@ sub worker_begin { if (scalar @jobs) { info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs; - $self->do('add_jobs', @jobs); + $self->{Q}->enqueue(@jobs); # FIXME priority and freeze } } @@ -37,8 +38,8 @@ sub worker_body { return debug "mgr ($wid): no need for manager... quitting" if setting('workers')->{'no_manager'}; - my $num_slots = sum( 0, map { setting('workers')->{$_} } - values %{setting('job_type_keys')} ); + # FIXME really the best strategy? + my $num_slots = (MCE::Util::get_ncpu() * 2) - $self->{Q}->pending(); while (1) { debug "mgr ($wid): getting potential jobs for $num_slots workers"; @@ -48,25 +49,18 @@ sub worker_body { # TODO also check for stale jobs in Netdisco DB foreach my $job ( jq_getsome($num_slots) ) { - # check for available local capacity - my $job_type = setting('job_types')->{$job->action}; - next unless $job_type and $self->do('capacity_for', $job_type); - - debug sprintf "mgr (%s): processing node has capacity for job %s (%s)", - $wid, $job->id, $job->action; - # mark job as running next unless jq_lock($job); info sprintf "mgr (%s): job %s booked out for this processing node", $wid, $job->id; # copy job to local queue - $self->do('add_jobs', $job); + $self->{Q}->enqueue($job); # FIXME priority and freeze } debug "mgr ($wid): sleeping now..."; prctl sprintf 'netdisco-daemon: worker #%s manager: idle', $wid; - sleep( setting('workers')->{sleep_time} || 2 ); + sleep( setting('workers')->{sleep_time} || 1 ); } } diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm index f7252ca3..bf1c0795 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm @@ -13,6 +13,10 @@ use App::Netdisco::JobQueue qw/jq_insert/; sub worker_begin { my $self = shift; my $wid = $self->wid; + + return debug "mgr ($wid): no need for scheduler... skip begin" + unless setting('scheduler'); + debug "entering Scheduler ($wid) worker_begin()"; foreach my $action (keys %{ setting('schedule') }) { @@ -34,6 +38,9 @@ sub worker_body { my $self = shift; my $wid = $self->wid; + return debug "mgr ($wid): no need for scheduler... quitting" + unless setting('scheduler'); + while (1) { # sleep until some point in the next minute my $naptime = 60 - (time % 60) + int(rand(45)); diff --git a/Netdisco/lib/App/Netdisco/JobQueue.pm b/Netdisco/lib/App/Netdisco/JobQueue.pm index 6c72b2c5..da832c28 100644 --- a/Netdisco/lib/App/Netdisco/JobQueue.pm +++ b/Netdisco/lib/App/Netdisco/JobQueue.pm @@ -14,7 +14,6 @@ our @EXPORT_OK = qw/ jq_queued jq_log jq_userlog - jq_take jq_lock jq_defer jq_complete @@ -64,12 +63,6 @@ Returns a list of jobs which have been entered into the queue by the passed C<$user>. Jobs are returned as objects which implement the Netdisco job instance interface (see below). -=head2 jq_take( $wid, $type, $max? ) - -Searches in the queue for jobs of type C<$type> and if up to C<$max> are -available, will book them out to the worker with ID C<$wid>. The default -number of booked jobs is 1. - =head2 jq_lock( $job ) Marks a job in the queue as booked out to this processing node (denoted by the diff --git a/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm index dc66f6af..ae321e54 100644 --- a/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm +++ b/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm @@ -15,7 +15,6 @@ our @EXPORT_OK = qw/ jq_queued jq_log jq_userlog - jq_take jq_lock jq_defer jq_complete @@ -35,9 +34,8 @@ sub jq_getsome { ); while (my $job = $rs->next) { - my $job_type = setting('job_types')->{$job->action} or next; push @returned, schema('daemon')->resultset('Admin') - ->new_result({ $job->get_columns, type => $job_type }); + ->new_result({ $job->get_columns }); } return @returned; } @@ -50,9 +48,8 @@ sub jq_locked { ->search({status => "queued-$fqdn"}); while (my $job = $rs->next) { - my $job_type = setting('job_types')->{$job->action} or next; push @returned, schema('daemon')->resultset('Admin') - ->new_result({ $job->get_columns, type => $job_type }); + ->new_result({ $job->get_columns }); } return @returned; } @@ -76,9 +73,8 @@ sub jq_log { }); while (my $job = $rs->next) { - my $job_type = setting('job_types')->{$job->action} or next; push @returned, schema('daemon')->resultset('Admin') - ->new_result({ $job->get_columns, type => $job_type }); + ->new_result({ $job->get_columns }); } return @returned; } @@ -93,28 +89,12 @@ sub jq_userlog { }); while (my $job = $rs->next) { - my $job_type = setting('job_types')->{$job->action} or next; push @returned, schema('daemon')->resultset('Admin') - ->new_result({ $job->get_columns, type => $job_type }); + ->new_result({ $job->get_columns }); } return @returned; } -# PostgreSQL engine depends on LocalQueue, which is accessed synchronously via -# the main daemon process. This is only used by daemon workers which can use -# MCE ->do() method. -sub jq_take { - my ($wid, $type) = @_; - Module::Load::load 'MCE'; - - # be polite to SQLite database (that is, local CPU) - debug "$type ($wid): sleeping now..."; - sleep(1); - - debug "$type ($wid): asking for a job"; - MCE->do('take_jobs', $wid, $type); -} - sub jq_lock { my $job = shift; my $fqdn = hostfqdn || 'localhost'; @@ -143,18 +123,11 @@ sub jq_lock { return $happy; } -# PostgreSQL engine depends on LocalQueue, which is accessed synchronously via -# the main daemon process. This is only used by daemon workers which can use -# MCE ->do() method. sub jq_defer { my $job = shift; my $happy = false; try { - # other local workers are polling the central queue, so - # to prevent a race, first delete the job in our local queue - MCE->do('release_jobs', $job->id); - # lock db row and update to show job is available schema('netdisco')->txn_do(sub { schema('netdisco')->resultset('Admin') diff --git a/Netdisco/share/config.yml b/Netdisco/share/config.yml index 1818672e..397a344f 100644 --- a/Netdisco/share/config.yml +++ b/Netdisco/share/config.yml @@ -172,7 +172,7 @@ port_control_reasons: workers: interactives: 1 pollers: 10 - sleep_time: 2 + sleep_time: 1 queue: PostgreSQL dns: