diff --git a/Netdisco/bin/netdisco-daemon-fg b/Netdisco/bin/netdisco-daemon-fg index ef46edda..73b3cabb 100755 --- a/Netdisco/bin/netdisco-daemon-fg +++ b/Netdisco/bin/netdisco-daemon-fg @@ -19,7 +19,7 @@ use Dancer qw/:moose :script/; warning sprintf "App::Netdisco %s backend", ($App::Netdisco::VERSION || 'HEAD'); # callbacks and local job queue management -use App::Netdisco::Daemon::Queue ':all'; +use App::Netdisco::Daemon::LocalQueue ':all'; # needed to quench AF_INET6 symbol errors use NetAddr::IP::Lite ':lower'; diff --git a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm b/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm index 2806b8c4..d7bec538 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm @@ -13,7 +13,7 @@ __PACKAGE__->add_columns( is_nullable => 0, }, - "role", # Poller, Interactive, etc + "type", # Poller, Interactive, etc { data_type => "text", is_nullable => 0 }, "wid", # worker ID, only != 0 once taken diff --git a/Netdisco/lib/App/Netdisco/Daemon/Queue.pm b/Netdisco/lib/App/Netdisco/Daemon/Queue.pm index b64d2915..e8b454b5 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Queue.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Queue.pm @@ -1,4 +1,4 @@ -package App::Netdisco::Daemon::Queue; +package App::Netdisco::Daemon::LocalQueue; use Dancer qw/:moose :syntax :script/; use Dancer::Plugin::DBIC 'schema'; @@ -18,49 +18,25 @@ sub add_jobs { } sub capacity_for { - my ($action) = @_; + my ($type) = @_; debug "checking local capacity for action $action"; - my $action_map = { - Poller => [ - qw/discoverall discover arpwalk arpnip macwalk macsuck nbtstat nbtwalk expire/ - ], - Interactive => [qw/location contact portcontrol portname vlan power/], - }; - - my $role_map = { - (map {$_ => 'Poller'} @{ $action_map->{Poller} }), - (map {$_ => 'Interactive'} @{ $action_map->{Interactive} }) - }; - - my $setting_map = { - Poller => 'pollers', - Interactive => 'interactives', - }; - - my $role = $role_map->{$action}; - my $setting = $setting_map->{$role}; - - my $current = $queue->search({role => $role})->count; - - return ($current < setting('workers')->{$setting}); + my $setting = setting('workers')->{ $job_type_keys->{$type} }; + my $current = $queue->search({type => $type})->count; + return ($current < $setting); } sub take_jobs { - my ($wid, $role, $max) = @_; - $max ||= 1; + my ($wid, $type, $max) = @_; # asking for more jobs means the current ones are done - debug "removing complete jobs for worker $wid from local queue"; - $queue->search({wid => $wid})->delete; + scrub_jobs($wid); - debug "searching for $max new jobs for worker $wid (role $role)"; - my $rs = $queue->search( - {role => $role, wid => 0}, - {rows => $max}, - ); - - my @rows = $rs->all; + debug "searching for $max new jobs for worker $wid (type $type)"; + my @rows = $queue->search( + {type => $type, wid => 0}, + {rows => ($max || 1)}, + )->all; return [] if scalar @rows == 0; debug sprintf "booking out %s jobs to worker %s", scalar @rows, $wid; @@ -79,7 +55,7 @@ sub reset_jobs { sub scrub_jobs { my ($wid) = @_; - debug "deleting jobs owned by worker $wid"; + debug "deleting dangling jobs owned by worker $wid"; return unless $wid > 1; $queue->search({wid => $wid})->delete; } diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm index 134f94f1..9fe7a3aa 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm @@ -11,13 +11,6 @@ use namespace::clean; my $fqdn = hostfqdn || 'localhost'; -my $role_map = { - (map {$_ => 'Poller'} - qw/discoverall discover arpwalk arpnip macwalk macsuck nbtstat nbtwalk expire/), - (map {$_ => 'Interactive'} - qw/location contact portcontrol portname vlan power/) -}; - sub worker_begin { my $self = shift; my $wid = $self->wid; @@ -32,7 +25,7 @@ sub worker_begin { if (scalar @jobs) { info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs; - map { $_->{role} = $role_map->{$_->{action}} } @jobs; + map { $_->{type} = setting('job_types')->{$_->{action}} } @jobs; $self->do('add_jobs', \@jobs); } @@ -55,9 +48,10 @@ sub worker_body { debug "mgr ($wid): getting potential jobs for $num_slots workers"; while (my $job = $rs->next) { my $jid = $job->job; + my $job_type = setting('job_types')->{$job->action} or next; # check for available local capacity - next unless $self->do('capacity_for', $job->action); + next unless $self->do('capacity_for', $job_type); debug sprintf "mgr (%s): processing node has capacity for job %s (%s)", $wid, $jid, $job->action; @@ -67,7 +61,7 @@ sub worker_body { $wid, $jid; my $local_job = { $job->get_columns }; - $local_job->{role} = $role_map->{$job->action}; + $local_job->{type} = $job_type; # copy job to local queue $self->do('add_jobs', [$local_job]); diff --git a/Netdisco/share/config.yml b/Netdisco/share/config.yml index 5fffcfee..83e5d64e 100644 --- a/Netdisco/share/config.yml +++ b/Netdisco/share/config.yml @@ -187,6 +187,27 @@ dns: # expire: # when: '20 23 * * *' +job_types: + discoverall: Poller + discover: Poller + arpwalk: Poller + arpnip: Poller + macwalk: Poller + macsuck: Poller + nbtwalk: Poller + nbtstat: Poller + expire: Poller + location: Interactive + contact: Interactive + portcontrol: Interactive + portname: Interactive + vlan: Interactive + power: Interactive + +job_type_keys: + Poller: pollers + Interactive: interactives + # --------------- # DANCER INTERNAL # ---------------