merge interactive and poller worker types

This commit is contained in:
Oliver Gorwits
2014-08-10 13:55:31 +01:00
parent e9043b90e8
commit 517b1ae4c1
7 changed files with 22 additions and 114 deletions

View File

@@ -30,7 +30,7 @@ use Role::Tiny::With;
# preload all worker modules into shared memory
use Module::Find ();
Module::Find::useall 'App::Netdisco::Daemon::Worker';
Module::Find::useall 'App::Netdisco::Daemon';
use MCE::Signal '-setpgrp';
use MCE::Flow Sereal => 1;

View File

@@ -1,69 +0,0 @@
package App::Netdisco::Daemon::LocalQueue;
use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';
use base 'Exporter';
our @EXPORT = ();
our @EXPORT_OK = qw/ add_jobs capacity_for take_jobs reset_jobs release_jobs /;
our %EXPORT_TAGS = ( all => \@EXPORT_OK );
schema('daemon')->deploy;
my $queue = schema('daemon')->resultset('Admin');
sub add_jobs {
my (@jobs) = @_;
info sprintf "adding %s jobs to local queue", scalar @jobs;
schema('daemon')->dclone($_)->insert for @jobs;
}
sub capacity_for {
my ($type) = @_;
debug "checking local capacity for worker type $type";
my $setting = setting('workers')->{ setting('job_type_keys')->{$type} };
my $current = $queue->search({type => $type})->count;
return ($current < $setting);
}
sub take_jobs {
my ($wid, $type, $max) = @_;
return () unless $wid > 1;
$max ||= 1;
debug "deleting completed jobs by worker $wid";
$queue->search({wid => $wid})->delete;
debug "searching for $max new jobs for worker $wid (type $type)";
my $rs = $queue->search(
{type => $type, wid => 0},
{rows => $max},
);
my @rows = $rs->all;
return [] if scalar @rows == 0;
debug sprintf "booking out %s jobs to worker %s", (scalar @rows), $wid;
$queue->search({job => { -in => [map {$_->job} @rows] }})
->update({wid => $wid});
return \@rows;
}
# not used by workers, only the daemon when reinitializing a worker
sub reset_jobs {
my ($wid) = @_;
debug "resetting jobs owned by worker $wid to be available";
return unless $wid > 1;
$queue->search({wid => $wid})
->update({wid => 0});
}
# not used by workers, only the daemon when reinitializing a worker
sub release_jobs {
my ($jid) = @_;
debug "releasing local job ID $jid";
$queue->search({job => $jid})->delete;
}
1;

View File

@@ -15,25 +15,23 @@ sub worker_body {
my $self = shift;
my $wid = $self->wid;
my $tag = $self->worker_tag;
my $type = $self->worker_type;
while (1) {
prctl sprintf 'netdisco-daemon: worker #%s %s: idle', $wid, lc($type);
prctl sprintf 'netdisco-daemon: worker #%s poller: idle', $wid;
my $job = $self->{queue}->dequeue(1);
next unless defined $job;
$job = schema('daemon')->dclone( $job ); # TODO stop using DBIC
my $target = $self->munge_action($job->action);
# TODO stop using DBIC
$job = schema('daemon')->dclone( $job );
my $action = $job->action;
try {
$job->started(scalar localtime);
prctl sprintf 'netdisco-daemon: worker #%s %s: working on #%s: %s',
$wid, lc($type), $job->id, $job->summary;
info sprintf "$tag (%s): starting %s job(%s) at %s",
$wid, $target, $job->id, $job->started;
my ($status, $log) = $self->$target($job);
prctl sprintf 'netdisco-daemon: worker #%s poller: working on #%s: %s',
$wid, $job->id, $job->summary;
info sprintf "pol (%s): starting %s job(%s) at %s",
$wid, $action, $job->id, $job->started;
my ($status, $log) = $self->$action($job);
$job->status($status);
$job->log($log);
}
@@ -49,13 +47,11 @@ sub worker_body {
sub close_job {
my ($self, $job) = @_;
my $tag = $self->worker_tag;
my $type = $self->worker_type;
my $now = scalar localtime;
prctl sprintf 'netdisco-daemon: worker #%s %s: wrapping up %s #%s: %s',
$self->wid, lc($type), $job->action, $job->id, $job->status;
info sprintf "$tag (%s): wrapping up %s job(%s) - status %s at %s",
prctl sprintf 'netdisco-daemon: worker #%s poller: wrapping up %s #%s: %s',
$self->wid, $job->action, $job->id, $job->status;
info sprintf "pol (%s): wrapping up %s job(%s) - status %s at %s",
$self->wid, $job->action, $job->id, $job->status, $now;
try {

View File

@@ -1,17 +0,0 @@
package App::Netdisco::Daemon::Worker::Interactive;
use Role::Tiny;
use namespace::clean;
# main worker body
with 'App::Netdisco::Daemon::Worker::Common';
# add dispatch methods for interactive actions
with 'App::Netdisco::Daemon::Worker::Interactive::DeviceActions',
'App::Netdisco::Daemon::Worker::Interactive::PortActions';
sub worker_tag { 'int' }
sub worker_type { 'Interactive' }
sub munge_action { 'set_' . $_[1] }
1;

View File

@@ -7,12 +7,12 @@ use App::Netdisco::Daemon::Util ':all';
use Role::Tiny;
use namespace::clean;
sub set_location {
sub location {
my ($self, $job) = @_;
return _set_device_generic($job->device, 'location', $job->subaction);
}
sub set_contact {
sub contact {
my ($self, $job) = @_;
return _set_device_generic($job->device, 'contact', $job->subaction);
}

View File

@@ -7,12 +7,12 @@ use App::Netdisco::Daemon::Util ':all';
use Role::Tiny;
use namespace::clean;
sub set_portname {
sub portname {
my ($self, $job) = @_;
return _set_port_generic($job, 'alias', 'name');
}
sub set_portcontrol {
sub portcontrol {
my ($self, $job) = @_;
my $port = get_port($job->device, $job->port)
@@ -39,7 +39,7 @@ sub set_portcontrol {
}
}
sub set_vlan {
sub vlan {
my ($self, $job) = @_;
my $port = get_port($job->device, $job->port)
@@ -97,7 +97,7 @@ sub _set_port_generic {
return job_done("Updated [$pn] $slot status on [$ip] to [$data]");
}
sub set_power {
sub power {
my ($self, $job) = @_;
my $port = get_port($job->device, $job->port)

View File

@@ -11,10 +11,8 @@ with 'App::Netdisco::Daemon::Worker::Poller::Device',
'App::Netdisco::Daemon::Worker::Poller::Arpnip',
'App::Netdisco::Daemon::Worker::Poller::Macsuck',
'App::Netdisco::Daemon::Worker::Poller::Nbtstat',
'App::Netdisco::Daemon::Worker::Poller::Expiry';
sub worker_tag { 'pol' }
sub worker_type { 'Poller' }
sub munge_action { $_[1] }
'App::Netdisco::Daemon::Worker::Poller::Expiry',
'App::Netdisco::Daemon::Worker::Interactive::DeviceActions',
'App::Netdisco::Daemon::Worker::Interactive::PortActions';
1;