diff --git a/Netdisco/Makefile.PL b/Netdisco/Makefile.PL index 4e2e7d6c..fa9678b1 100644 --- a/Netdisco/Makefile.PL +++ b/Netdisco/Makefile.PL @@ -27,6 +27,7 @@ requires 'HTTP::Tiny' => 0.029; requires 'JSON' => 0; requires 'List::MoreUtils' => 0.33; requires 'MIME::Base64' => 3.13; +requires 'Module::Load' => 0.32; requires 'Moo' => 1.001000; requires 'MCE' => 1.408; requires 'Net::Domain' => 1.23; diff --git a/Netdisco/bin/netdisco-daemon-fg b/Netdisco/bin/netdisco-daemon-fg index ef46edda..6b763ab6 100755 --- a/Netdisco/bin/netdisco-daemon-fg +++ b/Netdisco/bin/netdisco-daemon-fg @@ -18,22 +18,16 @@ use App::Netdisco; use Dancer qw/:moose :script/; warning sprintf "App::Netdisco %s backend", ($App::Netdisco::VERSION || 'HEAD'); -# callbacks and local job queue management -use App::Netdisco::Daemon::Queue ':all'; +# local job queue management +use App::Netdisco::Daemon::LocalQueue ':all'; # needed to quench AF_INET6 symbol errors use NetAddr::IP::Lite ':lower'; - +use List::Util 'sum'; use Role::Tiny::With; use MCE::Signal '-setpgrp'; use MCE; -# set defaults for AnyEvent::DNS -local $ENV{'PERL_ANYEVENT_MAX_OUTSTANDING_DNS'} - = setting('dns')->{max_outstanding} || 10; -local $ENV{'PERL_ANYEVENT_HOSTS'} - = setting('dns')->{hosts_file} || '/etc/hosts'; - # set temporary MCE files' location in home directory my $home = ($ENV{NETDISCO_HOME} || $ENV{HOME}); my $tmp_dir = ($ENV{NETDISCO_TEMP} || dir($home, 'tmp')); @@ -52,42 +46,41 @@ sub build_tasks_list { # NB MCE does not like max_workers => 0 my $tasks = []; - setting('workers')->{pollers} = 2 - if !defined setting('workers')->{pollers}; - setting('workers')->{interactives} = 2 - if !defined setting('workers')->{interactives}; - push @$tasks, { max_workers => 1, user_begin => worker_factory('Manager'), - } if setting('workers')->{pollers} or setting('workers')->{interactives}; + } if num_workers() > 0; push @$tasks, { max_workers => 1, user_begin => worker_factory('Scheduler'), - } if setting('housekeeping'); + } if setting('schedule'); - push @$tasks, { - max_workers => setting('workers')->{pollers}, - user_begin => worker_factory('Poller'), - } if setting('workers')->{pollers}; + my @logmsg = (); + foreach my $key (keys %{setting('job_type_keys')}) { + my $val = setting('job_type_keys')->{$key}; - push @$tasks, { - max_workers => setting('workers')->{interactives}, - user_begin => worker_factory('Interactive'), - } if setting('workers')->{interactives}; + setting('workers')->{$val} = 2 + if !defined setting('workers')->{$val}; - info sprintf "MCE will load: %s Manager, %s Scheduler, %s Poller, %s Interactive", - ((setting('workers')->{pollers} or setting('workers')->{interactives}) ? 1 : 0), - (setting('housekeeping') ? 1 : 0), - (setting('workers')->{pollers} || 0), - (setting('workers')->{interactives} || 0); + push @logmsg, setting('workers')->{$val} ." $key"; + push @$tasks, { + max_workers => setting('workers')->{$val}, + user_begin => worker_factory($key), + } if setting('workers')->{$val}; + } + + info sprintf "MCE will load: %s Manager, %s Scheduler, %s", + (num_workers() ? 1 : 0), + (setting('schedule') ? 1 : 0), + (join ', ', @logmsg); return $tasks; } sub num_workers { - return (setting('workers')->{pollers} + setting('workers')->{interactives}); + return sum( 0, map { setting('workers')->{$_} } + values %{setting('job_type_keys')} ); } sub worker_factory { diff --git a/Netdisco/bin/netdisco-do b/Netdisco/bin/netdisco-do index 8b633d5f..85feea73 100755 --- a/Netdisco/bin/netdisco-do +++ b/Netdisco/bin/netdisco-do @@ -59,12 +59,6 @@ $CONFIG->{log} = ($debug ? 'debug' : 'info'); # reconfigure logging to force console output Dancer::Logger->init('console', $CONFIG); -# set max outstanding requests for AnyEvent::DNS -local $ENV{'PERL_ANYEVENT_MAX_OUTSTANDING_DNS'} - = setting('dns')->{max_outstanding} || 10; -local $ENV{'PERL_ANYEVENT_HOSTS'} - = setting('dns')->{hosts_file} || '/etc/hosts'; - # for the in-memory local job queue schema('daemon')->deploy; diff --git a/Netdisco/lib/App/Netdisco.pm b/Netdisco/lib/App/Netdisco.pm index e24ff071..f4b25b5e 100644 --- a/Netdisco/lib/App/Netdisco.pm +++ b/Netdisco/lib/App/Netdisco.pm @@ -5,54 +5,7 @@ use warnings; use 5.010_000; our $VERSION = '2.027004'; - -use App::Netdisco::Environment; -use Dancer ':script'; - -# set up database schema config from simple config vars -if (ref {} eq ref setting('database')) { - my $name = (setting('database')->{name} || 'netdisco'); - my $host = setting('database')->{host}; - my $user = setting('database')->{user}; - my $pass = setting('database')->{pass}; - - my $dsn = "dbi:Pg:dbname=${name}"; - $dsn .= ";host=${host}" if $host; - - # set up the netdisco schema now we have access to the config - # but only if it doesn't exist from an earlier config style - setting('plugins')->{DBIC}->{netdisco} ||= { - dsn => $dsn, - user => $user, - password => $pass, - options => { - AutoCommit => 1, - RaiseError => 1, - auto_savepoint => 1, - }, - schema_class => 'App::Netdisco::DB', - }; - -} - -# static configuration for the in-memory local job queue -setting('plugins')->{DBIC}->{daemon} = { - dsn => 'dbi:SQLite:dbname=:memory:', - options => { - AutoCommit => 1, - RaiseError => 1, - sqlite_use_immediate_transaction => 1, - }, - schema_class => 'App::Netdisco::Daemon::DB', -}; - -# force skipped DNS resolution, if unset -setting('dns')->{no} ||= ['fe80::/64','169.254.0.0/16']; -setting('dns')->{hosts_file} ||= '/etc/hosts'; - -# housekeeping expire used to be called expiry -setting('housekeeping')->{expire} ||= setting('housekeeping')->{expiry} - if setting('housekeeping') and exists setting('housekeeping')->{expiry}; +use App::Netdisco::Configuration; =head1 NAME @@ -196,7 +149,7 @@ In the same file uncomment and edit the C setting to be appropriate for your local site. Change the C string setting if your site has different values, and -uncomment the C setting to enable SNMP data gathering from +uncomment the C setting to enable SNMP data gathering from devices (this replaces cron jobs in Netdisco 1). Have a quick read of the other settings to make sure you're happy, then move diff --git a/Netdisco/lib/App/Netdisco/Configuration.pm b/Netdisco/lib/App/Netdisco/Configuration.pm new file mode 100644 index 00000000..e9ecbc5c --- /dev/null +++ b/Netdisco/lib/App/Netdisco/Configuration.pm @@ -0,0 +1,60 @@ +package App::Netdisco::Configuration; + +use App::Netdisco::Environment; +use Dancer ':script'; + +# set up database schema config from simple config vars +if (ref {} eq ref setting('database')) { + my $name = (setting('database')->{name} || 'netdisco'); + my $host = setting('database')->{host}; + my $user = setting('database')->{user}; + my $pass = setting('database')->{pass}; + + my $dsn = "dbi:Pg:dbname=${name}"; + $dsn .= ";host=${host}" if $host; + + # set up the netdisco schema now we have access to the config + # but only if it doesn't exist from an earlier config style + setting('plugins')->{DBIC}->{netdisco} ||= { + dsn => $dsn, + user => $user, + password => $pass, + options => { + AutoCommit => 1, + RaiseError => 1, + auto_savepoint => 1, + }, + schema_class => 'App::Netdisco::DB', + }; + +} + +# static configuration for the in-memory local job queue +setting('plugins')->{DBIC}->{daemon} = { + dsn => 'dbi:SQLite:dbname=:memory:', + options => { + AutoCommit => 1, + RaiseError => 1, + sqlite_use_immediate_transaction => 1, + }, + schema_class => 'App::Netdisco::Daemon::DB', +}; + +# default queue model is Pg +setting('workers')->{queue} ||= 'PostgreSQL'; + +# force skipped DNS resolution, if unset +setting('dns')->{hosts_file} ||= '/etc/hosts'; +setting('dns')->{no} ||= ['fe80::/64','169.254.0.0/16']; + +# schedule expire used to be called expiry +setting('schedule')->{expire} ||= setting('schedule')->{expiry} + if setting('schedule') and exists setting('schedule')->{expiry}; + +# set max outstanding requests for AnyEvent::DNS +$ENV{'PERL_ANYEVENT_MAX_OUTSTANDING_DNS'} + = setting('dns')->{max_outstanding} || 50; +$ENV{'PERL_ANYEVENT_HOSTS'} + = setting('dns')->{hosts_file} || '/etc/hosts'; + +true; diff --git a/Netdisco/lib/App/Netdisco/Core/Discover.pm b/Netdisco/lib/App/Netdisco/Core/Discover.pm index 1b8fa01e..364b8ca9 100644 --- a/Netdisco/lib/App/Netdisco/Core/Discover.pm +++ b/Netdisco/lib/App/Netdisco/Core/Discover.pm @@ -5,6 +5,7 @@ use Dancer::Plugin::DBIC 'schema'; use App::Netdisco::Util::Device qw/get_device is_discoverable/; use App::Netdisco::Util::DNS ':all'; +use App::Netdisco::JobQueue qw/jq_queued jq_insert/; use NetAddr::IP::Lite ':lower'; use List::MoreUtils (); use Encode; @@ -900,20 +901,12 @@ sub discover_new_neighbors { next; } - # Don't queued if job already exists - my $is_queued = schema('netdisco')->resultset('Admin')->search( - { device => $ip, + # Don't queue if job already exists + if (List::MoreUtils::none {$_ eq $ip} jq_queued('discover')) { + jq_insert({ + device => $ip, action => 'discover', - status => { -like => 'queued%' }, - } - )->single; - unless ($is_queued) { - schema('netdisco')->resultset('Admin')->create( - { device => $ip, - action => 'discover', - status => 'queued', - } - ); + }); } } } diff --git a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm b/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm index 2806b8c4..8ad79af8 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm @@ -8,12 +8,9 @@ use base 'DBIx::Class::Core'; __PACKAGE__->table("admin"); __PACKAGE__->add_columns( "job", - { - data_type => "integer", - is_nullable => 0, - }, + { data_type => "integer", is_nullable => 0 }, - "role", # Poller, Interactive, etc + "type", # Poller, Interactive, etc { data_type => "text", is_nullable => 0 }, "wid", # worker ID, only != 0 once taken @@ -47,4 +44,11 @@ __PACKAGE__->add_columns( __PACKAGE__->set_primary_key("job"); +sub extra { (shift)->subaction } + +sub entered_stamp { + (my $stamp = (shift)->entered) =~ s/\.\d+$//; + return $stamp; +} + 1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/JobQueue.pm b/Netdisco/lib/App/Netdisco/Daemon/JobQueue.pm new file mode 100644 index 00000000..097582d5 --- /dev/null +++ b/Netdisco/lib/App/Netdisco/Daemon/JobQueue.pm @@ -0,0 +1,19 @@ +package App::Netdisco::Daemon::JobQueue; + +use Role::Tiny; +use namespace::clean; + +use Module::Load (); +Module::Load::load_remote 'JobQueue' => 'App::Netdisco::JobQueue' => ':all'; + +# central queue +sub jq_getsome { shift and JobQueue::jq_getsome(@_) } +sub jq_locked { shift and JobQueue::jq_locked(@_) } +sub jq_queued { shift and JobQueue::jq_queued(@_) } +sub jq_take { goto \&JobQueue::jq_take } +sub jq_lock { shift and JobQueue::jq_lock(@_) } +sub jq_defer { shift and JobQueue::jq_defer(@_) } +sub jq_complete { shift and JobQueue::jq_complete(@_) } +sub jq_insert { shift and JobQueue::jq_insert(@_) } + +1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm b/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm new file mode 100644 index 00000000..13834a3f --- /dev/null +++ b/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm @@ -0,0 +1,62 @@ +package App::Netdisco::Daemon::LocalQueue; + +use Dancer qw/:moose :syntax :script/; +use Dancer::Plugin::DBIC 'schema'; + +use base 'Exporter'; +our @EXPORT = (); +our @EXPORT_OK = qw/ add_jobs capacity_for take_jobs reset_jobs/; +our %EXPORT_TAGS = ( all => \@EXPORT_OK ); + +schema('daemon')->deploy; +my $queue = schema('daemon')->resultset('Admin'); + +sub add_jobs { + my (@jobs) = @_; + info sprintf "adding %s jobs to local queue", scalar @jobs; + schema('daemon')->dclone($_)->insert for @jobs; +} + +sub capacity_for { + my ($type) = @_; + debug "checking local capacity for worker type $type"; + + my $setting = setting('workers')->{ setting('job_type_keys')->{$type} }; + my $current = $queue->search({type => $type})->count; + return ($current < $setting); +} + +sub take_jobs { + my ($wid, $type, $max) = @_; + return () unless $wid > 1; + $max ||= 1; + + debug "deleting completed jobs by worker $wid"; + $queue->search({wid => $wid})->delete; + + debug "searching for $max new jobs for worker $wid (type $type)"; + my $rs = $queue->search( + {type => $type, wid => 0}, + {rows => $max}, + ); + + my @rows = $rs->all; + return [] if scalar @rows == 0; + + debug sprintf "booking out %s jobs to worker %s", (scalar @rows), $wid; + $queue->search({job => { -in => [map {$_->job} @rows] }}) + ->update({wid => $wid}); + + return \@rows; +} + +# not used by workers, only the daemon when reinitializing a worker +sub reset_jobs { + my ($wid) = @_; + debug "resetting jobs owned by worker $wid to be available"; + return unless $wid > 1; + $queue->search({wid => $wid}) + ->update({wid => 0}); +} + +1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Queue.pm b/Netdisco/lib/App/Netdisco/Daemon/Queue.pm deleted file mode 100644 index b64d2915..00000000 --- a/Netdisco/lib/App/Netdisco/Daemon/Queue.pm +++ /dev/null @@ -1,87 +0,0 @@ -package App::Netdisco::Daemon::Queue; - -use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; - -use base 'Exporter'; -our @EXPORT = (); -our @EXPORT_OK = qw/ add_jobs capacity_for take_jobs reset_jobs scrub_jobs /; -our %EXPORT_TAGS = ( all => \@EXPORT_OK ); - -schema('daemon')->deploy; -my $queue = schema('daemon')->resultset('Admin'); - -sub add_jobs { - my ($jobs) = @_; - info sprintf "adding %s jobs to local queue", scalar @$jobs; - $queue->populate($jobs); -} - -sub capacity_for { - my ($action) = @_; - debug "checking local capacity for action $action"; - - my $action_map = { - Poller => [ - qw/discoverall discover arpwalk arpnip macwalk macsuck nbtstat nbtwalk expire/ - ], - Interactive => [qw/location contact portcontrol portname vlan power/], - }; - - my $role_map = { - (map {$_ => 'Poller'} @{ $action_map->{Poller} }), - (map {$_ => 'Interactive'} @{ $action_map->{Interactive} }) - }; - - my $setting_map = { - Poller => 'pollers', - Interactive => 'interactives', - }; - - my $role = $role_map->{$action}; - my $setting = $setting_map->{$role}; - - my $current = $queue->search({role => $role})->count; - - return ($current < setting('workers')->{$setting}); -} - -sub take_jobs { - my ($wid, $role, $max) = @_; - $max ||= 1; - - # asking for more jobs means the current ones are done - debug "removing complete jobs for worker $wid from local queue"; - $queue->search({wid => $wid})->delete; - - debug "searching for $max new jobs for worker $wid (role $role)"; - my $rs = $queue->search( - {role => $role, wid => 0}, - {rows => $max}, - ); - - my @rows = $rs->all; - return [] if scalar @rows == 0; - - debug sprintf "booking out %s jobs to worker %s", scalar @rows, $wid; - $rs->update({wid => $wid}); - - return [ map {{$_->get_columns}} @rows ]; -} - -sub reset_jobs { - my ($wid) = @_; - debug "resetting jobs owned by worker $wid to be available"; - return unless $wid > 1; - $queue->search({wid => $wid}) - ->update({wid => 0}); -} - -sub scrub_jobs { - my ($wid) = @_; - debug "deleting jobs owned by worker $wid"; - return unless $wid > 1; - $queue->search({wid => $wid})->delete; -} - -1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm index 64292713..10baf3cf 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm @@ -1,87 +1,61 @@ package App::Netdisco::Daemon::Worker::Common; use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; use Try::Tiny; use Role::Tiny; use namespace::clean; -requires qw/worker_type worker_name munge_action/; +with 'App::Netdisco::Daemon::JobQueue'; sub worker_body { my $self = shift; my $wid = $self->wid; + my $tag = $self->worker_tag; my $type = $self->worker_type; - my $name = $self->worker_name; while (1) { - debug "$type ($wid): asking for a job"; - my $jobs = $self->do('take_jobs', $self->wid, $name); - - foreach my $candidate (@$jobs) { - # create a row object so we can use column accessors - # use the local db schema in case it is accidentally 'stored' - # (will throw an exception) - my $job = schema('daemon')->resultset('Admin') - ->new_result($candidate); - my $jid = $job->job; + my $jobs = $self->jq_take($self->wid, $type); + foreach my $job (@$jobs) { my $target = $self->munge_action($job->action); - next unless $self->can($target); - debug "$type ($wid): can ${target}() for job $jid"; - # do job - my ($status, $log); try { $job->started(scalar localtime); - info sprintf "$type (%s): starting %s job(%s) at %s", - $wid, $target, $jid, $job->started; - ($status, $log) = $self->$target($job); + info sprintf "$tag (%s): starting %s job(%s) at %s", + $wid, $target, $job->id, $job->started; + my ($status, $log) = $self->$target($job); + $job->status($status); + $job->log($log); } catch { - $status = 'error'; - $log = "error running job: $_"; - $self->sendto('stderr', $log ."\n"); + $job->status('error'); + $job->log("error running job: $_"); + $self->sendto('stderr', $job->log ."\n"); }; - $self->close_job($job, $status, $log); + $self->close_job($job); } - - debug "$type ($wid): sleeping now..."; - sleep(1); } } sub close_job { - my ($self, $job, $status, $log) = @_; - my $type = $self->worker_type; + my ($self, $job) = @_; + my $tag = $self->worker_tag; my $now = scalar localtime; - info sprintf "$type (%s): wrapping up %s job(%s) - status %s at %s", - $self->wid, $job->action, $job->job, $status, $now; + info sprintf "$tag (%s): wrapping up %s job(%s) - status %s at %s", + $self->wid, $job->action, $job->id, $job->status, $now; - # lock db row and either defer or complete the job try { - if ($status eq 'defer') { - schema('netdisco')->resultset('Admin') - ->find($job->job, {for => 'update'}) - ->update({ status => 'queued' }); + if ($job->status eq 'defer') { + $self->jq_defer($job); } else { - schema('netdisco')->resultset('Admin') - ->find($job->job, {for => 'update'}) - ->update({ - status => $status, - log => $log, - started => $job->started, - finished => $now, - }); + $job->finished($now); + $self->jq_complete($job); } - - # remove job from local queue - $self->do('scrub_jobs', $self->wid); } catch { $self->sendto('stderr', "error closing job: $_\n") }; } diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm index f0b13ec4..1ae2aa2c 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm @@ -10,8 +10,8 @@ with 'App::Netdisco::Daemon::Worker::Common'; with 'App::Netdisco::Daemon::Worker::Interactive::DeviceActions', 'App::Netdisco::Daemon::Worker::Interactive::PortActions'; -sub worker_type { 'int' } -sub worker_name { 'Interactive' } +sub worker_tag { 'int' } +sub worker_type { 'Interactive' } sub munge_action { 'set_' . $_[1] } 1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm index 134f94f1..d77c6952 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm @@ -1,104 +1,67 @@ package App::Netdisco::Daemon::Worker::Manager; use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; - -use Net::Domain 'hostfqdn'; -use Try::Tiny; use Role::Tiny; use namespace::clean; -my $fqdn = hostfqdn || 'localhost'; - -my $role_map = { - (map {$_ => 'Poller'} - qw/discoverall discover arpwalk arpnip macwalk macsuck nbtstat nbtwalk expire/), - (map {$_ => 'Interactive'} - qw/location contact portcontrol portname vlan power/) -}; +use List::Util 'sum'; +with 'App::Netdisco::Daemon::JobQueue'; sub worker_begin { my $self = shift; my $wid = $self->wid; debug "entering Manager ($wid) worker_begin()"; + if (setting('workers')->{'no_manager'}) { + return debug "mgr ($wid): no need for manager... skip begin"; + } + # requeue jobs locally debug "mgr ($wid): searching for jobs booked to this processing node"; - my $rs = schema('netdisco')->resultset('Admin') - ->search({status => "queued-$fqdn"}); - - my @jobs = map {{$_->get_columns}} $rs->all; + my @jobs = $self->jq_locked; if (scalar @jobs) { info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs; - map { $_->{role} = $role_map->{$_->{action}} } @jobs; - - $self->do('add_jobs', \@jobs); + $self->do('add_jobs', @jobs); } } sub worker_body { my $self = shift; my $wid = $self->wid; - my $num_slots = $self->do('num_workers') - or return debug "mgr ($wid): this node has no workers... quitting manager"; - # get some pending jobs - my $rs = schema('netdisco')->resultset('Admin') - ->search( - {status => 'queued'}, - {order_by => 'random()', rows => $num_slots}, - ); + return debug "mgr ($wid): no need for manager... quitting" + if setting('workers')->{'no_manager'}; + + my $num_slots = sum( 0, map { setting('workers')->{$_} } + values %{setting('job_type_keys')} ); while (1) { debug "mgr ($wid): getting potential jobs for $num_slots workers"; - while (my $job = $rs->next) { - my $jid = $job->job; + + # get some pending jobs + # TODO also check for stale jobs in Netdisco DB + foreach my $job ( $self->jq_getsome($num_slots) ) { # check for available local capacity - next unless $self->do('capacity_for', $job->action); + my $job_type = setting('job_types')->{$job->action}; + next unless $job_type and $self->do('capacity_for', $job_type); debug sprintf "mgr (%s): processing node has capacity for job %s (%s)", - $wid, $jid, $job->action; + $wid, $job->id, $job->action; # mark job as running - next unless $self->lock_job($job); + next unless $self->jq_lock($job); info sprintf "mgr (%s): job %s booked out for this processing node", - $wid, $jid; - - my $local_job = { $job->get_columns }; - $local_job->{role} = $role_map->{$job->action}; + $wid, $job->id; # copy job to local queue - $self->do('add_jobs', [$local_job]); + $self->do('add_jobs', $job); } - # reset iterator so ->next() triggers another DB query - $rs->reset; - - # TODO also check for stale jobs in Netdisco DB - debug "mgr ($wid): sleeping now..."; sleep( setting('workers')->{sleep_time} || 2 ); } } -sub lock_job { - my ($self, $job) = @_; - my $happy = 0; - - # lock db row and update to show job has been picked - try { - schema('netdisco')->txn_do(sub { - schema('netdisco')->resultset('Admin')->find( - {job => $job->job, status => 'queued'}, - {for => 'update'} - )->update({ status => "queued-$fqdn" }); - }); - $happy = 1; - }; - - return $happy; -} - 1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller.pm index 34559016..1350ba4e 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller.pm @@ -13,8 +13,8 @@ with 'App::Netdisco::Daemon::Worker::Poller::Device', 'App::Netdisco::Daemon::Worker::Poller::Nbtstat', 'App::Netdisco::Daemon::Worker::Poller::Expiry'; -sub worker_type { 'pol' } -sub worker_name { 'Poller' } +sub worker_tag { 'pol' } +sub worker_type { 'Poller' } sub munge_action { $_[1] } 1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Common.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Common.pm index 9f3f5cd2..95a2ba59 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Common.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Common.pm @@ -1,11 +1,11 @@ package App::Netdisco::Daemon::Worker::Poller::Common; use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; use App::Netdisco::Util::SNMP 'snmp_connect'; use App::Netdisco::Util::Device 'get_device'; use App::Netdisco::Daemon::Util ':all'; +use Dancer::Plugin::DBIC 'schema'; use NetAddr::IP::Lite ':lower'; @@ -16,36 +16,22 @@ use namespace::clean; sub _walk_body { my ($self, $job_type, $job) = @_; - my $action_method = $job_type .'_action'; - my $job_action = $self->$action_method; - my $layer_method = $job_type .'_layer'; my $job_layer = $self->$layer_method; - my $jobqueue = schema('netdisco')->resultset('Admin'); + my %queued = map {$_ => 1} $self->jq_queued($job_type); my @devices = schema('netdisco')->resultset('Device') - ->search({ip => { -not_in => - $jobqueue->search({ - device => { '!=' => undef}, - action => $job_type, - status => { -like => 'queued%' }, - })->get_column('device')->as_query - }})->has_layer($job_layer)->get_column('ip')->all; + ->has_layer($job_layer)->get_column('ip')->all; + my @filtered_devices = grep {!exists $queued{$_}} @devices; - my $filter_method = $job_type .'_filter'; - my $job_filter = $self->$filter_method; - - my @filtered_devices = grep {$job_filter->($_)} @devices; - - schema('netdisco')->resultset('Admin')->txn_do_locked(sub { - $jobqueue->populate([ + $self->jq_insert([ map {{ device => $_, action => $job_type, - status => 'queued', + username => $job->username, + userip => $job->userip, }} (@filtered_devices) - ]); - }); + ]); return job_done("Queued $job_type job for all devices"); } diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Device.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Device.pm index dcb42ec1..39742143 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Device.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Device.pm @@ -1,12 +1,12 @@ package App::Netdisco::Daemon::Worker::Poller::Device; use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; use App::Netdisco::Util::SNMP 'snmp_connect'; use App::Netdisco::Util::Device qw/get_device is_discoverable/; use App::Netdisco::Core::Discover ':all'; use App::Netdisco::Daemon::Util ':all'; +use Dancer::Plugin::DBIC 'schema'; use NetAddr::IP::Lite ':lower'; @@ -17,27 +17,19 @@ use namespace::clean; sub discoverall { my ($self, $job) = @_; - my $jobqueue = schema('netdisco')->resultset('Admin'); - my $devices = schema('netdisco')->resultset('Device') - ->search({ip => { -not_in => - $jobqueue->search({ - device => { '!=' => undef}, - action => 'discover', - status => { -like => 'queued%' }, - })->get_column('device')->as_query - }})->get_column('ip'); + my %queued = map {$_ => 1} $self->jq_queued('discover'); + my @devices = schema('netdisco')->resultset('Device') + ->get_column('ip')->all; + my @filtered_devices = grep {!exists $queued{$_}} @devices; - schema('netdisco')->resultset('Admin')->txn_do_locked(sub { - $jobqueue->populate([ + $self->jq_insert([ map {{ device => $_, action => 'discover', - status => 'queued', username => $job->username, userip => $job->userip, - }} ($devices->all) - ]); - }); + }} (@filtered_devices) + ]); return job_done("Queued discover job for all devices"); } @@ -48,7 +40,6 @@ sub discover { my $host = NetAddr::IP::Lite->new($job->device); my $device = get_device($host->addr); - my $jobqueue = schema('netdisco')->resultset('Admin'); if ($device->ip eq '0.0.0.0') { return job_error("discover failed: no device param (need -d ?)"); @@ -80,26 +71,20 @@ sub discover { # if requested, and the device has not yet been arpniped/macsucked, queue now if ($device->in_storage and $job->subaction and $job->subaction eq 'with-nodes') { if (!defined $device->last_macsuck) { - schema('netdisco')->txn_do(sub { - $jobqueue->create({ + $self->jq_insert({ device => $device->ip, action => 'macsuck', - status => 'queued', username => $job->username, userip => $job->userip, - }); }); } if (!defined $device->last_arpnip) { - schema('netdisco')->txn_do(sub { - $jobqueue->create({ + $self->jq_insert({ device => $device->ip, action => 'arpnip', - status => 'queued', username => $job->username, userip => $job->userip, - }); }); } } diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm index f6f24f2d..218798ac 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm @@ -1,38 +1,23 @@ package App::Netdisco::Daemon::Worker::Scheduler; use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; - use Algorithm::Cron; -use Try::Tiny; use Role::Tiny; use namespace::clean; -my $jobactions = { - map {$_ => undef} qw/ - discoverall - arpwalk - macwalk - nbtwalk - expire - / -# saveconfigs -# backup -}; +with 'App::Netdisco::Daemon::JobQueue'; sub worker_begin { my $self = shift; my $wid = $self->wid; debug "entering Scheduler ($wid) worker_begin()"; - foreach my $a (keys %$jobactions) { - next unless setting('housekeeping') - and exists setting('housekeeping')->{$a}; - my $config = setting('housekeeping')->{$a}; + foreach my $action (keys %{ setting('schedule') }) { + my $config = setting('schedule')->{$action}; # accept either single crontab format, or individual time fields - my $cron = Algorithm::Cron->new( + $config->{when} = Algorithm::Cron->new( base => 'local', %{ (ref {} eq ref $config->{when}) @@ -40,9 +25,6 @@ sub worker_begin { : {crontab => $config->{when}} } ); - - $jobactions->{$a} = $config; - $jobactions->{$a}->{when} = $cron; } } @@ -61,30 +43,21 @@ sub worker_body { my $win_end = $win_start + 60; # if any job is due, add it to the queue - foreach my $a (keys %$jobactions) { - next unless defined $jobactions->{$a}; - my $sched = $jobactions->{$a}; + foreach my $action (keys %{ setting('schedule') }) { + my $sched = setting('schedule')->{$action}; # next occurence of job must be in this minute's window - debug sprintf "sched ($wid): $a: win_start: %s, win_end: %s, next: %s", + debug sprintf "sched ($wid): $action: win_start: %s, win_end: %s, next: %s", $win_start, $win_end, $sched->{when}->next_time($win_start); next unless $sched->{when}->next_time($win_start) <= $win_end; # queue it! - # due to a table constraint, this will (intentionally) fail if a - # similar job is already queued. - try { - info "sched ($wid): queueing $a job"; - schema('netdisco')->resultset('Admin')->create({ - action => $a, - device => ($sched->{device} || undef), - subaction => ($sched->{extra} || undef), - status => 'queued', - }); - } - catch { - debug "sched ($wid): action $a was not queued (dupe?)"; - }; + info "sched ($wid): queueing $action job"; + $self->jq_insert({ + action => $action, + device => $sched->{device}, + extra => $sched->{extra}, + }); } } } diff --git a/Netdisco/lib/App/Netdisco/JobQueue.pm b/Netdisco/lib/App/Netdisco/JobQueue.pm new file mode 100644 index 00000000..6c72b2c5 --- /dev/null +++ b/Netdisco/lib/App/Netdisco/JobQueue.pm @@ -0,0 +1,141 @@ +package App::Netdisco::JobQueue; + +use Dancer qw/:moose :syntax :script/; + +use Module::Load (); +Module::Load::load + 'App::Netdisco::JobQueue::' . setting('workers')->{queue} => ':all'; + +use base 'Exporter'; +our @EXPORT = (); +our @EXPORT_OK = qw/ + jq_getsome + jq_locked + jq_queued + jq_log + jq_userlog + jq_take + jq_lock + jq_defer + jq_complete + jq_insert + jq_delete +/; +our %EXPORT_TAGS = ( all => \@EXPORT_OK ); + +=head1 NAME + +App::Netdisco::JobQueue + +=head1 DESCRIPTION + +Interface for Netdisco job queue. + +There are no default exports, however the C<:all> tag will export all +subroutines. + +=head1 EXPORT_OK + +=head2 jq_getsome( $num? ) + +Returns a list of randomly selected queued jobs. Default is to return one job, +unless C<$num> is provided. Jobs are returned as objects which implement the +Netdisco job instance interface (see below). + +=head2 jq_locked() + +Returns the list of jobs currently booked out to this processing node (denoted +by the local hostname). Jobs are returned as objects which implement the +Netdisco job instance interface (see below). + +=head2 jq_queued( $job_type ) + +Returns a list of IP addresses of devices which currently have a job of the +given C<$job_type> queued (e.g. C, C, etc). + +=head2 jq_log() + +Returns a list of the most recent 50 jobs in the queue. Jobs are returned as +objects which implement the Netdisco job instance interface (see below). + +=head2 jq_userlog( $user ) + +Returns a list of jobs which have been entered into the queue by the passed +C<$user>. Jobs are returned as objects which implement the Netdisco job +instance interface (see below). + +=head2 jq_take( $wid, $type, $max? ) + +Searches in the queue for jobs of type C<$type> and if up to C<$max> are +available, will book them out to the worker with ID C<$wid>. The default +number of booked jobs is 1. + +=head2 jq_lock( $job ) + +Marks a job in the queue as booked out to this processing node (denoted by the +local hostname). The C<$job> parameter must be an object which implements the +Netdisco job instance interface (see below). + +Returns true if successful else returns false. + +=head2 jq_defer( $job ) + +Marks a job in the queue as available for taking. This is usually done after a +job is booked but the processing node changes its mind and decides to return +the job to the queue. The C<$job> parameter must be an object which implements +the Netdisco job instance interface (see below). + +Returns true if successful else returns false. + +=head2 jq_complete( $job ) + +Marks a job as complete. The C<$job> parameter must be an object which +implements the Netdisco job instance interface (see below). The queue item's +status, log and finished fields will be updated from the passed C<$job>. + +Returns true if successful else returns false. + +=head2 jq_insert( \%job | [ \%job, \%job ...] ) + +Adds the passed jobs to the queue. + +=head2 jq_delete( $id? ) + +If passed the ID of a job, deletes it from the queue. Otherwise deletes ALL +jobs from the queue. + +=head1 Job Instance Interface + +=head2 id (auto) + +=head2 type (required) + +=head2 wid (required, default 0) + +=head2 entered + +=head2 started + +=head2 finished + +=head2 device + +=head2 port + +=head2 action + +=head2 subaction or extra + +=head2 status + +=head2 username + +=head2 userip + +=head2 log + +=head2 debug + +=cut + +true; diff --git a/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm new file mode 100644 index 00000000..f8f81a69 --- /dev/null +++ b/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm @@ -0,0 +1,221 @@ +package App::Netdisco::JobQueue::PostgreSQL; + +use Dancer qw/:moose :syntax :script/; +use Dancer::Plugin::DBIC 'schema'; + +use Net::Domain 'hostfqdn'; +use Try::Tiny; + +use base 'Exporter'; +our @EXPORT = (); +our @EXPORT_OK = qw/ + jq_getsome + jq_locked + jq_queued + jq_log + jq_userlog + jq_take + jq_lock + jq_defer + jq_complete + jq_insert + jq_delete +/; +our %EXPORT_TAGS = ( all => \@EXPORT_OK ); + +sub jq_getsome { + my $num_slots = shift; + my @returned = (); + + my $rs = schema('netdisco')->resultset('Admin') + ->search( + {status => 'queued'}, + {order_by => 'random()', rows => ($num_slots || 1)}, + ); + + while (my $job = $rs->next) { + my $job_type = setting('job_types')->{$job->action} or next; + push @returned, schema('daemon')->resultset('Admin') + ->new_result({ $job->get_columns, type => $job_type }); + } + return @returned; +} + +sub jq_locked { + my $fqdn = hostfqdn || 'localhost'; + my @returned = (); + + my $rs = schema('netdisco')->resultset('Admin') + ->search({status => "queued-$fqdn"}); + + while (my $job = $rs->next) { + my $job_type = setting('job_types')->{$job->action} or next; + push @returned, schema('daemon')->resultset('Admin') + ->new_result({ $job->get_columns, type => $job_type }); + } + return @returned; +} + +sub jq_queued { + my $job_type = shift; + + return schema('netdisco')->resultset('Admin')->search({ + device => { '!=' => undef}, + action => $job_type, + status => { -like => 'queued%' }, + })->get_column('device')->all; +} + +sub jq_log { + my @returned = (); + + my $rs = schema('netdisco')->resultset('Admin')->search({}, { + order_by => { -desc => [qw/entered device action/] }, + rows => 50, + }); + + while (my $job = $rs->next) { + my $job_type = setting('job_types')->{$job->action} or next; + push @returned, schema('daemon')->resultset('Admin') + ->new_result({ $job->get_columns, type => $job_type }); + } + return @returned; +} + +sub jq_userlog { + my $user = shift; + my @returned = (); + + my $rs = schema('netdisco')->resultset('Admin')->search({ + username => $user, + finished => { '>' => \"(now() - interval '5 seconds')" }, + }); + + while (my $job = $rs->next) { + my $job_type = setting('job_types')->{$job->action} or next; + push @returned, schema('daemon')->resultset('Admin') + ->new_result({ $job->get_columns, type => $job_type }); + } + return @returned; +} + +# PostgreSQL engine depends on LocalQueue, which is accessed synchronously via +# the main daemon process. This is only used by daemon workers which can use +# MCE ->do() method. +sub jq_take { + my ($self, $wid, $type) = @_; + + # be polite to SQLite database (that is, local CPU) + debug "$type ($wid): sleeping now..."; + sleep(1); + + debug "$type ($wid): asking for a job"; + $self->do('take_jobs', $wid, $type); +} + +sub jq_lock { + my $job = shift; + my $fqdn = hostfqdn || 'localhost'; + my $happy = false; + + # lock db row and update to show job has been picked + try { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin') + ->find($job->id, {for => 'update'}) + ->update({ status => "queued-$fqdn" }); + + # remove any duplicate jobs, needed because we have race conditions + # when queueing jobs of a type for all devices + schema('netdisco')->resultset('Admin')->search({ + status => 'queued', + device => $job->device, + port => $job->port, + action => $job->action, + subaction => $job->subaction, + }, {for => 'update'})->delete(); + }); + $happy = true; + }; + + return $happy; +} + +sub jq_defer { + my $job = shift; + my $happy = false; + + # lock db row and update to show job is available + try { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin') + ->find($job->id, {for => 'update'}) + ->update({ status => 'queued' }); + }); + $happy = true; + }; + + return $happy; +} + +sub jq_complete { + my $job = shift; + my $happy = false; + + # lock db row and update to show job is done/error + try { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin') + ->find($job->id, {for => 'update'})->update({ + status => $job->status, + log => $job->log, + finished => $job->finished, + }); + }); + $happy = true; + }; + + return $happy; +} + +sub jq_insert { + my $jobs = shift; + $jobs = [$jobs] if ref [] ne ref $jobs; + my $happy = false; + + try { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin')->populate([ + map {{ + device => $_->{device}, + port => $_->{port}, + action => $_->{action}, + subaction => ($_->{extra} || $_->{subaction}), + username => $_->{username}, + userip => $_->{userip}, + status => 'queued', + }} @$jobs + ]); + }); + $happy = true; + }; + + return $happy; +} + +sub jq_delete { + my $id = shift; + + if ($id) { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin')->find($id)->delete(); + }); + } + else { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin')->delete(); + }); + } +} + +true; diff --git a/Netdisco/lib/App/Netdisco/Manual/Configuration.pod b/Netdisco/lib/App/Netdisco/Manual/Configuration.pod index f8599c7e..d691f6da 100644 --- a/Netdisco/lib/App/Netdisco/Manual/Configuration.pod +++ b/Netdisco/lib/App/Netdisco/Manual/Configuration.pod @@ -870,7 +870,7 @@ library default of 10. C is a list of IP addresses or CIDR ranges to excluded from DNS resolution. Link local addresses are excluded by default. -=head3 C +=head3 C Value: Settings Tree. Default: None. @@ -879,13 +879,13 @@ macsuck, arpnip, etc) in the central database. It's fine to have multiple nodes scheduling work for redundancy (but make sure they all have good NTP). Note that this is independent of the Pollers configured in C. It's -okay to have this node schedule housekeeping but not do any of the polling +okay to have this node schedule schedule but not do any of the polling itself (C). Work can be scheduled using C style notation, or a simple weekday and hour fields (which accept same types as C notation). For example: - housekeeping: + schedule: discoverall: when: '0 9 * * *' arpwalk: diff --git a/Netdisco/lib/App/Netdisco/Manual/Developing.pod b/Netdisco/lib/App/Netdisco/Manual/Developing.pod index e40eace7..dea39857 100644 --- a/Netdisco/lib/App/Netdisco/Manual/Developing.pod +++ b/Netdisco/lib/App/Netdisco/Manual/Developing.pod @@ -440,7 +440,7 @@ each). The fourth kind of worker is called the Scheduler and takes care of adding discover, macsuck, arpnip, and nbtstat jobs to the queue (which are in turn handled by the Poller worker). This worker is automatically started only if -the user has enabled the "C" section of their +the user has enabled the "C" section of their C site config. =head2 SNMP::Info diff --git a/Netdisco/lib/App/Netdisco/Manual/ReleaseNotes.pod b/Netdisco/lib/App/Netdisco/Manual/ReleaseNotes.pod index afaa18ee..2115019a 100644 --- a/Netdisco/lib/App/Netdisco/Manual/ReleaseNotes.pod +++ b/Netdisco/lib/App/Netdisco/Manual/ReleaseNotes.pod @@ -36,6 +36,14 @@ but they are backwards compatible. =back +=head1 2.028000 + +=head2 General Changes + +The configuration item C has been renamed to C. Old +configuration will continue to work, but we recommend you rename this key in +your configuration anyway. + =head1 2.025001 =head2 General Changes diff --git a/Netdisco/lib/App/Netdisco/Web.pm b/Netdisco/lib/App/Netdisco/Web.pm index c39a6ce1..dab64938 100644 --- a/Netdisco/lib/App/Netdisco/Web.pm +++ b/Netdisco/lib/App/Netdisco/Web.pm @@ -10,6 +10,7 @@ use Socket6 (); # to ensure dependency is met use HTML::Entities (); # to ensure dependency is met use URI::QueryParam (); # part of URI, to add helper methods use Path::Class 'dir'; +use Module::Load (); use App::Netdisco::Util::Web 'interval_to_daterange'; use App::Netdisco::Web::AuthN; @@ -34,8 +35,7 @@ sub _load_web_plugins { $plugin =~ s/^\+//; debug "loading Netdisco plugin $plugin"; - eval "require $plugin"; - error $@ if $@; + Module::Load::load $plugin; } } diff --git a/Netdisco/lib/App/Netdisco/Web/AdminTask.pm b/Netdisco/lib/App/Netdisco/Web/AdminTask.pm index 2f13803f..7134d86d 100644 --- a/Netdisco/lib/App/Netdisco/Web/AdminTask.pm +++ b/Netdisco/lib/App/Netdisco/Web/AdminTask.pm @@ -5,25 +5,10 @@ use Dancer::Plugin::Ajax; use Dancer::Plugin::DBIC; use Dancer::Plugin::Auth::Extensible; -use Try::Tiny; - -# we have a separate list for jobs needing a device to avoid queueing -# such a job when there's no device param (it could still be duff, tho). -my %jobs = map { $_ => 1} qw/ - discover - macsuck - arpnip - nbtstat -/; -my %jobs_all = map {$_ => 1} qw/ - discoverall - macwalk - arpwalk - nbtwalk -/; +use App::Netdisco::JobQueue 'jq_insert'; sub add_job { - my ($jobtype, $device, $subaction) = @_; + my ($action, $device, $subaction) = @_; if ($device) { $device = NetAddr::IP::Lite->new($device); @@ -31,32 +16,22 @@ sub add_job { if ! $device or $device->addr eq '0.0.0.0'; } - # job might already be in the queue, so this could die - try { - schema('netdisco')->resultset('Admin')->create({ - ($device ? (device => $device->addr) : ()), - action => $jobtype, - ($subaction ? (subaction => $subaction) : ()), - status => 'queued', - (exists $jobs{$jobtype} ? (username => session('logged_in_user')) : ()), - userip => request->remote_address, - }); - }; + jq_insert({ + ($device ? (device => $device->addr) : ()), + action => $action, + ($subaction ? (subaction => $subaction) : ()), + username => session('logged_in_user'), + userip => request->remote_address, + }); } -foreach my $jobtype (keys %jobs_all, keys %jobs) { - ajax "/ajax/control/admin/$jobtype" => require_role admin => sub { - send_error('Missing device', 400) - if exists $jobs{$jobtype} and not param('device'); - - add_job($jobtype, param('device'), param('extra')); +foreach my $action (keys %{ setting('job_types') }) { + ajax "/ajax/control/admin/$action" => require_role admin => sub { + add_job($action, param('device'), param('extra')); }; - post "/admin/$jobtype" => require_role admin => sub { - send_error('Missing device', 400) - if exists $jobs{$jobtype} and not param('device'); - - add_job($jobtype, param('device'), param('extra')); + post "/admin/$action" => require_role admin => sub { + add_job($action, param('device'), param('extra')); redirect uri_for('/admin/jobqueue')->path; }; } diff --git a/Netdisco/lib/App/Netdisco/Web/Plugin/AdminTask/JobQueue.pm b/Netdisco/lib/App/Netdisco/Web/Plugin/AdminTask/JobQueue.pm index 27767ce9..5cf97cac 100644 --- a/Netdisco/lib/App/Netdisco/Web/Plugin/AdminTask/JobQueue.pm +++ b/Netdisco/lib/App/Netdisco/Web/Plugin/AdminTask/JobQueue.pm @@ -6,6 +6,7 @@ use Dancer::Plugin::DBIC; use Dancer::Plugin::Auth::Extensible; use App::Netdisco::Web::Plugin; +use App::Netdisco::JobQueue qw/jq_log jq_delete/; register_admin_task({ tag => 'jobqueue', @@ -14,30 +15,17 @@ register_admin_task({ ajax '/ajax/control/admin/jobqueue/del' => require_role admin => sub { send_error('Missing job', 400) unless param('job'); - - schema('netdisco')->txn_do(sub { - my $device = schema('netdisco')->resultset('Admin') - ->search({job => param('job')})->delete; - }); + jq_delete( param('job') ); }; ajax '/ajax/control/admin/jobqueue/delall' => require_role admin => sub { - schema('netdisco')->txn_do(sub { - my $device = schema('netdisco')->resultset('Admin')->delete; - }); + jq_delete(); }; ajax '/ajax/content/admin/jobqueue' => require_role admin => sub { - my $set = schema('netdisco')->resultset('Admin') - ->with_times - ->search({}, { - order_by => { -desc => [qw/entered device action/] }, - rows => 50, - }); - content_type('text/html'); template 'ajax/admintask/jobqueue.tt', { - results => $set, + results => [ jq_log ], }, { layout => undef }; }; diff --git a/Netdisco/lib/App/Netdisco/Web/PortControl.pm b/Netdisco/lib/App/Netdisco/Web/PortControl.pm index b362fb70..d4a182e8 100644 --- a/Netdisco/lib/App/Netdisco/Web/PortControl.pm +++ b/Netdisco/lib/App/Netdisco/Web/PortControl.pm @@ -5,6 +5,8 @@ use Dancer::Plugin::Ajax; use Dancer::Plugin::DBIC; use Dancer::Plugin::Auth::Extensible; +use App::Netdisco::JobQueue qw/jq_insert jq_userlog/; + ajax '/ajax/portcontrol' => require_role port_control => sub { send_error('No device/port/field', 400) unless param('device') and (param('port') or param('field')); @@ -44,12 +46,11 @@ ajax '/ajax/portcontrol' => require_role port_control => sub { }); } - schema('netdisco')->resultset('Admin')->create({ + jq_insert({ device => param('device'), port => param('port'), action => $action, subaction => $subaction, - status => 'queued', username => session('logged_in_user'), userip => request->remote_address, log => $log, @@ -61,21 +62,20 @@ ajax '/ajax/portcontrol' => require_role port_control => sub { }; ajax '/ajax/userlog' => require_login sub { - my $rs = schema('netdisco')->resultset('Admin')->search({ - username => session('logged_in_user'), - action => [qw/location contact portcontrol portname vlan power - discover macsuck arpnip/], - finished => { '>' => \"(now() - interval '5 seconds')" }, - }); + my @jobs = jq_userlog( session('logged_in_user') ); my %status = ( - 'done' => [ - map {s/\[\]/<empty>/; $_} - $rs->search({status => 'done'})->get_column('log')->all + 'done' => [ + map {s/\[\]/<empty>/; $_} + map { $_->log } + grep { $_->status eq 'done' } + @jobs ], 'error' => [ - map {s/\[\]/<empty>/; $_} - $rs->search({status => 'error'})->get_column('log')->all + map {s/\[\]/<empty>/; $_} + map { $_->log } + grep { $_->status eq 'error' } + @jobs ], ); diff --git a/Netdisco/share/config.yml b/Netdisco/share/config.yml index 5fffcfee..18fc4f94 100644 --- a/Netdisco/share/config.yml +++ b/Netdisco/share/config.yml @@ -167,13 +167,14 @@ workers: interactives: 2 pollers: 10 sleep_time: 2 + queue: PostgreSQL dns: max_outstanding: 50 hosts_file: '/etc/hosts' no: ['fe80::/64','169.254.0.0/16'] -#housekeeping: +#schedule: # discoverall: # when: '5 7 * * *' # macwalk: @@ -187,6 +188,27 @@ dns: # expire: # when: '20 23 * * *' +job_types: + discoverall: Poller + discover: Poller + arpwalk: Poller + arpnip: Poller + macwalk: Poller + macsuck: Poller + nbtwalk: Poller + nbtstat: Poller + expire: Poller + location: Interactive + contact: Interactive + portcontrol: Interactive + portname: Interactive + vlan: Interactive + power: Interactive + +job_type_keys: + Poller: pollers + Interactive: interactives + # --------------- # DANCER INTERNAL # --------------- diff --git a/Netdisco/share/environments/deployment.yml b/Netdisco/share/environments/deployment.yml index 14d5d763..fc7f795d 100644 --- a/Netdisco/share/environments/deployment.yml +++ b/Netdisco/share/environments/deployment.yml @@ -42,7 +42,7 @@ snmp_auth: # daemon will keep netdisco up to date on this schedule # ````````````````````````````````````````````````````` -#housekeeping: +#schedule: # discoverall: # when: '5 7 * * *' # macwalk: diff --git a/Netdisco/share/views/ajax/admintask/jobqueue.tt b/Netdisco/share/views/ajax/admintask/jobqueue.tt index 77fd4076..aa034549 100644 --- a/Netdisco/share/views/ajax/admintask/jobqueue.tt +++ b/Netdisco/share/views/ajax/admintask/jobqueue.tt @@ -1,4 +1,4 @@ -[% IF NOT results.has_rows %] +[% IF NOT results.size %]
The job queue is empty.
[% ELSE %] @@ -17,7 +17,7 @@ - [% WHILE (row = results.next) %] + [% FOREACH row IN results %]