remove job type knowledge from code into config
This commit is contained in:
@@ -19,7 +19,7 @@ use Dancer qw/:moose :script/;
|
|||||||
warning sprintf "App::Netdisco %s backend", ($App::Netdisco::VERSION || 'HEAD');
|
warning sprintf "App::Netdisco %s backend", ($App::Netdisco::VERSION || 'HEAD');
|
||||||
|
|
||||||
# callbacks and local job queue management
|
# callbacks and local job queue management
|
||||||
use App::Netdisco::Daemon::Queue ':all';
|
use App::Netdisco::Daemon::LocalQueue ':all';
|
||||||
|
|
||||||
# needed to quench AF_INET6 symbol errors
|
# needed to quench AF_INET6 symbol errors
|
||||||
use NetAddr::IP::Lite ':lower';
|
use NetAddr::IP::Lite ':lower';
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ __PACKAGE__->add_columns(
|
|||||||
is_nullable => 0,
|
is_nullable => 0,
|
||||||
},
|
},
|
||||||
|
|
||||||
"role", # Poller, Interactive, etc
|
"type", # Poller, Interactive, etc
|
||||||
{ data_type => "text", is_nullable => 0 },
|
{ data_type => "text", is_nullable => 0 },
|
||||||
|
|
||||||
"wid", # worker ID, only != 0 once taken
|
"wid", # worker ID, only != 0 once taken
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package App::Netdisco::Daemon::Queue;
|
package App::Netdisco::Daemon::LocalQueue;
|
||||||
|
|
||||||
use Dancer qw/:moose :syntax :script/;
|
use Dancer qw/:moose :syntax :script/;
|
||||||
use Dancer::Plugin::DBIC 'schema';
|
use Dancer::Plugin::DBIC 'schema';
|
||||||
@@ -18,49 +18,25 @@ sub add_jobs {
|
|||||||
}
|
}
|
||||||
|
|
||||||
sub capacity_for {
|
sub capacity_for {
|
||||||
my ($action) = @_;
|
my ($type) = @_;
|
||||||
debug "checking local capacity for action $action";
|
debug "checking local capacity for action $action";
|
||||||
|
|
||||||
my $action_map = {
|
my $setting = setting('workers')->{ $job_type_keys->{$type} };
|
||||||
Poller => [
|
my $current = $queue->search({type => $type})->count;
|
||||||
qw/discoverall discover arpwalk arpnip macwalk macsuck nbtstat nbtwalk expire/
|
return ($current < $setting);
|
||||||
],
|
|
||||||
Interactive => [qw/location contact portcontrol portname vlan power/],
|
|
||||||
};
|
|
||||||
|
|
||||||
my $role_map = {
|
|
||||||
(map {$_ => 'Poller'} @{ $action_map->{Poller} }),
|
|
||||||
(map {$_ => 'Interactive'} @{ $action_map->{Interactive} })
|
|
||||||
};
|
|
||||||
|
|
||||||
my $setting_map = {
|
|
||||||
Poller => 'pollers',
|
|
||||||
Interactive => 'interactives',
|
|
||||||
};
|
|
||||||
|
|
||||||
my $role = $role_map->{$action};
|
|
||||||
my $setting = $setting_map->{$role};
|
|
||||||
|
|
||||||
my $current = $queue->search({role => $role})->count;
|
|
||||||
|
|
||||||
return ($current < setting('workers')->{$setting});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sub take_jobs {
|
sub take_jobs {
|
||||||
my ($wid, $role, $max) = @_;
|
my ($wid, $type, $max) = @_;
|
||||||
$max ||= 1;
|
|
||||||
|
|
||||||
# asking for more jobs means the current ones are done
|
# asking for more jobs means the current ones are done
|
||||||
debug "removing complete jobs for worker $wid from local queue";
|
scrub_jobs($wid);
|
||||||
$queue->search({wid => $wid})->delete;
|
|
||||||
|
|
||||||
debug "searching for $max new jobs for worker $wid (role $role)";
|
debug "searching for $max new jobs for worker $wid (type $type)";
|
||||||
my $rs = $queue->search(
|
my @rows = $queue->search(
|
||||||
{role => $role, wid => 0},
|
{type => $type, wid => 0},
|
||||||
{rows => $max},
|
{rows => ($max || 1)},
|
||||||
);
|
)->all;
|
||||||
|
|
||||||
my @rows = $rs->all;
|
|
||||||
return [] if scalar @rows == 0;
|
return [] if scalar @rows == 0;
|
||||||
|
|
||||||
debug sprintf "booking out %s jobs to worker %s", scalar @rows, $wid;
|
debug sprintf "booking out %s jobs to worker %s", scalar @rows, $wid;
|
||||||
@@ -79,7 +55,7 @@ sub reset_jobs {
|
|||||||
|
|
||||||
sub scrub_jobs {
|
sub scrub_jobs {
|
||||||
my ($wid) = @_;
|
my ($wid) = @_;
|
||||||
debug "deleting jobs owned by worker $wid";
|
debug "deleting dangling jobs owned by worker $wid";
|
||||||
return unless $wid > 1;
|
return unless $wid > 1;
|
||||||
$queue->search({wid => $wid})->delete;
|
$queue->search({wid => $wid})->delete;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,13 +11,6 @@ use namespace::clean;
|
|||||||
|
|
||||||
my $fqdn = hostfqdn || 'localhost';
|
my $fqdn = hostfqdn || 'localhost';
|
||||||
|
|
||||||
my $role_map = {
|
|
||||||
(map {$_ => 'Poller'}
|
|
||||||
qw/discoverall discover arpwalk arpnip macwalk macsuck nbtstat nbtwalk expire/),
|
|
||||||
(map {$_ => 'Interactive'}
|
|
||||||
qw/location contact portcontrol portname vlan power/)
|
|
||||||
};
|
|
||||||
|
|
||||||
sub worker_begin {
|
sub worker_begin {
|
||||||
my $self = shift;
|
my $self = shift;
|
||||||
my $wid = $self->wid;
|
my $wid = $self->wid;
|
||||||
@@ -32,7 +25,7 @@ sub worker_begin {
|
|||||||
|
|
||||||
if (scalar @jobs) {
|
if (scalar @jobs) {
|
||||||
info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs;
|
info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs;
|
||||||
map { $_->{role} = $role_map->{$_->{action}} } @jobs;
|
map { $_->{type} = setting('job_types')->{$_->{action}} } @jobs;
|
||||||
|
|
||||||
$self->do('add_jobs', \@jobs);
|
$self->do('add_jobs', \@jobs);
|
||||||
}
|
}
|
||||||
@@ -55,9 +48,10 @@ sub worker_body {
|
|||||||
debug "mgr ($wid): getting potential jobs for $num_slots workers";
|
debug "mgr ($wid): getting potential jobs for $num_slots workers";
|
||||||
while (my $job = $rs->next) {
|
while (my $job = $rs->next) {
|
||||||
my $jid = $job->job;
|
my $jid = $job->job;
|
||||||
|
my $job_type = setting('job_types')->{$job->action} or next;
|
||||||
|
|
||||||
# check for available local capacity
|
# check for available local capacity
|
||||||
next unless $self->do('capacity_for', $job->action);
|
next unless $self->do('capacity_for', $job_type);
|
||||||
debug sprintf "mgr (%s): processing node has capacity for job %s (%s)",
|
debug sprintf "mgr (%s): processing node has capacity for job %s (%s)",
|
||||||
$wid, $jid, $job->action;
|
$wid, $jid, $job->action;
|
||||||
|
|
||||||
@@ -67,7 +61,7 @@ sub worker_body {
|
|||||||
$wid, $jid;
|
$wid, $jid;
|
||||||
|
|
||||||
my $local_job = { $job->get_columns };
|
my $local_job = { $job->get_columns };
|
||||||
$local_job->{role} = $role_map->{$job->action};
|
$local_job->{type} = $job_type;
|
||||||
|
|
||||||
# copy job to local queue
|
# copy job to local queue
|
||||||
$self->do('add_jobs', [$local_job]);
|
$self->do('add_jobs', [$local_job]);
|
||||||
|
|||||||
@@ -187,6 +187,27 @@ dns:
|
|||||||
# expire:
|
# expire:
|
||||||
# when: '20 23 * * *'
|
# when: '20 23 * * *'
|
||||||
|
|
||||||
|
job_types:
|
||||||
|
discoverall: Poller
|
||||||
|
discover: Poller
|
||||||
|
arpwalk: Poller
|
||||||
|
arpnip: Poller
|
||||||
|
macwalk: Poller
|
||||||
|
macsuck: Poller
|
||||||
|
nbtwalk: Poller
|
||||||
|
nbtstat: Poller
|
||||||
|
expire: Poller
|
||||||
|
location: Interactive
|
||||||
|
contact: Interactive
|
||||||
|
portcontrol: Interactive
|
||||||
|
portname: Interactive
|
||||||
|
vlan: Interactive
|
||||||
|
power: Interactive
|
||||||
|
|
||||||
|
job_type_keys:
|
||||||
|
Poller: pollers
|
||||||
|
Interactive: interactives
|
||||||
|
|
||||||
# ---------------
|
# ---------------
|
||||||
# DANCER INTERNAL
|
# DANCER INTERNAL
|
||||||
# ---------------
|
# ---------------
|
||||||
|
|||||||
Reference in New Issue
Block a user