From 517b1ae4c16d88f815917948d13001257b32a28e Mon Sep 17 00:00:00 2001 From: Oliver Gorwits Date: Sun, 10 Aug 2014 13:55:31 +0100 Subject: [PATCH] merge interactive and poller worker types --- Netdisco/bin/netdisco-daemon-fg | 2 +- .../lib/App/Netdisco/Daemon/LocalQueue.pm | 69 ------------------- .../lib/App/Netdisco/Daemon/Worker/Common.pm | 28 ++++---- .../App/Netdisco/Daemon/Worker/Interactive.pm | 17 ----- .../Worker/Interactive/DeviceActions.pm | 4 +- .../Daemon/Worker/Interactive/PortActions.pm | 8 +-- .../lib/App/Netdisco/Daemon/Worker/Poller.pm | 8 +-- 7 files changed, 22 insertions(+), 114 deletions(-) delete mode 100644 Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm delete mode 100644 Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm diff --git a/Netdisco/bin/netdisco-daemon-fg b/Netdisco/bin/netdisco-daemon-fg index 238e39d1..8b13e5a7 100755 --- a/Netdisco/bin/netdisco-daemon-fg +++ b/Netdisco/bin/netdisco-daemon-fg @@ -30,7 +30,7 @@ use Role::Tiny::With; # preload all worker modules into shared memory use Module::Find (); -Module::Find::useall 'App::Netdisco::Daemon::Worker'; +Module::Find::useall 'App::Netdisco::Daemon'; use MCE::Signal '-setpgrp'; use MCE::Flow Sereal => 1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm b/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm deleted file mode 100644 index b3dbcb9a..00000000 --- a/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm +++ /dev/null @@ -1,69 +0,0 @@ -package App::Netdisco::Daemon::LocalQueue; - -use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; - -use base 'Exporter'; -our @EXPORT = (); -our @EXPORT_OK = qw/ add_jobs capacity_for take_jobs reset_jobs release_jobs /; -our %EXPORT_TAGS = ( all => \@EXPORT_OK ); - -schema('daemon')->deploy; -my $queue = schema('daemon')->resultset('Admin'); - -sub add_jobs { - my (@jobs) = @_; - info sprintf "adding %s jobs to local queue", scalar @jobs; - schema('daemon')->dclone($_)->insert for @jobs; -} - -sub capacity_for { - my ($type) = @_; - debug "checking local capacity for worker type $type"; - - my $setting = setting('workers')->{ setting('job_type_keys')->{$type} }; - my $current = $queue->search({type => $type})->count; - return ($current < $setting); -} - -sub take_jobs { - my ($wid, $type, $max) = @_; - return () unless $wid > 1; - $max ||= 1; - - debug "deleting completed jobs by worker $wid"; - $queue->search({wid => $wid})->delete; - - debug "searching for $max new jobs for worker $wid (type $type)"; - my $rs = $queue->search( - {type => $type, wid => 0}, - {rows => $max}, - ); - - my @rows = $rs->all; - return [] if scalar @rows == 0; - - debug sprintf "booking out %s jobs to worker %s", (scalar @rows), $wid; - $queue->search({job => { -in => [map {$_->job} @rows] }}) - ->update({wid => $wid}); - - return \@rows; -} - -# not used by workers, only the daemon when reinitializing a worker -sub reset_jobs { - my ($wid) = @_; - debug "resetting jobs owned by worker $wid to be available"; - return unless $wid > 1; - $queue->search({wid => $wid}) - ->update({wid => 0}); -} - -# not used by workers, only the daemon when reinitializing a worker -sub release_jobs { - my ($jid) = @_; - debug "releasing local job ID $jid"; - $queue->search({job => $jid})->delete; -} - -1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm index 2648d68a..01928171 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm @@ -15,25 +15,23 @@ sub worker_body { my $self = shift; my $wid = $self->wid; - my $tag = $self->worker_tag; - my $type = $self->worker_type; - while (1) { - prctl sprintf 'netdisco-daemon: worker #%s %s: idle', $wid, lc($type); + prctl sprintf 'netdisco-daemon: worker #%s poller: idle', $wid; my $job = $self->{queue}->dequeue(1); next unless defined $job; - $job = schema('daemon')->dclone( $job ); # TODO stop using DBIC - my $target = $self->munge_action($job->action); + # TODO stop using DBIC + $job = schema('daemon')->dclone( $job ); + my $action = $job->action; try { $job->started(scalar localtime); - prctl sprintf 'netdisco-daemon: worker #%s %s: working on #%s: %s', - $wid, lc($type), $job->id, $job->summary; - info sprintf "$tag (%s): starting %s job(%s) at %s", - $wid, $target, $job->id, $job->started; - my ($status, $log) = $self->$target($job); + prctl sprintf 'netdisco-daemon: worker #%s poller: working on #%s: %s', + $wid, $job->id, $job->summary; + info sprintf "pol (%s): starting %s job(%s) at %s", + $wid, $action, $job->id, $job->started; + my ($status, $log) = $self->$action($job); $job->status($status); $job->log($log); } @@ -49,13 +47,11 @@ sub worker_body { sub close_job { my ($self, $job) = @_; - my $tag = $self->worker_tag; - my $type = $self->worker_type; my $now = scalar localtime; - prctl sprintf 'netdisco-daemon: worker #%s %s: wrapping up %s #%s: %s', - $self->wid, lc($type), $job->action, $job->id, $job->status; - info sprintf "$tag (%s): wrapping up %s job(%s) - status %s at %s", + prctl sprintf 'netdisco-daemon: worker #%s poller: wrapping up %s #%s: %s', + $self->wid, $job->action, $job->id, $job->status; + info sprintf "pol (%s): wrapping up %s job(%s) - status %s at %s", $self->wid, $job->action, $job->id, $job->status, $now; try { diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm deleted file mode 100644 index 1ae2aa2c..00000000 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive.pm +++ /dev/null @@ -1,17 +0,0 @@ -package App::Netdisco::Daemon::Worker::Interactive; - -use Role::Tiny; -use namespace::clean; - -# main worker body -with 'App::Netdisco::Daemon::Worker::Common'; - -# add dispatch methods for interactive actions -with 'App::Netdisco::Daemon::Worker::Interactive::DeviceActions', - 'App::Netdisco::Daemon::Worker::Interactive::PortActions'; - -sub worker_tag { 'int' } -sub worker_type { 'Interactive' } -sub munge_action { 'set_' . $_[1] } - -1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive/DeviceActions.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive/DeviceActions.pm index af3e02a0..5aa2a9d4 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive/DeviceActions.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive/DeviceActions.pm @@ -7,12 +7,12 @@ use App::Netdisco::Daemon::Util ':all'; use Role::Tiny; use namespace::clean; -sub set_location { +sub location { my ($self, $job) = @_; return _set_device_generic($job->device, 'location', $job->subaction); } -sub set_contact { +sub contact { my ($self, $job) = @_; return _set_device_generic($job->device, 'contact', $job->subaction); } diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive/PortActions.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive/PortActions.pm index 7adf941c..ad2472ac 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive/PortActions.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Interactive/PortActions.pm @@ -7,12 +7,12 @@ use App::Netdisco::Daemon::Util ':all'; use Role::Tiny; use namespace::clean; -sub set_portname { +sub portname { my ($self, $job) = @_; return _set_port_generic($job, 'alias', 'name'); } -sub set_portcontrol { +sub portcontrol { my ($self, $job) = @_; my $port = get_port($job->device, $job->port) @@ -39,7 +39,7 @@ sub set_portcontrol { } } -sub set_vlan { +sub vlan { my ($self, $job) = @_; my $port = get_port($job->device, $job->port) @@ -97,7 +97,7 @@ sub _set_port_generic { return job_done("Updated [$pn] $slot status on [$ip] to [$data]"); } -sub set_power { +sub power { my ($self, $job) = @_; my $port = get_port($job->device, $job->port) diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller.pm index 1350ba4e..001c327a 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller.pm @@ -11,10 +11,8 @@ with 'App::Netdisco::Daemon::Worker::Poller::Device', 'App::Netdisco::Daemon::Worker::Poller::Arpnip', 'App::Netdisco::Daemon::Worker::Poller::Macsuck', 'App::Netdisco::Daemon::Worker::Poller::Nbtstat', - 'App::Netdisco::Daemon::Worker::Poller::Expiry'; - -sub worker_tag { 'pol' } -sub worker_type { 'Poller' } -sub munge_action { $_[1] } + 'App::Netdisco::Daemon::Worker::Poller::Expiry', + 'App::Netdisco::Daemon::Worker::Interactive::DeviceActions', + 'App::Netdisco::Daemon::Worker::Interactive::PortActions'; 1;