big patch to remove knowledge of DB from most worker code
This commit is contained in:
@@ -8,10 +8,7 @@ use base 'DBIx::Class::Core';
|
|||||||
__PACKAGE__->table("admin");
|
__PACKAGE__->table("admin");
|
||||||
__PACKAGE__->add_columns(
|
__PACKAGE__->add_columns(
|
||||||
"job",
|
"job",
|
||||||
{
|
{ data_type => "integer", is_nullable => 0 },
|
||||||
data_type => "integer",
|
|
||||||
is_nullable => 0,
|
|
||||||
},
|
|
||||||
|
|
||||||
"type", # Poller, Interactive, etc
|
"type", # Poller, Interactive, etc
|
||||||
{ data_type => "text", is_nullable => 0 },
|
{ data_type => "text", is_nullable => 0 },
|
||||||
|
|||||||
@@ -3,27 +3,144 @@ package App::Netdisco::Daemon::JobQueue::PostgreSQL;
|
|||||||
use Dancer qw/:moose :syntax :script/;
|
use Dancer qw/:moose :syntax :script/;
|
||||||
use Dancer::Plugin::DBIC 'schema';
|
use Dancer::Plugin::DBIC 'schema';
|
||||||
|
|
||||||
|
use Net::Domain 'hostfqdn';
|
||||||
|
use Try::Tiny;
|
||||||
|
|
||||||
use Role::Tiny;
|
use Role::Tiny;
|
||||||
use namespace::clean;
|
use namespace::clean;
|
||||||
|
|
||||||
sub jobqueue_insert {
|
#jq_get
|
||||||
my ($self, $settings) = @_;
|
#jq_getlocal
|
||||||
|
#jq_queued
|
||||||
|
#jq_lock
|
||||||
|
#jq_defer
|
||||||
|
#jq_complete
|
||||||
|
#jq_insert
|
||||||
|
|
||||||
schema('netdisco')->resultset('Admin')->create({
|
sub jq_get {
|
||||||
action => $settings->{action},
|
my ($self, $num_slots) = @_;
|
||||||
device => $settings->{device},
|
my @returned = ();
|
||||||
port => $settings->{port},
|
|
||||||
subaction => $settings->{extra},
|
my $rs = schema('netdisco')->resultset('Admin')
|
||||||
status => 'queued',
|
->search(
|
||||||
});
|
{status => 'queued'},
|
||||||
|
{order_by => 'random()', rows => ($num_slots || 1)},
|
||||||
|
);
|
||||||
|
|
||||||
|
while (my $job = $rs->next) {
|
||||||
|
my $job_type = setting('job_types')->{$job->action} or next;
|
||||||
|
push @returned, schema('daemon')->resultset('Admin')
|
||||||
|
->new_result({ $job->get_columns, type => $job_type });
|
||||||
}
|
}
|
||||||
|
|
||||||
sub jobqueue_update {
|
return @returned;
|
||||||
my ($self, $settings) = @_;
|
}
|
||||||
|
|
||||||
|
sub jq_getlocal {
|
||||||
|
my $self = shift;
|
||||||
|
my $fqdn = hostfqdn || 'localhost';
|
||||||
|
my @returned = ();
|
||||||
|
|
||||||
|
my $rs = schema('netdisco')->resultset('Admin')
|
||||||
|
->search({status => "queued-$fqdn"});
|
||||||
|
|
||||||
|
while (my $job = $rs->next) {
|
||||||
|
my $job_type = setting('job_types')->{$job->action} or next;
|
||||||
|
push @returned, schema('daemon')->resultset('Admin')
|
||||||
|
->new_result({ $job->get_columns, type => $job_type });
|
||||||
|
}
|
||||||
|
|
||||||
|
return @returned;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub jq_queued {
|
||||||
|
my ($self, $job_type) = @_;
|
||||||
|
|
||||||
|
return schema('netdisco')->resultset('Admin')->search({
|
||||||
|
device => { '!=' => undef},
|
||||||
|
action => $job_type,
|
||||||
|
status => { -like => 'queued%' },
|
||||||
|
})->get_column('device')->all;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub jq_lock {
|
||||||
|
my ($self, $job) = @_;
|
||||||
|
my $fqdn = hostfqdn || 'localhost';
|
||||||
|
my $happy = 0;
|
||||||
|
|
||||||
|
# lock db row and update to show job has been picked
|
||||||
|
try {
|
||||||
|
schema('netdisco')->txn_do(sub {
|
||||||
schema('netdisco')->resultset('Admin')
|
schema('netdisco')->resultset('Admin')
|
||||||
->find(delete $settings->{id}, {for => 'update'})
|
->find($job->id, {for => 'update'})
|
||||||
->update($settings);
|
->update({ status => "queued-$fqdn" });
|
||||||
|
});
|
||||||
|
$happy = 1;
|
||||||
|
};
|
||||||
|
|
||||||
|
return $happy;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub jq_defer {
|
||||||
|
my ($self, $job) = @_;
|
||||||
|
my $happy = 0;
|
||||||
|
|
||||||
|
# lock db row and update to show job is available
|
||||||
|
try {
|
||||||
|
schema('netdisco')->txn_do(sub {
|
||||||
|
schema('netdisco')->resultset('Admin')
|
||||||
|
->find($job->id, {for => 'update'})
|
||||||
|
->update({ status => 'queued' });
|
||||||
|
});
|
||||||
|
$happy = 1;
|
||||||
|
};
|
||||||
|
|
||||||
|
return $happy;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub jq_complete {
|
||||||
|
my ($self, $job) = @_;
|
||||||
|
my $happy = 0;
|
||||||
|
|
||||||
|
# lock db row and update to show job is done/error
|
||||||
|
try {
|
||||||
|
schema('netdisco')->txn_do(sub {
|
||||||
|
schema('netdisco')->resultset('Admin')
|
||||||
|
->find($job->id, {for => 'update'})->update({
|
||||||
|
status => $job->status,
|
||||||
|
log => $job->log,
|
||||||
|
finished => $job->finished,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
$happy = 1;
|
||||||
|
};
|
||||||
|
|
||||||
|
return $happy;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub jq_insert {
|
||||||
|
my ($self, $jobs) = @_;
|
||||||
|
$jobs = [$jobs] if ref [] ne ref $jobs;
|
||||||
|
my $happy = 0;
|
||||||
|
|
||||||
|
try {
|
||||||
|
schema('netdisco')->txn_do(sub {
|
||||||
|
schema('netdisco')->resultset('Admin')->populate([
|
||||||
|
map {{
|
||||||
|
device => $_->{device},
|
||||||
|
port => $_->{port},
|
||||||
|
action => $_->{action},
|
||||||
|
subaction => ($_->{extra} || $_->{subaction}),
|
||||||
|
username => $_->{username},
|
||||||
|
userip => $_->{userip},
|
||||||
|
status => 'queued',
|
||||||
|
}} @$jobs
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
$happy = 1;
|
||||||
|
};
|
||||||
|
|
||||||
|
return $happy;
|
||||||
}
|
}
|
||||||
|
|
||||||
true;
|
true;
|
||||||
|
|||||||
@@ -12,9 +12,10 @@ schema('daemon')->deploy;
|
|||||||
my $queue = schema('daemon')->resultset('Admin');
|
my $queue = schema('daemon')->resultset('Admin');
|
||||||
|
|
||||||
sub add_jobs {
|
sub add_jobs {
|
||||||
my ($jobs) = @_;
|
my (@jobs) = @_;
|
||||||
info sprintf "adding %s jobs to local queue", scalar @$jobs;
|
info sprintf "adding %s jobs to local queue", scalar @jobs;
|
||||||
$queue->populate($jobs);
|
use Data::Printer;
|
||||||
|
do { schema('daemon')->dclone($_)->insert } for @jobs;
|
||||||
}
|
}
|
||||||
|
|
||||||
sub capacity_for {
|
sub capacity_for {
|
||||||
@@ -46,7 +47,7 @@ sub take_jobs {
|
|||||||
$queue->search({job => { -in => [map {$_->job} @rows] }})
|
$queue->search({job => { -in => [map {$_->job} @rows] }})
|
||||||
->update({wid => $wid});
|
->update({wid => $wid});
|
||||||
|
|
||||||
return [ map {{$_->get_columns}} @rows ];
|
return \@rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
sub reset_jobs {
|
sub reset_jobs {
|
||||||
|
|||||||
@@ -1,14 +1,13 @@
|
|||||||
package App::Netdisco::Daemon::Worker::Common;
|
package App::Netdisco::Daemon::Worker::Common;
|
||||||
|
|
||||||
use Dancer qw/:moose :syntax :script/;
|
use Dancer qw/:moose :syntax :script/;
|
||||||
use Dancer::Plugin::DBIC 'schema';
|
|
||||||
use Try::Tiny;
|
use Try::Tiny;
|
||||||
|
|
||||||
use Role::Tiny;
|
use Role::Tiny;
|
||||||
use namespace::clean;
|
use namespace::clean;
|
||||||
|
|
||||||
with 'App::Netdisco::Daemon::JobQueue::'. setting('job_queue');
|
with 'App::Netdisco::Daemon::JobQueue::'. setting('job_queue');
|
||||||
requires qw/worker_type worker_name munge_action jobqueue_update/;
|
requires qw/worker_type worker_name munge_action jq_defer jq_complete/;
|
||||||
|
|
||||||
sub worker_body {
|
sub worker_body {
|
||||||
my $self = shift;
|
my $self = shift;
|
||||||
@@ -21,33 +20,27 @@ sub worker_body {
|
|||||||
debug "$type ($wid): asking for a job";
|
debug "$type ($wid): asking for a job";
|
||||||
my $jobs = $self->do('take_jobs', $self->wid, $name);
|
my $jobs = $self->do('take_jobs', $self->wid, $name);
|
||||||
|
|
||||||
foreach my $candidate (@$jobs) {
|
foreach my $job (@$jobs) {
|
||||||
# create a row object so we can use column accessors
|
|
||||||
# use the local db schema in case it is accidentally 'stored'
|
|
||||||
# (will throw an exception)
|
|
||||||
my $job = schema('daemon')->resultset('Admin')
|
|
||||||
->new_result($candidate);
|
|
||||||
my $jid = $job->job;
|
|
||||||
|
|
||||||
my $target = $self->munge_action($job->action);
|
my $target = $self->munge_action($job->action);
|
||||||
next unless $self->can($target);
|
next unless $self->can($target);
|
||||||
debug "$type ($wid): can ${target}() for job $jid";
|
debug sprintf "$type ($wid): can ${target}() for job %s", $job->id;
|
||||||
|
|
||||||
# do job
|
# do job
|
||||||
my ($status, $log);
|
|
||||||
try {
|
try {
|
||||||
$job->started(scalar localtime);
|
$job->started(scalar localtime);
|
||||||
info sprintf "$type (%s): starting %s job(%s) at %s",
|
info sprintf "$type (%s): starting %s job(%s) at %s",
|
||||||
$wid, $target, $jid, $job->started;
|
$wid, $target, $job->id, $job->started;
|
||||||
($status, $log) = $self->$target($job);
|
my ($status, $log) = $self->$target($job);
|
||||||
|
$job->status($status);
|
||||||
|
$job->log($log);
|
||||||
}
|
}
|
||||||
catch {
|
catch {
|
||||||
$status = 'error';
|
$job->status('error');
|
||||||
$log = "error running job: $_";
|
$job->log("error running job: $_");
|
||||||
$self->sendto('stderr', $log ."\n");
|
$self->sendto('stderr', $job->log ."\n");
|
||||||
};
|
};
|
||||||
|
|
||||||
$self->close_job($job, $status, $log);
|
$self->close_job($job);
|
||||||
}
|
}
|
||||||
|
|
||||||
debug "$type ($wid): sleeping now...";
|
debug "$type ($wid): sleeping now...";
|
||||||
@@ -56,29 +49,20 @@ sub worker_body {
|
|||||||
}
|
}
|
||||||
|
|
||||||
sub close_job {
|
sub close_job {
|
||||||
my ($self, $job, $status, $log) = @_;
|
my ($self, $job) = @_;
|
||||||
my $type = $self->worker_type;
|
my $type = $self->worker_type;
|
||||||
my $now = scalar localtime;
|
my $now = scalar localtime;
|
||||||
|
|
||||||
info sprintf "$type (%s): wrapping up %s job(%s) - status %s at %s",
|
info sprintf "$type (%s): wrapping up %s job(%s) - status %s at %s",
|
||||||
$self->wid, $job->action, $job->job, $status, $now;
|
$self->wid, $job->action, $job->id, $job->status, $now;
|
||||||
|
|
||||||
# lock db row and either defer or complete the job
|
|
||||||
try {
|
try {
|
||||||
if ($status eq 'defer') {
|
if ($job->status eq 'defer') {
|
||||||
$self->jobqueue_update({
|
$self->jq_defer($job);
|
||||||
id => $job->job,
|
|
||||||
status => 'queued',
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
$self->jobqueue_update({
|
$job->finished($now);
|
||||||
id => $job->job,
|
$self->jq_complete($job);
|
||||||
status => $status,
|
|
||||||
log => $log,
|
|
||||||
started => $job->started,
|
|
||||||
finished => $now,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# remove job from local queue
|
# remove job from local queue
|
||||||
|
|||||||
@@ -1,15 +1,12 @@
|
|||||||
package App::Netdisco::Daemon::Worker::Manager;
|
package App::Netdisco::Daemon::Worker::Manager;
|
||||||
|
|
||||||
use Dancer qw/:moose :syntax :script/;
|
use Dancer qw/:moose :syntax :script/;
|
||||||
use Dancer::Plugin::DBIC 'schema';
|
|
||||||
|
|
||||||
use Net::Domain 'hostfqdn';
|
|
||||||
use Try::Tiny;
|
|
||||||
|
|
||||||
use Role::Tiny;
|
use Role::Tiny;
|
||||||
use namespace::clean;
|
use namespace::clean;
|
||||||
|
|
||||||
my $fqdn = hostfqdn || 'localhost';
|
with 'App::Netdisco::Daemon::JobQueue::'. setting('job_queue');
|
||||||
|
requires qw/jq_get jq_getlocal jq_lock/;
|
||||||
|
|
||||||
sub worker_begin {
|
sub worker_begin {
|
||||||
my $self = shift;
|
my $self = shift;
|
||||||
@@ -18,16 +15,11 @@ sub worker_begin {
|
|||||||
|
|
||||||
# requeue jobs locally
|
# requeue jobs locally
|
||||||
debug "mgr ($wid): searching for jobs booked to this processing node";
|
debug "mgr ($wid): searching for jobs booked to this processing node";
|
||||||
my $rs = schema('netdisco')->resultset('Admin')
|
my @jobs = $self->jq_getlocal;
|
||||||
->search({status => "queued-$fqdn"});
|
|
||||||
|
|
||||||
my @jobs = map {{$_->get_columns}} $rs->all;
|
|
||||||
|
|
||||||
if (scalar @jobs) {
|
if (scalar @jobs) {
|
||||||
info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs;
|
info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs;
|
||||||
map { $_->{type} = setting('job_types')->{$_->{action}} } @jobs;
|
$self->do('add_jobs', @jobs);
|
||||||
|
|
||||||
$self->do('add_jobs', \@jobs);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -37,62 +29,31 @@ sub worker_body {
|
|||||||
my $num_slots = $self->do('num_workers')
|
my $num_slots = $self->do('num_workers')
|
||||||
or return debug "mgr ($wid): this node has no workers... quitting manager";
|
or return debug "mgr ($wid): this node has no workers... quitting manager";
|
||||||
|
|
||||||
# get some pending jobs
|
|
||||||
my $rs = schema('netdisco')->resultset('Admin')
|
|
||||||
->search(
|
|
||||||
{status => 'queued'},
|
|
||||||
{order_by => 'random()', rows => $num_slots},
|
|
||||||
);
|
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
debug "mgr ($wid): getting potential jobs for $num_slots workers";
|
debug "mgr ($wid): getting potential jobs for $num_slots workers";
|
||||||
while (my $job = $rs->next) {
|
|
||||||
my $jid = $job->job;
|
# get some pending jobs
|
||||||
my $job_type = setting('job_types')->{$job->action} or next;
|
# TODO also check for stale jobs in Netdisco DB
|
||||||
|
foreach my $job ( $self->jq_get($num_slots) ) {
|
||||||
|
|
||||||
# check for available local capacity
|
# check for available local capacity
|
||||||
next unless $self->do('capacity_for', $job_type);
|
my $job_type = setting('job_types')->{$job->action};
|
||||||
|
next unless $job_type and $self->do('capacity_for', $job_type);
|
||||||
debug sprintf "mgr (%s): processing node has capacity for job %s (%s)",
|
debug sprintf "mgr (%s): processing node has capacity for job %s (%s)",
|
||||||
$wid, $jid, $job->action;
|
$wid, $job->id, $job->action;
|
||||||
|
|
||||||
# mark job as running
|
# mark job as running
|
||||||
next unless $self->lock_job($job);
|
next unless $self->jq_lock($job);
|
||||||
info sprintf "mgr (%s): job %s booked out for this processing node",
|
info sprintf "mgr (%s): job %s booked out for this processing node",
|
||||||
$wid, $jid;
|
$wid, $job->id;
|
||||||
|
|
||||||
my $local_job = { $job->get_columns };
|
|
||||||
$local_job->{type} = $job_type;
|
|
||||||
|
|
||||||
# copy job to local queue
|
# copy job to local queue
|
||||||
$self->do('add_jobs', [$local_job]);
|
$self->do('add_jobs', $job);
|
||||||
}
|
}
|
||||||
|
|
||||||
# reset iterator so ->next() triggers another DB query
|
|
||||||
$rs->reset;
|
|
||||||
|
|
||||||
# TODO also check for stale jobs in Netdisco DB
|
|
||||||
|
|
||||||
debug "mgr ($wid): sleeping now...";
|
debug "mgr ($wid): sleeping now...";
|
||||||
sleep( setting('workers')->{sleep_time} || 2 );
|
sleep( setting('workers')->{sleep_time} || 2 );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sub lock_job {
|
|
||||||
my ($self, $job) = @_;
|
|
||||||
my $happy = 0;
|
|
||||||
|
|
||||||
# lock db row and update to show job has been picked
|
|
||||||
try {
|
|
||||||
schema('netdisco')->txn_do(sub {
|
|
||||||
schema('netdisco')->resultset('Admin')->find(
|
|
||||||
{job => $job->job, status => 'queued'},
|
|
||||||
{for => 'update'}
|
|
||||||
)->update({ status => "queued-$fqdn" });
|
|
||||||
});
|
|
||||||
$happy = 1;
|
|
||||||
};
|
|
||||||
|
|
||||||
return $happy;
|
|
||||||
}
|
|
||||||
|
|
||||||
1;
|
1;
|
||||||
|
|||||||
@@ -1,51 +1,40 @@
|
|||||||
package App::Netdisco::Daemon::Worker::Poller::Common;
|
package App::Netdisco::Daemon::Worker::Poller::Common;
|
||||||
|
|
||||||
use Dancer qw/:moose :syntax :script/;
|
use Dancer qw/:moose :syntax :script/;
|
||||||
use Dancer::Plugin::DBIC 'schema';
|
|
||||||
|
|
||||||
use App::Netdisco::Util::SNMP 'snmp_connect';
|
use App::Netdisco::Util::SNMP 'snmp_connect';
|
||||||
use App::Netdisco::Util::Device 'get_device';
|
use App::Netdisco::Util::Device 'get_device';
|
||||||
use App::Netdisco::Daemon::Util ':all';
|
use App::Netdisco::Daemon::Util ':all';
|
||||||
|
use Dancer::Plugin::DBIC 'schema';
|
||||||
|
|
||||||
use NetAddr::IP::Lite ':lower';
|
use NetAddr::IP::Lite ':lower';
|
||||||
|
|
||||||
use Role::Tiny;
|
use Role::Tiny;
|
||||||
use namespace::clean;
|
use namespace::clean;
|
||||||
|
|
||||||
|
with 'App::Netdisco::Daemon::JobQueue::'. setting('job_queue');
|
||||||
|
requires qw/jq_queued jq_insert/;
|
||||||
|
|
||||||
# queue a job for all devices known to Netdisco
|
# queue a job for all devices known to Netdisco
|
||||||
sub _walk_body {
|
sub _walk_body {
|
||||||
my ($self, $job_type, $job) = @_;
|
my ($self, $job_type, $job) = @_;
|
||||||
|
|
||||||
my $action_method = $job_type .'_action';
|
|
||||||
my $job_action = $self->$action_method;
|
|
||||||
|
|
||||||
my $layer_method = $job_type .'_layer';
|
my $layer_method = $job_type .'_layer';
|
||||||
my $job_layer = $self->$layer_method;
|
my $job_layer = $self->$layer_method;
|
||||||
|
|
||||||
my $jobqueue = schema('netdisco')->resultset('Admin');
|
my %queued = map {$_ => 1} $self->jq_queued($job_type);
|
||||||
my @devices = schema('netdisco')->resultset('Device')
|
my @devices = schema('netdisco')->resultset('Device')
|
||||||
->search({ip => { -not_in =>
|
->has_layer($job_layer)->get_column('ip')->all;
|
||||||
$jobqueue->search({
|
my @filtered_devices = grep {!exists $queued{$_}} @devices;
|
||||||
device => { '!=' => undef},
|
|
||||||
action => $job_type,
|
|
||||||
status => { -like => 'queued%' },
|
|
||||||
})->get_column('device')->as_query
|
|
||||||
}})->has_layer($job_layer)->get_column('ip')->all;
|
|
||||||
|
|
||||||
my $filter_method = $job_type .'_filter';
|
$self->jq_insert([
|
||||||
my $job_filter = $self->$filter_method;
|
|
||||||
|
|
||||||
my @filtered_devices = grep {$job_filter->($_)} @devices;
|
|
||||||
|
|
||||||
schema('netdisco')->resultset('Admin')->txn_do_locked(sub {
|
|
||||||
$jobqueue->populate([
|
|
||||||
map {{
|
map {{
|
||||||
device => $_,
|
device => $_,
|
||||||
action => $job_type,
|
action => $job_type,
|
||||||
status => 'queued',
|
username => $job->username,
|
||||||
|
userip => $job->userip,
|
||||||
}} (@filtered_devices)
|
}} (@filtered_devices)
|
||||||
]);
|
]);
|
||||||
});
|
|
||||||
|
|
||||||
return job_done("Queued $job_type job for all devices");
|
return job_done("Queued $job_type job for all devices");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,43 +1,38 @@
|
|||||||
package App::Netdisco::Daemon::Worker::Poller::Device;
|
package App::Netdisco::Daemon::Worker::Poller::Device;
|
||||||
|
|
||||||
use Dancer qw/:moose :syntax :script/;
|
use Dancer qw/:moose :syntax :script/;
|
||||||
use Dancer::Plugin::DBIC 'schema';
|
|
||||||
|
|
||||||
use App::Netdisco::Util::SNMP 'snmp_connect';
|
use App::Netdisco::Util::SNMP 'snmp_connect';
|
||||||
use App::Netdisco::Util::Device qw/get_device is_discoverable/;
|
use App::Netdisco::Util::Device qw/get_device is_discoverable/;
|
||||||
use App::Netdisco::Core::Discover ':all';
|
use App::Netdisco::Core::Discover ':all';
|
||||||
use App::Netdisco::Daemon::Util ':all';
|
use App::Netdisco::Daemon::Util ':all';
|
||||||
|
use Dancer::Plugin::DBIC 'schema';
|
||||||
|
|
||||||
use NetAddr::IP::Lite ':lower';
|
use NetAddr::IP::Lite ':lower';
|
||||||
|
|
||||||
use Role::Tiny;
|
use Role::Tiny;
|
||||||
use namespace::clean;
|
use namespace::clean;
|
||||||
|
|
||||||
|
with 'App::Netdisco::Daemon::JobQueue::'. setting('job_queue');
|
||||||
|
requires qw/jq_queued jq_insert/;
|
||||||
|
|
||||||
# queue a discover job for all devices known to Netdisco
|
# queue a discover job for all devices known to Netdisco
|
||||||
sub discoverall {
|
sub discoverall {
|
||||||
my ($self, $job) = @_;
|
my ($self, $job) = @_;
|
||||||
|
|
||||||
my $jobqueue = schema('netdisco')->resultset('Admin');
|
my %queued = map {$_ => 1} $self->jq_queued('discover');
|
||||||
my $devices = schema('netdisco')->resultset('Device')
|
my @devices = schema('netdisco')->resultset('Device')
|
||||||
->search({ip => { -not_in =>
|
->get_column('ip')->all;
|
||||||
$jobqueue->search({
|
my @filtered_devices = grep {!exists $queued{$_}} @devices;
|
||||||
device => { '!=' => undef},
|
|
||||||
action => 'discover',
|
|
||||||
status => { -like => 'queued%' },
|
|
||||||
})->get_column('device')->as_query
|
|
||||||
}})->get_column('ip');
|
|
||||||
|
|
||||||
schema('netdisco')->resultset('Admin')->txn_do_locked(sub {
|
$self->jq_insert([
|
||||||
$jobqueue->populate([
|
|
||||||
map {{
|
map {{
|
||||||
device => $_,
|
device => $_,
|
||||||
action => 'discover',
|
action => 'discover',
|
||||||
status => 'queued',
|
|
||||||
username => $job->username,
|
username => $job->username,
|
||||||
userip => $job->userip,
|
userip => $job->userip,
|
||||||
}} ($devices->all)
|
}} (@filtered_devices)
|
||||||
]);
|
]);
|
||||||
});
|
|
||||||
|
|
||||||
return job_done("Queued discover job for all devices");
|
return job_done("Queued discover job for all devices");
|
||||||
}
|
}
|
||||||
@@ -48,7 +43,6 @@ sub discover {
|
|||||||
|
|
||||||
my $host = NetAddr::IP::Lite->new($job->device);
|
my $host = NetAddr::IP::Lite->new($job->device);
|
||||||
my $device = get_device($host->addr);
|
my $device = get_device($host->addr);
|
||||||
my $jobqueue = schema('netdisco')->resultset('Admin');
|
|
||||||
|
|
||||||
if ($device->ip eq '0.0.0.0') {
|
if ($device->ip eq '0.0.0.0') {
|
||||||
return job_error("discover failed: no device param (need -d ?)");
|
return job_error("discover failed: no device param (need -d ?)");
|
||||||
@@ -80,27 +74,21 @@ sub discover {
|
|||||||
# if requested, and the device has not yet been arpniped/macsucked, queue now
|
# if requested, and the device has not yet been arpniped/macsucked, queue now
|
||||||
if ($device->in_storage and $job->subaction and $job->subaction eq 'with-nodes') {
|
if ($device->in_storage and $job->subaction and $job->subaction eq 'with-nodes') {
|
||||||
if (!defined $device->last_macsuck) {
|
if (!defined $device->last_macsuck) {
|
||||||
schema('netdisco')->txn_do(sub {
|
$self->jq_insert({
|
||||||
$jobqueue->create({
|
|
||||||
device => $device->ip,
|
device => $device->ip,
|
||||||
action => 'macsuck',
|
action => 'macsuck',
|
||||||
status => 'queued',
|
|
||||||
username => $job->username,
|
username => $job->username,
|
||||||
userip => $job->userip,
|
userip => $job->userip,
|
||||||
});
|
});
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!defined $device->last_arpnip) {
|
if (!defined $device->last_arpnip) {
|
||||||
schema('netdisco')->txn_do(sub {
|
$self->jq_insert({
|
||||||
$jobqueue->create({
|
|
||||||
device => $device->ip,
|
device => $device->ip,
|
||||||
action => 'arpnip',
|
action => 'arpnip',
|
||||||
status => 'queued',
|
|
||||||
username => $job->username,
|
username => $job->username,
|
||||||
userip => $job->userip,
|
userip => $job->userip,
|
||||||
});
|
});
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,15 +1,13 @@
|
|||||||
package App::Netdisco::Daemon::Worker::Scheduler;
|
package App::Netdisco::Daemon::Worker::Scheduler;
|
||||||
|
|
||||||
use Dancer qw/:moose :syntax :script/;
|
use Dancer qw/:moose :syntax :script/;
|
||||||
|
|
||||||
use Algorithm::Cron;
|
use Algorithm::Cron;
|
||||||
use Try::Tiny;
|
|
||||||
|
|
||||||
use Role::Tiny;
|
use Role::Tiny;
|
||||||
use namespace::clean;
|
use namespace::clean;
|
||||||
|
|
||||||
with 'App::Netdisco::Daemon::JobQueue::'. setting('job_queue');
|
with 'App::Netdisco::Daemon::JobQueue::'. setting('job_queue');
|
||||||
requires 'jobqueue_insert';
|
requires 'jq_insert';
|
||||||
|
|
||||||
sub worker_begin {
|
sub worker_begin {
|
||||||
my $self = shift;
|
my $self = shift;
|
||||||
@@ -55,18 +53,13 @@ sub worker_body {
|
|||||||
next unless $sched->{when}->next_time($win_start) <= $win_end;
|
next unless $sched->{when}->next_time($win_start) <= $win_end;
|
||||||
|
|
||||||
# queue it!
|
# queue it!
|
||||||
try {
|
|
||||||
info "sched ($wid): queueing $action job";
|
info "sched ($wid): queueing $action job";
|
||||||
$self->jobqueue_insert({
|
$self->jq_insert({
|
||||||
action => $action,
|
action => $action,
|
||||||
device => $sched->{device},
|
device => $sched->{device},
|
||||||
extra => $sched->{extra},
|
extra => $sched->{extra},
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
catch {
|
|
||||||
debug "sched ($wid): action $action was not queued (dupe?)";
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user