Squashed commit of the following:
commit c8f399b2d70248acde6bbabfa58ed4312bf1f8c2 Author: Oliver Gorwits <oliver@cpan.org> Date: Tue Jun 11 22:53:14 2013 +0100 version bump commit 39d16adf38c142feeb1378c217a59783c5a12b44 Author: Oliver Gorwits <oliver@cpan.org> Date: Tue Jun 11 22:43:37 2013 +0100 initial discover triggers *walk commit 29c334db714ac4846d024d815ffe1822889e879c Author: Oliver Gorwits <oliver@cpan.org> Date: Tue Jun 11 22:07:03 2013 +0100 factor out body of poller and interactive worker role commit db21f88e5a24ad99d808e70213cab1d83cc1ffee Author: Oliver Gorwits <oliver@cpan.org> Date: Tue Jun 11 20:58:07 2013 +0100 fix logging when daemonized commit 1f4a0539de7368273ddc7c24a8ffb0a663817e72 Author: Oliver Gorwits <oliver@cpan.org> Date: Tue Jun 11 17:49:44 2013 +0100 limit discovery and remove duff constraint on device_ip
This commit is contained in:
@@ -4,10 +4,11 @@ package App::Netdisco::Daemon::Util;
|
||||
|
||||
use base 'Exporter';
|
||||
our @EXPORT = ();
|
||||
our @EXPORT_OK = qw/ job_done job_error /;
|
||||
our @EXPORT_OK = qw/ job_done job_error job_defer /;
|
||||
our %EXPORT_TAGS = (all => \@EXPORT_OK);
|
||||
|
||||
sub job_done { return ('done', shift) }
|
||||
sub job_error { return ('error', shift) }
|
||||
sub job_defer { return ('defer', shift) }
|
||||
|
||||
1;
|
||||
|
||||
86
Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm
Normal file
86
Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm
Normal file
@@ -0,0 +1,86 @@
|
||||
package App::Netdisco::Daemon::Worker::Common;
|
||||
|
||||
use Dancer qw/:moose :syntax :script/;
|
||||
use Dancer::Plugin::DBIC 'schema';
|
||||
use Try::Tiny;
|
||||
|
||||
use Role::Tiny;
|
||||
use namespace::clean;
|
||||
|
||||
requires qw/worker_type worker_name munge_action/;
|
||||
|
||||
sub worker_body {
|
||||
my $self = shift;
|
||||
my $wid = $self->wid;
|
||||
|
||||
my $type = $self->worker_type;
|
||||
my $name = $self->worker_name;
|
||||
|
||||
while (1) {
|
||||
debug "$type ($wid): asking for a job";
|
||||
my $jobs = $self->do('take_jobs', $self->wid, $name);
|
||||
|
||||
foreach my $candidate (@$jobs) {
|
||||
# create a row object so we can use column accessors
|
||||
# use the local db schema in case it is accidentally 'stored'
|
||||
# (will throw an exception)
|
||||
my $job = schema('daemon')->resultset('Admin')
|
||||
->new_result($candidate);
|
||||
my $jid = $job->job;
|
||||
|
||||
my $target = $self->munge_action($job->action);
|
||||
next unless $self->can($target);
|
||||
debug "$type ($wid): can ${target}() for job $jid";
|
||||
|
||||
# do job
|
||||
my ($status, $log);
|
||||
try {
|
||||
$job->started(scalar localtime);
|
||||
info sprintf "$type (%s): starting %s job(%s) at %s",
|
||||
$wid, $target, $jid, $job->started;
|
||||
($status, $log) = $self->$target($job);
|
||||
}
|
||||
catch {
|
||||
$status = 'error';
|
||||
$log = "error running job: $_";
|
||||
$self->sendto('stderr', $log ."\n");
|
||||
};
|
||||
|
||||
$self->close_job($job, $status, $log);
|
||||
}
|
||||
|
||||
debug "$type ($wid): sleeping now...";
|
||||
sleep( setting('workers')->{sleep_time} || 5 );
|
||||
}
|
||||
}
|
||||
|
||||
sub close_job {
|
||||
my ($self, $job, $status, $log) = @_;
|
||||
my $type = $self->worker_type;
|
||||
my $now = scalar localtime;
|
||||
|
||||
info sprintf "$type (%s): wrapping up %s job(%s) - status %s at %s",
|
||||
$self->wid, $job->action, $job->job, $status, $now;
|
||||
|
||||
# lock db row and either defer or complete the job
|
||||
try {
|
||||
if ($status eq 'defer') {
|
||||
schema('netdisco')->resultset('Admin')
|
||||
->find($job->job, {for => 'update'})
|
||||
->update({ status => 'queued' });
|
||||
}
|
||||
else {
|
||||
schema('netdisco')->resultset('Admin')
|
||||
->find($job->job, {for => 'update'})
|
||||
->update({
|
||||
status => $status,
|
||||
log => $log,
|
||||
started => $job->started,
|
||||
finished => $now,
|
||||
});
|
||||
}
|
||||
}
|
||||
catch { $self->sendto('stderr', "error closing job: $_\n") };
|
||||
}
|
||||
|
||||
1;
|
||||
@@ -1,75 +1,17 @@
|
||||
package App::Netdisco::Daemon::Worker::Interactive;
|
||||
|
||||
use Dancer qw/:moose :syntax :script/;
|
||||
use Dancer::Plugin::DBIC 'schema';
|
||||
use Try::Tiny;
|
||||
|
||||
use Role::Tiny;
|
||||
use namespace::clean;
|
||||
|
||||
# main worker body
|
||||
with 'App::Netdisco::Daemon::Worker::Common';
|
||||
|
||||
# add dispatch methods for interactive actions
|
||||
with 'App::Netdisco::Daemon::Worker::Interactive::DeviceActions',
|
||||
'App::Netdisco::Daemon::Worker::Interactive::PortActions';
|
||||
|
||||
sub worker_body {
|
||||
my $self = shift;
|
||||
my $wid = $self->wid;
|
||||
|
||||
while (1) {
|
||||
debug "int ($wid): asking for a job";
|
||||
my $jobs = $self->do('take_jobs', $self->wid, 'Interactive');
|
||||
|
||||
foreach my $candidate (@$jobs) {
|
||||
# create a row object so we can use column accessors
|
||||
# use the local db schema in case it is accidentally 'stored'
|
||||
# (will throw an exception)
|
||||
my $job = schema('daemon')->resultset('Admin')
|
||||
->new_result($candidate);
|
||||
my $jid = $job->job;
|
||||
|
||||
my $target = 'set_'. $job->action;
|
||||
next unless $self->can($target);
|
||||
debug "int ($wid): can ${target}() for job $jid";
|
||||
|
||||
# do job
|
||||
my ($status, $log);
|
||||
try {
|
||||
$job->started(scalar localtime);
|
||||
info sprintf "int (%s): starting %s job(%s) at %s",
|
||||
$wid, $target, $jid, $job->started;
|
||||
($status, $log) = $self->$target($job);
|
||||
}
|
||||
catch {
|
||||
$status = 'error';
|
||||
$log = "error running job: $_";
|
||||
$self->sendto('stderr', $log ."\n");
|
||||
};
|
||||
|
||||
$self->close_job($job, $status, $log);
|
||||
}
|
||||
|
||||
debug "int ($wid): sleeping now...";
|
||||
sleep( setting('workers')->{sleep_time} || 5 );
|
||||
}
|
||||
}
|
||||
|
||||
sub close_job {
|
||||
my ($self, $job, $status, $log) = @_;
|
||||
my $now = scalar localtime;
|
||||
info sprintf "int (%s): wrapping up set_%s job(%s) - status %s at %s",
|
||||
$self->wid, $job->action, $job->job, $status, $now;
|
||||
|
||||
try {
|
||||
schema('netdisco')->resultset('Admin')
|
||||
->find($job->job, {for => 'update'})
|
||||
->update({
|
||||
status => $status,
|
||||
log => $log,
|
||||
started => $job->started,
|
||||
finished => $now,
|
||||
});
|
||||
}
|
||||
catch { $self->sendto('stderr', "error closing job: $_\n") };
|
||||
}
|
||||
sub worker_type { 'int' }
|
||||
sub worker_name { 'Interactive' }
|
||||
sub munge_action { 'set_' . $_[1] }
|
||||
|
||||
1;
|
||||
|
||||
@@ -95,11 +95,10 @@ sub lock_job {
|
||||
# lock db row and update to show job has been picked
|
||||
try {
|
||||
schema('netdisco')->txn_do(sub {
|
||||
my $row = schema('netdisco')->resultset('Admin')->find(
|
||||
{job => $job->job, status => 'queued'}, {for => 'update'}
|
||||
);
|
||||
|
||||
$row->update({status => "queued-$fqdn"});
|
||||
schema('netdisco')->resultset('Admin')->find(
|
||||
{job => $job->job, status => 'queued'},
|
||||
{for => 'update'}
|
||||
)->update({ status => "queued-$fqdn" });
|
||||
});
|
||||
$happy = 1;
|
||||
};
|
||||
|
||||
@@ -1,77 +1,18 @@
|
||||
package App::Netdisco::Daemon::Worker::Poller;
|
||||
|
||||
use Dancer qw/:moose :syntax :script/;
|
||||
use Dancer::Plugin::DBIC 'schema';
|
||||
|
||||
use Try::Tiny;
|
||||
|
||||
use Role::Tiny;
|
||||
use namespace::clean;
|
||||
|
||||
# main worker body
|
||||
with 'App::Netdisco::Daemon::Worker::Common';
|
||||
|
||||
# add dispatch methods for poller tasks
|
||||
with 'App::Netdisco::Daemon::Worker::Poller::Device';
|
||||
with 'App::Netdisco::Daemon::Worker::Poller::Arpnip';
|
||||
with 'App::Netdisco::Daemon::Worker::Poller::Macsuck';
|
||||
with 'App::Netdisco::Daemon::Worker::Poller::Device',
|
||||
'App::Netdisco::Daemon::Worker::Poller::Arpnip',
|
||||
'App::Netdisco::Daemon::Worker::Poller::Macsuck';
|
||||
|
||||
sub worker_body {
|
||||
my $self = shift;
|
||||
my $wid = $self->wid;
|
||||
|
||||
while (1) {
|
||||
debug "poll ($wid): asking for a job";
|
||||
my $jobs = $self->do('take_jobs', $self->wid, 'Poller');
|
||||
|
||||
foreach my $candidate (@$jobs) {
|
||||
# create a row object so we can use column accessors
|
||||
# use the local db schema in case it is accidentally 'stored'
|
||||
# (will throw an exception)
|
||||
my $job = schema('daemon')->resultset('Admin')
|
||||
->new_result($candidate);
|
||||
my $jid = $job->job;
|
||||
my $target = $job->action;
|
||||
|
||||
next unless $self->can($target);
|
||||
debug "poll ($wid): can ${target}() for job $jid";
|
||||
|
||||
# do job
|
||||
my ($status, $log);
|
||||
try {
|
||||
$job->started(scalar localtime);
|
||||
info sprintf "poll (%s): starting %s job(%s) at %s",
|
||||
$wid, $target, $jid, $job->started;
|
||||
($status, $log) = $self->$target($job);
|
||||
}
|
||||
catch {
|
||||
$status = 'error';
|
||||
$log = "error running job: $_";
|
||||
$self->sendto('stderr', $log ."\n");
|
||||
};
|
||||
|
||||
$self->close_job($job, $status, $log);
|
||||
}
|
||||
|
||||
debug "poll ($wid): sleeping now...";
|
||||
sleep( setting('workers')->{sleep_time} || 5 );
|
||||
}
|
||||
}
|
||||
|
||||
sub close_job {
|
||||
my ($self, $job, $status, $log) = @_;
|
||||
my $now = scalar localtime;
|
||||
info sprintf "poll (%s): wrapping up %s job(%s) - status %s at %s",
|
||||
$self->wid, $job->action, $job->job, $status, $now;
|
||||
|
||||
try {
|
||||
schema('netdisco')->resultset('Admin')
|
||||
->find($job->job, {for => 'update'})
|
||||
->update({
|
||||
status => $status,
|
||||
log => $log,
|
||||
started => $job->started,
|
||||
finished => $now,
|
||||
});
|
||||
}
|
||||
catch { $self->sendto('stderr', "error closing job: $_\n") };
|
||||
}
|
||||
sub worker_type { 'pol' }
|
||||
sub worker_name { 'Poller' }
|
||||
sub munge_action { $_[1] }
|
||||
|
||||
1;
|
||||
|
||||
@@ -20,6 +20,16 @@ sub arpwalk {
|
||||
my $devices = schema('netdisco')->resultset('Device')->get_column('ip');
|
||||
my $jobqueue = schema('netdisco')->resultset('Admin');
|
||||
|
||||
if ($job->subaction and $job->subaction eq 'after-discoverall') {
|
||||
# make sure there are no incomplete discover jobs queued
|
||||
my $discover = $jobqueue->search(
|
||||
{ action => 'discover', status => { -like => 'queued%' } }
|
||||
)->count;
|
||||
|
||||
return job_defer("Deferred arpwalk due to pending discover jobs")
|
||||
if $discover;
|
||||
}
|
||||
|
||||
schema('netdisco')->txn_do(sub {
|
||||
# clean up user submitted jobs older than 1min,
|
||||
# assuming skew between schedulers' clocks is not greater than 1min
|
||||
|
||||
@@ -50,6 +50,10 @@ sub discover {
|
||||
my $host = NetAddr::IP::Lite->new($job->device);
|
||||
my $device = get_device($host->addr);
|
||||
|
||||
if ($device->ip eq '0.0.0.0') {
|
||||
return job_error("discover failed: no device param (need -d ?)");
|
||||
}
|
||||
|
||||
if ($device->in_storage
|
||||
and $device->vendor and $device->vendor eq 'netdisco') {
|
||||
return job_done("Skipped discover for pseudo-device $host");
|
||||
|
||||
Reference in New Issue
Block a user