Merge pluggable job queue branch.

Squashed commit of the following:

commit e2ca15c0f8
Merge: 0a90308 ffcf6ed
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Wed May 21 21:18:58 2014 +0100

    Merge branch 'master' into og-pluggable-daemon

commit 0a90308ecf
Merge: e80c575 ee398fc
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 17 22:20:40 2014 +0100

    Merge branch 'master' into og-pluggable-daemon

    Conflicts:
    	Netdisco/lib/App/Netdisco.pm

commit e80c575c57
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 17 22:14:44 2014 +0100

    move worker sleep into jobqueue

commit c83b999597
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 17 22:01:43 2014 +0100

    support disable manager from jobqueue dynamic code

commit 4792b0dc49
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 17 21:34:28 2014 +0100

    fix pod name

commit 187fc84937
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 17 21:22:06 2014 +0100

    better naming

commit 1c43aaa0f4
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 17 21:18:49 2014 +0100

    make worker use only JobQueue not LocalQueue directly

commit 5316058ba8
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 17 20:42:19 2014 +0100

    remove unecessary scrub subroutine

commit 8077e3de9d
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 17 20:31:18 2014 +0100

    remove any duplicate jobs when locking

commit d4b5e4e6cd
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 17 20:20:32 2014 +0100

    rename DefaultSettings to Configuration

commit aacb149d09
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 17 19:57:45 2014 +0100

    no need to check - mgr is not started if 0 workers

commit 46ebe4cd6a
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 17 19:50:37 2014 +0100

    remove unecessary job scrub

commit 60522fe555
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 17 19:27:53 2014 +0100

    fixes for DefaultSettings

commit 2c6f0dd0f7
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 17 19:11:50 2014 +0100

    rename housekeeping to schedule

commit c12034d2b0
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 17 19:06:22 2014 +0100

    new DefaultSettings package, and mv queue to be key of workers

commit 49e9079f9a
Merge: ec8ad3b 213f44e
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 17 08:00:02 2014 +0100

    Merge branch 'master' into og-pluggable-daemon

commit ec8ad3b2d8
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sun May 11 01:18:21 2014 +0100

    fix entered_stamp

commit 471724dd89
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 10 23:44:14 2014 +0100

    fix auto hack

commit 4620deff33
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 10 23:27:11 2014 +0100

    final migration

commit 5413e34e83
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 10 23:18:12 2014 +0100

    more JobQueue migration

commit 9569bda4d8
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 10 22:44:20 2014 +0100

    migrate to JobQueue :)

commit 41ee8f91f2
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 10 22:38:20 2014 +0100

    simplify again

commit 58cba4da24
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 10 22:06:41 2014 +0100

    add POD for JobQueue

commit c9afbab26b
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 10 21:36:01 2014 +0100

    use Module::Load tricks to avoid some other mess

commit 50c72c1d64
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 10 21:12:52 2014 +0100

    use Module::Load for dynamic loading

commit 54510a1560
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu May 8 22:05:10 2014 +0100

    hack to make functional and OO interface

commit b8c706a2e7
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu May 8 21:29:31 2014 +0100

    simplify role apply for jobqueue

commit 8a816b9764
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Tue May 6 22:20:50 2014 +0100

    remove debug print

commit f3131adfc8
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Tue May 6 21:47:30 2014 +0100

    big patch to remove knowledge of DB from most worker code

commit 39a0efb3c3
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Mon Apr 28 23:46:10 2014 +0100

    port Worker Common to pluggable jobqueue

commit 8c0614357a
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Mon Apr 28 23:04:13 2014 +0100

    port Scheduler to pluggable jobqueue

commit 3882c157ec
Merge: 44e6c49 2480646
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Mon Apr 28 22:36:57 2014 +0100

    Merge branch 'master' into og-pluggable-daemon

commit 44e6c49419
Merge: fdeeffc 5fc6209
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Mon Apr 28 22:35:53 2014 +0100

    Merge branch 'master' into og-pluggable-daemon

commit 5fc62090e2
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Mon Apr 28 22:15:07 2014 +0100

    edge topology
      17   * Use

commit fdeeffcbe4
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 24 23:13:20 2014 +0100

    book specifically same jobs which were seen

