diff --git a/bin/netdisco-do b/bin/netdisco-do index 9be32cd4..71de8748 100755 --- a/bin/netdisco-do +++ b/bin/netdisco-do @@ -35,7 +35,7 @@ BEGIN { # for netdisco app config use App::Netdisco; -use App::Netdisco::Daemon::Job; +use App::Netdisco::Backend::Job; use Dancer qw/:moose :script/; info "App::Netdisco version $App::Netdisco::VERSION loaded."; @@ -104,13 +104,13 @@ unless ($action) { use App::Netdisco::Util::Device qw/get_device delete_device renumber_device/; - with 'App::Netdisco::Daemon::Worker::Poller::Device'; - with 'App::Netdisco::Daemon::Worker::Poller::Arpnip'; - with 'App::Netdisco::Daemon::Worker::Poller::Macsuck'; - with 'App::Netdisco::Daemon::Worker::Poller::Nbtstat'; - with 'App::Netdisco::Daemon::Worker::Poller::Expiry'; - with 'App::Netdisco::Daemon::Worker::Interactive::DeviceActions'; - with 'App::Netdisco::Daemon::Worker::Interactive::PortActions'; + with 'App::Netdisco::Backend::Worker::Poller::Device'; + with 'App::Netdisco::Backend::Worker::Poller::Arpnip'; + with 'App::Netdisco::Backend::Worker::Poller::Macsuck'; + with 'App::Netdisco::Backend::Worker::Poller::Nbtstat'; + with 'App::Netdisco::Backend::Worker::Poller::Expiry'; + with 'App::Netdisco::Backend::Worker::Interactive::DeviceActions'; + with 'App::Netdisco::Backend::Worker::Interactive::PortActions'; eval { Module::Load::load 'App::Netdisco::Util::Graph' }; sub graph { @@ -233,7 +233,7 @@ foreach my $host (@hostlist) { } # what job are we asked to do? - my $job = App::Netdisco::Daemon::Job->new({ + my $job = App::Netdisco::Backend::Job->new({ job => 0, action => $action, device => $dev, diff --git a/bin/netdisco-do-e b/bin/netdisco-do-e new file mode 100755 index 00000000..9be32cd4 --- /dev/null +++ b/bin/netdisco-do-e @@ -0,0 +1,462 @@ +#!/usr/bin/env perl + +use strict; +use warnings; + +our $home; + +BEGIN { + use FindBin; + FindBin::again(); + + $home = ($ENV{NETDISCO_HOME} || $ENV{HOME}); + + # try to find a localenv if one isn't already in place. + if (!exists $ENV{PERL_LOCAL_LIB_ROOT}) { + use File::Spec; + my $localenv = File::Spec->catfile($FindBin::RealBin, 'localenv'); + exec($localenv, $0, @ARGV) if -f $localenv; + $localenv = File::Spec->catfile($home, 'perl5', 'bin', 'localenv'); + exec($localenv, $0, @ARGV) if -f $localenv; + + die "Sorry, can't find libs required for App::Netdisco.\n" + if !exists $ENV{PERLBREW_PERL}; + } +} + +BEGIN { + use Path::Class; + + # stuff useful locations into @INC and $PATH + unshift @INC, + dir($FindBin::RealBin)->parent->subdir('lib')->stringify, + dir($FindBin::RealBin, 'lib')->stringify; +} + +# for netdisco app config +use App::Netdisco; +use App::Netdisco::Daemon::Job; +use Dancer qw/:moose :script/; + +info "App::Netdisco version $App::Netdisco::VERSION loaded."; + +use NetAddr::IP qw/:rfc3021 :lower/; +use App::Netdisco::Util::Device 'get_device'; + +use Try::Tiny; +use Pod::Usage; +use Scalar::Util 'blessed'; +use Getopt::Long; +Getopt::Long::Configure ("bundling"); + +my ($device, $port, $extra, $debug); +my ($infotrace, $snmptrace, $sqltrace) = (0, 0, 0); + +my $result = GetOptions( + 'device|d=s' => \$device, + 'port|p=s' => \$port, + 'extra|e=s' => \$extra, + 'debug|D' => \$debug, + 'infotrace|I+' => \$infotrace, + 'snmptrace|S+' => \$snmptrace, + 'sqltrace|Q+' => \$sqltrace, +) or pod2usage( + -msg => 'error: bad options', + -verbose => 0, + -exitval => 1, +); + +my $CONFIG = config(); +$CONFIG->{logger} = 'console'; +$CONFIG->{log} = ($debug ? 'debug' : 'info'); + +$ENV{INFO_TRACE} ||= $infotrace; +$ENV{SNMP_TRACE} ||= $snmptrace; +$ENV{DBIC_TRACE} ||= $sqltrace; + +# reconfigure logging to force console output +Dancer::Logger->init('console', $CONFIG); + +# get requested action +(my $action = shift @ARGV) =~ s/^set_// + if scalar @ARGV; + +unless ($action) { + pod2usage( + -msg => 'error: missing action!', + -verbose => 2, + -exitval => 2, + ); +} + +# create worker (placeholder object for the role methods) +{ + package MyWorker; + + use Moo; + use Module::Load (); + use Data::Printer (); + use Scalar::Util 'blessed'; + use NetAddr::IP qw/:rfc3021 :lower/; + use Dancer ':script'; + + use App::Netdisco::Util::SNMP (); + use App::Netdisco::Util::Device + qw/get_device delete_device renumber_device/; + + with 'App::Netdisco::Daemon::Worker::Poller::Device'; + with 'App::Netdisco::Daemon::Worker::Poller::Arpnip'; + with 'App::Netdisco::Daemon::Worker::Poller::Macsuck'; + with 'App::Netdisco::Daemon::Worker::Poller::Nbtstat'; + with 'App::Netdisco::Daemon::Worker::Poller::Expiry'; + with 'App::Netdisco::Daemon::Worker::Interactive::DeviceActions'; + with 'App::Netdisco::Daemon::Worker::Interactive::PortActions'; + + eval { Module::Load::load 'App::Netdisco::Util::Graph' }; + sub graph { + App::Netdisco::Util::Graph::graph(); + return ('done', 'Generated graph data.'); + } + + use App::Netdisco::Util::NodeMonitor (); + sub monitor { + App::Netdisco::Util::NodeMonitor::monitor(); + return ('done', 'Generated monitor data.'); + } + + sub show { + my ($self, $job) = @_; + my ($device, $port, $extra) = map {$job->$_} qw/device port extra/; + return ('error', 'Missing device (-d).') if !defined $device; + + $extra ||= 'interfaces'; my $class = undef; + ($class, $extra) = split(/::([^:]+)$/, $extra); + if ($class and $extra) { + $class = 'SNMP::Info::'.$class; + } + else { + $extra = $class; + undef $class; + } + my $i = App::Netdisco::Util::SNMP::snmp_connect($device, $class); + Data::Printer::p($i->$extra); + return ('done', sprintf "Showed %s response from %s.", $extra, $device->ip); + } + + sub delete { + my ($self, $job) = @_; + my ($device, $port, $extra) = map {$job->$_} qw/device port extra/; + return ('error', 'Missing device (-d).') if !defined $device; + + $port = ($port ? 1 : 0); + delete_device($device, $port, $extra); + return ('done', sprintf "Deleted device %s.", $device->ip); + } + + sub renumber { + my ($self, $job) = @_; + my ($device, $port, $extra) = map {$job->$_} qw/device port extra/; + return ('error', 'Missing device (-d).') if !defined $device; + my $old_ip = $device->ip; + + my $new_ip = NetAddr::IP->new($extra); + unless ($new_ip and $new_ip->addr ne '0.0.0.0') { + return ('error', "Bad host or IP: ".($extra || '0.0.0.0')); + } + + my $new_dev = get_device($new_ip->addr); + if ($new_dev and $new_dev->in_storage and ($new_dev->ip ne $device->ip)) { + return ('error', sprintf "Already know new device as: %s.", $new_dev->ip); + } + + renumber_device($device, $new_ip); + return ('done', sprintf 'Renumbered device %s to %s (%s).', + $device->ip, $new_ip, ($device->dns || '')); + } + + sub psql { + my ($self, $job) = @_; + my ($device, $port, $extra) = map {$job->$_} qw/device port extra/; + + my $name = (setting('database')->{name} || 'netdisco'); + my $host = setting('database')->{host}; + my $user = setting('database')->{user}; + my $pass = setting('database')->{pass}; + + my $portnum = undef; + if ($host and $host =~ m/([^;]+);port=(\d+)/) { + $host = $1; + $portnum = $2; + } + + $ENV{PGHOST} = $host if $host; + $ENV{PGPORT} = $portnum if defined $portnum; + $ENV{PGDATABASE} = $name; + $ENV{PGUSER} = $user; + $ENV{PGPASSWORD} = $pass; + $ENV{PGCLIENTENCODING} = 'UTF8'; + + if ($extra) { + system('psql', '-c', $extra); + } + else { + system('psql'); + } + return ('done', "psql session closed."); + } +} +my $worker = MyWorker->new(); + +# belt and braces check before we go ahead +if (not $worker->can( $action )) { + pod2usage( + -msg => (sprintf 'error: %s is not a valid action', $action), + -verbose => 2, + -exitval => 3, + ); +} + +my $net = NetAddr::IP->new($device); +if ($device and (!$net or $net->num == 0 or $net->addr eq '0.0.0.0')) { + info sprintf '%s: error - Bad host, IP or prefix: %s', $action, $device; + exit 1; +} + +my @hostlist = defined $device ? ($net->hostenum) : (undef); +my $exitstatus = 0; + +foreach my $host (@hostlist) { + my $dev = $host ? get_device($host->addr) : undef; + if ($dev and not (blessed $dev and $dev->in_storage) and $action ne 'discover') { + info sprintf "%s: error - Don't know device: %s", $action, $host->addr; + next; + } + + # what job are we asked to do? + my $job = App::Netdisco::Daemon::Job->new({ + job => 0, + action => $action, + device => $dev, + port => $port, + subaction => $extra, + }); + + # do job + my ($status, $log); + try { + info sprintf '%s: started at %s', $action, scalar localtime; + ($status, $log) = $worker->$action($job); + } + catch { + $status = 'error'; + $log = "error running job: $_"; + }; + + info sprintf '%s: finished at %s', $action, scalar localtime; + info sprintf '%s: status %s: %s', $action, $status, $log; + $exitstatus = 1 if !defined $status or $status eq 'error'; +} + +exit $exitstatus; + +=head1 NAME + +netdisco-do - Run any Netdisco job from the command-line. + +=head1 SYNOPSIS + + ~/bin/netdisco-do [-DISQ] [-d [-p ] [-e ]] + +=head1 DESCRIPTION + +This program allows you to run any Netdisco poller job from the command-line. + +The C<-d> option will accept a hostname (that can be resolved to an IP with +DNS), an IP address, or IP prefix (subnets in CIDR format). It can be any +interface on the device known to Netdisco. + +Note that some jobs (C, C, C, C) +simply add entries to the Netdisco job queue for other jobs, so won't seem +to do much when you trigger them. + +=head1 ACTIONS + +=head2 discover + +Run a discover on the device (specified with C<-d>). + + ~netdisco/bin/netdisco-do discover -d 192.0.2.1 + +=head2 discoverall + +Run a discover for all known devices. + +=head2 macsuck + +Run a macsuck on the device (specified with C<-d>). + + ~netdisco/bin/netdisco-do macsuck -d 192.0.2.1 + +=head2 macwalk + +Run a macsuck for all known devices. + +=head2 arpnip + +Run an arpnip on the device (specified with C<-d>). + + ~netdisco/bin/netdisco-do arpnip -d 192.0.2.1 + +=head2 arpwalk + +Run an arpnip for all known devices. + +=head2 delete + +Delete a device (specified with C<-d>). Pass a log message for the action in +the C<-e> parameter. Optionally request for associated nodes to be archived +(rather than deleted) by setting the C<-p> parameter to "C" (mnemonic: +B

reserve). + + ~netdisco/bin/netdisco-do delete -d 192.0.2.1 + ~netdisco/bin/netdisco-do delete -d 192.0.2.1 -e 'older than the sun' + ~netdisco/bin/netdisco-do delete -d 192.0.2.1 -e 'older than the sun' -p yes + +=head2 renumber + +Change the canonical IP address of a device (specified with C<-d>). Pass the +new IP address in the C<-e> parameter. All related records such as topology, +log and node information will also be updated to refer to the new device. + +Note that I check is made as to whether the new IP is reachable for future +polling. + + ~netdisco/bin/netdisco-do renumber -d 192.0.2.1 -e 192.0.2.254 + +=head2 nbtstat + +Run an nbtstat on the node (specified with C<-d>). + + ~netdisco/bin/netdisco-do nbtstat -d 192.0.2.2 + +=head2 nbtwalk + +Run an nbtstat for all known nodes. + +=head2 expire + +Run Device and Node expiry actions according to configuration. + +=head2 expirenodes + +Archive nodes on the specified device. If you want to delete nodes, set the +C<-e> parameter to "C" (mnemonic: Bxpire). If you want to perform the +action on a specific port, set the C<-p> parameter. + + ~netdisco/bin/netdisco-do expirenodes -d 192.0.2.1 + ~netdisco/bin/netdisco-do expirenodes -d 192.0.2.1 -p FastEthernet0/1 -e no + +=head2 graph + +Generate GraphViz graphs for the largest cluster of devices. + +You'll need to install the L and L Perl modules, +and possibly also the C utility for your operating system. Also +create a directory for the output files. + + mkdir ~netdisco/graph + ~netdisco/bin/localenv cpanm Graph::Undirected + ~netdisco/bin/localenv cpanm GraphViz + +=head2 show + +Dump the content of an SNMP MIB leaf, which is useful for diagnostics and +troubleshooting. You should provide the "C<-e>" option which is the name of +the leaf (such as C or C). + +If you wish to test with a device class other than that discovered, prefix the +leaf with the class short name, for example "C" or +"C". + + ~netdisco/bin/netdisco-do show -d 192.0.2.1 -e interfaces + ~netdisco/bin/netdisco-do show -d 192.0.2.1 -e Layer2::HP::interfaces + +=head2 psql + +Start an interactive terminal with the Netdisco PostgreSQL database. If you +pass an SQL statement in the C<-e> option then it will be executed. + + ~netdisco/bin/netdisco-do psql + ~netdisco/bin/netdisco-do psql -e 'SELECT ip, dns FROM device' + ~netdisco/bin/netdisco-do psql -e 'COPY (SELECT ip, dns FROM device) TO STDOUT WITH CSV HEADER' + +=head2 location + +Set the SNMP location field on the device (specified with C<-d>). Pass the +location string in the C<-e> extra parameter. + + ~netdisco/bin/netdisco-do location -d 192.0.2.1 -e 'wiring closet' + +=head2 contact + +Set the SNMP contact field on the device (specified with C<-d>). Pass the +contact name in the C<-e> extra parameter. + + ~netdisco/bin/netdisco-do contact -d 192.0.2.1 -e 'tel: 555-2453' + +=head2 portname + +Set the description on a device port. Requires the C<-d> parameter (device), +C<-p> parameter (port), and C<-e> parameter (description). + + ~netdisco/bin/netdisco-do portname -d 192.0.2.1 -p FastEthernet0/1 -e 'Web Server' + +=head2 portcontrol + +Set the up/down status on a device port. Requires the C<-d> parameter +(device), C<-p> parameter (port), and C<-e> parameter ("up" or "down"). + + ~netdisco/bin/netdisco-do portcontrol -d 192.0.2.1 -p FastEthernet0/1 -e up + ~netdisco/bin/netdisco-do portcontrol -d 192.0.2.1 -p FastEthernet0/1 -e down + +=head2 vlan + +Set the native VLAN on a device port. Requires the C<-d> parameter (device), +C<-p> parameter (port), and C<-e> parameter (VLAN number). + + ~netdisco/bin/netdisco-do vlan -d 192.0.2.1 -p FastEthernet0/1 -e 102 + +=head2 power + +Set the PoE on/off status on a device port. Requires the C<-d> parameter +(device), C<-p> parameter (port), and C<-e> parameter ("on" or "off"). + + ~netdisco/bin/netdisco-do power -d 192.0.2.1 -p FastEthernet0/1 -e on + ~netdisco/bin/netdisco-do power -d 192.0.2.1 -p FastEthernet0/1 -e off + +=head1 DEBUG LEVELS + +The flags "C<-DISQ>" can be specified, multiple times, and enable the +following items in order: + +=over 4 + +=item C<-D> + +Netdisco debug log level + +=item C<-I> or C<-II> + +L trace level (1 or 2). + +=item C<-S> or C<-SS> or C<-SSS> + +L (net-snmp) trace level (1, 2 or 3). + +=item C<-Q> + +L trace enabled + +=back + +=cut diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm new file mode 100644 index 00000000..b4c27ccc --- /dev/null +++ b/lib/App/Netdisco/Backend/Job.pm @@ -0,0 +1,54 @@ +package App::Netdisco::Backend::Job; + +use Moo; +use namespace::clean; + +foreach my $slot (qw/ + job + entered + started + finished + device + port + action + subaction + status + username + userip + log + debug + /) { + + has $slot => ( + is => 'rw', + ); +} + +=head1 METHODS + +=head2 summary + +An attempt to make a meaningful statement about the job. + +=cut + +sub summary { + my $job = shift; + return join ' ', + $job->action, + ($job->device || ''), + ($job->port || ''); +# ($job->subaction ? (q{'}. $job->subaction .q{'}) : ''); +} + +=head1 ADDITIONAL COLUMNS + +=head2 extra + +Alias for the C column. + +=cut + +sub extra { (shift)->subaction } + +1; diff --git a/lib/App/Netdisco/Daemon/Job.pm b/lib/App/Netdisco/Backend/Job.pm-e similarity index 100% rename from lib/App/Netdisco/Daemon/Job.pm rename to lib/App/Netdisco/Backend/Job.pm-e diff --git a/lib/App/Netdisco/Backend/Util.pm b/lib/App/Netdisco/Backend/Util.pm new file mode 100644 index 00000000..a957d5e6 --- /dev/null +++ b/lib/App/Netdisco/Backend/Util.pm @@ -0,0 +1,17 @@ +package App::Netdisco::Backend::Util; + +use strict; +use warnings; + +# support utilities for Backend Actions + +use base 'Exporter'; +our @EXPORT = (); +our @EXPORT_OK = qw/ job_done job_error job_defer /; +our %EXPORT_TAGS = (all => \@EXPORT_OK); + +sub job_done { return ('done', shift) } +sub job_error { return ('error', shift) } +sub job_defer { return ('defer', shift) } + +1; diff --git a/lib/App/Netdisco/Daemon/Util.pm b/lib/App/Netdisco/Backend/Util.pm-e similarity index 100% rename from lib/App/Netdisco/Daemon/Util.pm rename to lib/App/Netdisco/Backend/Util.pm-e diff --git a/lib/App/Netdisco/Daemon/Worker/Common.pm b/lib/App/Netdisco/Backend/Worker/Common.pm similarity index 100% rename from lib/App/Netdisco/Daemon/Worker/Common.pm rename to lib/App/Netdisco/Backend/Worker/Common.pm diff --git a/lib/App/Netdisco/Backend/Worker/Common.pm-e b/lib/App/Netdisco/Backend/Worker/Common.pm-e new file mode 100644 index 00000000..d9d5b299 --- /dev/null +++ b/lib/App/Netdisco/Backend/Worker/Common.pm-e @@ -0,0 +1,70 @@ +package App::Netdisco::Backend::Worker::Common; + +use Dancer qw/:moose :syntax :script/; + +use Try::Tiny; +use App::Netdisco::Util::Backend; + +use Role::Tiny; +use namespace::clean; + +use Time::HiRes 'sleep'; +use App::Netdisco::JobQueue qw/jq_defer jq_complete/; + +sub worker_begin { (shift)->{started} = time } + +sub worker_body { + my $self = shift; + my $wid = $self->wid; + + while (1) { + prctl sprintf 'netdisco-backend: worker #%s poller: idle', $wid; + + my $job = $self->{queue}->dequeue(1); + next unless defined $job; + my $action = $job->action; + + try { + $job->started(scalar localtime); + prctl sprintf 'netdisco-backend: worker #%s poller: working on #%s: %s', + $wid, $job->job, $job->summary; + info sprintf "pol (%s): starting %s job(%s) at %s", + $wid, $action, $job->job, $job->started; + my ($status, $log) = $self->$action($job); + $job->status($status); + $job->log($log); + } + catch { + $job->status('error'); + $job->log("error running job: $_"); + $self->sendto('stderr', $job->log ."\n"); + }; + + $self->close_job($job); + sleep( setting('workers')->{'min_runtime'} || 0 ); + $self->exit(0); # recycle worker + } +} + +sub close_job { + my ($self, $job) = @_; + my $now = scalar localtime; + + prctl sprintf 'netdisco-backend: worker #%s poller: wrapping up %s #%s: %s', + $self->wid, $job->action, $job->job, $job->status; + info sprintf "pol (%s): wrapping up %s job(%s) - status %s at %s", + $self->wid, $job->action, $job->job, $job->status, $now; + + try { + if ($job->status eq 'defer') { + jq_defer($job); + } + else { + $job->finished($now); + jq_complete($job); + } + } + catch { $self->sendto('stderr', "error closing job: $_\n") }; +} + +1; diff --git a/lib/App/Netdisco/Backend/Worker/Interactive/DeviceActions.pm b/lib/App/Netdisco/Backend/Worker/Interactive/DeviceActions.pm new file mode 100644 index 00000000..62bfbe6c --- /dev/null +++ b/lib/App/Netdisco/Backend/Worker/Interactive/DeviceActions.pm @@ -0,0 +1,50 @@ +package App::Netdisco::Backend::Worker::Interactive::DeviceActions; + +use App::Netdisco::Util::SNMP 'snmp_connect_rw'; +use App::Netdisco::Util::Device 'get_device'; +use App::Netdisco::Backend::Util ':all'; + +use Role::Tiny; +use namespace::clean; + +sub location { + my ($self, $job) = @_; + return _set_device_generic($job->device, 'location', $job->subaction); +} + +sub contact { + my ($self, $job) = @_; + return _set_device_generic($job->device, 'contact', $job->subaction); +} + +sub _set_device_generic { + my ($ip, $slot, $data) = @_; + $data ||= ''; + + # snmp connect using rw community + my $info = snmp_connect_rw($ip) + or return job_error("Failed to connect to device [$ip] to update $slot"); + + my $method = 'set_'. $slot; + my $rv = $info->$method($data); + + if (!defined $rv) { + return job_error(sprintf 'Failed to set %s on [%s]: %s', + $slot, $ip, ($info->error || '')); + } + + # confirm the set happened + $info->clear_cache; + my $new_data = ($info->$slot || ''); + if ($new_data ne $data) { + return job_error("Verify of $slot update failed on [$ip]: $new_data"); + } + + # update netdisco DB + my $device = get_device($ip); + $device->update({$slot => $data}); + + return job_done("Updated $slot on [$ip] to [$data]"); +} + +1; diff --git a/lib/App/Netdisco/Daemon/Worker/Interactive/DeviceActions.pm b/lib/App/Netdisco/Backend/Worker/Interactive/DeviceActions.pm-e similarity index 100% rename from lib/App/Netdisco/Daemon/Worker/Interactive/DeviceActions.pm rename to lib/App/Netdisco/Backend/Worker/Interactive/DeviceActions.pm-e diff --git a/lib/App/Netdisco/Backend/Worker/Interactive/PortActions.pm b/lib/App/Netdisco/Backend/Worker/Interactive/PortActions.pm new file mode 100644 index 00000000..e34d49e4 --- /dev/null +++ b/lib/App/Netdisco/Backend/Worker/Interactive/PortActions.pm @@ -0,0 +1,159 @@ +package App::Netdisco::Backend::Worker::Interactive::PortActions; + +use App::Netdisco::Util::Port ':all'; +use App::Netdisco::Util::SNMP 'snmp_connect_rw'; +use App::Netdisco::Util::Device 'get_device'; +use App::Netdisco::Backend::Util ':all'; + +use Role::Tiny; +use namespace::clean; + +sub portname { + my ($self, $job) = @_; + return _set_port_generic($job, 'alias', 'name'); +} + +sub portcontrol { + my ($self, $job) = @_; + + my $port = get_port($job->device, $job->port) + or return job_error(sprintf "Unknown port name [%s] on device [%s]", + $job->port, $job->device); + + my $reconfig_check = port_reconfig_check($port); + return job_error("Cannot alter port: $reconfig_check") + if $reconfig_check; + + # need to remove "-other" which appears for power/portcontrol + (my $sa = $job->subaction) =~ s/-\w+//; + $job->subaction($sa); + + if ($sa eq 'bounce') { + $job->subaction('down'); + my @stat = _set_port_generic($job, 'up_admin'); + return @stat if $stat[0] ne 'done'; + $job->subaction('up'); + return _set_port_generic($job, 'up_admin'); + } + else { + return _set_port_generic($job, 'up_admin'); + } +} + +sub vlan { + my ($self, $job) = @_; + + my $port = get_port($job->device, $job->port) + or return job_error(sprintf "Unknown port name [%s] on device [%s]", + $job->port, $job->device); + + my $port_reconfig_check = port_reconfig_check($port); + return job_error("Cannot alter port: $port_reconfig_check") + if $port_reconfig_check; + + my $vlan_reconfig_check = vlan_reconfig_check($port); + return job_error("Cannot alter vlan: $vlan_reconfig_check") + if $vlan_reconfig_check; + + my @stat = _set_port_generic($job, 'pvid'); # for Cisco trunk + return @stat if $stat[0] eq 'done'; + return _set_port_generic($job, 'vlan'); +} + +sub _set_port_generic { + my ($job, $slot, $column) = @_; + $column ||= $slot; + + my $device = get_device($job->device); + my $ip = $device->ip; + my $pn = $job->port; + my $data = $job->subaction; + + my $port = get_port($ip, $pn) + or return job_error("Unknown port name [$pn] on device [$ip]"); + + if ($device->vendor ne 'netdisco') { + # snmp connect using rw community + my $info = snmp_connect_rw($ip) + or return job_error("Failed to connect to device [$ip] to control port"); + + my $iid = get_iid($info, $port) + or return job_error("Failed to get port ID for [$pn] from [$ip]"); + + my $method = 'set_i_'. $slot; + my $rv = $info->$method($data, $iid); + + if (!defined $rv) { + return job_error(sprintf 'Failed to set [%s] %s to [%s] on [%s]: %s', + $pn, $slot, $data, $ip, ($info->error || '')); + } + + # confirm the set happened + $info->clear_cache; + my $check_method = 'i_'. $slot; + my $state = ($info->$check_method($iid) || ''); + if (ref {} ne ref $state or $state->{$iid} ne $data) { + return job_error("Verify of [$pn] $slot failed on [$ip]"); + } + } + + # update netdisco DB + $port->update({$column => $data}); + + return job_done("Updated [$pn] $slot status on [$ip] to [$data]"); +} + +sub power { + my ($self, $job) = @_; + + my $port = get_port($job->device, $job->port) + or return job_error(sprintf "Unknown port name [%s] on device [%s]", + $job->port, $job->device); + + return job_error("No PoE service on port [%s] on device [%s]") + unless $port->power; + + my $reconfig_check = port_reconfig_check($port); + return job_error("Cannot alter port: $reconfig_check") + if $reconfig_check; + + my $device = get_device($job->device); + my $ip = $device->ip; + my $pn = $job->port; + + # munge data + (my $data = $job->subaction) =~ s/-\w+//; # remove -other + $data = 'true' if $data =~ m/^(on|yes|up)$/; + $data = 'false' if $data =~ m/^(off|no|down)$/; + + # snmp connect using rw community + my $info = snmp_connect_rw($ip) + or return job_error("Failed to connect to device [$ip] to control port"); + + my $powerid = get_powerid($info, $port) + or return job_error("Failed to get power ID for [$pn] from [$ip]"); + + my $rv = $info->set_peth_port_admin($data, $powerid); + + if (!defined $rv) { + return job_error(sprintf 'Failed to set [%s] power to [%s] on [%s]: %s', + $pn, $data, $ip, ($info->error || '')); + } + + # confirm the set happened + $info->clear_cache; + my $state = ($info->peth_port_admin($powerid) || ''); + if (ref {} ne ref $state or $state->{$powerid} ne $data) { + return job_error("Verify of [$pn] power failed on [$ip]"); + } + + # update netdisco DB + $port->power->update({ + admin => $data, + status => ($data eq 'false' ? 'disabled' : 'searching'), + }); + + return job_done("Updated [$pn] power status on [$ip] to [$data]"); +} + +1; diff --git a/lib/App/Netdisco/Daemon/Worker/Interactive/PortActions.pm b/lib/App/Netdisco/Backend/Worker/Interactive/PortActions.pm-e similarity index 100% rename from lib/App/Netdisco/Daemon/Worker/Interactive/PortActions.pm rename to lib/App/Netdisco/Backend/Worker/Interactive/PortActions.pm-e diff --git a/lib/App/Netdisco/Daemon/Worker/Manager.pm b/lib/App/Netdisco/Backend/Worker/Manager.pm similarity index 100% rename from lib/App/Netdisco/Daemon/Worker/Manager.pm rename to lib/App/Netdisco/Backend/Worker/Manager.pm diff --git a/lib/App/Netdisco/Backend/Worker/Manager.pm-e b/lib/App/Netdisco/Backend/Worker/Manager.pm-e new file mode 100644 index 00000000..58b5bb76 --- /dev/null +++ b/lib/App/Netdisco/Backend/Worker/Manager.pm-e @@ -0,0 +1,86 @@ +package App::Netdisco::Backend::Worker::Manager; + +use Dancer qw/:moose :syntax :script/; + +use List::Util 'sum'; +use App::Netdisco::Util::Backend; + +use Role::Tiny; +use namespace::clean; + +use App::Netdisco::JobQueue qw/jq_locked jq_getsome jq_getsomep jq_lock/; + +sub worker_begin { + my $self = shift; + my $wid = $self->wid; + + return debug "mgr ($wid): no need for manager... skip begin" + if setting('workers')->{'no_manager'}; + + debug "entering Manager ($wid) worker_begin()"; + + # requeue jobs locally + debug "mgr ($wid): searching for jobs booked to this processing node"; + my @jobs = jq_locked; + + if (scalar @jobs) { + info sprintf "mgr (%s): found %s jobs booked to this processing node", + $wid, scalar @jobs; + $self->{queue}->enqueuep(100, @jobs); + } +} + +sub worker_body { + my $self = shift; + my $wid = $self->wid; + + if (setting('workers')->{'no_manager'}) { + prctl sprintf 'netdisco-backend: worker #%s manager: inactive', $wid; + return debug "mgr ($wid): no need for manager... quitting" + } + + while (1) { + prctl sprintf 'netdisco-backend: worker #%s manager: gathering', $wid; + my $num_slots = 0; + + $num_slots = parse_max_workers( setting('workers')->{tasks} ) + - $self->{queue}->pending(); + debug "mgr ($wid): getting potential jobs for $num_slots workers (HP)"; + + # get some high priority jobs + # TODO also check for stale jobs in Netdisco DB + foreach my $job ( jq_getsomep($num_slots) ) { + + # mark job as running + next unless jq_lock($job); + info sprintf "mgr (%s): job %s booked out for this processing node", + $wid, $job->job; + + # copy job to local queue + $self->{queue}->enqueuep(100, $job); + } + + $num_slots = parse_max_workers( setting('workers')->{tasks} ) + - $self->{queue}->pending(); + debug "mgr ($wid): getting potential jobs for $num_slots workers (NP)"; + + # get some normal priority jobs + # TODO also check for stale jobs in Netdisco DB + foreach my $job ( jq_getsome($num_slots) ) { + + # mark job as running + next unless jq_lock($job); + info sprintf "mgr (%s): job %s booked out for this processing node", + $wid, $job->job; + + # copy job to local queue + $self->{queue}->enqueue($job); + } + + debug "mgr ($wid): sleeping now..."; + prctl sprintf 'netdisco-backend: worker #%s manager: idle', $wid; + sleep( setting('workers')->{sleep_time} || 1 ); + } +} + +1; diff --git a/lib/App/Netdisco/Backend/Worker/Poller.pm b/lib/App/Netdisco/Backend/Worker/Poller.pm new file mode 100644 index 00000000..1e62d61b --- /dev/null +++ b/lib/App/Netdisco/Backend/Worker/Poller.pm @@ -0,0 +1,18 @@ +package App::Netdisco::Backend::Worker::Poller; + +use Role::Tiny; +use namespace::clean; + +# main worker body +with 'App::Netdisco::Backend::Worker::Common'; + +# add dispatch methods for poller tasks +with 'App::Netdisco::Backend::Worker::Poller::Device', + 'App::Netdisco::Backend::Worker::Poller::Arpnip', + 'App::Netdisco::Backend::Worker::Poller::Macsuck', + 'App::Netdisco::Backend::Worker::Poller::Nbtstat', + 'App::Netdisco::Backend::Worker::Poller::Expiry', + 'App::Netdisco::Backend::Worker::Interactive::DeviceActions', + 'App::Netdisco::Backend::Worker::Interactive::PortActions'; + +1; diff --git a/lib/App/Netdisco/Daemon/Worker/Poller.pm b/lib/App/Netdisco/Backend/Worker/Poller.pm-e similarity index 100% rename from lib/App/Netdisco/Daemon/Worker/Poller.pm rename to lib/App/Netdisco/Backend/Worker/Poller.pm-e diff --git a/lib/App/Netdisco/Backend/Worker/Poller/Arpnip.pm b/lib/App/Netdisco/Backend/Worker/Poller/Arpnip.pm new file mode 100644 index 00000000..e132eb33 --- /dev/null +++ b/lib/App/Netdisco/Backend/Worker/Poller/Arpnip.pm @@ -0,0 +1,18 @@ +package App::Netdisco::Backend::Worker::Poller::Arpnip; + +use App::Netdisco::Core::Arpnip 'do_arpnip'; +use App::Netdisco::Util::Device 'is_arpnipable'; + +use Role::Tiny; +use namespace::clean; + +with 'App::Netdisco::Backend::Worker::Poller::Common'; + +sub arpnip_action { \&do_arpnip } +sub arpnip_filter { \&is_arpnipable } +sub arpnip_layer { 3 } + +sub arpwalk { (shift)->_walk_body('arpnip', @_) } +sub arpnip { (shift)->_single_body('arpnip', @_) } + +1; diff --git a/lib/App/Netdisco/Daemon/Worker/Poller/Arpnip.pm b/lib/App/Netdisco/Backend/Worker/Poller/Arpnip.pm-e similarity index 100% rename from lib/App/Netdisco/Daemon/Worker/Poller/Arpnip.pm rename to lib/App/Netdisco/Backend/Worker/Poller/Arpnip.pm-e diff --git a/lib/App/Netdisco/Backend/Worker/Poller/Common.pm b/lib/App/Netdisco/Backend/Worker/Poller/Common.pm new file mode 100644 index 00000000..8c08dea2 --- /dev/null +++ b/lib/App/Netdisco/Backend/Worker/Poller/Common.pm @@ -0,0 +1,98 @@ +package App::Netdisco::Backend::Worker::Poller::Common; + +use Dancer qw/:moose :syntax :script/; + +use App::Netdisco::Util::SNMP 'snmp_connect'; +use App::Netdisco::Util::Device 'get_device'; +use App::Netdisco::Backend::Util ':all'; +use App::Netdisco::JobQueue qw/jq_queued jq_insert/; + +use Dancer::Plugin::DBIC 'schema'; +use NetAddr::IP::Lite ':lower'; + +use Role::Tiny; +use namespace::clean; + +# queue a job for all devices known to Netdisco +sub _walk_body { + my ($self, $job_type, $job) = @_; + + my $layer_method = $job_type .'_layer'; + my $job_layer = $self->$layer_method; + + my %queued = map {$_ => 1} jq_queued($job_type); + my @devices = schema('netdisco')->resultset('Device') + ->has_layer($job_layer)->get_column('ip')->all; + my @filtered_devices = grep {!exists $queued{$_}} @devices; + + jq_insert([ + map {{ + device => $_, + action => $job_type, + username => $job->username, + userip => $job->userip, + }} (@filtered_devices) + ]); + + return job_done("Queued $job_type job for all devices"); +} + +sub _single_body { + my ($self, $job_type, $job) = @_; + + my $action_method = $job_type .'_action'; + my $job_action = $self->$action_method; + + my $layer_method = $job_type .'_layer'; + my $job_layer = $self->$layer_method; + + my $device = get_device($job->device) + or job_error("$job_type failed: unable to interpret device parameter"); + my $host = $device->ip; + + if ($device->in_storage + and $device->vendor and $device->vendor eq 'netdisco') { + return job_done("$job_type skipped: $host is pseudo-device"); + } + + my $filter_method = $job_type .'_filter'; + my $job_filter = $self->$filter_method; + + unless ($job_filter->($device->ip)) { + return job_defer("$job_type deferred: $host is not ${job_type}able"); + } + + my $snmp = snmp_connect($device); + if (!defined $snmp) { + return job_error("$job_type failed: could not SNMP connect to $host"); + } + + unless ($snmp->has_layer( $job_layer )) { + return job_done("Skipped $job_type for device $host without OSI layer $job_layer capability"); + } + + $job_action->($device, $snmp); + + return job_done("Ended $job_type for $host"); +} + +sub _single_node_body { + my ($self, $job_type, $node, $now) = @_; + + my $action_method = $job_type .'_action'; + my $job_action = $self->$action_method; + + my $filter_method = $job_type .'_filter'; + my $job_filter = $self->$filter_method; + + unless ($job_filter->($node)) { + return job_defer("$job_type deferred: $node is not ${job_type}able"); + } + + $job_action->($node, $now); + + # would be ignored if wrapped in a loop + return job_done("Ended $job_type for $node"); +} + +1; diff --git a/lib/App/Netdisco/Daemon/Worker/Poller/Common.pm b/lib/App/Netdisco/Backend/Worker/Poller/Common.pm-e similarity index 100% rename from lib/App/Netdisco/Daemon/Worker/Poller/Common.pm rename to lib/App/Netdisco/Backend/Worker/Poller/Common.pm-e diff --git a/lib/App/Netdisco/Backend/Worker/Poller/Device.pm b/lib/App/Netdisco/Backend/Worker/Poller/Device.pm new file mode 100644 index 00000000..52f3d7c3 --- /dev/null +++ b/lib/App/Netdisco/Backend/Worker/Poller/Device.pm @@ -0,0 +1,99 @@ +package App::Netdisco::Backend::Worker::Poller::Device; + +use Dancer qw/:moose :syntax :script/; + +use App::Netdisco::Util::SNMP 'snmp_connect'; +use App::Netdisco::Util::Device qw/get_device is_discoverable/; +use App::Netdisco::Core::Discover ':all'; +use App::Netdisco::Backend::Util ':all'; +use App::Netdisco::JobQueue qw/jq_queued jq_insert/; + +use Dancer::Plugin::DBIC 'schema'; +use NetAddr::IP::Lite ':lower'; + +use Role::Tiny; +use namespace::clean; + +# queue a discover job for all devices known to Netdisco +sub discoverall { + my ($self, $job) = @_; + + my %queued = map {$_ => 1} jq_queued('discover'); + my @devices = schema('netdisco')->resultset('Device') + ->get_column('ip')->all; + my @filtered_devices = grep {!exists $queued{$_}} @devices; + + jq_insert([ + map {{ + device => $_, + action => 'discover', + username => $job->username, + userip => $job->userip, + }} (@filtered_devices) + ]); + + return job_done("Queued discover job for all devices"); +} + +# run a discover job for one device, and its *new* neighbors +sub discover { + my ($self, $job) = @_; + + my $device = get_device($job->device) + or return job_error( + "discover failed: unable to interpret device parameter: " + . ($job->device || "''")); + my $host = $device->ip; + + if ($device->ip eq '0.0.0.0') { + return job_error("discover failed: no device param (need -d ?)"); + } + + if ($device->in_storage + and $device->vendor and $device->vendor eq 'netdisco') { + return job_done("discover skipped: $host is pseudo-device"); + } + + unless (is_discoverable($device->ip)) { + return job_defer("discover deferred: $host is not discoverable"); + } + + my $snmp = snmp_connect($device); + if (!defined $snmp) { + return job_error("discover failed: could not SNMP connect to $host"); + } + + store_device($device, $snmp); + set_canonical_ip($device, $snmp); # must come after store_device + store_interfaces($device, $snmp); + store_wireless($device, $snmp); + store_vlans($device, $snmp); + store_power($device, $snmp); + store_modules($device, $snmp) if setting('store_modules'); + discover_new_neighbors($device, $snmp); + + # if requested, and the device has not yet been arpniped/macsucked, queue now + if ($device->in_storage and $job->subaction and $job->subaction eq 'with-nodes') { + if (!defined $device->last_macsuck) { + jq_insert({ + device => $device->ip, + action => 'macsuck', + username => $job->username, + userip => $job->userip, + }); + } + + if (!defined $device->last_arpnip) { + jq_insert({ + device => $device->ip, + action => 'arpnip', + username => $job->username, + userip => $job->userip, + }); + } + } + + return job_done("Ended discover for $host"); +} + +1; diff --git a/lib/App/Netdisco/Daemon/Worker/Poller/Device.pm b/lib/App/Netdisco/Backend/Worker/Poller/Device.pm-e similarity index 100% rename from lib/App/Netdisco/Daemon/Worker/Poller/Device.pm rename to lib/App/Netdisco/Backend/Worker/Poller/Device.pm-e diff --git a/lib/App/Netdisco/Backend/Worker/Poller/Expiry.pm b/lib/App/Netdisco/Backend/Worker/Poller/Expiry.pm new file mode 100644 index 00000000..09818046 --- /dev/null +++ b/lib/App/Netdisco/Backend/Worker/Poller/Expiry.pm @@ -0,0 +1,73 @@ +package App::Netdisco::Backend::Worker::Poller::Expiry; + +use Dancer qw/:moose :syntax :script/; +use Dancer::Plugin::DBIC 'schema'; + +use App::Netdisco::Backend::Util ':all'; + +use Role::Tiny; +use namespace::clean; + +# expire devices and nodes according to config +sub expire { + my ($self, $job) = @_; + + if (setting('expire_devices') and setting('expire_devices') > 0) { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Device')->search({ + last_discover => \[q/< (now() - ?::interval)/, + (setting('expire_devices') * 86400)], + })->delete(); + }); + } + + if (setting('expire_nodes') and setting('expire_nodes') > 0) { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Node')->search({ + time_last => \[q/< (now() - ?::interval)/, + (setting('expire_nodes') * 86400)], + })->delete(); + }); + } + + if (setting('expire_nodes_archive') and setting('expire_nodes_archive') > 0) { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Node')->search({ + -not_bool => 'active', + time_last => \[q/< (now() - ?::interval)/, + (setting('expire_nodes_archive') * 86400)], + })->delete(); + }); + } + + if (setting('expire_jobs') and setting('expire_jobs') > 0) { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin')->search({ + entered => \[q/< (now() - ?::interval)/, + (setting('expire_jobs') * 86400)], + })->delete(); + }); + } + + return job_done("Checked expiry for all Devices and Nodes"); +} + +# expire nodes for a specific device +sub expirenodes { + my ($self, $job) = @_; + + return job_error('Missing device') unless $job->device; + + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Node')->search({ + switch => $job->device->ip, + ($job->port ? (port => $job->port) : ()), + })->delete( + ($job->extra ? () : ({ archive_nodes => 1 })) + ); + }); + + return job_done("Expired nodes for ". $job->device->ip); +} + +1; diff --git a/lib/App/Netdisco/Daemon/Worker/Poller/Expiry.pm b/lib/App/Netdisco/Backend/Worker/Poller/Expiry.pm-e similarity index 100% rename from lib/App/Netdisco/Daemon/Worker/Poller/Expiry.pm rename to lib/App/Netdisco/Backend/Worker/Poller/Expiry.pm-e diff --git a/lib/App/Netdisco/Backend/Worker/Poller/Macsuck.pm b/lib/App/Netdisco/Backend/Worker/Poller/Macsuck.pm new file mode 100644 index 00000000..373ceaa3 --- /dev/null +++ b/lib/App/Netdisco/Backend/Worker/Poller/Macsuck.pm @@ -0,0 +1,18 @@ +package App::Netdisco::Backend::Worker::Poller::Macsuck; + +use App::Netdisco::Core::Macsuck 'do_macsuck'; +use App::Netdisco::Util::Device 'is_macsuckable'; + +use Role::Tiny; +use namespace::clean; + +with 'App::Netdisco::Backend::Worker::Poller::Common'; + +sub macsuck_action { \&do_macsuck } +sub macsuck_filter { \&is_macsuckable } +sub macsuck_layer { 2 } + +sub macwalk { (shift)->_walk_body('macsuck', @_) } +sub macsuck { (shift)->_single_body('macsuck', @_) } + +1; diff --git a/lib/App/Netdisco/Daemon/Worker/Poller/Macsuck.pm b/lib/App/Netdisco/Backend/Worker/Poller/Macsuck.pm-e similarity index 100% rename from lib/App/Netdisco/Daemon/Worker/Poller/Macsuck.pm rename to lib/App/Netdisco/Backend/Worker/Poller/Macsuck.pm-e diff --git a/lib/App/Netdisco/Backend/Worker/Poller/Nbtstat.pm b/lib/App/Netdisco/Backend/Worker/Poller/Nbtstat.pm new file mode 100644 index 00000000..e89a05c4 --- /dev/null +++ b/lib/App/Netdisco/Backend/Worker/Poller/Nbtstat.pm @@ -0,0 +1,73 @@ +package App::Netdisco::Backend::Worker::Poller::Nbtstat; + +use Dancer qw/:moose :syntax :script/; +use Dancer::Plugin::DBIC 'schema'; + +use App::Netdisco::Core::Nbtstat qw/nbtstat_resolve_async store_nbt/; +use App::Netdisco::Util::Node 'is_nbtstatable'; +use App::Netdisco::Util::Device qw/get_device is_discoverable/; +use App::Netdisco::Backend::Util ':all'; + +use NetAddr::IP::Lite ':lower'; +use Time::HiRes 'gettimeofday'; + +use Role::Tiny; +use namespace::clean; + +with 'App::Netdisco::Backend::Worker::Poller::Common'; + +sub nbtstat_action { \&do_nbtstat } +sub nbtstat_filter { \&is_nbtstatable } +sub nbtstat_layer { 2 } + +sub nbtwalk { (shift)->_walk_body('nbtstat', @_) } + +sub nbtstat { + my ($self, $job) = @_; + + my $device = get_device($job->device) + or job_error("nbtstat failed: unable to interpret device parameter"); + my $host = $device->ip; + + unless (is_discoverable($device->ip)) { + return job_defer("nbtstat deferred: $host is not discoverable"); + } + + # get list of nodes on device + my $interval = (setting('nbtstat_max_age') || 7) . ' day'; + my $rs = schema('netdisco')->resultset('NodeIp')->search({ + -bool => 'me.active', + -bool => 'nodes.active', + 'nodes.switch' => $device->ip, + 'me.time_last' => \[ '>= now() - ?::interval', $interval ], + },{ + join => 'nodes', + columns => 'ip', + distinct => 1, + })->ip_version(4); + + my @nodes = $rs->get_column('ip')->all; + + # Unless we have IP's don't bother + if (scalar @nodes) { + # filter exclusions from config + @nodes = grep { is_nbtstatable( $_ ) } @nodes; + + # setup the hash nbtstat_resolve_async expects + my @ips = map {+{'ip' => $_}} @nodes; + my $now = 'to_timestamp('. (join '.', gettimeofday) .')'; + + my $resolved_nodes = nbtstat_resolve_async(\@ips); + + # update node_nbt with status entries + foreach my $result (@$resolved_nodes) { + if (defined $result->{'nbname'}) { + store_nbt($result, $now); + } + } + } + + return job_done("Ended nbtstat for $host"); +} + +1; diff --git a/lib/App/Netdisco/Daemon/Worker/Poller/Nbtstat.pm b/lib/App/Netdisco/Backend/Worker/Poller/Nbtstat.pm-e similarity index 100% rename from lib/App/Netdisco/Daemon/Worker/Poller/Nbtstat.pm rename to lib/App/Netdisco/Backend/Worker/Poller/Nbtstat.pm-e diff --git a/lib/App/Netdisco/Daemon/Worker/Scheduler.pm b/lib/App/Netdisco/Backend/Worker/Scheduler.pm similarity index 100% rename from lib/App/Netdisco/Daemon/Worker/Scheduler.pm rename to lib/App/Netdisco/Backend/Worker/Scheduler.pm diff --git a/lib/App/Netdisco/Backend/Worker/Scheduler.pm-e b/lib/App/Netdisco/Backend/Worker/Scheduler.pm-e new file mode 100644 index 00000000..12c610d6 --- /dev/null +++ b/lib/App/Netdisco/Backend/Worker/Scheduler.pm-e @@ -0,0 +1,80 @@ +package App::Netdisco::Backend::Worker::Scheduler; + +use Dancer qw/:moose :syntax :script/; + +use Algorithm::Cron; +use App::Netdisco::Util::Backend; + +use Role::Tiny; +use namespace::clean; + +use App::Netdisco::JobQueue qw/jq_insert/; + +sub worker_begin { + my $self = shift; + my $wid = $self->wid; + + return debug "sch ($wid): no need for scheduler... skip begin" + unless setting('schedule'); + + debug "entering Scheduler ($wid) worker_begin()"; + + foreach my $action (keys %{ setting('schedule') }) { + my $config = setting('schedule')->{$action}; + + # accept either single crontab format, or individual time fields + $config->{when} = Algorithm::Cron->new( + base => 'local', + %{ + (ref {} eq ref $config->{when}) + ? $config->{when} + : {crontab => $config->{when}} + } + ); + } +} + +sub worker_body { + my $self = shift; + my $wid = $self->wid; + + unless (setting('schedule')) { + prctl sprintf 'netdisco-backend: worker #%s scheduler: inactive', $wid; + return debug "sch ($wid): no need for scheduler... quitting" + } + + while (1) { + # sleep until some point in the next minute + my $naptime = 60 - (time % 60) + int(rand(45)); + + prctl sprintf 'netdisco-backend: worker #%s scheduler: idle', $wid; + debug "sched ($wid): sleeping for $naptime seconds"; + + sleep $naptime; + prctl sprintf 'netdisco-backend: worker #%s scheduler: queueing', $wid; + + # NB next_time() returns the next *after* win_start + my $win_start = time - (time % 60) - 1; + my $win_end = $win_start + 60; + + # if any job is due, add it to the queue + foreach my $action (keys %{ setting('schedule') }) { + my $sched = setting('schedule')->{$action}; + + # next occurence of job must be in this minute's window + debug sprintf "sched ($wid): $action: win_start: %s, win_end: %s, next: %s", + $win_start, $win_end, $sched->{when}->next_time($win_start); + next unless $sched->{when}->next_time($win_start) <= $win_end; + + # queue it! + info "sched ($wid): queueing $action job"; + jq_insert({ + action => $action, + device => $sched->{device}, + extra => $sched->{extra}, + }); + } + } +} + +1; diff --git a/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/lib/App/Netdisco/JobQueue/PostgreSQL.pm index 78e57578..efcc15a1 100644 --- a/lib/App/Netdisco/JobQueue/PostgreSQL.pm +++ b/lib/App/Netdisco/JobQueue/PostgreSQL.pm @@ -3,7 +3,7 @@ package App::Netdisco::JobQueue::PostgreSQL; use Dancer qw/:moose :syntax :script/; use Dancer::Plugin::DBIC 'schema'; -use App::Netdisco::Daemon::Job; +use App::Netdisco::Backend::Job; use Net::Domain 'hostfqdn'; use Module::Load (); use Try::Tiny; @@ -38,7 +38,7 @@ sub _getsome { my @returned = (); while (my $job = $rs->next) { - push @returned, App::Netdisco::Daemon::Job->new({ $job->get_columns }); + push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns }); } return @returned; } @@ -68,7 +68,7 @@ sub jq_locked { ->search({status => "queued-$fqdn"}); while (my $job = $rs->next) { - push @returned, App::Netdisco::Daemon::Job->new({ $job->get_columns }); + push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns }); } return @returned; } diff --git a/lib/App/Netdisco/JobQueue/PostgreSQL.pm-e b/lib/App/Netdisco/JobQueue/PostgreSQL.pm-e new file mode 100644 index 00000000..78e57578 --- /dev/null +++ b/lib/App/Netdisco/JobQueue/PostgreSQL.pm-e @@ -0,0 +1,224 @@ +package App::Netdisco::JobQueue::PostgreSQL; + +use Dancer qw/:moose :syntax :script/; +use Dancer::Plugin::DBIC 'schema'; + +use App::Netdisco::Daemon::Job; +use Net::Domain 'hostfqdn'; +use Module::Load (); +use Try::Tiny; + +use base 'Exporter'; +our @EXPORT = (); +our @EXPORT_OK = qw/ + jq_getsome + jq_getsomep + jq_locked + jq_queued + jq_log + jq_userlog + jq_lock + jq_defer + jq_complete + jq_insert + jq_delete +/; +our %EXPORT_TAGS = ( all => \@EXPORT_OK ); + +sub _getsome { + my ($num_slots, $where) = @_; + return () if ((!defined $num_slots) or ($num_slots < 1)); + return () if ((!defined $where) or (ref {} ne ref $where)); + + my $rs = schema('netdisco')->resultset('Admin') + ->search( + { status => 'queued', %$where }, + { order_by => 'random()', rows => $num_slots }, + ); + + my @returned = (); + while (my $job = $rs->next) { + push @returned, App::Netdisco::Daemon::Job->new({ $job->get_columns }); + } + return @returned; +} + +sub jq_getsome { + return _getsome(shift, + { action => { -in => setting('job_prio')->{'normal'} } } + ); +} + +sub jq_getsomep { + return _getsome(shift, { + -or => [{ + username => { '!=' => undef }, + action => { -in => setting('job_prio')->{'normal'} }, + },{ + action => { -in => setting('job_prio')->{'high'} }, + }], + }); +} + +sub jq_locked { + my $fqdn = hostfqdn || 'localhost'; + my @returned = (); + + my $rs = schema('netdisco')->resultset('Admin') + ->search({status => "queued-$fqdn"}); + + while (my $job = $rs->next) { + push @returned, App::Netdisco::Daemon::Job->new({ $job->get_columns }); + } + return @returned; +} + +sub jq_queued { + my $job_type = shift; + + return schema('netdisco')->resultset('Admin')->search({ + device => { '!=' => undef}, + action => $job_type, + status => { -like => 'queued%' }, + })->get_column('device')->all; +} + +sub jq_log { + return schema('netdisco')->resultset('Admin')->search({}, { + order_by => { -desc => [qw/entered device action/] }, + rows => 50, + })->with_times->hri->all; +} + +sub jq_userlog { + my $user = shift; + return schema('netdisco')->resultset('Admin')->search({ + username => $user, + finished => { '>' => \"(now() - interval '5 seconds')" }, + })->with_times->all; +} + +sub jq_lock { + my $job = shift; + my $fqdn = hostfqdn || 'localhost'; + my $happy = false; + + # lock db row and update to show job has been picked + try { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin') + ->search({job => $job->job}, {for => 'update'}) + ->update({ status => "queued-$fqdn" }); + + return unless + schema('netdisco')->resultset('Admin') + ->count({job => $job->job, status => "queued-$fqdn"}); + + # remove any duplicate jobs, needed because we have race conditions + # when queueing jobs of a type for all devices + schema('netdisco')->resultset('Admin')->search({ + status => 'queued', + device => $job->device, + port => $job->port, + action => $job->action, + subaction => $job->subaction, + }, {for => 'update'})->delete(); + + $happy = true; + }); + } + catch { + error $_; + }; + + return $happy; +} + +sub jq_defer { + my $job = shift; + my $happy = false; + + try { + # lock db row and update to show job is available + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin') + ->find($job->job, {for => 'update'}) + ->update({ status => 'queued', started => undef }); + }); + $happy = true; + } + catch { + error $_; + }; + + return $happy; +} + +sub jq_complete { + my $job = shift; + my $happy = false; + + # lock db row and update to show job is done/error + try { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin') + ->find($job->job, {for => 'update'})->update({ + status => $job->status, + log => $job->log, + started => $job->started, + finished => $job->finished, + }); + }); + $happy = true; + } + catch { + error $_; + }; + + return $happy; +} + +sub jq_insert { + my $jobs = shift; + $jobs = [$jobs] if ref [] ne ref $jobs; + my $happy = false; + + try { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin')->populate([ + map {{ + device => $_->{device}, + port => $_->{port}, + action => $_->{action}, + subaction => ($_->{extra} || $_->{subaction}), + username => $_->{username}, + userip => $_->{userip}, + status => 'queued', + }} @$jobs + ]); + }); + $happy = true; + } + catch { + error $_; + }; + + return $happy; +} + +sub jq_delete { + my $id = shift; + + if ($id) { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin')->find($id)->delete(); + }); + } + else { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin')->delete(); + }); + } +} + +true; diff --git a/lib/App/Netdisco/Util/Daemon.pm b/lib/App/Netdisco/Util/Backend.pm similarity index 94% rename from lib/App/Netdisco/Util/Daemon.pm rename to lib/App/Netdisco/Util/Backend.pm index 9234c5b0..f441a24a 100644 --- a/lib/App/Netdisco/Util/Daemon.pm +++ b/lib/App/Netdisco/Util/Backend.pm @@ -1,4 +1,4 @@ -package App::Netdisco::Util::Daemon; +package App::Netdisco::Util::Backend; use strict; use warnings;