diff --git a/Netdisco/Makefile.PL b/Netdisco/Makefile.PL index 368abbd1..92226919 100644 --- a/Netdisco/Makefile.PL +++ b/Netdisco/Makefile.PL @@ -13,7 +13,6 @@ requires 'Archive::Extract' => 0; requires 'CGI::Expand' => 2.05; requires 'Data::Printer' => 0; requires 'DBD::Pg' => 0; -requires 'DBD::SQLite' => 1.37; requires 'DBIx::Class' => 0.08250; requires 'DBIx::Class::Helpers' => 2.018004; requires 'Daemon::Control' => 0.001000; @@ -32,7 +31,7 @@ requires 'MIME::Base64' => 3.13; requires 'Module::Find' => 0.12; requires 'Module::Load' => 0.32; requires 'Moo' => 1.001000; -requires 'MCE' => 1.408; +requires 'MCE' => 1.515; requires 'Net::Domain' => 1.23; requires 'Net::DNS' => 0.72; requires 'Net::LDAP' => 0; @@ -44,6 +43,7 @@ requires 'Plack' => 1.0023; requires 'Plack::Middleware::Expires' => 0.03; requires 'Plack::Middleware::ReverseProxy' => 0.15; requires 'Role::Tiny' => 1.002005; +requires 'Sereal' => 0; requires 'Socket6' => 0.23; requires 'Starman' => 0.4008; requires 'SNMP::Info' => 3.18; diff --git a/Netdisco/bin/netdisco-daemon-fg b/Netdisco/bin/netdisco-daemon-fg index d3585cf5..6bcfb713 100755 --- a/Netdisco/bin/netdisco-daemon-fg +++ b/Netdisco/bin/netdisco-daemon-fg @@ -24,16 +24,17 @@ use App::Netdisco; use Dancer qw/:moose :script/; warning sprintf "App::Netdisco %s backend", ($App::Netdisco::VERSION || 'HEAD'); -# local job queue management -use App::Netdisco::Daemon::LocalQueue ':all'; use App::Netdisco::Util::Daemon; - -# needed to quench AF_INET6 symbol errors -use NetAddr::IP::Lite ':lower'; -use List::Util 'sum'; +use NetAddr::IP::Lite ':lower'; # to quench AF_INET6 symbol errors use Role::Tiny::With; + +# preload all worker modules into shared memory +use Module::Find (); +Module::Find::useall 'App::Netdisco::Daemon'; + use MCE::Signal '-setpgrp'; -use MCE; +use MCE::Flow Sereal => 1; +use MCE::Queue; # set temporary MCE files' location in home directory my $home = ($ENV{NETDISCO_HOME} || $ENV{HOME}); @@ -43,79 +44,38 @@ mkdir $tmp_dir if ! -d $tmp_dir; setpgrp(0,0); # only portable variety of setpgrp prctl 'netdisco-daemon: master'; -my $mce = MCE->new( - spawn_delay => 0.15, - job_delay => 1.15, - tmp_dir => $tmp_dir, - user_func => sub { $_[0]->worker_body }, - on_post_exit => \&restart_this_worker, - user_tasks => build_tasks_list(), -)->run(); +# shared local job queue +my $queue = MCE::Queue->new; -sub build_tasks_list { - # NB MCE does not like max_workers => 0 - my $tasks = []; +# support a scheduler-only node +setting('workers')->{'no_manager'} = 1 + if setting('workers')->{tasks} eq '0'; - push @$tasks, { - max_workers => 1, - user_begin => worker_factory('Manager'), - } if num_workers() > 0; +mce_flow { + task_name => [qw/ scheduler manager poller /], + max_workers => [ 1, 1, setting('workers')->{tasks} ], + tmp_dir => $tmp_dir, + on_post_exit => sub { MCE->restart_worker }, +}, _mk_wkr('Scheduler'), _mk_wkr('Manager'), _mk_wkr('Poller'); - push @$tasks, { - max_workers => 1, - user_begin => worker_factory('Scheduler'), - } if setting('schedule'); - - my @logmsg = (); - foreach my $key (keys %{setting('job_type_keys')}) { - my $val = setting('job_type_keys')->{$key}; - - setting('workers')->{$val} = 2 - if !defined setting('workers')->{$val}; - - push @logmsg, setting('workers')->{$val} ." $key"; - push @$tasks, { - max_workers => setting('workers')->{$val}, - user_begin => worker_factory($key), - } if setting('workers')->{$val}; - } - - info sprintf "MCE will load: %s Manager, %s Scheduler, %s", - (num_workers() ? 1 : 0), - (setting('schedule') ? 1 : 0), - (join ', ', @logmsg); - - return $tasks; -} - -sub num_workers { - return sum( 0, map { setting('workers')->{$_} } - values %{setting('job_type_keys')} ); -} - -sub worker_factory { +sub _mk_wkr { my $role = shift; return sub { my $self = shift; - my $wid = $self->wid; - prctl sprintf 'netdisco-daemon: worker #%s %s: init', $wid, lc($role); - info "applying role $role to worker $wid"; + $self->{queue} = $queue; - # $self->sendto('stderr', ">>> worker $wid starting with role $role\n"); - Role::Tiny->apply_roles_to_object($self, "App::Netdisco::Daemon::Worker::$role"); + prctl sprintf 'netdisco-daemon: worker #%s %s: init', MCE->wid, lc($role); + info sprintf 'applying role %s to worker %s', $role, MCE->wid; + + # post-fork, become manager, scheduler, poller, etc + Role::Tiny->apply_roles_to_object( + $self => "App::Netdisco::Daemon::Worker::$role"); $self->worker_begin if $self->can('worker_begin'); + $self->worker_body; }; } -sub restart_this_worker { - my ($self, $e) = @_; - reset_jobs($e->{wid}); - - debug "restarting worker $e->{wid}"; - $self->restart_worker($e->{wid}); -} - =head1 NAME netdisco-daemon-fg - Job Control for Netdisco diff --git a/Netdisco/bin/netdisco-do b/Netdisco/bin/netdisco-do index d8981d3a..4b2e0d53 100755 --- a/Netdisco/bin/netdisco-do +++ b/Netdisco/bin/netdisco-do @@ -35,8 +35,8 @@ BEGIN { # for netdisco app config use App::Netdisco; +use App::Netdisco::Daemon::Job; use Dancer qw/:moose :script/; -use Dancer::Plugin::DBIC 'schema'; info "App::Netdisco version $App::Netdisco::VERSION loaded."; @@ -73,9 +73,6 @@ $ENV{DBIC_TRACE} ||= $sqltrace; # reconfigure logging to force console output Dancer::Logger->init('console', $CONFIG); -# for the in-memory local job queue -schema('daemon')->deploy; - # get requested action my $action = shift @ARGV; @@ -143,7 +140,7 @@ if (not $worker->can( $action )) { } # what job are we asked to do? -my $job = schema('daemon')->resultset('Admin')->new_result({ +my $job = App::Netdisco::Daemon::Job->new({ job => 0, action => $action, device => $device, diff --git a/Netdisco/lib/App/Netdisco/Configuration.pm b/Netdisco/lib/App/Netdisco/Configuration.pm index 504c6aca..ae4f41ce 100644 --- a/Netdisco/lib/App/Netdisco/Configuration.pm +++ b/Netdisco/lib/App/Netdisco/Configuration.pm @@ -35,21 +35,18 @@ if (ref {} eq ref setting('database')) { } -# static configuration for the in-memory local job queue -setting('plugins')->{DBIC}->{daemon} = { - dsn => 'dbi:SQLite:dbname=:memory:', - options => { - AutoCommit => 1, - RaiseError => 1, - sqlite_use_immediate_transaction => 1, - }, - schema_class => 'App::Netdisco::Daemon::DB', -}; - # defaults for workers setting('workers')->{queue} ||= 'PostgreSQL'; -setting('workers')->{interactives} = 1 - if setting('workers') and not exists setting('workers')->{interactives}; +if (exists setting('workers')->{interactives} + or exists setting('workers')->{pollers}) { + + setting('workers')->{tasks} = + (setting('workers')->{pollers} || 0) + + (setting('workers')->{interactives} || 0); + + delete setting('workers')->{pollers}; + delete setting('workers')->{interactives}; +} # force skipped DNS resolution, if unset setting('dns')->{hosts_file} ||= '/etc/hosts'; diff --git a/Netdisco/lib/App/Netdisco/Daemon/DB.pm b/Netdisco/lib/App/Netdisco/Daemon/DB.pm deleted file mode 100644 index 11c15293..00000000 --- a/Netdisco/lib/App/Netdisco/Daemon/DB.pm +++ /dev/null @@ -1,10 +0,0 @@ -use utf8; -package App::Netdisco::Daemon::DB; - -use strict; -use warnings; - -use base 'DBIx::Class::Schema'; -__PACKAGE__->load_namespaces; - -1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm b/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm deleted file mode 100644 index e73d981d..00000000 --- a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm +++ /dev/null @@ -1,90 +0,0 @@ -use utf8; -package App::Netdisco::Daemon::DB::Result::Admin; - -use strict; -use warnings; - -use base 'DBIx::Class::Core'; -__PACKAGE__->table("admin"); -__PACKAGE__->add_columns( - "job", - { data_type => "integer", is_nullable => 0 }, - - "type", # Poller, Interactive, etc - { data_type => "text", is_nullable => 0 }, - - "wid", # worker ID, only != 0 once taken - { data_type => "integer", is_nullable => 0, default_value => 0 }, - - "entered", - { data_type => "timestamp", is_nullable => 1 }, - "started", - { data_type => "timestamp", is_nullable => 1 }, - "finished", - { data_type => "timestamp", is_nullable => 1 }, - "device", - { data_type => "inet", is_nullable => 1 }, - "port", - { data_type => "text", is_nullable => 1 }, - "action", - { data_type => "text", is_nullable => 1 }, - "subaction", - { data_type => "text", is_nullable => 1 }, - "status", - { data_type => "text", is_nullable => 1 }, - "username", - { data_type => "text", is_nullable => 1 }, - "userip", - { data_type => "inet", is_nullable => 1 }, - "log", - { data_type => "text", is_nullable => 1 }, - "debug", - { data_type => "boolean", is_nullable => 1 }, -); - -__PACKAGE__->set_primary_key("job"); - -=head1 METHODS - -=head2 summary - -An attempt to make a meaningful statement about the job. - -=cut - -sub summary { - my $job = shift; - return join ' ', - $job->action, - ($job->device || ''), - ($job->port || ''); -# ($job->subaction ? (q{'}. $job->subaction .q{'}) : ''); -} - -=head1 ADDITIONAL COLUMNS - -=head2 extra - -Alias for the C column. - -=cut - -sub extra { (shift)->subaction } - -=head2 entererd_stamp - -Formatted version of the C field, accurate to the minute. - -The format is somewhat like ISO 8601 or RFC3339 but without the middle C -between the date stamp and time stamp. That is: - - 2012-02-06 12:49 - -=cut - -sub entered_stamp { - (my $stamp = (shift)->entered) =~ s/\.\d+$//; - return $stamp; -} - -1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Job.pm b/Netdisco/lib/App/Netdisco/Daemon/Job.pm new file mode 100644 index 00000000..3c078fa8 --- /dev/null +++ b/Netdisco/lib/App/Netdisco/Daemon/Job.pm @@ -0,0 +1,54 @@ +package App::Netdisco::Daemon::Job; + +use Moo; +use namespace::clean; + +foreach my $slot (qw/ + job + entered + started + finished + device + port + action + subaction + status + username + userip + log + debug + /) { + + has $slot => ( + is => 'rw', + ); +} + +=head1 METHODS + +=head2 summary + +An attempt to make a meaningful statement about the job. + +=cut + +sub summary { + my $job = shift; + return join ' ', + $job->action, + ($job->device || ''), + ($job->port || ''); +# ($job->subaction ? (q{'}. $job->subaction .q{'}) : ''); +} + +=head1 ADDITIONAL COLUMNS + +=head2 extra + +Alias for the C column. + +=cut + +sub extra { (shift)->subaction } + +1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm b/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm deleted file mode 100644 index b3dbcb9a..00000000 --- a/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm +++ /dev/null @@ -1,69 +0,0 @@ -package App::Netdisco::Daemon::LocalQueue; - -use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; - -use base 'Exporter'; -our @EXPORT = (); -our @EXPORT_OK = qw/ add_jobs capacity_for take_jobs reset_jobs release_jobs /; -our %EXPORT_TAGS = ( all => \@EXPORT_OK ); - -schema('daemon')->deploy; -my $queue = schema('daemon')->resultset('Admin'); - -sub add_jobs { - my (@jobs) = @_; - info sprintf "adding %s jobs to local queue", scalar @jobs; - schema('daemon')->dclone($_)->insert for @jobs; -} - -sub capacity_for { - my ($type) = @_; - debug "checking local capacity for worker type $type"; - - my $setting = setting('workers')->{ setting('job_type_keys')->{$type} }; - my $current = $queue->search({type => $type})->count; - return ($current < $setting); -} - -sub take_jobs { - my ($wid, $type, $max) = @_; - return () unless $wid > 1; - $max ||= 1; - - debug "deleting completed jobs by worker $wid"; - $queue->search({wid => $wid})->delete; - - debug "searching for $max new jobs for worker $wid (type $type)"; - my $rs = $queue->search( - {type => $type, wid => 0}, - {rows => $max}, - ); - - my @rows = $rs->all; - return [] if scalar @rows == 0; - - debug sprintf "booking out %s jobs to worker %s", (scalar @rows), $wid; - $queue->search({job => { -in => [map {$_->job} @rows] }}) - ->update({wid => $wid}); - - return \@rows; -} - -# not used by workers, only the daemon when reinitializing a worker -sub reset_jobs { - my ($wid) = @_; - debug "resetting jobs owned by worker $wid to be available"; - return unless $wid > 1; - $queue->search({wid => $wid}) - ->update({wid => 0}); -} - -# not used by workers, only the daemon when reinitializing a worker -sub release_jobs { - my ($jid) = @_; - debug "releasing local job ID $jid"; - $queue->search({job => $jid})->delete; -} - -1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm index 55c7f00d..437be666 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm @@ -8,53 +8,47 @@ use App::Netdisco::Util::Daemon; use Role::Tiny; use namespace::clean; -use App::Netdisco::JobQueue qw/jq_take jq_defer jq_complete/; +use App::Netdisco::JobQueue qw/jq_defer jq_complete/; sub worker_body { my $self = shift; my $wid = $self->wid; - my $tag = $self->worker_tag; - my $type = $self->worker_type; - while (1) { - prctl sprintf 'netdisco-daemon: worker #%s %s: idle', $wid, lc($type); - my $jobs = jq_take($self->wid, $type); + prctl sprintf 'netdisco-daemon: worker #%s poller: idle', $wid; - foreach my $job (@$jobs) { - my $target = $self->munge_action($job->action); + my $job = $self->{queue}->dequeue(1); + next unless defined $job; + my $action = $job->action; - try { - $job->started(scalar localtime); - prctl sprintf 'netdisco-daemon: worker #%s %s: working on #%s: %s', - $wid, lc($type), $job->id, $job->summary; - info sprintf "$tag (%s): starting %s job(%s) at %s", - $wid, $target, $job->id, $job->started; - my ($status, $log) = $self->$target($job); - $job->status($status); - $job->log($log); - } - catch { - $job->status('error'); - $job->log("error running job: $_"); - $self->sendto('stderr', $job->log ."\n"); - }; - - $self->close_job($job); + try { + $job->started(scalar localtime); + prctl sprintf 'netdisco-daemon: worker #%s poller: working on #%s: %s', + $wid, $job->job, $job->summary; + info sprintf "pol (%s): starting %s job(%s) at %s", + $wid, $action, $job->job, $job->started; + my ($status, $log) = $self->$action($job); + $job->status($status); + $job->log($log); } + catch { + $job->status('error'); + $job->log("error running job: $_"); + $self->sendto('stderr', $job->log ."\n"); + }; + + $self->close_job($job); } } sub close_job { my ($self, $job) = @_; - my $tag = $self->worker_tag; - my $type = $self->worker_type; my $now = scalar localtime; - prctl sprintf 'netdisco-daemon: worker #%s %s: wrapping up %s #%s: %s', - $self->wid, lc($type), $job->action, $job->id, $job->status; - info sprintf "$tag (%s): wrapping up %s job(%s) - status %s at %s", - $self->wid, $job->action, $job->id, $job->status, $now; + prctl sprintf 'netdisco-daemon: worker #%s poller: wrapping up %s #%s: %s', + $self->wid, $job->action, $job->job, $job->status; + info sprintf "pol (%s): wrapping up %s job(%s) - status %s at %s", + $self->wid, $job->action, $job->job, $job->status, $now; try { if ($job->status eq 'defer') { diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm deleted file mode 100644 index 1ae2aa2c..00000000 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm +++ /dev/null @@ -1,17 +0,0 @@ -package App::Netdisco::Daemon::Worker::Interactive; - -use Role::Tiny; -use namespace::clean; - -# main worker body -with 'App::Netdisco::Daemon::Worker::Common'; - -# add dispatch methods for interactive actions -with 'App::Netdisco::Daemon::Worker::Interactive::DeviceActions', - 'App::Netdisco::Daemon::Worker::Interactive::PortActions'; - -sub worker_tag { 'int' } -sub worker_type { 'Interactive' } -sub munge_action { 'set_' . $_[1] } - -1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive/DeviceActions.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive/DeviceActions.pm index af3e02a0..5aa2a9d4 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive/DeviceActions.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive/DeviceActions.pm @@ -7,12 +7,12 @@ use App::Netdisco::Daemon::Util ':all'; use Role::Tiny; use namespace::clean; -sub set_location { +sub location { my ($self, $job) = @_; return _set_device_generic($job->device, 'location', $job->subaction); } -sub set_contact { +sub contact { my ($self, $job) = @_; return _set_device_generic($job->device, 'contact', $job->subaction); } diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive/PortActions.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive/PortActions.pm index 7adf941c..ad2472ac 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive/PortActions.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive/PortActions.pm @@ -7,12 +7,12 @@ use App::Netdisco::Daemon::Util ':all'; use Role::Tiny; use namespace::clean; -sub set_portname { +sub portname { my ($self, $job) = @_; return _set_port_generic($job, 'alias', 'name'); } -sub set_portcontrol { +sub portcontrol { my ($self, $job) = @_; my $port = get_port($job->device, $job->port) @@ -39,7 +39,7 @@ sub set_portcontrol { } } -sub set_vlan { +sub vlan { my ($self, $job) = @_; my $port = get_port($job->device, $job->port) @@ -97,7 +97,7 @@ sub _set_port_generic { return job_done("Updated [$pn] $slot status on [$ip] to [$data]"); } -sub set_power { +sub power { my ($self, $job) = @_; my $port = get_port($job->device, $job->port) diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm index 10f19928..d28f0ff1 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm @@ -8,7 +8,8 @@ use App::Netdisco::Util::Daemon; use Role::Tiny; use namespace::clean; -use App::Netdisco::JobQueue qw/jq_locked jq_getsome jq_lock/; +use App::Netdisco::JobQueue qw/jq_locked jq_getsome jq_getsomep jq_lock/; +use MCE::Util (); sub worker_begin { my $self = shift; @@ -26,7 +27,7 @@ sub worker_begin { if (scalar @jobs) { info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs; - $self->do('add_jobs', @jobs); + $self->{queue}->enqueuep(100, @jobs); } } @@ -34,39 +35,54 @@ sub worker_body { my $self = shift; my $wid = $self->wid; - return debug "mgr ($wid): no need for manager... quitting" - if setting('workers')->{'no_manager'}; - - my $num_slots = sum( 0, map { setting('workers')->{$_} } - values %{setting('job_type_keys')} ); + if (setting('workers')->{'no_manager'}) { + prctl sprintf 'netdisco-daemon: worker #%s manager: inactive', $wid; + return debug "mgr ($wid): no need for manager... quitting" + } while (1) { - debug "mgr ($wid): getting potential jobs for $num_slots workers"; prctl sprintf 'netdisco-daemon: worker #%s manager: gathering', $wid; + my $num_slots = 0; - # get some pending jobs + $num_slots = + MCE::Util::_parse_max_workers( setting('workers')->{tasks} ) + - $self->{queue}->pending(); + debug "mgr ($wid): getting potential jobs for $num_slots workers (HP)"; + + # get some high priority jobs # TODO also check for stale jobs in Netdisco DB - foreach my $job ( jq_getsome($num_slots) ) { - - # check for available local capacity - my $job_type = setting('job_types')->{$job->action}; - next unless $job_type and $self->do('capacity_for', $job_type); - - debug sprintf "mgr (%s): processing node has capacity for job %s (%s)", - $wid, $job->id, $job->action; + foreach my $job ( jq_getsomep($num_slots) ) { # mark job as running next unless jq_lock($job); info sprintf "mgr (%s): job %s booked out for this processing node", - $wid, $job->id; + $wid, $job->job; # copy job to local queue - $self->do('add_jobs', $job); + $self->{queue}->enqueuep(100, $job); + } + + $num_slots = + MCE::Util::_parse_max_workers( setting('workers')->{tasks} ) + - $self->{queue}->pending(); + debug "mgr ($wid): getting potential jobs for $num_slots workers (NP)"; + + # get some normal priority jobs + # TODO also check for stale jobs in Netdisco DB + foreach my $job ( jq_getsome($num_slots) ) { + + # mark job as running + next unless jq_lock($job); + info sprintf "mgr (%s): job %s booked out for this processing node", + $wid, $job->job; + + # copy job to local queue + $self->{queue}->enqueue($job); } debug "mgr ($wid): sleeping now..."; prctl sprintf 'netdisco-daemon: worker #%s manager: idle', $wid; - sleep( setting('workers')->{sleep_time} || 2 ); + sleep( setting('workers')->{sleep_time} || 1 ); } } diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller.pm index 1350ba4e..001c327a 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller.pm @@ -11,10 +11,8 @@ with 'App::Netdisco::Daemon::Worker::Poller::Device', 'App::Netdisco::Daemon::Worker::Poller::Arpnip', 'App::Netdisco::Daemon::Worker::Poller::Macsuck', 'App::Netdisco::Daemon::Worker::Poller::Nbtstat', - 'App::Netdisco::Daemon::Worker::Poller::Expiry'; - -sub worker_tag { 'pol' } -sub worker_type { 'Poller' } -sub munge_action { $_[1] } + 'App::Netdisco::Daemon::Worker::Poller::Expiry', + 'App::Netdisco::Daemon::Worker::Interactive::DeviceActions', + 'App::Netdisco::Daemon::Worker::Interactive::PortActions'; 1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm index f7252ca3..a622fdbf 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm @@ -13,6 +13,10 @@ use App::Netdisco::JobQueue qw/jq_insert/; sub worker_begin { my $self = shift; my $wid = $self->wid; + + return debug "sch ($wid): no need for scheduler... skip begin" + unless setting('schedule'); + debug "entering Scheduler ($wid) worker_begin()"; foreach my $action (keys %{ setting('schedule') }) { @@ -34,6 +38,11 @@ sub worker_body { my $self = shift; my $wid = $self->wid; + unless (setting('schedule')) { + prctl sprintf 'netdisco-daemon: worker #%s scheduler: inactive', $wid; + return debug "sch ($wid): no need for scheduler... quitting" + } + while (1) { # sleep until some point in the next minute my $naptime = 60 - (time % 60) + int(rand(45)); diff --git a/Netdisco/lib/App/Netdisco/JobQueue.pm b/Netdisco/lib/App/Netdisco/JobQueue.pm index 6c72b2c5..fd93e726 100644 --- a/Netdisco/lib/App/Netdisco/JobQueue.pm +++ b/Netdisco/lib/App/Netdisco/JobQueue.pm @@ -10,11 +10,11 @@ use base 'Exporter'; our @EXPORT = (); our @EXPORT_OK = qw/ jq_getsome + jq_getsomep jq_locked jq_queued jq_log jq_userlog - jq_take jq_lock jq_defer jq_complete @@ -42,6 +42,10 @@ Returns a list of randomly selected queued jobs. Default is to return one job, unless C<$num> is provided. Jobs are returned as objects which implement the Netdisco job instance interface (see below). +=head2 jq_getsomep( $num? ) + +Same as C but for high priority jobs. + =head2 jq_locked() Returns the list of jobs currently booked out to this processing node (denoted @@ -64,12 +68,6 @@ Returns a list of jobs which have been entered into the queue by the passed C<$user>. Jobs are returned as objects which implement the Netdisco job instance interface (see below). -=head2 jq_take( $wid, $type, $max? ) - -Searches in the queue for jobs of type C<$type> and if up to C<$max> are -available, will book them out to the worker with ID C<$wid>. The default -number of booked jobs is 1. - =head2 jq_lock( $job ) Marks a job in the queue as booked out to this processing node (denoted by the @@ -108,10 +106,6 @@ jobs from the queue. =head2 id (auto) -=head2 type (required) - -=head2 wid (required, default 0) - =head2 entered =head2 started diff --git a/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm index dc66f6af..02d77d3a 100644 --- a/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm +++ b/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm @@ -3,6 +3,7 @@ package App::Netdisco::JobQueue::PostgreSQL; use Dancer qw/:moose :syntax :script/; use Dancer::Plugin::DBIC 'schema'; +use App::Netdisco::Daemon::Job; use Net::Domain 'hostfqdn'; use Module::Load (); use Try::Tiny; @@ -11,11 +12,11 @@ use base 'Exporter'; our @EXPORT = (); our @EXPORT_OK = qw/ jq_getsome + jq_getsomep jq_locked jq_queued jq_log jq_userlog - jq_take jq_lock jq_defer jq_complete @@ -25,23 +26,26 @@ our @EXPORT_OK = qw/ our %EXPORT_TAGS = ( all => \@EXPORT_OK ); sub jq_getsome { - my $num_slots = shift; + my ($num_slots, $prio) = @_; + return () if defined $num_slots and $num_slots eq '0'; + $num_slots ||= 1; + $prio ||= 'normal'; my @returned = (); my $rs = schema('netdisco')->resultset('Admin') ->search( - {status => 'queued'}, + {status => 'queued', action => { -in => setting('job_prio')->{$prio} } }, {order_by => 'random()', rows => ($num_slots || 1)}, ); while (my $job = $rs->next) { - my $job_type = setting('job_types')->{$job->action} or next; - push @returned, schema('daemon')->resultset('Admin') - ->new_result({ $job->get_columns, type => $job_type }); + push @returned, App::Netdisco::Daemon::Job->new({ $job->get_columns }); } return @returned; } +sub jq_getsomep { return jq_getsome(shift, 'high') } + sub jq_locked { my $fqdn = hostfqdn || 'localhost'; my @returned = (); @@ -50,9 +54,7 @@ sub jq_locked { ->search({status => "queued-$fqdn"}); while (my $job = $rs->next) { - my $job_type = setting('job_types')->{$job->action} or next; - push @returned, schema('daemon')->resultset('Admin') - ->new_result({ $job->get_columns, type => $job_type }); + push @returned, App::Netdisco::Daemon::Job->new({ $job->get_columns }); } return @returned; } @@ -68,51 +70,18 @@ sub jq_queued { } sub jq_log { - my @returned = (); - - my $rs = schema('netdisco')->resultset('Admin')->search({}, { + return schema('netdisco')->resultset('Admin')->search({}, { order_by => { -desc => [qw/entered device action/] }, rows => 50, - }); - - while (my $job = $rs->next) { - my $job_type = setting('job_types')->{$job->action} or next; - push @returned, schema('daemon')->resultset('Admin') - ->new_result({ $job->get_columns, type => $job_type }); - } - return @returned; + })->with_times->hri->all; } sub jq_userlog { my $user = shift; - my @returned = (); - - my $rs = schema('netdisco')->resultset('Admin')->search({ + return schema('netdisco')->resultset('Admin')->search({ username => $user, finished => { '>' => \"(now() - interval '5 seconds')" }, - }); - - while (my $job = $rs->next) { - my $job_type = setting('job_types')->{$job->action} or next; - push @returned, schema('daemon')->resultset('Admin') - ->new_result({ $job->get_columns, type => $job_type }); - } - return @returned; -} - -# PostgreSQL engine depends on LocalQueue, which is accessed synchronously via -# the main daemon process. This is only used by daemon workers which can use -# MCE ->do() method. -sub jq_take { - my ($wid, $type) = @_; - Module::Load::load 'MCE'; - - # be polite to SQLite database (that is, local CPU) - debug "$type ($wid): sleeping now..."; - sleep(1); - - debug "$type ($wid): asking for a job"; - MCE->do('take_jobs', $wid, $type); + })->with_times->hri->all; } sub jq_lock { @@ -124,7 +93,7 @@ sub jq_lock { try { schema('netdisco')->txn_do(sub { schema('netdisco')->resultset('Admin') - ->find($job->id, {for => 'update'}) + ->find($job->job, {for => 'update'}) ->update({ status => "queued-$fqdn" }); # remove any duplicate jobs, needed because we have race conditions @@ -138,30 +107,29 @@ sub jq_lock { }, {for => 'update'})->delete(); }); $happy = true; + } + catch { + error $_; }; return $happy; } -# PostgreSQL engine depends on LocalQueue, which is accessed synchronously via -# the main daemon process. This is only used by daemon workers which can use -# MCE ->do() method. sub jq_defer { my $job = shift; my $happy = false; try { - # other local workers are polling the central queue, so - # to prevent a race, first delete the job in our local queue - MCE->do('release_jobs', $job->id); - # lock db row and update to show job is available schema('netdisco')->txn_do(sub { schema('netdisco')->resultset('Admin') - ->find($job->id, {for => 'update'}) + ->find($job->job, {for => 'update'}) ->update({ status => 'queued', started => undef }); }); $happy = true; + } + catch { + error $_; }; return $happy; @@ -175,7 +143,7 @@ sub jq_complete { try { schema('netdisco')->txn_do(sub { schema('netdisco')->resultset('Admin') - ->find($job->id, {for => 'update'})->update({ + ->find($job->job, {for => 'update'})->update({ status => $job->status, log => $job->log, started => $job->started, @@ -183,6 +151,9 @@ sub jq_complete { }); }); $happy = true; + } + catch { + error $_; }; return $happy; @@ -208,6 +179,9 @@ sub jq_insert { ]); }); $happy = true; + } + catch { + error $_; }; return $happy; diff --git a/Netdisco/lib/App/Netdisco/Manual/Configuration.pod b/Netdisco/lib/App/Netdisco/Manual/Configuration.pod index 45302073..e82228ec 100644 --- a/Netdisco/lib/App/Netdisco/Manual/Configuration.pod +++ b/Netdisco/lib/App/Netdisco/Manual/Configuration.pod @@ -879,15 +879,17 @@ or remove any entries. Value: Settings Tree. Default: workers: - interactives: 2 - pollers: 10 - sleep_time: 2 + tasks: 'AUTO * 2' + sleep_time: 1 Control the activity of the backend daemon with this configuration setting. -C and C sets how many workers are started for -interactive jobs (port control) and polling jobs (discover, macsuck, arpnip) -on this node, respectively. Other nodes can have different settings. +C sets how many worker processes are started for interactive jobs (port +control) and polling jobs (discover, macsuck, arpnip) on this node. Other +nodes can have different settings. + +"C" is the number of CPU cores. Set C to "C<0>" to disable all +workers (which allows you to have a scheduler-only node). C is the number of seconds between polling the database to find new jobs. This is a balance between responsiveness and database load. @@ -919,9 +921,9 @@ If set, then this node's backend daemon will schedule polling jobs (discover, macsuck, arpnip, etc) in the central database. It's fine to have multiple nodes scheduling work for redundancy (but make sure they all have good NTP). -Note that this is independent of the Pollers configured in C. It's +Note that this is independent of the Tasks configured in C. It's okay to have this node schedule schedule but not do any of the polling -itself (C). +itself (C). Work can be scheduled using C style notation, or a simple weekday and hour fields (which accept same types as C notation). For example: diff --git a/Netdisco/lib/App/Netdisco/Manual/Developing.pod b/Netdisco/lib/App/Netdisco/Manual/Developing.pod index db993165..61c63a13 100644 --- a/Netdisco/lib/App/Netdisco/Manual/Developing.pod +++ b/Netdisco/lib/App/Netdisco/Manual/Developing.pod @@ -54,9 +54,6 @@ For the daemon, it's very similar: DBIC_TRACE=1 ~/bin/localenv bin/netdisco-daemon-fg -Don't be alarmed by the high rate of database queries in the daemon - most of -them are communicating only with a local in-memory SQLite database. - Happy hacking! =head1 Introduction @@ -418,24 +415,12 @@ the central Netdisco database and booking out jobs which it's able to service according to the local configuration settings. Jobs are "locked" in the central queue and then copied to a local job queue within the daemon. -Along with the Manager we start zero or more of two other types of worker. -Some jobs such as port control are "interactive" and the user typically wants -quick feedback on the results. Others such as polling are background tasks -which can take more time and are less schedule sensitive. So as not to starve -the "interactive" jobs of workers we have two types of worker. - -The Interactive worker picks jobs from the local job queue relating to device -and port reconfiguration only. It submits results directly back to the central -Netdisco database. The Poller worker similarly picks job from the local queue, -this time relating to device discovery and polling. - There is support in the daemon for the workers to pick more than one job at a time from the local queue, in case we decide this is worth doing. However the Manager won't ever book out more jobs from the central Netdisco job queue than it has workers available (so as not to hog jobs for itself against other daemons on other servers). The user is free to configure the number of -Interactive and Poller workers in their C file (zero or more of -each). +workers in their C file (zero or more). The fourth kind of worker is called the Scheduler and takes care of adding discover, macsuck, arpnip, and nbtstat jobs to the queue (which are in turn @@ -458,17 +443,6 @@ Configuration for SNMP::Info comes from the YAML files, of course. This means that our C and C settings are now in YAML format. In particular, the C list is a real list within the configuration. -=head2 DBIx::Class Layer - -The local job queue for each Job Daemon is actually an SQLite database running -in memory. This makes the queue management code a little more elegant. The -schema for this is of course DBIx::Class using Dancer connection management, -and lives in L. - -There is currently only one table, the port control job queue, in -L. It's likely this name will change -in the future. - =head1 Other Noteable Technology =head2 C diff --git a/Netdisco/lib/App/Netdisco/Manual/ReleaseNotes.pod b/Netdisco/lib/App/Netdisco/Manual/ReleaseNotes.pod index d928a64d..5bb9fd94 100644 --- a/Netdisco/lib/App/Netdisco/Manual/ReleaseNotes.pod +++ b/Netdisco/lib/App/Netdisco/Manual/ReleaseNotes.pod @@ -36,6 +36,19 @@ but they are backwards compatible. =back +=head1 2.029002 + +=head2 General Notices + +The backend polling daemon has been rewritten and as a result your +configuration can be simplified. Some keys have also been renamed. Our advice +is to remove (or comment out) the complete C configuration which +enables auto-tuning. If you do wish to control the number of worker +processes, follow this pattern: + + workers: + tasks: 'AUTO * 2' # this is the default, twice the number of CPUs + =head1 2.029001 =head2 Health Advice diff --git a/Netdisco/lib/App/Netdisco/Web/AdminTask.pm b/Netdisco/lib/App/Netdisco/Web/AdminTask.pm index 7134d86d..e0986398 100644 --- a/Netdisco/lib/App/Netdisco/Web/AdminTask.pm +++ b/Netdisco/lib/App/Netdisco/Web/AdminTask.pm @@ -25,7 +25,9 @@ sub add_job { }); } -foreach my $action (keys %{ setting('job_types') }) { +foreach my $action (@{ setting('job_prio')->{high} }, + @{ setting('job_prio')->{normal} }) { + ajax "/ajax/control/admin/$action" => require_role admin => sub { add_job($action, param('device'), param('extra')); }; diff --git a/Netdisco/share/config.yml b/Netdisco/share/config.yml index 1818672e..55d5a453 100644 --- a/Netdisco/share/config.yml +++ b/Netdisco/share/config.yml @@ -108,7 +108,11 @@ macsuck_no: [] macsuck_only: [] macsuck_all_vlans: false macsuck_no_unnamed: false -macsuck_no_vlan: [fddi-default,token-ring-default,fddinet-default,trnet-default] +macsuck_no_vlan: + - 'fddi-default' + - 'token-ring-default' + - 'fddinet-default' + - 'trnet-default' macsuck_no_devicevlan: [] macsuck_bleed: false macsuck_min_age: 0 @@ -170,9 +174,8 @@ port_control_reasons: # -------------- workers: - interactives: 1 - pollers: 10 - sleep_time: 2 + tasks: 'AUTO * 2' + sleep_time: 1 queue: PostgreSQL dns: @@ -194,26 +197,24 @@ dns: # expire: # when: '20 23 * * *' -job_types: - discoverall: Poller - discover: Poller - arpwalk: Poller - arpnip: Poller - macwalk: Poller - macsuck: Poller - nbtwalk: Poller - nbtstat: Poller - expire: Poller - location: Interactive - contact: Interactive - portcontrol: Interactive - portname: Interactive - vlan: Interactive - power: Interactive - -job_type_keys: - Poller: pollers - Interactive: interactives +job_prio: + high: + - location + - contact + - portcontrol + - portname + - vlan + - power + normal: + - discoverall + - discover + - arpwalk + - arpnip + - macwalk + - macsuck + - nbtwalk + - nbtstat + - expire # --------------- # GraphViz Export diff --git a/Netdisco/share/environments/deployment.yml b/Netdisco/share/environments/deployment.yml index 0d7f9814..501caac5 100644 --- a/Netdisco/share/environments/deployment.yml +++ b/Netdisco/share/environments/deployment.yml @@ -31,17 +31,17 @@ safe_password_store: true # SNMP community string(s) # ```````````````````````` snmp_auth: - - tag: 'v2default' + - tag: 'default_v2_readonly' community: 'public' read: true write: false -# - tag: 'v2default_w' +# - tag: 'default_v2_for_write' # community: 'private' # read: false # write: true -# daemon will keep netdisco up to date on this schedule -# ````````````````````````````````````````````````````` +# this is the schedule for automatically keeping netdisco up-to-date +# `````````````````````````````````````````````````````````````````` #schedule: # discoverall: # when: '5 7 * * *' @@ -56,16 +56,6 @@ snmp_auth: # expire: # when: '20 23 * * *' -# number of SNMP pollers to run in parallel (in netdisco-daemon) -# `````````````````````````````````````````````````````````````` -workers: - pollers: 10 - -# amount parallel DNS resolution for node names -# ````````````````````````````````````````````` -dns: - max_outstanding: 50 - # do not discover IP Phones or Wireless Access Points. # usually these are visible as device neighbors but don't support # SNMP, which just clogs up the job queue. @@ -74,6 +64,18 @@ discover_no_type: - '(?i)phone' - '(?i)(?:wap|wireless)' +# number of SNMP workers to run in parallel (in netdisco-daemon). +# the default is twice the number of CPU cores. increase this if +# your system has few cores and the schedule is taking too long. +# ``````````````````````````````````````````````````````````````` +#workers: +# tasks: 'AUTO * 2' + +# number of parallel DNS queries for node names +# ````````````````````````````````````````````` +#dns: +# max_outstanding: 50 + # set to true to globally disable authentication/login. # create a user called "guest" if you want to assign port/admin rights. # `````````````````````````````````````````````````````````````````````