commit 0d97c2b819
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 24 22:57:37 2014 +0100

    fix typos

commit 47265a5292
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 24 21:56:52 2014 +0100

    rename file to follow name change

commit fd169149c4
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 24 21:52:57 2014 +0100

    remove job types from web code

commit 319489ae00
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 24 21:46:30 2014 +0100

    remove job types from scheduler

commit ccdeca600c
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 24 21:33:01 2014 +0100

    remove job types from netdisco-daemon-fg

commit 349bddf609
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 24 21:05:42 2014 +0100

    move default env settings to Netdisco.pm

commit b4b5cce00a
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Thu Apr 24 21:01:26 2014 +0100

    remove job type knowledge from code into config
This commit is contained in:
Oliver Gorwits
2014-05-21 21:20:27 +01:00
parent ffcf6ed099
commit 9685eb182a
29 changed files with 694 additions and 466 deletions

View File

@@ -8,12 +8,9 @@ use base 'DBIx::Class::Core';
__PACKAGE__->table("admin");
__PACKAGE__->add_columns(
"job",
{
data_type => "integer",
is_nullable => 0,
},
{ data_type => "integer", is_nullable => 0 },
"role", # Poller, Interactive, etc
"type", # Poller, Interactive, etc
{ data_type => "text", is_nullable => 0 },
"wid", # worker ID, only != 0 once taken
@@ -47,4 +44,11 @@ __PACKAGE__->add_columns(
__PACKAGE__->set_primary_key("job");
sub extra { (shift)->subaction }
sub entered_stamp {
(my $stamp = (shift)->entered) =~ s/\.\d+$//;
return $stamp;
}
1;

View File

@@ -0,0 +1,19 @@
package App::Netdisco::Daemon::JobQueue;
use Role::Tiny;
use namespace::clean;
use Module::Load ();
Module::Load::load_remote 'JobQueue' => 'App::Netdisco::JobQueue' => ':all';
# central queue
sub jq_getsome { shift and JobQueue::jq_getsome(@_) }
sub jq_locked { shift and JobQueue::jq_locked(@_) }
sub jq_queued { shift and JobQueue::jq_queued(@_) }
sub jq_take { goto \&JobQueue::jq_take }
sub jq_lock { shift and JobQueue::jq_lock(@_) }
sub jq_defer { shift and JobQueue::jq_defer(@_) }
sub jq_complete { shift and JobQueue::jq_complete(@_) }
sub jq_insert { shift and JobQueue::jq_insert(@_) }
1;

View File

@@ -0,0 +1,62 @@
package App::Netdisco::Daemon::LocalQueue;
use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';
use base 'Exporter';
our @EXPORT = ();
our @EXPORT_OK = qw/ add_jobs capacity_for take_jobs reset_jobs/;
our %EXPORT_TAGS = ( all => \@EXPORT_OK );
schema('daemon')->deploy;
my $queue = schema('daemon')->resultset('Admin');
sub add_jobs {
my (@jobs) = @_;
info sprintf "adding %s jobs to local queue", scalar @jobs;
schema('daemon')->dclone($_)->insert for @jobs;
}
sub capacity_for {
my ($type) = @_;
debug "checking local capacity for worker type $type";
my $setting = setting('workers')->{ setting('job_type_keys')->{$type} };
my $current = $queue->search({type => $type})->count;
return ($current < $setting);
}
sub take_jobs {
my ($wid, $type, $max) = @_;
return () unless $wid > 1;
$max ||= 1;
debug "deleting completed jobs by worker $wid";
$queue->search({wid => $wid})->delete;
debug "searching for $max new jobs for worker $wid (type $type)";
my $rs = $queue->search(
{type => $type, wid => 0},
{rows => $max},
);
my @rows = $rs->all;
return [] if scalar @rows == 0;
debug sprintf "booking out %s jobs to worker %s", (scalar @rows), $wid;
$queue->search({job => { -in => [map {$_->job} @rows] }})
->update({wid => $wid});
return \@rows;
}
# not used by workers, only the daemon when reinitializing a worker
sub reset_jobs {
my ($wid) = @_;
debug "resetting jobs owned by worker $wid to be available";
return unless $wid > 1;
$queue->search({wid => $wid})
->update({wid => 0});
}
1;

View File

@@ -1,87 +0,0 @@
package App::Netdisco::Daemon::Queue;
use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';
use base 'Exporter';
our @EXPORT = ();
our @EXPORT_OK = qw/ add_jobs capacity_for take_jobs reset_jobs scrub_jobs /;
our %EXPORT_TAGS = ( all => \@EXPORT_OK );
schema('daemon')->deploy;
my $queue = schema('daemon')->resultset('Admin');
sub add_jobs {
my ($jobs) = @_;
info sprintf "adding %s jobs to local queue", scalar @$jobs;
$queue->populate($jobs);
}
sub capacity_for {
my ($action) = @_;
debug "checking local capacity for action $action";
my $action_map = {
Poller => [
qw/discoverall discover arpwalk arpnip macwalk macsuck nbtstat nbtwalk expire/
],
Interactive => [qw/location contact portcontrol portname vlan power/],
};
my $role_map = {
(map {$_ => 'Poller'} @{ $action_map->{Poller} }),
(map {$_ => 'Interactive'} @{ $action_map->{Interactive} })
};
my $setting_map = {
Poller => 'pollers',
Interactive => 'interactives',
};
my $role = $role_map->{$action};
my $setting = $setting_map->{$role};
my $current = $queue->search({role => $role})->count;
return ($current < setting('workers')->{$setting});
}
sub take_jobs {
my ($wid, $role, $max) = @_;
$max ||= 1;
# asking for more jobs means the current ones are done
debug "removing complete jobs for worker $wid from local queue";
$queue->search({wid => $wid})->delete;
debug "searching for $max new jobs for worker $wid (role $role)";
my $rs = $queue->search(
{role => $role, wid => 0},
{rows => $max},
);
my @rows = $rs->all;
return [] if scalar @rows == 0;
debug sprintf "booking out %s jobs to worker %s", scalar @rows, $wid;
$rs->update({wid => $wid});
return [ map {{$_->get_columns}} @rows ];
}
sub reset_jobs {
my ($wid) = @_;
debug "resetting jobs owned by worker $wid to be available";
return unless $wid > 1;
$queue->search({wid => $wid})
->update({wid => 0});
}
sub scrub_jobs {
my ($wid) = @_;
debug "deleting jobs owned by worker $wid";
return unless $wid > 1;
$queue->search({wid => $wid})->delete;
}
1;

View File

@@ -1,87 +1,61 @@
package App::Netdisco::Daemon::Worker::Common;
use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';
use Try::Tiny;
use Role::Tiny;
use namespace::clean;
requires qw/worker_type worker_name munge_action/;
with 'App::Netdisco::Daemon::JobQueue';
sub worker_body {
my $self = shift;
my $wid = $self->wid;
my $tag = $self->worker_tag;
my $type = $self->worker_type;
my $name = $self->worker_name;
while (1) {
debug "$type ($wid): asking for a job";
my $jobs = $self->do('take_jobs', $self->wid, $name);
foreach my $candidate (@$jobs) {
# create a row object so we can use column accessors
# use the local db schema in case it is accidentally 'stored'
# (will throw an exception)
my $job = schema('daemon')->resultset('Admin')
->new_result($candidate);
my $jid = $job->job;
my $jobs = $self->jq_take($self->wid, $type);
foreach my $job (@$jobs) {
my $target = $self->munge_action($job->action);
next unless $self->can($target);
debug "$type ($wid): can ${target}() for job $jid";
# do job
my ($status, $log);
try {
$job->started(scalar localtime);
info sprintf "$type (%s): starting %s job(%s) at %s",
$wid, $target, $jid, $job->started;
($status, $log) = $self->$target($job);
info sprintf "$tag (%s): starting %s job(%s) at %s",
$wid, $target, $job->id, $job->started;
my ($status, $log) = $self->$target($job);
$job->status($status);
$job->log($log);
}
catch {
$status = 'error';
$log = "error running job: $_";
$self->sendto('stderr', $log ."\n");
$job->status('error');
$job->log("error running job: $_");
$self->sendto('stderr', $job->log ."\n");
};
$self->close_job($job, $status, $log);
$self->close_job($job);
}
debug "$type ($wid): sleeping now...";
sleep(1);
}
}
sub close_job {
my ($self, $job, $status, $log) = @_;
my $type = $self->worker_type;
my ($self, $job) = @_;
my $tag = $self->worker_tag;
my $now = scalar localtime;
info sprintf "$type (%s): wrapping up %s job(%s) - status %s at %s",
$self->wid, $job->action, $job->job, $status, $now;
info sprintf "$tag (%s): wrapping up %s job(%s) - status %s at %s",
$self->wid, $job->action, $job->id, $job->status, $now;
# lock db row and either defer or complete the job
try {
if ($status eq 'defer') {
schema('netdisco')->resultset('Admin')
->find($job->job, {for => 'update'})
->update({ status => 'queued' });
if ($job->status eq 'defer') {
$self->jq_defer($job);
}
else {
schema('netdisco')->resultset('Admin')
->find($job->job, {for => 'update'})
->update({
status => $status,
log => $log,
started => $job->started,
finished => $now,
});
$job->finished($now);
$self->jq_complete($job);
}
# remove job from local queue
$self->do('scrub_jobs', $self->wid);
}
catch { $self->sendto('stderr', "error closing job: $_\n") };
}

View File

@@ -10,8 +10,8 @@ with 'App::Netdisco::Daemon::Worker::Common';
with 'App::Netdisco::Daemon::Worker::Interactive::DeviceActions',
'App::Netdisco::Daemon::Worker::Interactive::PortActions';
sub worker_type { 'int' }
sub worker_name { 'Interactive' }
sub worker_tag { 'int' }
sub worker_type { 'Interactive' }
sub munge_action { 'set_' . $_[1] }
1;

View File

@@ -1,104 +1,67 @@
package App::Netdisco::Daemon::Worker::Manager;
use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';
use Net::Domain 'hostfqdn';
use Try::Tiny;
use Role::Tiny;
use namespace::clean;
my $fqdn = hostfqdn || 'localhost';
my $role_map = {
(map {$_ => 'Poller'}
qw/discoverall discover arpwalk arpnip macwalk macsuck nbtstat nbtwalk expire/),
(map {$_ => 'Interactive'}
qw/location contact portcontrol portname vlan power/)
};
use List::Util 'sum';
with 'App::Netdisco::Daemon::JobQueue';
sub worker_begin {
my $self = shift;
my $wid = $self->wid;
debug "entering Manager ($wid) worker_begin()";
if (setting('workers')->{'no_manager'}) {
return debug "mgr ($wid): no need for manager... skip begin";
}
# requeue jobs locally
debug "mgr ($wid): searching for jobs booked to this processing node";
my $rs = schema('netdisco')->resultset('Admin')
->search({status => "queued-$fqdn"});
my @jobs = map {{$_->get_columns}} $rs->all;
my @jobs = $self->jq_locked;
if (scalar @jobs) {
info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs;
map { $_->{role} = $role_map->{$_->{action}} } @jobs;
$self->do('add_jobs', \@jobs);
$self->do('add_jobs', @jobs);
}
}
sub worker_body {
my $self = shift;
my $wid = $self->wid;
my $num_slots = $self->do('num_workers')
or return debug "mgr ($wid): this node has no workers... quitting manager";
# get some pending jobs
my $rs = schema('netdisco')->resultset('Admin')
->search(
{status => 'queued'},
{order_by => 'random()', rows => $num_slots},
);
return debug "mgr ($wid): no need for manager... quitting"
if setting('workers')->{'no_manager'};
my $num_slots = sum( 0, map { setting('workers')->{$_} }
values %{setting('job_type_keys')} );
while (1) {
debug "mgr ($wid): getting potential jobs for $num_slots workers";
while (my $job = $rs->next) {
my $jid = $job->job;
# get some pending jobs
# TODO also check for stale jobs in Netdisco DB
foreach my $job ( $self->jq_getsome($num_slots) ) {
# check for available local capacity
next unless $self->do('capacity_for', $job->action);
my $job_type = setting('job_types')->{$job->action};
next unless $job_type and $self->do('capacity_for', $job_type);
debug sprintf "mgr (%s): processing node has capacity for job %s (%s)",
$wid, $jid, $job->action;
$wid, $job->id, $job->action;
# mark job as running
next unless $self->lock_job($job);
next unless $self->jq_lock($job);
info sprintf "mgr (%s): job %s booked out for this processing node",
$wid, $jid;
my $local_job = { $job->get_columns };
$local_job->{role} = $role_map->{$job->action};
$wid, $job->id;
# copy job to local queue
$self->do('add_jobs', [$local_job]);
$self->do('add_jobs', $job);
}
# reset iterator so ->next() triggers another DB query
$rs->reset;
# TODO also check for stale jobs in Netdisco DB
debug "mgr ($wid): sleeping now...";
sleep( setting('workers')->{sleep_time} || 2 );
}
}
sub lock_job {
my ($self, $job) = @_;
my $happy = 0;
# lock db row and update to show job has been picked
try {
schema('netdisco')->txn_do(sub {
schema('netdisco')->resultset('Admin')->find(
{job => $job->job, status => 'queued'},
{for => 'update'}
)->update({ status => "queued-$fqdn" });
});
$happy = 1;
};
return $happy;
}
1;

View File

@@ -13,8 +13,8 @@ with 'App::Netdisco::Daemon::Worker::Poller::Device',
'App::Netdisco::Daemon::Worker::Poller::Nbtstat',
'App::Netdisco::Daemon::Worker::Poller::Expiry';
sub worker_type { 'pol' }
sub worker_name { 'Poller' }
sub worker_tag { 'pol' }
sub worker_type { 'Poller' }
sub munge_action { $_[1] }
1;

View File

@@ -1,11 +1,11 @@
package App::Netdisco::Daemon::Worker::Poller::Common;
use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';
use App::Netdisco::Util::SNMP 'snmp_connect';
use App::Netdisco::Util::Device 'get_device';
use App::Netdisco::Daemon::Util ':all';
use Dancer::Plugin::DBIC 'schema';
use NetAddr::IP::Lite ':lower';
@@ -16,36 +16,22 @@ use namespace::clean;
sub _walk_body {
my ($self, $job_type, $job) = @_;
my $action_method = $job_type .'_action';
my $job_action = $self->$action_method;
my $layer_method = $job_type .'_layer';
my $job_layer = $self->$layer_method;
my $jobqueue = schema('netdisco')->resultset('Admin');
my %queued = map {$_ => 1} $self->jq_queued($job_type);
my @devices = schema('netdisco')->resultset('Device')
->search({ip => { -not_in =>
$jobqueue->search({
device => { '!=' => undef},
action => $job_type,
status => { -like => 'queued%' },
})->get_column('device')->as_query
}})->has_layer($job_layer)->get_column('ip')->all;
->has_layer($job_layer)->get_column('ip')->all;
my @filtered_devices = grep {!exists $queued{$_}} @devices;
my $filter_method = $job_type .'_filter';
my $job_filter = $self->$filter_method;
my @filtered_devices = grep {$job_filter->($_)} @devices;
schema('netdisco')->resultset('Admin')->txn_do_locked(sub {
$jobqueue->populate([
$self->jq_insert([
map {{
device => $_,
action => $job_type,
status => 'queued',
username => $job->username,
userip => $job->userip,
}} (@filtered_devices)
]);
});
]);
return job_done("Queued $job_type job for all devices");
}

View File

@@ -1,12 +1,12 @@
package App::Netdisco::Daemon::Worker::Poller::Device;
use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';
use App::Netdisco::Util::SNMP 'snmp_connect';
use App::Netdisco::Util::Device qw/get_device is_discoverable/;
use App::Netdisco::Core::Discover ':all';
use App::Netdisco::Daemon::Util ':all';
use Dancer::Plugin::DBIC 'schema';
use NetAddr::IP::Lite ':lower';
@@ -17,27 +17,19 @@ use namespace::clean;
sub discoverall {
my ($self, $job) = @_;
my $jobqueue = schema('netdisco')->resultset('Admin');
my $devices = schema('netdisco')->resultset('Device')
->search({ip => { -not_in =>
$jobqueue->search({
device => { '!=' => undef},
action => 'discover',
status => { -like => 'queued%' },
})->get_column('device')->as_query
}})->get_column('ip');
my %queued = map {$_ => 1} $self->jq_queued('discover');
my @devices = schema('netdisco')->resultset('Device')
->get_column('ip')->all;
my @filtered_devices = grep {!exists $queued{$_}} @devices;
schema('netdisco')->resultset('Admin')->txn_do_locked(sub {
$jobqueue->populate([
$self->jq_insert([
map {{
device => $_,
action => 'discover',
status => 'queued',
username => $job->username,
userip => $job->userip,
}} ($devices->all)
]);
});
}} (@filtered_devices)
]);
return job_done("Queued discover job for all devices");
}
@@ -48,7 +40,6 @@ sub discover {
my $host = NetAddr::IP::Lite->new($job->device);
my $device = get_device($host->addr);
my $jobqueue = schema('netdisco')->resultset('Admin');
if ($device->ip eq '0.0.0.0') {
return job_error("discover failed: no device param (need -d ?)");
@@ -80,26 +71,20 @@ sub discover {
# if requested, and the device has not yet been arpniped/macsucked, queue now
if ($device->in_storage and $job->subaction and $job->subaction eq 'with-nodes') {
if (!defined $device->last_macsuck) {
schema('netdisco')->txn_do(sub {
$jobqueue->create({
$self->jq_insert({
device => $device->ip,
action => 'macsuck',
status => 'queued',
username => $job->username,
userip => $job->userip,
});
});
}
if (!defined $device->last_arpnip) {
schema('netdisco')->txn_do(sub {
$jobqueue->create({
$self->jq_insert({
device => $device->ip,
action => 'arpnip',
status => 'queued',
username => $job->username,
userip => $job->userip,
});
});
}
}

View File

@@ -1,38 +1,23 @@
package App::Netdisco::Daemon::Worker::Scheduler;
use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';
use Algorithm::Cron;
use Try::Tiny;
use Role::Tiny;
use namespace::clean;
my $jobactions = {
map {$_ => undef} qw/
discoverall
arpwalk
macwalk
nbtwalk
expire
/
# saveconfigs
# backup
};
with 'App::Netdisco::Daemon::JobQueue';
sub worker_begin {
my $self = shift;
my $wid = $self->wid;
debug "entering Scheduler ($wid) worker_begin()";
foreach my $a (keys %$jobactions) {
next unless setting('housekeeping')
and exists setting('housekeeping')->{$a};
my $config = setting('housekeeping')->{$a};
foreach my $action (keys %{ setting('schedule') }) {
my $config = setting('schedule')->{$action};
# accept either single crontab format, or individual time fields
my $cron = Algorithm::Cron->new(
$config->{when} = Algorithm::Cron->new(
base => 'local',
%{
(ref {} eq ref $config->{when})
@@ -40,9 +25,6 @@ sub worker_begin {
: {crontab => $config->{when}}
}
);
$jobactions->{$a} = $config;
$jobactions->{$a}->{when} = $cron;
}
}
@@ -61,30 +43,21 @@ sub worker_body {
my $win_end = $win_start + 60;
# if any job is due, add it to the queue
foreach my $a (keys %$jobactions) {
next unless defined $jobactions->{$a};
my $sched = $jobactions->{$a};
foreach my $action (keys %{ setting('schedule') }) {
my $sched = setting('schedule')->{$action};
# next occurence of job must be in this minute's window
debug sprintf "sched ($wid): $a: win_start: %s, win_end: %s, next: %s",
debug sprintf "sched ($wid): $action: win_start: %s, win_end: %s, next: %s",
$win_start, $win_end, $sched->{when}->next_time($win_start);
next unless $sched->{when}->next_time($win_start) <= $win_end;
# queue it!
# due to a table constraint, this will (intentionally) fail if a
# similar job is already queued.
try {
info "sched ($wid): queueing $a job";
schema('netdisco')->resultset('Admin')->create({
action => $a,
device => ($sched->{device} || undef),
subaction => ($sched->{extra} || undef),
status => 'queued',
});
}
catch {
debug "sched ($wid): action $a was not queued (dupe?)";
};
info "sched ($wid): queueing $action job";
$self->jq_insert({
action => $action,
device => $sched->{device},
extra => $sched->{extra},
});
}
}
}