From 9685eb182a9e390679ff63d47e7734d71b23f870 Mon Sep 17 00:00:00 2001 From: Oliver Gorwits Date: Wed, 21 May 2014 21:20:27 +0100 Subject: [PATCH] Merge pluggable job queue branch. Squashed commit of the following: commit e2ca15c0f8f3a45026ff8b9f78673f97aefbcd87 Merge: 0a90308 ffcf6ed Author: Oliver Gorwits Date: Wed May 21 21:18:58 2014 +0100 Merge branch 'master' into og-pluggable-daemon commit 0a90308ecf893b4fa09e139fae534a7b5fecd015 Merge: e80c575 ee398fc Author: Oliver Gorwits Date: Sat May 17 22:20:40 2014 +0100 Merge branch 'master' into og-pluggable-daemon Conflicts: Netdisco/lib/App/Netdisco.pm commit e80c575c576fc1fb12ce850e4ff22a75fdb7859d Author: Oliver Gorwits Date: Sat May 17 22:14:44 2014 +0100 move worker sleep into jobqueue commit c83b9995977b486daeb806b1d09a33deabccf714 Author: Oliver Gorwits Date: Sat May 17 22:01:43 2014 +0100 support disable manager from jobqueue dynamic code commit 4792b0dc49ceee847b82bf891b372c3ecdf96f0e Author: Oliver Gorwits Date: Sat May 17 21:34:28 2014 +0100 fix pod name commit 187fc849375ff27c644ce05a014b8c0e8ab8d5b8 Author: Oliver Gorwits Date: Sat May 17 21:22:06 2014 +0100 better naming commit 1c43aaa0f4a0a3940964b647883a5ec067afc8e0 Author: Oliver Gorwits Date: Sat May 17 21:18:49 2014 +0100 make worker use only JobQueue not LocalQueue directly commit 5316058ba817ef00e98d392328e7c9c9391df47c Author: Oliver Gorwits Date: Sat May 17 20:42:19 2014 +0100 remove unecessary scrub subroutine commit 8077e3de9dc40a67e496761b0f996ad3b46b65ec Author: Oliver Gorwits Date: Sat May 17 20:31:18 2014 +0100 remove any duplicate jobs when locking commit d4b5e4e6cdd4a79fa32a616a79e05908011512aa Author: Oliver Gorwits Date: Sat May 17 20:20:32 2014 +0100 rename DefaultSettings to Configuration commit aacb149d09261c045fd539aa1269a5fc2e6620ca Author: Oliver Gorwits Date: Sat May 17 19:57:45 2014 +0100 no need to check - mgr is not started if 0 workers commit 46ebe4cd6abd2d9dbae2a7c1851b5f235ba02dfc Author: Oliver Gorwits Date: Sat May 17 19:50:37 2014 +0100 remove unecessary job scrub commit 60522fe555f65d4d2bf621bb479439308cac2274 Author: Oliver Gorwits Date: Sat May 17 19:27:53 2014 +0100 fixes for DefaultSettings commit 2c6f0dd0f7d66759d11bab6f71defb0b7d2b4ea1 Author: Oliver Gorwits Date: Sat May 17 19:11:50 2014 +0100 rename housekeeping to schedule commit c12034d2b0d213f38bc7c74b86f2ddf4bbe0175e Author: Oliver Gorwits Date: Sat May 17 19:06:22 2014 +0100 new DefaultSettings package, and mv queue to be key of workers commit 49e9079f9a9f6130f75a40290e2bd9fc1eef7afa Merge: ec8ad3b 213f44e Author: Oliver Gorwits Date: Sat May 17 08:00:02 2014 +0100 Merge branch 'master' into og-pluggable-daemon commit ec8ad3b2d8721a036280cd485b1ca48870a97528 Author: Oliver Gorwits Date: Sun May 11 01:18:21 2014 +0100 fix entered_stamp commit 471724dd89717021775c83563e8b746ce077366a Author: Oliver Gorwits Date: Sat May 10 23:44:14 2014 +0100 fix auto hack commit 4620deff33c1cf7b12273abb05f5443da106e7a6 Author: Oliver Gorwits Date: Sat May 10 23:27:11 2014 +0100 final migration commit 5413e34e832c6a7f7df49ea9a41f670c95956c60 Author: Oliver Gorwits Date: Sat May 10 23:18:12 2014 +0100 more JobQueue migration commit 9569bda4d876aa7b43a2ff126c8b4481d1f640e0 Author: Oliver Gorwits Date: Sat May 10 22:44:20 2014 +0100 migrate to JobQueue :) commit 41ee8f91f2fead03b4ecdd315b747f599153577e Author: Oliver Gorwits Date: Sat May 10 22:38:20 2014 +0100 simplify again commit 58cba4da24d9f14bce8a6beeaf5cb7a5975911a2 Author: Oliver Gorwits Date: Sat May 10 22:06:41 2014 +0100 add POD for JobQueue commit c9afbab26b80e85a373330c5c3f1a3297b7f6b00 Author: Oliver Gorwits Date: Sat May 10 21:36:01 2014 +0100 use Module::Load tricks to avoid some other mess commit 50c72c1d648d241af058352b2e9160856d918401 Author: Oliver Gorwits Date: Sat May 10 21:12:52 2014 +0100 use Module::Load for dynamic loading commit 54510a1560cba514b2e97d22808dfa699e157f9d Author: Oliver Gorwits Date: Thu May 8 22:05:10 2014 +0100 hack to make functional and OO interface commit b8c706a2e749cc16eefc6f8bc419e2a90a2444c2 Author: Oliver Gorwits Date: Thu May 8 21:29:31 2014 +0100 simplify role apply for jobqueue commit 8a816b97645f0a4cb44a18e4f3ba69000136eb1f Author: Oliver Gorwits Date: Tue May 6 22:20:50 2014 +0100 remove debug print commit f3131adfc884eb2eee658c4b5426c84a42beb6f5 Author: Oliver Gorwits Date: Tue May 6 21:47:30 2014 +0100 big patch to remove knowledge of DB from most worker code commit 39a0efb3c35faade8303a4c96a45ac40d8633f90 Author: Oliver Gorwits Date: Mon Apr 28 23:46:10 2014 +0100 port Worker Common to pluggable jobqueue commit 8c0614357a7b5253a68f50e48ec1078e6f70ccfb Author: Oliver Gorwits Date: Mon Apr 28 23:04:13 2014 +0100 port Scheduler to pluggable jobqueue commit 3882c157ec89d564df41db84f9fe125a17edd8a2 Merge: 44e6c49 2480646 Author: Oliver Gorwits Date: Mon Apr 28 22:36:57 2014 +0100 Merge branch 'master' into og-pluggable-daemon commit 44e6c49419e4a66a4cd338b356bb9e8063656753 Merge: fdeeffc 5fc6209 Author: Oliver Gorwits Date: Mon Apr 28 22:35:53 2014 +0100 Merge branch 'master' into og-pluggable-daemon commit 5fc62090e21e5380c68d32cee2fc8d8485f98fa9 Author: Oliver Gorwits Date: Mon Apr 28 22:15:07 2014 +0100 edge topology 17 * Use commit fdeeffcbe4f6f4b997d22d8aaa60fe0070c723e5 Author: Oliver Gorwits Date: Thu Apr 24 23:13:20 2014 +0100 book specifically same jobs which were seen commit 0d97c2b819cd2563c459040b19affcf0ee8c3429 Author: Oliver Gorwits Date: Thu Apr 24 22:57:37 2014 +0100 fix typos commit 47265a5292b538b19c2560b52fade98e06d37971 Author: Oliver Gorwits Date: Thu Apr 24 21:56:52 2014 +0100 rename file to follow name change commit fd169149c4bbe23a3cac52746d1c9871e2a822a7 Author: Oliver Gorwits Date: Thu Apr 24 21:52:57 2014 +0100 remove job types from web code commit 319489ae00604e1b1ca9f0dc908ff74bad3a9baf Author: Oliver Gorwits Date: Thu Apr 24 21:46:30 2014 +0100 remove job types from scheduler commit ccdeca600cd3d8576dc999d458179b834dd6380e Author: Oliver Gorwits Date: Thu Apr 24 21:33:01 2014 +0100 remove job types from netdisco-daemon-fg commit 349bddf609090fe19831c51709fdff03258132a6 Author: Oliver Gorwits Date: Thu Apr 24 21:05:42 2014 +0100 move default env settings to Netdisco.pm commit b4b5cce00a04235fbd2e36cd5a823b81f5dccc38 Author: Oliver Gorwits Date: Thu Apr 24 21:01:26 2014 +0100 remove job type knowledge from code into config --- Netdisco/Makefile.PL | 1 + Netdisco/bin/netdisco-daemon-fg | 53 ++--- Netdisco/bin/netdisco-do | 6 - Netdisco/lib/App/Netdisco.pm | 51 +--- Netdisco/lib/App/Netdisco/Configuration.pm | 60 +++++ Netdisco/lib/App/Netdisco/Core/Discover.pm | 19 +- .../App/Netdisco/Daemon/DB/Result/Admin.pm | 14 +- Netdisco/lib/App/Netdisco/Daemon/JobQueue.pm | 19 ++ .../lib/App/Netdisco/Daemon/LocalQueue.pm | 62 +++++ Netdisco/lib/App/Netdisco/Daemon/Queue.pm | 87 ------- .../lib/App/Netdisco/Daemon/Worker/Common.pm | 68 ++---- .../App/Netdisco/Daemon/Worker/Interactive.pm | 4 +- .../lib/App/Netdisco/Daemon/Worker/Manager.pm | 83 ++----- .../lib/App/Netdisco/Daemon/Worker/Poller.pm | 4 +- .../Netdisco/Daemon/Worker/Poller/Common.pm | 30 +-- .../Netdisco/Daemon/Worker/Poller/Device.pm | 35 +-- .../App/Netdisco/Daemon/Worker/Scheduler.pm | 53 ++--- Netdisco/lib/App/Netdisco/JobQueue.pm | 141 +++++++++++ .../lib/App/Netdisco/JobQueue/PostgreSQL.pm | 221 ++++++++++++++++++ .../lib/App/Netdisco/Manual/Configuration.pod | 6 +- .../lib/App/Netdisco/Manual/Developing.pod | 2 +- .../lib/App/Netdisco/Manual/ReleaseNotes.pod | 8 + Netdisco/lib/App/Netdisco/Web.pm | 4 +- Netdisco/lib/App/Netdisco/Web/AdminTask.pm | 53 ++--- .../Netdisco/Web/Plugin/AdminTask/JobQueue.pm | 20 +- Netdisco/lib/App/Netdisco/Web/PortControl.pm | 26 +-- Netdisco/share/config.yml | 24 +- Netdisco/share/environments/deployment.yml | 2 +- .../share/views/ajax/admintask/jobqueue.tt | 4 +- 29 files changed, 694 insertions(+), 466 deletions(-) create mode 100644 Netdisco/lib/App/Netdisco/Configuration.pm create mode 100644 Netdisco/lib/App/Netdisco/Daemon/JobQueue.pm create mode 100644 Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm delete mode 100644 Netdisco/lib/App/Netdisco/Daemon/Queue.pm create mode 100644 Netdisco/lib/App/Netdisco/JobQueue.pm create mode 100644 Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm diff --git a/Netdisco/Makefile.PL b/Netdisco/Makefile.PL index 4e2e7d6c..fa9678b1 100644 --- a/Netdisco/Makefile.PL +++ b/Netdisco/Makefile.PL @@ -27,6 +27,7 @@ requires 'HTTP::Tiny' => 0.029; requires 'JSON' => 0; requires 'List::MoreUtils' => 0.33; requires 'MIME::Base64' => 3.13; +requires 'Module::Load' => 0.32; requires 'Moo' => 1.001000; requires 'MCE' => 1.408; requires 'Net::Domain' => 1.23; diff --git a/Netdisco/bin/netdisco-daemon-fg b/Netdisco/bin/netdisco-daemon-fg index ef46edda..6b763ab6 100755 --- a/Netdisco/bin/netdisco-daemon-fg +++ b/Netdisco/bin/netdisco-daemon-fg @@ -18,22 +18,16 @@ use App::Netdisco; use Dancer qw/:moose :script/; warning sprintf "App::Netdisco %s backend", ($App::Netdisco::VERSION || 'HEAD'); -# callbacks and local job queue management -use App::Netdisco::Daemon::Queue ':all'; +# local job queue management +use App::Netdisco::Daemon::LocalQueue ':all'; # needed to quench AF_INET6 symbol errors use NetAddr::IP::Lite ':lower'; - +use List::Util 'sum'; use Role::Tiny::With; use MCE::Signal '-setpgrp'; use MCE; -# set defaults for AnyEvent::DNS -local $ENV{'PERL_ANYEVENT_MAX_OUTSTANDING_DNS'} - = setting('dns')->{max_outstanding} || 10; -local $ENV{'PERL_ANYEVENT_HOSTS'} - = setting('dns')->{hosts_file} || '/etc/hosts'; - # set temporary MCE files' location in home directory my $home = ($ENV{NETDISCO_HOME} || $ENV{HOME}); my $tmp_dir = ($ENV{NETDISCO_TEMP} || dir($home, 'tmp')); @@ -52,42 +46,41 @@ sub build_tasks_list { # NB MCE does not like max_workers => 0 my $tasks = []; - setting('workers')->{pollers} = 2 - if !defined setting('workers')->{pollers}; - setting('workers')->{interactives} = 2 - if !defined setting('workers')->{interactives}; - push @$tasks, { max_workers => 1, user_begin => worker_factory('Manager'), - } if setting('workers')->{pollers} or setting('workers')->{interactives}; + } if num_workers() > 0; push @$tasks, { max_workers => 1, user_begin => worker_factory('Scheduler'), - } if setting('housekeeping'); + } if setting('schedule'); - push @$tasks, { - max_workers => setting('workers')->{pollers}, - user_begin => worker_factory('Poller'), - } if setting('workers')->{pollers}; + my @logmsg = (); + foreach my $key (keys %{setting('job_type_keys')}) { + my $val = setting('job_type_keys')->{$key}; - push @$tasks, { - max_workers => setting('workers')->{interactives}, - user_begin => worker_factory('Interactive'), - } if setting('workers')->{interactives}; + setting('workers')->{$val} = 2 + if !defined setting('workers')->{$val}; - info sprintf "MCE will load: %s Manager, %s Scheduler, %s Poller, %s Interactive", - ((setting('workers')->{pollers} or setting('workers')->{interactives}) ? 1 : 0), - (setting('housekeeping') ? 1 : 0), - (setting('workers')->{pollers} || 0), - (setting('workers')->{interactives} || 0); + push @logmsg, setting('workers')->{$val} ." $key"; + push @$tasks, { + max_workers => setting('workers')->{$val}, + user_begin => worker_factory($key), + } if setting('workers')->{$val}; + } + + info sprintf "MCE will load: %s Manager, %s Scheduler, %s", + (num_workers() ? 1 : 0), + (setting('schedule') ? 1 : 0), + (join ', ', @logmsg); return $tasks; } sub num_workers { - return (setting('workers')->{pollers} + setting('workers')->{interactives}); + return sum( 0, map { setting('workers')->{$_} } + values %{setting('job_type_keys')} ); } sub worker_factory { diff --git a/Netdisco/bin/netdisco-do b/Netdisco/bin/netdisco-do index 8b633d5f..85feea73 100755 --- a/Netdisco/bin/netdisco-do +++ b/Netdisco/bin/netdisco-do @@ -59,12 +59,6 @@ $CONFIG->{log} = ($debug ? 'debug' : 'info'); # reconfigure logging to force console output Dancer::Logger->init('console', $CONFIG); -# set max outstanding requests for AnyEvent::DNS -local $ENV{'PERL_ANYEVENT_MAX_OUTSTANDING_DNS'} - = setting('dns')->{max_outstanding} || 10; -local $ENV{'PERL_ANYEVENT_HOSTS'} - = setting('dns')->{hosts_file} || '/etc/hosts'; - # for the in-memory local job queue schema('daemon')->deploy; diff --git a/Netdisco/lib/App/Netdisco.pm b/Netdisco/lib/App/Netdisco.pm index e24ff071..f4b25b5e 100644 --- a/Netdisco/lib/App/Netdisco.pm +++ b/Netdisco/lib/App/Netdisco.pm @@ -5,54 +5,7 @@ use warnings; use 5.010_000; our $VERSION = '2.027004'; - -use App::Netdisco::Environment; -use Dancer ':script'; - -# set up database schema config from simple config vars -if (ref {} eq ref setting('database')) { - my $name = (setting('database')->{name} || 'netdisco'); - my $host = setting('database')->{host}; - my $user = setting('database')->{user}; - my $pass = setting('database')->{pass}; - - my $dsn = "dbi:Pg:dbname=${name}"; - $dsn .= ";host=${host}" if $host; - - # set up the netdisco schema now we have access to the config - # but only if it doesn't exist from an earlier config style - setting('plugins')->{DBIC}->{netdisco} ||= { - dsn => $dsn, - user => $user, - password => $pass, - options => { - AutoCommit => 1, - RaiseError => 1, - auto_savepoint => 1, - }, - schema_class => 'App::Netdisco::DB', - }; - -} - -# static configuration for the in-memory local job queue -setting('plugins')->{DBIC}->{daemon} = { - dsn => 'dbi:SQLite:dbname=:memory:', - options => { - AutoCommit => 1, - RaiseError => 1, - sqlite_use_immediate_transaction => 1, - }, - schema_class => 'App::Netdisco::Daemon::DB', -}; - -# force skipped DNS resolution, if unset -setting('dns')->{no} ||= ['fe80::/64','169.254.0.0/16']; -setting('dns')->{hosts_file} ||= '/etc/hosts'; - -# housekeeping expire used to be called expiry -setting('housekeeping')->{expire} ||= setting('housekeeping')->{expiry} - if setting('housekeeping') and exists setting('housekeeping')->{expiry}; +use App::Netdisco::Configuration; =head1 NAME @@ -196,7 +149,7 @@ In the same file uncomment and edit the C setting to be appropriate for your local site. Change the C string setting if your site has different values, and -uncomment the C setting to enable SNMP data gathering from +uncomment the C setting to enable SNMP data gathering from devices (this replaces cron jobs in Netdisco 1). Have a quick read of the other settings to make sure you're happy, then move diff --git a/Netdisco/lib/App/Netdisco/Configuration.pm b/Netdisco/lib/App/Netdisco/Configuration.pm new file mode 100644 index 00000000..e9ecbc5c --- /dev/null +++ b/Netdisco/lib/App/Netdisco/Configuration.pm @@ -0,0 +1,60 @@ +package App::Netdisco::Configuration; + +use App::Netdisco::Environment; +use Dancer ':script'; + +# set up database schema config from simple config vars +if (ref {} eq ref setting('database')) { + my $name = (setting('database')->{name} || 'netdisco'); + my $host = setting('database')->{host}; + my $user = setting('database')->{user}; + my $pass = setting('database')->{pass}; + + my $dsn = "dbi:Pg:dbname=${name}"; + $dsn .= ";host=${host}" if $host; + + # set up the netdisco schema now we have access to the config + # but only if it doesn't exist from an earlier config style + setting('plugins')->{DBIC}->{netdisco} ||= { + dsn => $dsn, + user => $user, + password => $pass, + options => { + AutoCommit => 1, + RaiseError => 1, + auto_savepoint => 1, + }, + schema_class => 'App::Netdisco::DB', + }; + +} + +# static configuration for the in-memory local job queue +setting('plugins')->{DBIC}->{daemon} = { + dsn => 'dbi:SQLite:dbname=:memory:', + options => { + AutoCommit => 1, + RaiseError => 1, + sqlite_use_immediate_transaction => 1, + }, + schema_class => 'App::Netdisco::Daemon::DB', +}; + +# default queue model is Pg +setting('workers')->{queue} ||= 'PostgreSQL'; + +# force skipped DNS resolution, if unset +setting('dns')->{hosts_file} ||= '/etc/hosts'; +setting('dns')->{no} ||= ['fe80::/64','169.254.0.0/16']; + +# schedule expire used to be called expiry +setting('schedule')->{expire} ||= setting('schedule')->{expiry} + if setting('schedule') and exists setting('schedule')->{expiry}; + +# set max outstanding requests for AnyEvent::DNS +$ENV{'PERL_ANYEVENT_MAX_OUTSTANDING_DNS'} + = setting('dns')->{max_outstanding} || 50; +$ENV{'PERL_ANYEVENT_HOSTS'} + = setting('dns')->{hosts_file} || '/etc/hosts'; + +true; diff --git a/Netdisco/lib/App/Netdisco/Core/Discover.pm b/Netdisco/lib/App/Netdisco/Core/Discover.pm index 1b8fa01e..364b8ca9 100644 --- a/Netdisco/lib/App/Netdisco/Core/Discover.pm +++ b/Netdisco/lib/App/Netdisco/Core/Discover.pm @@ -5,6 +5,7 @@ use Dancer::Plugin::DBIC 'schema'; use App::Netdisco::Util::Device qw/get_device is_discoverable/; use App::Netdisco::Util::DNS ':all'; +use App::Netdisco::JobQueue qw/jq_queued jq_insert/; use NetAddr::IP::Lite ':lower'; use List::MoreUtils (); use Encode; @@ -900,20 +901,12 @@ sub discover_new_neighbors { next; } - # Don't queued if job already exists - my $is_queued = schema('netdisco')->resultset('Admin')->search( - { device => $ip, + # Don't queue if job already exists + if (List::MoreUtils::none {$_ eq $ip} jq_queued('discover')) { + jq_insert({ + device => $ip, action => 'discover', - status => { -like => 'queued%' }, - } - )->single; - unless ($is_queued) { - schema('netdisco')->resultset('Admin')->create( - { device => $ip, - action => 'discover', - status => 'queued', - } - ); + }); } } } diff --git a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm b/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm index 2806b8c4..8ad79af8 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm @@ -8,12 +8,9 @@ use base 'DBIx::Class::Core'; __PACKAGE__->table("admin"); __PACKAGE__->add_columns( "job", - { - data_type => "integer", - is_nullable => 0, - }, + { data_type => "integer", is_nullable => 0 }, - "role", # Poller, Interactive, etc + "type", # Poller, Interactive, etc { data_type => "text", is_nullable => 0 }, "wid", # worker ID, only != 0 once taken @@ -47,4 +44,11 @@ __PACKAGE__->add_columns( __PACKAGE__->set_primary_key("job"); +sub extra { (shift)->subaction } + +sub entered_stamp { + (my $stamp = (shift)->entered) =~ s/\.\d+$//; + return $stamp; +} + 1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/JobQueue.pm b/Netdisco/lib/App/Netdisco/Daemon/JobQueue.pm new file mode 100644 index 00000000..097582d5 --- /dev/null +++ b/Netdisco/lib/App/Netdisco/Daemon/JobQueue.pm @@ -0,0 +1,19 @@ +package App::Netdisco::Daemon::JobQueue; + +use Role::Tiny; +use namespace::clean; + +use Module::Load (); +Module::Load::load_remote 'JobQueue' => 'App::Netdisco::JobQueue' => ':all'; + +# central queue +sub jq_getsome { shift and JobQueue::jq_getsome(@_) } +sub jq_locked { shift and JobQueue::jq_locked(@_) } +sub jq_queued { shift and JobQueue::jq_queued(@_) } +sub jq_take { goto \&JobQueue::jq_take } +sub jq_lock { shift and JobQueue::jq_lock(@_) } +sub jq_defer { shift and JobQueue::jq_defer(@_) } +sub jq_complete { shift and JobQueue::jq_complete(@_) } +sub jq_insert { shift and JobQueue::jq_insert(@_) } + +1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm b/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm new file mode 100644 index 00000000..13834a3f --- /dev/null +++ b/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm @@ -0,0 +1,62 @@ +package App::Netdisco::Daemon::LocalQueue; + +use Dancer qw/:moose :syntax :script/; +use Dancer::Plugin::DBIC 'schema'; + +use base 'Exporter'; +our @EXPORT = (); +our @EXPORT_OK = qw/ add_jobs capacity_for take_jobs reset_jobs/; +our %EXPORT_TAGS = ( all => \@EXPORT_OK ); + +schema('daemon')->deploy; +my $queue = schema('daemon')->resultset('Admin'); + +sub add_jobs { + my (@jobs) = @_; + info sprintf "adding %s jobs to local queue", scalar @jobs; + schema('daemon')->dclone($_)->insert for @jobs; +} + +sub capacity_for { + my ($type) = @_; + debug "checking local capacity for worker type $type"; + + my $setting = setting('workers')->{ setting('job_type_keys')->{$type} }; + my $current = $queue->search({type => $type})->count; + return ($current < $setting); +} + +sub take_jobs { + my ($wid, $type, $max) = @_; + return () unless $wid > 1; + $max ||= 1; + + debug "deleting completed jobs by worker $wid"; + $queue->search({wid => $wid})->delete; + + debug "searching for $max new jobs for worker $wid (type $type)"; + my $rs = $queue->search( + {type => $type, wid => 0}, + {rows => $max}, + ); + + my @rows = $rs->all; + return [] if scalar @rows == 0; + + debug sprintf "booking out %s jobs to worker %s", (scalar @rows), $wid; + $queue->search({job => { -in => [map {$_->job} @rows] }}) + ->update({wid => $wid}); + + return \@rows; +} + +# not used by workers, only the daemon when reinitializing a worker +sub reset_jobs { + my ($wid) = @_; + debug "resetting jobs owned by worker $wid to be available"; + return unless $wid > 1; + $queue->search({wid => $wid}) + ->update({wid => 0}); +} + +1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Queue.pm b/Netdisco/lib/App/Netdisco/Daemon/Queue.pm deleted file mode 100644 index b64d2915..00000000 --- a/Netdisco/lib/App/Netdisco/Daemon/Queue.pm +++ /dev/null @@ -1,87 +0,0 @@ -package App::Netdisco::Daemon::Queue; - -use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; - -use base 'Exporter'; -our @EXPORT = (); -our @EXPORT_OK = qw/ add_jobs capacity_for take_jobs reset_jobs scrub_jobs /; -our %EXPORT_TAGS = ( all => \@EXPORT_OK ); - -schema('daemon')->deploy; -my $queue = schema('daemon')->resultset('Admin'); - -sub add_jobs { - my ($jobs) = @_; - info sprintf "adding %s jobs to local queue", scalar @$jobs; - $queue->populate($jobs); -} - -sub capacity_for { - my ($action) = @_; - debug "checking local capacity for action $action"; - - my $action_map = { - Poller => [ - qw/discoverall discover arpwalk arpnip macwalk macsuck nbtstat nbtwalk expire/ - ], - Interactive => [qw/location contact portcontrol portname vlan power/], - }; - - my $role_map = { - (map {$_ => 'Poller'} @{ $action_map->{Poller} }), - (map {$_ => 'Interactive'} @{ $action_map->{Interactive} }) - }; - - my $setting_map = { - Poller => 'pollers', - Interactive => 'interactives', - }; - - my $role = $role_map->{$action}; - my $setting = $setting_map->{$role}; - - my $current = $queue->search({role => $role})->count; - - return ($current < setting('workers')->{$setting}); -} - -sub take_jobs { - my ($wid, $role, $max) = @_; - $max ||= 1; - - # asking for more jobs means the current ones are done - debug "removing complete jobs for worker $wid from local queue"; - $queue->search({wid => $wid})->delete; - - debug "searching for $max new jobs for worker $wid (role $role)"; - my $rs = $queue->search( - {role => $role, wid => 0}, - {rows => $max}, - ); - - my @rows = $rs->all; - return [] if scalar @rows == 0; - - debug sprintf "booking out %s jobs to worker %s", scalar @rows, $wid; - $rs->update({wid => $wid}); - - return [ map {{$_->get_columns}} @rows ]; -} - -sub reset_jobs { - my ($wid) = @_; - debug "resetting jobs owned by worker $wid to be available"; - return unless $wid > 1; - $queue->search({wid => $wid}) - ->update({wid => 0}); -} - -sub scrub_jobs { - my ($wid) = @_; - debug "deleting jobs owned by worker $wid"; - return unless $wid > 1; - $queue->search({wid => $wid})->delete; -} - -1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm index 64292713..10baf3cf 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm @@ -1,87 +1,61 @@ package App::Netdisco::Daemon::Worker::Common; use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; use Try::Tiny; use Role::Tiny; use namespace::clean; -requires qw/worker_type worker_name munge_action/; +with 'App::Netdisco::Daemon::JobQueue'; sub worker_body { my $self = shift; my $wid = $self->wid; + my $tag = $self->worker_tag; my $type = $self->worker_type; - my $name = $self->worker_name; while (1) { - debug "$type ($wid): asking for a job"; - my $jobs = $self->do('take_jobs', $self->wid, $name); - - foreach my $candidate (@$jobs) { - # create a row object so we can use column accessors - # use the local db schema in case it is accidentally 'stored' - # (will throw an exception) - my $job = schema('daemon')->resultset('Admin') - ->new_result($candidate); - my $jid = $job->job; + my $jobs = $self->jq_take($self->wid, $type); + foreach my $job (@$jobs) { my $target = $self->munge_action($job->action); - next unless $self->can($target); - debug "$type ($wid): can ${target}() for job $jid"; - # do job - my ($status, $log); try { $job->started(scalar localtime); - info sprintf "$type (%s): starting %s job(%s) at %s", - $wid, $target, $jid, $job->started; - ($status, $log) = $self->$target($job); + info sprintf "$tag (%s): starting %s job(%s) at %s", + $wid, $target, $job->id, $job->started; + my ($status, $log) = $self->$target($job); + $job->status($status); + $job->log($log); } catch { - $status = 'error'; - $log = "error running job: $_"; - $self->sendto('stderr', $log ."\n"); + $job->status('error'); + $job->log("error running job: $_"); + $self->sendto('stderr', $job->log ."\n"); }; - $self->close_job($job, $status, $log); + $self->close_job($job); } - - debug "$type ($wid): sleeping now..."; - sleep(1); } } sub close_job { - my ($self, $job, $status, $log) = @_; - my $type = $self->worker_type; + my ($self, $job) = @_; + my $tag = $self->worker_tag; my $now = scalar localtime; - info sprintf "$type (%s): wrapping up %s job(%s) - status %s at %s", - $self->wid, $job->action, $job->job, $status, $now; + info sprintf "$tag (%s): wrapping up %s job(%s) - status %s at %s", + $self->wid, $job->action, $job->id, $job->status, $now; - # lock db row and either defer or complete the job try { - if ($status eq 'defer') { - schema('netdisco')->resultset('Admin') - ->find($job->job, {for => 'update'}) - ->update({ status => 'queued' }); + if ($job->status eq 'defer') { + $self->jq_defer($job); } else { - schema('netdisco')->resultset('Admin') - ->find($job->job, {for => 'update'}) - ->update({ - status => $status, - log => $log, - started => $job->started, - finished => $now, - }); + $job->finished($now); + $self->jq_complete($job); } - - # remove job from local queue - $self->do('scrub_jobs', $self->wid); } catch { $self->sendto('stderr', "error closing job: $_\n") }; } diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm index f0b13ec4..1ae2aa2c 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm @@ -10,8 +10,8 @@ with 'App::Netdisco::Daemon::Worker::Common'; with 'App::Netdisco::Daemon::Worker::Interactive::DeviceActions', 'App::Netdisco::Daemon::Worker::Interactive::PortActions'; -sub worker_type { 'int' } -sub worker_name { 'Interactive' } +sub worker_tag { 'int' } +sub worker_type { 'Interactive' } sub munge_action { 'set_' . $_[1] } 1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm index 134f94f1..d77c6952 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm @@ -1,104 +1,67 @@ package App::Netdisco::Daemon::Worker::Manager; use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; - -use Net::Domain 'hostfqdn'; -use Try::Tiny; use Role::Tiny; use namespace::clean; -my $fqdn = hostfqdn || 'localhost'; - -my $role_map = { - (map {$_ => 'Poller'} - qw/discoverall discover arpwalk arpnip macwalk macsuck nbtstat nbtwalk expire/), - (map {$_ => 'Interactive'} - qw/location contact portcontrol portname vlan power/) -}; +use List::Util 'sum'; +with 'App::Netdisco::Daemon::JobQueue'; sub worker_begin { my $self = shift; my $wid = $self->wid; debug "entering Manager ($wid) worker_begin()"; + if (setting('workers')->{'no_manager'}) { + return debug "mgr ($wid): no need for manager... skip begin"; + } + # requeue jobs locally debug "mgr ($wid): searching for jobs booked to this processing node"; - my $rs = schema('netdisco')->resultset('Admin') - ->search({status => "queued-$fqdn"}); - - my @jobs = map {{$_->get_columns}} $rs->all; + my @jobs = $self->jq_locked; if (scalar @jobs) { info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs; - map { $_->{role} = $role_map->{$_->{action}} } @jobs; - - $self->do('add_jobs', \@jobs); + $self->do('add_jobs', @jobs); } } sub worker_body { my $self = shift; my $wid = $self->wid; - my $num_slots = $self->do('num_workers') - or return debug "mgr ($wid): this node has no workers... quitting manager"; - # get some pending jobs - my $rs = schema('netdisco')->resultset('Admin') - ->search( - {status => 'queued'}, - {order_by => 'random()', rows => $num_slots}, - ); + return debug "mgr ($wid): no need for manager... quitting" + if setting('workers')->{'no_manager'}; + + my $num_slots = sum( 0, map { setting('workers')->{$_} } + values %{setting('job_type_keys')} ); while (1) { debug "mgr ($wid): getting potential jobs for $num_slots workers"; - while (my $job = $rs->next) { - my $jid = $job->job; + + # get some pending jobs + # TODO also check for stale jobs in Netdisco DB + foreach my $job ( $self->jq_getsome($num_slots) ) { # check for available local capacity - next unless $self->do('capacity_for', $job->action); + my $job_type = setting('job_types')->{$job->action}; + next unless $job_type and $self->do('capacity_for', $job_type); debug sprintf "mgr (%s): processing node has capacity for job %s (%s)", - $wid, $jid, $job->action; + $wid, $job->id, $job->action; # mark job as running - next unless $self->lock_job($job); + next unless $self->jq_lock($job); info sprintf "mgr (%s): job %s booked out for this processing node", - $wid, $jid; - - my $local_job = { $job->get_columns }; - $local_job->{role} = $role_map->{$job->action}; + $wid, $job->id; # copy job to local queue - $self->do('add_jobs', [$local_job]); + $self->do('add_jobs', $job); } - # reset iterator so ->next() triggers another DB query - $rs->reset; - - # TODO also check for stale jobs in Netdisco DB - debug "mgr ($wid): sleeping now..."; sleep( setting('workers')->{sleep_time} || 2 ); } } -sub lock_job { - my ($self, $job) = @_; - my $happy = 0; - - # lock db row and update to show job has been picked - try { - schema('netdisco')->txn_do(sub { - schema('netdisco')->resultset('Admin')->find( - {job => $job->job, status => 'queued'}, - {for => 'update'} - )->update({ status => "queued-$fqdn" }); - }); - $happy = 1; - }; - - return $happy; -} - 1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller.pm index 34559016..1350ba4e 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller.pm @@ -13,8 +13,8 @@ with 'App::Netdisco::Daemon::Worker::Poller::Device', 'App::Netdisco::Daemon::Worker::Poller::Nbtstat', 'App::Netdisco::Daemon::Worker::Poller::Expiry'; -sub worker_type { 'pol' } -sub worker_name { 'Poller' } +sub worker_tag { 'pol' } +sub worker_type { 'Poller' } sub munge_action { $_[1] } 1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Common.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Common.pm index 9f3f5cd2..95a2ba59 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Common.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Common.pm @@ -1,11 +1,11 @@ package App::Netdisco::Daemon::Worker::Poller::Common; use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; use App::Netdisco::Util::SNMP 'snmp_connect'; use App::Netdisco::Util::Device 'get_device'; use App::Netdisco::Daemon::Util ':all'; +use Dancer::Plugin::DBIC 'schema'; use NetAddr::IP::Lite ':lower'; @@ -16,36 +16,22 @@ use namespace::clean; sub _walk_body { my ($self, $job_type, $job) = @_; - my $action_method = $job_type .'_action'; - my $job_action = $self->$action_method; - my $layer_method = $job_type .'_layer'; my $job_layer = $self->$layer_method; - my $jobqueue = schema('netdisco')->resultset('Admin'); + my %queued = map {$_ => 1} $self->jq_queued($job_type); my @devices = schema('netdisco')->resultset('Device') - ->search({ip => { -not_in => - $jobqueue->search({ - device => { '!=' => undef}, - action => $job_type, - status => { -like => 'queued%' }, - })->get_column('device')->as_query - }})->has_layer($job_layer)->get_column('ip')->all; + ->has_layer($job_layer)->get_column('ip')->all; + my @filtered_devices = grep {!exists $queued{$_}} @devices; - my $filter_method = $job_type .'_filter'; - my $job_filter = $self->$filter_method; - - my @filtered_devices = grep {$job_filter->($_)} @devices; - - schema('netdisco')->resultset('Admin')->txn_do_locked(sub { - $jobqueue->populate([ + $self->jq_insert([ map {{ device => $_, action => $job_type, - status => 'queued', + username => $job->username, + userip => $job->userip, }} (@filtered_devices) - ]); - }); + ]); return job_done("Queued $job_type job for all devices"); } diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Device.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Device.pm index dcb42ec1..39742143 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Device.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Device.pm @@ -1,12 +1,12 @@ package App::Netdisco::Daemon::Worker::Poller::Device; use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; use App::Netdisco::Util::SNMP 'snmp_connect'; use App::Netdisco::Util::Device qw/get_device is_discoverable/; use App::Netdisco::Core::Discover ':all'; use App::Netdisco::Daemon::Util ':all'; +use Dancer::Plugin::DBIC 'schema'; use NetAddr::IP::Lite ':lower'; @@ -17,27 +17,19 @@ use namespace::clean; sub discoverall { my ($self, $job) = @_; - my $jobqueue = schema('netdisco')->resultset('Admin'); - my $devices = schema('netdisco')->resultset('Device') - ->search({ip => { -not_in => - $jobqueue->search({ - device => { '!=' => undef}, - action => 'discover', - status => { -like => 'queued%' }, - })->get_column('device')->as_query - }})->get_column('ip'); + my %queued = map {$_ => 1} $self->jq_queued('discover'); + my @devices = schema('netdisco')->resultset('Device') + ->get_column('ip')->all; + my @filtered_devices = grep {!exists $queued{$_}} @devices; - schema('netdisco')->resultset('Admin')->txn_do_locked(sub { - $jobqueue->populate([ + $self->jq_insert([ map {{ device => $_, action => 'discover', - status => 'queued', username => $job->username, userip => $job->userip, - }} ($devices->all) - ]); - }); + }} (@filtered_devices) + ]); return job_done("Queued discover job for all devices"); } @@ -48,7 +40,6 @@ sub discover { my $host = NetAddr::IP::Lite->new($job->device); my $device = get_device($host->addr); - my $jobqueue = schema('netdisco')->resultset('Admin'); if ($device->ip eq '0.0.0.0') { return job_error("discover failed: no device param (need -d ?)"); @@ -80,26 +71,20 @@ sub discover { # if requested, and the device has not yet been arpniped/macsucked, queue now if ($device->in_storage and $job->subaction and $job->subaction eq 'with-nodes') { if (!defined $device->last_macsuck) { - schema('netdisco')->txn_do(sub { - $jobqueue->create({ + $self->jq_insert({ device => $device->ip, action => 'macsuck', - status => 'queued', username => $job->username, userip => $job->userip, - }); }); } if (!defined $device->last_arpnip) { - schema('netdisco')->txn_do(sub { - $jobqueue->create({ + $self->jq_insert({ device => $device->ip, action => 'arpnip', - status => 'queued', username => $job->username, userip => $job->userip, - }); }); } } diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm index f6f24f2d..218798ac 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm @@ -1,38 +1,23 @@ package App::Netdisco::Daemon::Worker::Scheduler; use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; - use Algorithm::Cron; -use Try::Tiny; use Role::Tiny; use namespace::clean; -my $jobactions = { - map {$_ => undef} qw/ - discoverall - arpwalk - macwalk - nbtwalk - expire - / -# saveconfigs -# backup -}; +with 'App::Netdisco::Daemon::JobQueue'; sub worker_begin { my $self = shift; my $wid = $self->wid; debug "entering Scheduler ($wid) worker_begin()"; - foreach my $a (keys %$jobactions) { - next unless setting('housekeeping') - and exists setting('housekeeping')->{$a}; - my $config = setting('housekeeping')->{$a}; + foreach my $action (keys %{ setting('schedule') }) { + my $config = setting('schedule')->{$action}; # accept either single crontab format, or individual time fields - my $cron = Algorithm::Cron->new( + $config->{when} = Algorithm::Cron->new( base => 'local', %{ (ref {} eq ref $config->{when}) @@ -40,9 +25,6 @@ sub worker_begin { : {crontab => $config->{when}} } ); - - $jobactions->{$a} = $config; - $jobactions->{$a}->{when} = $cron; } } @@ -61,30 +43,21 @@ sub worker_body { my $win_end = $win_start + 60; # if any job is due, add it to the queue - foreach my $a (keys %$jobactions) { - next unless defined $jobactions->{$a}; - my $sched = $jobactions->{$a}; + foreach my $action (keys %{ setting('schedule') }) { + my $sched = setting('schedule')->{$action}; # next occurence of job must be in this minute's window - debug sprintf "sched ($wid): $a: win_start: %s, win_end: %s, next: %s", + debug sprintf "sched ($wid): $action: win_start: %s, win_end: %s, next: %s", $win_start, $win_end, $sched->{when}->next_time($win_start); next unless $sched->{when}->next_time($win_start) <= $win_end; # queue it! - # due to a table constraint, this will (intentionally) fail if a - # similar job is already queued. - try { - info "sched ($wid): queueing $a job"; - schema('netdisco')->resultset('Admin')->create({ - action => $a, - device => ($sched->{device} || undef), - subaction => ($sched->{extra} || undef), - status => 'queued', - }); - } - catch { - debug "sched ($wid): action $a was not queued (dupe?)"; - }; + info "sched ($wid): queueing $action job"; + $self->jq_insert({ + action => $action, + device => $sched->{device}, + extra => $sched->{extra}, + }); } } } diff --git a/Netdisco/lib/App/Netdisco/JobQueue.pm b/Netdisco/lib/App/Netdisco/JobQueue.pm new file mode 100644 index 00000000..6c72b2c5 --- /dev/null +++ b/Netdisco/lib/App/Netdisco/JobQueue.pm @@ -0,0 +1,141 @@ +package App::Netdisco::JobQueue; + +use Dancer qw/:moose :syntax :script/; + +use Module::Load (); +Module::Load::load + 'App::Netdisco::JobQueue::' . setting('workers')->{queue} => ':all'; + +use base 'Exporter'; +our @EXPORT = (); +our @EXPORT_OK = qw/ + jq_getsome + jq_locked + jq_queued + jq_log + jq_userlog + jq_take + jq_lock + jq_defer + jq_complete + jq_insert + jq_delete +/; +our %EXPORT_TAGS = ( all => \@EXPORT_OK ); + +=head1 NAME + +App::Netdisco::JobQueue + +=head1 DESCRIPTION + +Interface for Netdisco job queue. + +There are no default exports, however the C<:all> tag will export all +subroutines. + +=head1 EXPORT_OK + +=head2 jq_getsome( $num? ) + +Returns a list of randomly selected queued jobs. Default is to return one job, +unless C<$num> is provided. Jobs are returned as objects which implement the +Netdisco job instance interface (see below). + +=head2 jq_locked() + +Returns the list of jobs currently booked out to this processing node (denoted +by the local hostname). Jobs are returned as objects which implement the +Netdisco job instance interface (see below). + +=head2 jq_queued( $job_type ) + +Returns a list of IP addresses of devices which currently have a job of the +given C<$job_type> queued (e.g. C, C, etc). + +=head2 jq_log() + +Returns a list of the most recent 50 jobs in the queue. Jobs are returned as +objects which implement the Netdisco job instance interface (see below). + +=head2 jq_userlog( $user ) + +Returns a list of jobs which have been entered into the queue by the passed +C<$user>. Jobs are returned as objects which implement the Netdisco job +instance interface (see below). + +=head2 jq_take( $wid, $type, $max? ) + +Searches in the queue for jobs of type C<$type> and if up to C<$max> are +available, will book them out to the worker with ID C<$wid>. The default +number of booked jobs is 1. + +=head2 jq_lock( $job ) + +Marks a job in the queue as booked out to this processing node (denoted by the +local hostname). The C<$job> parameter must be an object which implements the +Netdisco job instance interface (see below). + +Returns true if successful else returns false. + +=head2 jq_defer( $job ) + +Marks a job in the queue as available for taking. This is usually done after a +job is booked but the processing node changes its mind and decides to return +the job to the queue. The C<$job> parameter must be an object which implements +the Netdisco job instance interface (see below). + +Returns true if successful else returns false. + +=head2 jq_complete( $job ) + +Marks a job as complete. The C<$job> parameter must be an object which +implements the Netdisco job instance interface (see below). The queue item's +status, log and finished fields will be updated from the passed C<$job>. + +Returns true if successful else returns false. + +=head2 jq_insert( \%job | [ \%job, \%job ...] ) + +Adds the passed jobs to the queue. + +=head2 jq_delete( $id? ) + +If passed the ID of a job, deletes it from the queue. Otherwise deletes ALL +jobs from the queue. + +=head1 Job Instance Interface + +=head2 id (auto) + +=head2 type (required) + +=head2 wid (required, default 0) + +=head2 entered + +=head2 started + +=head2 finished + +=head2 device + +=head2 port + +=head2 action + +=head2 subaction or extra + +=head2 status + +=head2 username + +=head2 userip + +=head2 log + +=head2 debug + +=cut + +true; diff --git a/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm new file mode 100644 index 00000000..f8f81a69 --- /dev/null +++ b/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm @@ -0,0 +1,221 @@ +package App::Netdisco::JobQueue::PostgreSQL; + +use Dancer qw/:moose :syntax :script/; +use Dancer::Plugin::DBIC 'schema'; + +use Net::Domain 'hostfqdn'; +use Try::Tiny; + +use base 'Exporter'; +our @EXPORT = (); +our @EXPORT_OK = qw/ + jq_getsome + jq_locked + jq_queued + jq_log + jq_userlog + jq_take + jq_lock + jq_defer + jq_complete + jq_insert + jq_delete +/; +our %EXPORT_TAGS = ( all => \@EXPORT_OK ); + +sub jq_getsome { + my $num_slots = shift; + my @returned = (); + + my $rs = schema('netdisco')->resultset('Admin') + ->search( + {status => 'queued'}, + {order_by => 'random()', rows => ($num_slots || 1)}, + ); + + while (my $job = $rs->next) { + my $job_type = setting('job_types')->{$job->action} or next; + push @returned, schema('daemon')->resultset('Admin') + ->new_result({ $job->get_columns, type => $job_type }); + } + return @returned; +} + +sub jq_locked { + my $fqdn = hostfqdn || 'localhost'; + my @returned = (); + + my $rs = schema('netdisco')->resultset('Admin') + ->search({status => "queued-$fqdn"}); + + while (my $job = $rs->next) { + my $job_type = setting('job_types')->{$job->action} or next; + push @returned, schema('daemon')->resultset('Admin') + ->new_result({ $job->get_columns, type => $job_type }); + } + return @returned; +} + +sub jq_queued { + my $job_type = shift; + + return schema('netdisco')->resultset('Admin')->search({ + device => { '!=' => undef}, + action => $job_type, + status => { -like => 'queued%' }, + })->get_column('device')->all; +} + +sub jq_log { + my @returned = (); + + my $rs = schema('netdisco')->resultset('Admin')->search({}, { + order_by => { -desc => [qw/entered device action/] }, + rows => 50, + }); + + while (my $job = $rs->next) { + my $job_type = setting('job_types')->{$job->action} or next; + push @returned, schema('daemon')->resultset('Admin') + ->new_result({ $job->get_columns, type => $job_type }); + } + return @returned; +} + +sub jq_userlog { + my $user = shift; + my @returned = (); + + my $rs = schema('netdisco')->resultset('Admin')->search({ + username => $user, + finished => { '>' => \"(now() - interval '5 seconds')" }, + }); + + while (my $job = $rs->next) { + my $job_type = setting('job_types')->{$job->action} or next; + push @returned, schema('daemon')->resultset('Admin') + ->new_result({ $job->get_columns, type => $job_type }); + } + return @returned; +} + +# PostgreSQL engine depends on LocalQueue, which is accessed synchronously via +# the main daemon process. This is only used by daemon workers which can use +# MCE ->do() method. +sub jq_take { + my ($self, $wid, $type) = @_; + + # be polite to SQLite database (that is, local CPU) + debug "$type ($wid): sleeping now..."; + sleep(1); + + debug "$type ($wid): asking for a job"; + $self->do('take_jobs', $wid, $type); +} + +sub jq_lock { + my $job = shift; + my $fqdn = hostfqdn || 'localhost'; + my $happy = false; + + # lock db row and update to show job has been picked + try { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin') + ->find($job->id, {for => 'update'}) + ->update({ status => "queued-$fqdn" }); + + # remove any duplicate jobs, needed because we have race conditions + # when queueing jobs of a type for all devices + schema('netdisco')->resultset('Admin')->search({ + status => 'queued', + device => $job->device, + port => $job->port, + action => $job->action, + subaction => $job->subaction, + }, {for => 'update'})->delete(); + }); + $happy = true; + }; + + return $happy; +} + +sub jq_defer { + my $job = shift; + my $happy = false; + + # lock db row and update to show job is available + try { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin') + ->find($job->id, {for => 'update'}) + ->update({ status => 'queued' }); + }); + $happy = true; + }; + + return $happy; +} + +sub jq_complete { + my $job = shift; + my $happy = false; + + # lock db row and update to show job is done/error + try { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin') + ->find($job->id, {for => 'update'})->update({ + status => $job->status, + log => $job->log, + finished => $job->finished, + }); + }); + $happy = true; + }; + + return $happy; +} + +sub jq_insert { + my $jobs = shift; + $jobs = [$jobs] if ref [] ne ref $jobs; + my $happy = false; + + try { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin')->populate([ + map {{ + device => $_->{device}, + port => $_->{port}, + action => $_->{action}, + subaction => ($_->{extra} || $_->{subaction}), + username => $_->{username}, + userip => $_->{userip}, + status => 'queued', + }} @$jobs + ]); + }); + $happy = true; + }; + + return $happy; +} + +sub jq_delete { + my $id = shift; + + if ($id) { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin')->find($id)->delete(); + }); + } + else { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin')->delete(); + }); + } +} + +true; diff --git a/Netdisco/lib/App/Netdisco/Manual/Configuration.pod b/Netdisco/lib/App/Netdisco/Manual/Configuration.pod index f8599c7e..d691f6da 100644 --- a/Netdisco/lib/App/Netdisco/Manual/Configuration.pod +++ b/Netdisco/lib/App/Netdisco/Manual/Configuration.pod @@ -870,7 +870,7 @@ library default of 10. C is a list of IP addresses or CIDR ranges to excluded from DNS resolution. Link local addresses are excluded by default. -=head3 C +=head3 C Value: Settings Tree. Default: None. @@ -879,13 +879,13 @@ macsuck, arpnip, etc) in the central database. It's fine to have multiple nodes scheduling work for redundancy (but make sure they all have good NTP). Note that this is independent of the Pollers configured in C. It's -okay to have this node schedule housekeeping but not do any of the polling +okay to have this node schedule schedule but not do any of the polling itself (C). Work can be scheduled using C style notation, or a simple weekday and hour fields (which accept same types as C notation). For example: - housekeeping: + schedule: discoverall: when: '0 9 * * *' arpwalk: diff --git a/Netdisco/lib/App/Netdisco/Manual/Developing.pod b/Netdisco/lib/App/Netdisco/Manual/Developing.pod index e40eace7..dea39857 100644 --- a/Netdisco/lib/App/Netdisco/Manual/Developing.pod +++ b/Netdisco/lib/App/Netdisco/Manual/Developing.pod @@ -440,7 +440,7 @@ each). The fourth kind of worker is called the Scheduler and takes care of adding discover, macsuck, arpnip, and nbtstat jobs to the queue (which are in turn handled by the Poller worker). This worker is automatically started only if -the user has enabled the "C" section of their +the user has enabled the "C" section of their C site config. =head2 SNMP::Info diff --git a/Netdisco/lib/App/Netdisco/Manual/ReleaseNotes.pod b/Netdisco/lib/App/Netdisco/Manual/ReleaseNotes.pod index afaa18ee..2115019a 100644 --- a/Netdisco/lib/App/Netdisco/Manual/ReleaseNotes.pod +++ b/Netdisco/lib/App/Netdisco/Manual/ReleaseNotes.pod @@ -36,6 +36,14 @@ but they are backwards compatible. =back +=head1 2.028000 + +=head2 General Changes + +The configuration item C has been renamed to C. Old +configuration will continue to work, but we recommend you rename this key in +your configuration anyway. + =head1 2.025001 =head2 General Changes diff --git a/Netdisco/lib/App/Netdisco/Web.pm b/Netdisco/lib/App/Netdisco/Web.pm index c39a6ce1..dab64938 100644 --- a/Netdisco/lib/App/Netdisco/Web.pm +++ b/Netdisco/lib/App/Netdisco/Web.pm @@ -10,6 +10,7 @@ use Socket6 (); # to ensure dependency is met use HTML::Entities (); # to ensure dependency is met use URI::QueryParam (); # part of URI, to add helper methods use Path::Class 'dir'; +use Module::Load (); use App::Netdisco::Util::Web 'interval_to_daterange'; use App::Netdisco::Web::AuthN; @@ -34,8 +35,7 @@ sub _load_web_plugins { $plugin =~ s/^\+//; debug "loading Netdisco plugin $plugin"; - eval "require $plugin"; - error $@ if $@; + Module::Load::load $plugin; } } diff --git a/Netdisco/lib/App/Netdisco/Web/AdminTask.pm b/Netdisco/lib/App/Netdisco/Web/AdminTask.pm index 2f13803f..7134d86d 100644 --- a/Netdisco/lib/App/Netdisco/Web/AdminTask.pm +++ b/Netdisco/lib/App/Netdisco/Web/AdminTask.pm @@ -5,25 +5,10 @@ use Dancer::Plugin::Ajax; use Dancer::Plugin::DBIC; use Dancer::Plugin::Auth::Extensible; -use Try::Tiny; - -# we have a separate list for jobs needing a device to avoid queueing -# such a job when there's no device param (it could still be duff, tho). -my %jobs = map { $_ => 1} qw/ - discover - macsuck - arpnip - nbtstat -/; -my %jobs_all = map {$_ => 1} qw/ - discoverall - macwalk - arpwalk - nbtwalk -/; +use App::Netdisco::JobQueue 'jq_insert'; sub add_job { - my ($jobtype, $device, $subaction) = @_; + my ($action, $device, $subaction) = @_; if ($device) { $device = NetAddr::IP::Lite->new($device); @@ -31,32 +16,22 @@ sub add_job { if ! $device or $device->addr eq '0.0.0.0'; } - # job might already be in the queue, so this could die - try { - schema('netdisco')->resultset('Admin')->create({ - ($device ? (device => $device->addr) : ()), - action => $jobtype, - ($subaction ? (subaction => $subaction) : ()), - status => 'queued', - (exists $jobs{$jobtype} ? (username => session('logged_in_user')) : ()), - userip => request->remote_address, - }); - }; + jq_insert({ + ($device ? (device => $device->addr) : ()), + action => $action, + ($subaction ? (subaction => $subaction) : ()), + username => session('logged_in_user'), + userip => request->remote_address, + }); } -foreach my $jobtype (keys %jobs_all, keys %jobs) { - ajax "/ajax/control/admin/$jobtype" => require_role admin => sub { - send_error('Missing device', 400) - if exists $jobs{$jobtype} and not param('device'); - - add_job($jobtype, param('device'), param('extra')); +foreach my $action (keys %{ setting('job_types') }) { + ajax "/ajax/control/admin/$action" => require_role admin => sub { + add_job($action, param('device'), param('extra')); }; - post "/admin/$jobtype" => require_role admin => sub { - send_error('Missing device', 400) - if exists $jobs{$jobtype} and not param('device'); - - add_job($jobtype, param('device'), param('extra')); + post "/admin/$action" => require_role admin => sub { + add_job($action, param('device'), param('extra')); redirect uri_for('/admin/jobqueue')->path; }; } diff --git a/Netdisco/lib/App/Netdisco/Web/Plugin/AdminTask/JobQueue.pm b/Netdisco/lib/App/Netdisco/Web/Plugin/AdminTask/JobQueue.pm index 27767ce9..5cf97cac 100644 --- a/Netdisco/lib/App/Netdisco/Web/Plugin/AdminTask/JobQueue.pm +++ b/Netdisco/lib/App/Netdisco/Web/Plugin/AdminTask/JobQueue.pm @@ -6,6 +6,7 @@ use Dancer::Plugin::DBIC; use Dancer::Plugin::Auth::Extensible; use App::Netdisco::Web::Plugin; +use App::Netdisco::JobQueue qw/jq_log jq_delete/; register_admin_task({ tag => 'jobqueue', @@ -14,30 +15,17 @@ register_admin_task({ ajax '/ajax/control/admin/jobqueue/del' => require_role admin => sub { send_error('Missing job', 400) unless param('job'); - - schema('netdisco')->txn_do(sub { - my $device = schema('netdisco')->resultset('Admin') - ->search({job => param('job')})->delete; - }); + jq_delete( param('job') ); }; ajax '/ajax/control/admin/jobqueue/delall' => require_role admin => sub { - schema('netdisco')->txn_do(sub { - my $device = schema('netdisco')->resultset('Admin')->delete; - }); + jq_delete(); }; ajax '/ajax/content/admin/jobqueue' => require_role admin => sub { - my $set = schema('netdisco')->resultset('Admin') - ->with_times - ->search({}, { - order_by => { -desc => [qw/entered device action/] }, - rows => 50, - }); - content_type('text/html'); template 'ajax/admintask/jobqueue.tt', { - results => $set, + results => [ jq_log ], }, { layout => undef }; }; diff --git a/Netdisco/lib/App/Netdisco/Web/PortControl.pm b/Netdisco/lib/App/Netdisco/Web/PortControl.pm index b362fb70..d4a182e8 100644 --- a/Netdisco/lib/App/Netdisco/Web/PortControl.pm +++ b/Netdisco/lib/App/Netdisco/Web/PortControl.pm @@ -5,6 +5,8 @@ use Dancer::Plugin::Ajax; use Dancer::Plugin::DBIC; use Dancer::Plugin::Auth::Extensible; +use App::Netdisco::JobQueue qw/jq_insert jq_userlog/; + ajax '/ajax/portcontrol' => require_role port_control => sub { send_error('No device/port/field', 400) unless param('device') and (param('port') or param('field')); @@ -44,12 +46,11 @@ ajax '/ajax/portcontrol' => require_role port_control => sub { }); } - schema('netdisco')->resultset('Admin')->create({ + jq_insert({ device => param('device'), port => param('port'), action => $action, subaction => $subaction, - status => 'queued', username => session('logged_in_user'), userip => request->remote_address, log => $log, @@ -61,21 +62,20 @@ ajax '/ajax/portcontrol' => require_role port_control => sub { }; ajax '/ajax/userlog' => require_login sub { - my $rs = schema('netdisco')->resultset('Admin')->search({ - username => session('logged_in_user'), - action => [qw/location contact portcontrol portname vlan power - discover macsuck arpnip/], - finished => { '>' => \"(now() - interval '5 seconds')" }, - }); + my @jobs = jq_userlog( session('logged_in_user') ); my %status = ( - 'done' => [ - map {s/\[\]/<empty>/; $_} - $rs->search({status => 'done'})->get_column('log')->all + 'done' => [ + map {s/\[\]/<empty>/; $_} + map { $_->log } + grep { $_->status eq 'done' } + @jobs ], 'error' => [ - map {s/\[\]/<empty>/; $_} - $rs->search({status => 'error'})->get_column('log')->all + map {s/\[\]/<empty>/; $_} + map { $_->log } + grep { $_->status eq 'error' } + @jobs ], ); diff --git a/Netdisco/share/config.yml b/Netdisco/share/config.yml index 5fffcfee..18fc4f94 100644 --- a/Netdisco/share/config.yml +++ b/Netdisco/share/config.yml @@ -167,13 +167,14 @@ workers: interactives: 2 pollers: 10 sleep_time: 2 + queue: PostgreSQL dns: max_outstanding: 50 hosts_file: '/etc/hosts' no: ['fe80::/64','169.254.0.0/16'] -#housekeeping: +#schedule: # discoverall: # when: '5 7 * * *' # macwalk: @@ -187,6 +188,27 @@ dns: # expire: # when: '20 23 * * *' +job_types: + discoverall: Poller + discover: Poller + arpwalk: Poller + arpnip: Poller + macwalk: Poller + macsuck: Poller + nbtwalk: Poller + nbtstat: Poller + expire: Poller + location: Interactive + contact: Interactive + portcontrol: Interactive + portname: Interactive + vlan: Interactive + power: Interactive + +job_type_keys: + Poller: pollers + Interactive: interactives + # --------------- # DANCER INTERNAL # --------------- diff --git a/Netdisco/share/environments/deployment.yml b/Netdisco/share/environments/deployment.yml index 14d5d763..fc7f795d 100644 --- a/Netdisco/share/environments/deployment.yml +++ b/Netdisco/share/environments/deployment.yml @@ -42,7 +42,7 @@ snmp_auth: # daemon will keep netdisco up to date on this schedule # ````````````````````````````````````````````````````` -#housekeeping: +#schedule: # discoverall: # when: '5 7 * * *' # macwalk: diff --git a/Netdisco/share/views/ajax/admintask/jobqueue.tt b/Netdisco/share/views/ajax/admintask/jobqueue.tt index 77fd4076..aa034549 100644 --- a/Netdisco/share/views/ajax/admintask/jobqueue.tt +++ b/Netdisco/share/views/ajax/admintask/jobqueue.tt @@ -1,4 +1,4 @@ -[% IF NOT results.has_rows %] +[% IF NOT results.size %]
The job queue is empty.
[% ELSE %] @@ -17,7 +17,7 @@ - [% WHILE (row = results.next) %] + [% FOREACH row IN results %]