add discover/refresh, scheduler jobs, netdisco-do

New Poller daemon worker can discover and refresh devices.
  New Scheduler daemon worker replaces the cron jobs with config.
  New netdisco-do script allows running a job one-off from CLI.

Squashed commit of the following:

commit fa25f36e14
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Wed Apr 10 22:43:47 2013 +0100

    fix HTTP port at 5000

commit 202ea4a84c
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Wed Apr 10 22:33:03 2013 +0100

    bug fixes in discover

commit 925d9e4d6b
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Wed Apr 10 21:51:44 2013 +0100

    add mini app for one-time jobs

commit d3a6c08a9d
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Wed Apr 10 21:46:55 2013 +0100

    better name for subaction

commit 4adf473b20
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Wed Apr 10 20:15:18 2013 +0100

    add logging of db add/del

commit 8aacafedaa
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Wed Apr 10 19:49:00 2013 +0100

    copy all remaining messages from netdisco 1

commit 3e1156df1f
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Mon Apr 8 22:09:40 2013 +0100

    alter some log levels and messages

commit e7ea92920f
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Mon Apr 8 20:32:33 2013 +0100

    store wireless ssid and port info to DB

commit d1d16938a1
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Fri Apr 5 08:52:59 2013 +0100

    update packaging for new files and deps

commit 965990786f
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 4 22:22:50 2013 +0100

    implementation of find_neighbors

commit 03c4d8ef09
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 4 20:42:11 2013 +0100

    add discoverall and discover_neighbors poller jobs

commit df68ff0890
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 4 19:52:37 2013 +0100

    implementation of store_modules

commit c2ac19e647
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 4 19:32:14 2013 +0100

    implementation of store_power

commit b7fb8c64a0
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 4 18:48:09 2013 +0100

    implementation of store_vlans

commit b8ddbd1eca
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 4 17:08:06 2013 +0100

    implementation of store_wireless (without storing, yet)

commit 2a14057481
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 4 16:47:21 2013 +0100

    implementation of store_interfaces (without wireless)

commit d5b2b71d34
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 4 15:12:19 2013 +0100

    only start Manager if there are pollers or interactives

commit f4a3dac760
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 4 15:03:10 2013 +0100

    change sub names so as not to collide with Dancer

commit a8f0894986
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 4 14:47:28 2013 +0100

    implementation of refresh, discover, and store_device

commit 4c2e3cf82d
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 4 11:38:15 2013 +0100

    make get_device return a new result object

commit e6ac131658
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 4 10:38:22 2013 +0100

    better POD section name

commit 6c5b6bbbee
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 4 10:37:46 2013 +0100

    implement separate snmp_connect and snmp_connect_rw methods

commit 62c8e19063
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Mon Apr 1 20:06:29 2013 +0100

    fix for unique constraint on job queue for locally queued jobs

commit ebb65996e6
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Mon Apr 1 20:00:36 2013 +0100

    add refresh poller job

commit 05928e8cf6
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Mon Apr 1 19:31:39 2013 +0100

    Refactor Util namespace

    Squashed commit of the following:

    commit 789c528fcf
    Author: Oliver Gorwits <oliver@cpan.org>
    Date:   Mon Apr 1 19:31:07 2013 +0100

        update manifest and fix typo

    commit b95d0951f2
    Author: Oliver Gorwits <oliver@cpan.org>
    Date:   Mon Apr 1 19:22:41 2013 +0100

        refactor ::Util namespace

    commit a8dde50343
    Author: Oliver Gorwits <oliver@cpan.org>
    Date:   Sun Mar 31 13:45:27 2013 +0100

        no need to search for device - IP should already be exact

commit b42daee4c1
Merge: 6e52762 95bb8fc
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Wed Mar 27 21:00:09 2013 +0000

    Merge branch 'master' into og-poller

commit 6e527629a2
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Tue Mar 26 23:39:23 2013 +0000

    fixes and log messages

commit cfcb7a956f
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Tue Mar 26 22:57:06 2013 +0000

    bug fixes

commit 48f779a8d0
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Tue Mar 26 22:42:16 2013 +0000

    add config for scheduled tasks

commit 2f6efcb312
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Tue Mar 26 22:15:04 2013 +0000

    create poller worker and add poller type stubs

commit 52b28b0ab8
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Tue Mar 26 22:04:00 2013 +0000

    code tidy

commit 96db66739f
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Mon Mar 25 22:35:11 2013 +0000

    more insane but more working version of the job queue constraint

commit cb25216f40
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sun Mar 24 20:22:11 2013 +0000

    make scheduler start automatic based on housekeeping setting existing

commit 0acbe8abd3
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sun Mar 24 19:45:24 2013 +0000

    add scheduler based on Algorithm::Cron

commit 49d136b57a
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sun Mar 24 18:48:10 2013 +0000

    add unique constraint on admin/job queue
This commit is contained in:
Oliver Gorwits
2013-04-10 22:45:33 +01:00
parent 95bb8fc06c
commit d21f9e8e19
35 changed files with 2191 additions and 354 deletions

View File

@@ -33,11 +33,13 @@ sub capacity_for {
debug "checking local capacity for action $action";
my $action_map = {
Interactive => [qw/location contact portcontrol portname vlan power/]
Poller => [qw/refresh discover discoverall discover_neighbors/],
Interactive => [qw/location contact portcontrol portname vlan power/],
};
my $role_map = {
map {$_ => 'Interactive'} @{ $action_map->{Interactive} }
(map {$_ => 'Poller'} @{ $action_map->{Poller} }),
(map {$_ => 'Interactive'} @{ $action_map->{Interactive} })
};
my $setting_map = {

View File

@@ -35,7 +35,8 @@ sub worker_body {
my ($status, $log);
try {
$job->started(scalar localtime);
info sprintf "int (%s): starting job %s at %s", $wid, $jid, $job->started;
info sprintf "int (%s): starting %s job(%s) at %s",
$wid, $target, $jid, $job->started;
($status, $log) = $self->$target($job);
}
catch {
@@ -55,8 +56,8 @@ sub worker_body {
sub close_job {
my ($self, $job, $status, $log) = @_;
my $now = scalar localtime;
info sprintf "int (%s): wrapping up job %s - status %s at %s",
$self->wid, $job->job, $status, $now;
info sprintf "int (%s): wrapping up set_%s job(%s) - status %s at %s",
$self->wid, $job->action, $job->job, $status, $now;
try {
schema('netdisco')->resultset('Admin')

View File

@@ -1,6 +1,7 @@
package App::Netdisco::Daemon::Worker::Interactive::DeviceActions;
use App::Netdisco::Util::Connect qw/snmp_connect get_device/;
use App::Netdisco::Util::SNMP ':all';
use App::Netdisco::Util::Device 'get_device';
use App::Netdisco::Daemon::Worker::Interactive::Util ':all';
use Role::Tiny;
@@ -21,14 +22,14 @@ sub _set_device_generic {
$data ||= '';
# snmp connect using rw community
my $info = snmp_connect($ip)
or return error("Failed to connect to device [$ip] to update $slot");
my $info = snmp_connect_rw($ip)
or return job_error("Failed to connect to device [$ip] to update $slot");
my $method = 'set_'. $slot;
my $rv = $info->$method($data);
if (!defined $rv) {
return error(sprintf 'Failed to set %s on [%s]: %s',
return job_error(sprintf 'Failed to set %s on [%s]: %s',
$slot, $ip, ($info->error || ''));
}
@@ -36,14 +37,14 @@ sub _set_device_generic {
$info->clear_cache;
my $new_data = ($info->$slot || '');
if ($new_data ne $data) {
return error("Verify of $slot update failed on [$ip]: $new_data");
return job_error("Verify of $slot update failed on [$ip]: $new_data");
}
# update netdisco DB
my $device = get_device($ip);
$device->update({$slot => $data});
return done("Updated $slot on [$ip] to [$data]");
return job_done("Updated $slot on [$ip] to [$data]");
}
1;

View File

@@ -1,7 +1,7 @@
package App::Netdisco::Daemon::Worker::Interactive::PortActions;
use App::Netdisco::Util::Connect ':all';
use App::Netdisco::Util::Permissions ':all';
use App::Netdisco::Util::SNMP ':all';
use App::Netdisco::Util::Port ':all';
use App::Netdisco::Daemon::Worker::Interactive::Util ':all';
use Role::Tiny;
@@ -16,11 +16,11 @@ sub set_portcontrol {
my ($self, $job) = @_;
my $port = get_port($job->device, $job->port)
or return error(sprintf "Unknown port name [%s] on device [%s]",
or return job_error(sprintf "Unknown port name [%s] on device [%s]",
$job->port, $job->device);
my $reconfig_check = port_reconfig_check($port);
return error("Cannot alter port: $reconfig_check")
return job_error("Cannot alter port: $reconfig_check")
if length $reconfig_check;
return _set_port_generic($job, 'up_admin');
@@ -30,15 +30,15 @@ sub set_vlan {
my ($self, $job) = @_;
my $port = get_port($job->device, $job->port)
or return error(sprintf "Unknown port name [%s] on device [%s]",
or return job_error(sprintf "Unknown port name [%s] on device [%s]",
$job->port, $job->device);
my $port_reconfig_check = port_reconfig_check($port);
return error("Cannot alter port: $port_reconfig_check")
return job_error("Cannot alter port: $port_reconfig_check")
if length $port_reconfig_check;
my $vlan_reconfig_check = vlan_reconfig_check($port);
return error("Cannot alter vlan: $vlan_reconfig_check")
return job_error("Cannot alter vlan: $vlan_reconfig_check")
if length $vlan_reconfig_check;
return _set_port_generic($job, 'vlan');
@@ -53,20 +53,20 @@ sub _set_port_generic {
(my $data = $job->subaction) =~ s/-\w+//;
my $port = get_port($ip, $pn)
or return error("Unknown port name [$pn] on device [$ip]");
or return job_error("Unknown port name [$pn] on device [$ip]");
# snmp connect using rw community
my $info = snmp_connect($ip)
or return error("Failed to connect to device [$ip] to control port");
my $info = snmp_connect_rw($ip)
or return job_error("Failed to connect to device [$ip] to control port");
my $iid = get_iid($info, $port)
or return error("Failed to get port ID for [$pn] from [$ip]");
or return job_error("Failed to get port ID for [$pn] from [$ip]");
my $method = 'set_i_'. $slot;
my $rv = $info->$method($data, $iid);
if (!defined $rv) {
return error(sprintf 'Failed to set [%s] %s to [%s] on [%s]: %s',
return job_error(sprintf 'Failed to set [%s] %s to [%s] on [%s]: %s',
$pn, $slot, $data, $ip, ($info->error || ''));
}
@@ -75,27 +75,27 @@ sub _set_port_generic {
my $check_method = 'i_'. $slot;
my $state = ($info->$check_method($iid) || '');
if (ref {} ne ref $state or $state->{$iid} ne $data) {
return error("Verify of [$pn] $slot failed on [$ip]");
return job_error("Verify of [$pn] $slot failed on [$ip]");
}
# update netdisco DB
$port->update({$column => $data});
return done("Updated [$pn] $slot status on [$ip] to [$data]");
return job_done("Updated [$pn] $slot status on [$ip] to [$data]");
}
sub set_power {
my ($self, $job) = @_;
my $port = get_port($job->device, $job->port)
or return error(sprintf "Unknown port name [%s] on device [%s]",
or return job_error(sprintf "Unknown port name [%s] on device [%s]",
$job->port, $job->device);
return error("No PoE service on port [%s] on device [%s]")
return job_error("No PoE service on port [%s] on device [%s]")
unless $port->power;
my $reconfig_check = port_reconfig_check($port);
return error("Cannot alter port: $reconfig_check")
return job_error("Cannot alter port: $reconfig_check")
if length $reconfig_check;
@@ -104,16 +104,16 @@ sub set_power {
(my $data = $job->subaction) =~ s/-\w+//;
# snmp connect using rw community
my $info = snmp_connect($ip)
or return error("Failed to connect to device [$ip] to control port");
my $info = snmp_connect_rw($ip)
or return job_error("Failed to connect to device [$ip] to control port");
my $powerid = get_powerid($info, $port)
or return error("Failed to get power ID for [$pn] from [$ip]");
or return job_error("Failed to get power ID for [$pn] from [$ip]");
my $rv = $info->set_peth_port_admin($data, $powerid);
if (!defined $rv) {
return error(sprintf 'Failed to set [%s] power to [%s] on [%s]: %s',
return job_error(sprintf 'Failed to set [%s] power to [%s] on [%s]: %s',
$pn, $data, $ip, ($info->error || ''));
}
@@ -121,7 +121,7 @@ sub set_power {
$info->clear_cache;
my $state = ($info->peth_port_admin($powerid) || '');
if (ref {} ne ref $state or $state->{$powerid} ne $data) {
return error("Verify of [$pn] power failed on [$ip]");
return job_error("Verify of [$pn] power failed on [$ip]");
}
# update netdisco DB
@@ -130,7 +130,7 @@ sub set_power {
status => ($data eq 'false' ? 'disabled' : 'searching'),
});
return done("Updated [$pn] power status on [$ip] to [$data]");
return job_done("Updated [$pn] power status on [$ip] to [$data]");
}
1;

View File

@@ -4,12 +4,10 @@ package App::Netdisco::Daemon::Worker::Interactive::Util;
use base 'Exporter';
our @EXPORT = ();
our @EXPORT_OK = qw/ done error /;
our %EXPORT_TAGS = (
all => [qw/ done error /],
);
our @EXPORT_OK = qw/ job_done job_error /;
our %EXPORT_TAGS = (all => \@EXPORT_OK);
sub done { return ('done', shift) }
sub error { return ('error', shift) }
sub job_done { return ('done', shift) }
sub job_error { return ('error', shift) }
1;

View File

@@ -3,7 +3,7 @@ package App::Netdisco::Daemon::Worker::Manager;
use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';
use App::Netdisco::Util::DeviceProperties 'is_discoverable';
use App::Netdisco::Util::Device 'is_discoverable';
use Net::Domain 'hostfqdn';
use Try::Tiny;
@@ -13,8 +13,10 @@ use namespace::clean;
my $fqdn = hostfqdn || 'localhost';
my $role_map = {
map {$_ => 'Interactive'}
qw/location contact portcontrol portname vlan power/
(map {$_ => 'Poller'}
qw/refresh discover discoverall discover_neighbors/),
(map {$_ => 'Interactive'}
qw/location contact portcontrol portname vlan power/)
};
sub worker_begin {
@@ -40,7 +42,8 @@ sub worker_begin {
sub worker_body {
my $self = shift;
my $wid = $self->wid;
my $num_slots = $self->do('num_workers');
my $num_slots = $self->do('num_workers')
or return debug "mgr ($wid): this node has no workers... quitting manager";
# get some pending jobs
my $rs = schema('netdisco')->resultset('Admin')
@@ -56,11 +59,11 @@ sub worker_body {
# filter for discover_*
next unless is_discoverable($job->device);
info sprintf "mgr (%s): job %s is discoverable", $wid, $jid;
debug sprintf "mgr (%s): job %s is discoverable", $wid, $jid;
# check for available local capacity
next unless $self->do('capacity_for', $job->action);
info sprintf "mgr (%s): processing node has capacity for job %s (%s)",
debug sprintf "mgr (%s): processing node has capacity for job %s (%s)",
$wid, $jid, $job->action;
# mark job as running

View File

@@ -0,0 +1,75 @@
package App::Netdisco::Daemon::Worker::Poller;
use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';
use Try::Tiny;
use Role::Tiny;
use namespace::clean;
# add dispatch methods for poller tasks
with 'App::Netdisco::Daemon::Worker::Poller::Discover';
sub worker_body {
my $self = shift;
my $wid = $self->wid;
while (1) {
debug "poll ($wid): asking for a job";
my $jobs = $self->do('take_jobs', $self->wid, 'Poller');
foreach my $candidate (@$jobs) {
# create a row object so we can use column accessors
# use the local db schema in case it is accidentally 'stored'
# (will throw an exception)
my $job = schema('daemon')->resultset('Admin')
->new_result($candidate);
my $jid = $job->job;
my $target = $job->action;
next unless $self->can($target);
debug "poll ($wid): can ${target}() for job $jid";
# do job
my ($status, $log);
try {
$job->started(scalar localtime);
info sprintf "poll (%s): starting %s job(%s) at %s",
$wid, $target, $jid, $job->started;
($status, $log) = $self->$target($job);
}
catch {
$status = 'error';
$log = "error running job: $_";
$self->sendto('stderr', $log ."\n");
};
$self->close_job($job, $status, $log);
}
debug "poll ($wid): sleeping now...";
sleep( setting('daemon_sleep_time') || 5 );
}
}
sub close_job {
my ($self, $job, $status, $log) = @_;
my $now = scalar localtime;
info sprintf "poll (%s): wrapping up %s job(%s) - status %s at %s",
$self->wid, $job->action, $job->job, $status, $now;
try {
schema('netdisco')->resultset('Admin')
->find($job->job)
->update({
status => $status,
log => $log,
started => $job->started,
finished => $now,
});
}
catch { $self->sendto('stderr', "error closing job: $_\n") };
}
1;

View File

@@ -0,0 +1,88 @@
package App::Netdisco::Daemon::Worker::Poller::Discover;
use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';
use App::Netdisco::Util::SNMP 'snmp_connect';
use App::Netdisco::Util::Device 'get_device';
use App::Netdisco::Util::DiscoverAndStore ':all';
use App::Netdisco::Daemon::Worker::Interactive::Util ':all';
use NetAddr::IP::Lite ':lower';
use Role::Tiny;
use namespace::clean;
# queue a discover job for all devices known to Netdisco
sub refresh {
my ($self, $job) = @_;
my $devices = schema('netdisco')->resultset('Device')->get_column('ip');
schema('netdisco')->resultset('Admin')->populate([
map {{
device => $_,
action => 'discover',
status => 'queued',
}} ($devices->all)
]);
return job_done("Queued discover job for all devices");
}
sub discover {
my ($self, $job) = @_;
my $host = NetAddr::IP::Lite->new($job->device);
my $device = get_device($host->addr);
my $snmp = snmp_connect($device);
if (!defined $snmp) {
return job_error("discover failed: could not SNMP connect to $host");
}
store_device($device, $snmp);
store_interfaces($device, $snmp);
store_wireless($device, $snmp);
store_vlans($device, $snmp);
store_power($device, $snmp);
store_modules($device, $snmp);
return job_done("Ended discover for $host");
}
# run find_neighbors on all known devices, and run discover on any
# newly found devices.
sub discoverall {
my ($self, $job) = @_;
my $devices = schema('netdisco')->resultset('Device')->get_column('ip');
schema('netdisco')->resultset('Admin')->populate([
map {{
device => $_,
action => 'discover_neighbors',
status => 'queued',
}} ($devices->all)
]);
return job_done("Queued discover_neighbors job for all devices");
}
sub discover_neighbors {
my ($self, $job) = @_;
my $host = NetAddr::IP::Lite->new($job->device);
my $device = get_device($host->addr);
my $snmp = snmp_connect($device);
if (!defined $snmp) {
return job_error("discover_neighbors failed: could not SNMP connect to $host");
}
find_neighbors($device, $snmp);
return job_done("Ended find_neighbors for $host");
}
1;

View File

@@ -0,0 +1,92 @@
package App::Netdisco::Daemon::Worker::Scheduler;
use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';
use Algorithm::Cron;
use Try::Tiny;
use Role::Tiny;
use namespace::clean;
my $jobactions = {
map {$_ => undef} qw/
/
# saveconfigs
# discoverall
# refresh
# macwalk
# arpwalk
# nbtwalk
# backup
};
sub worker_begin {
my $self = shift;
my $wid = $self->wid;
debug "entering Scheduler ($wid) worker_begin()";
foreach my $a (keys %$jobactions) {
next unless setting('housekeeping')
and exists setting('housekeeping')->{$a};
my $config = setting('housekeeping')->{$a};
# accept either single crontab format, or individual time fields
my $cron = Algorithm::Cron->new(
base => 'local',
%{
(ref {} eq ref $config->{when})
? $config->{when}
: {crontab => $config->{when}}
}
);
$jobactions->{$a} = $config;
$jobactions->{$a}->{when} = $cron;
}
}
sub worker_body {
my $self = shift;
my $wid = $self->wid;
while (1) {
# sleep until some point in the next minute
my $naptime = 60 - (time % 60) + int(rand(45));
debug "sched ($wid): sleeping for $naptime seconds";
sleep $naptime;
# NB next_time() returns the next *after* win_start
my $win_start = time - (time % 60) - 1;
my $win_end = $win_start + 60;
# if any job is due, add it to the queue
foreach my $a (keys %$jobactions) {
next unless defined $jobactions->{$a};
my $sched = $jobactions->{$a};
# next occurence of job must be in this minute's window
debug sprintf "sched ($wid): $a: win_start: %s, win_end: %s, next: %s",
$win_start, $win_end, $sched->{when}->next_time($win_start);
next unless $sched->{when}->next_time($win_start) <= $win_end;
# queue it!
# due to a table constraint, this will (intentionally) fail if a
# similar job is already queued.
try {
debug "sched ($wid): queueing $a job";
schema('netdisco')->resultset('Admin')->create({
action => $a,
device => ($sched->{device} || undef),
subaction => ($sched->{extra} || undef),
status => 'queued',
});
}
catch {
debug "sched ($wid): action $a was not queued (dupe?)";
};
}
}
}
1;