From f3131adfc884eb2eee658c4b5426c84a42beb6f5 Mon Sep 17 00:00:00 2001 From: Oliver Gorwits Date: Tue, 6 May 2014 21:47:30 +0100 Subject: [PATCH] big patch to remove knowledge of DB from most worker code --- .../App/Netdisco/Daemon/DB/Result/Admin.pm | 5 +- .../Netdisco/Daemon/JobQueue/PostgreSQL.pm | 145 ++++++++++++++++-- .../lib/App/Netdisco/Daemon/LocalQueue.pm | 9 +- .../lib/App/Netdisco/Daemon/Worker/Common.pm | 50 ++---- .../lib/App/Netdisco/Daemon/Worker/Manager.pm | 67 ++------ .../Netdisco/Daemon/Worker/Poller/Common.pm | 33 ++-- .../Netdisco/Daemon/Worker/Poller/Device.pm | 38 ++--- .../App/Netdisco/Daemon/Worker/Scheduler.pm | 21 +-- 8 files changed, 199 insertions(+), 169 deletions(-) diff --git a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm b/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm index d7bec538..64f3899c 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm @@ -8,10 +8,7 @@ use base 'DBIx::Class::Core'; __PACKAGE__->table("admin"); __PACKAGE__->add_columns( "job", - { - data_type => "integer", - is_nullable => 0, - }, + { data_type => "integer", is_nullable => 0 }, "type", # Poller, Interactive, etc { data_type => "text", is_nullable => 0 }, diff --git a/Netdisco/lib/App/Netdisco/Daemon/JobQueue/PostgreSQL.pm b/Netdisco/lib/App/Netdisco/Daemon/JobQueue/PostgreSQL.pm index ac09df48..a9c90e30 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/JobQueue/PostgreSQL.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/JobQueue/PostgreSQL.pm @@ -3,27 +3,144 @@ package App::Netdisco::Daemon::JobQueue::PostgreSQL; use Dancer qw/:moose :syntax :script/; use Dancer::Plugin::DBIC 'schema'; +use Net::Domain 'hostfqdn'; +use Try::Tiny; + use Role::Tiny; use namespace::clean; -sub jobqueue_insert { - my ($self, $settings) = @_; +#jq_get +#jq_getlocal +#jq_queued +#jq_lock +#jq_defer +#jq_complete +#jq_insert - schema('netdisco')->resultset('Admin')->create({ - action => $settings->{action}, - device => $settings->{device}, - port => $settings->{port}, - subaction => $settings->{extra}, - status => 'queued', - }); +sub jq_get { + my ($self, $num_slots) = @_; + my @returned = (); + + my $rs = schema('netdisco')->resultset('Admin') + ->search( + {status => 'queued'}, + {order_by => 'random()', rows => ($num_slots || 1)}, + ); + + while (my $job = $rs->next) { + my $job_type = setting('job_types')->{$job->action} or next; + push @returned, schema('daemon')->resultset('Admin') + ->new_result({ $job->get_columns, type => $job_type }); + } + + return @returned; } -sub jobqueue_update { - my ($self, $settings) = @_; +sub jq_getlocal { + my $self = shift; + my $fqdn = hostfqdn || 'localhost'; + my @returned = (); - schema('netdisco')->resultset('Admin') - ->find(delete $settings->{id}, {for => 'update'}) - ->update($settings); + my $rs = schema('netdisco')->resultset('Admin') + ->search({status => "queued-$fqdn"}); + + while (my $job = $rs->next) { + my $job_type = setting('job_types')->{$job->action} or next; + push @returned, schema('daemon')->resultset('Admin') + ->new_result({ $job->get_columns, type => $job_type }); + } + + return @returned; +} + +sub jq_queued { + my ($self, $job_type) = @_; + + return schema('netdisco')->resultset('Admin')->search({ + device => { '!=' => undef}, + action => $job_type, + status => { -like => 'queued%' }, + })->get_column('device')->all; +} + +sub jq_lock { + my ($self, $job) = @_; + my $fqdn = hostfqdn || 'localhost'; + my $happy = 0; + + # lock db row and update to show job has been picked + try { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin') + ->find($job->id, {for => 'update'}) + ->update({ status => "queued-$fqdn" }); + }); + $happy = 1; + }; + + return $happy; +} + +sub jq_defer { + my ($self, $job) = @_; + my $happy = 0; + + # lock db row and update to show job is available + try { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin') + ->find($job->id, {for => 'update'}) + ->update({ status => 'queued' }); + }); + $happy = 1; + }; + + return $happy; +} + +sub jq_complete { + my ($self, $job) = @_; + my $happy = 0; + + # lock db row and update to show job is done/error + try { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin') + ->find($job->id, {for => 'update'})->update({ + status => $job->status, + log => $job->log, + finished => $job->finished, + }); + }); + $happy = 1; + }; + + return $happy; +} + +sub jq_insert { + my ($self, $jobs) = @_; + $jobs = [$jobs] if ref [] ne ref $jobs; + my $happy = 0; + + try { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin')->populate([ + map {{ + device => $_->{device}, + port => $_->{port}, + action => $_->{action}, + subaction => ($_->{extra} || $_->{subaction}), + username => $_->{username}, + userip => $_->{userip}, + status => 'queued', + }} @$jobs + ]); + }); + $happy = 1; + }; + + return $happy; } true; diff --git a/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm b/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm index f3ef561f..58a8f366 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm @@ -12,9 +12,10 @@ schema('daemon')->deploy; my $queue = schema('daemon')->resultset('Admin'); sub add_jobs { - my ($jobs) = @_; - info sprintf "adding %s jobs to local queue", scalar @$jobs; - $queue->populate($jobs); + my (@jobs) = @_; + info sprintf "adding %s jobs to local queue", scalar @jobs; + use Data::Printer; + do { schema('daemon')->dclone($_)->insert } for @jobs; } sub capacity_for { @@ -46,7 +47,7 @@ sub take_jobs { $queue->search({job => { -in => [map {$_->job} @rows] }}) ->update({wid => $wid}); - return [ map {{$_->get_columns}} @rows ]; + return \@rows; } sub reset_jobs { diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm index ea9b2dde..7ef553f8 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm @@ -1,14 +1,13 @@ package App::Netdisco::Daemon::Worker::Common; use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; use Try::Tiny; use Role::Tiny; use namespace::clean; with 'App::Netdisco::Daemon::JobQueue::'. setting('job_queue'); -requires qw/worker_type worker_name munge_action jobqueue_update/; +requires qw/worker_type worker_name munge_action jq_defer jq_complete/; sub worker_body { my $self = shift; @@ -21,33 +20,27 @@ sub worker_body { debug "$type ($wid): asking for a job"; my $jobs = $self->do('take_jobs', $self->wid, $name); - foreach my $candidate (@$jobs) { - # create a row object so we can use column accessors - # use the local db schema in case it is accidentally 'stored' - # (will throw an exception) - my $job = schema('daemon')->resultset('Admin') - ->new_result($candidate); - my $jid = $job->job; - + foreach my $job (@$jobs) { my $target = $self->munge_action($job->action); next unless $self->can($target); - debug "$type ($wid): can ${target}() for job $jid"; + debug sprintf "$type ($wid): can ${target}() for job %s", $job->id; # do job - my ($status, $log); try { $job->started(scalar localtime); info sprintf "$type (%s): starting %s job(%s) at %s", - $wid, $target, $jid, $job->started; - ($status, $log) = $self->$target($job); + $wid, $target, $job->id, $job->started; + my ($status, $log) = $self->$target($job); + $job->status($status); + $job->log($log); } catch { - $status = 'error'; - $log = "error running job: $_"; - $self->sendto('stderr', $log ."\n"); + $job->status('error'); + $job->log("error running job: $_"); + $self->sendto('stderr', $job->log ."\n"); }; - $self->close_job($job, $status, $log); + $self->close_job($job); } debug "$type ($wid): sleeping now..."; @@ -56,29 +49,20 @@ sub worker_body { } sub close_job { - my ($self, $job, $status, $log) = @_; + my ($self, $job) = @_; my $type = $self->worker_type; my $now = scalar localtime; info sprintf "$type (%s): wrapping up %s job(%s) - status %s at %s", - $self->wid, $job->action, $job->job, $status, $now; + $self->wid, $job->action, $job->id, $job->status, $now; - # lock db row and either defer or complete the job try { - if ($status eq 'defer') { - $self->jobqueue_update({ - id => $job->job, - status => 'queued', - }); + if ($job->status eq 'defer') { + $self->jq_defer($job); } else { - $self->jobqueue_update({ - id => $job->job, - status => $status, - log => $log, - started => $job->started, - finished => $now, - }); + $job->finished($now); + $self->jq_complete($job); } # remove job from local queue diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm index 9fe7a3aa..d6b8abc3 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm @@ -1,15 +1,12 @@ package App::Netdisco::Daemon::Worker::Manager; use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; - -use Net::Domain 'hostfqdn'; -use Try::Tiny; use Role::Tiny; use namespace::clean; -my $fqdn = hostfqdn || 'localhost'; +with 'App::Netdisco::Daemon::JobQueue::'. setting('job_queue'); +requires qw/jq_get jq_getlocal jq_lock/; sub worker_begin { my $self = shift; @@ -18,16 +15,11 @@ sub worker_begin { # requeue jobs locally debug "mgr ($wid): searching for jobs booked to this processing node"; - my $rs = schema('netdisco')->resultset('Admin') - ->search({status => "queued-$fqdn"}); - - my @jobs = map {{$_->get_columns}} $rs->all; + my @jobs = $self->jq_getlocal; if (scalar @jobs) { info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs; - map { $_->{type} = setting('job_types')->{$_->{action}} } @jobs; - - $self->do('add_jobs', \@jobs); + $self->do('add_jobs', @jobs); } } @@ -37,62 +29,31 @@ sub worker_body { my $num_slots = $self->do('num_workers') or return debug "mgr ($wid): this node has no workers... quitting manager"; - # get some pending jobs - my $rs = schema('netdisco')->resultset('Admin') - ->search( - {status => 'queued'}, - {order_by => 'random()', rows => $num_slots}, - ); - while (1) { debug "mgr ($wid): getting potential jobs for $num_slots workers"; - while (my $job = $rs->next) { - my $jid = $job->job; - my $job_type = setting('job_types')->{$job->action} or next; + + # get some pending jobs + # TODO also check for stale jobs in Netdisco DB + foreach my $job ( $self->jq_get($num_slots) ) { # check for available local capacity - next unless $self->do('capacity_for', $job_type); + my $job_type = setting('job_types')->{$job->action}; + next unless $job_type and $self->do('capacity_for', $job_type); debug sprintf "mgr (%s): processing node has capacity for job %s (%s)", - $wid, $jid, $job->action; + $wid, $job->id, $job->action; # mark job as running - next unless $self->lock_job($job); + next unless $self->jq_lock($job); info sprintf "mgr (%s): job %s booked out for this processing node", - $wid, $jid; - - my $local_job = { $job->get_columns }; - $local_job->{type} = $job_type; + $wid, $job->id; # copy job to local queue - $self->do('add_jobs', [$local_job]); + $self->do('add_jobs', $job); } - # reset iterator so ->next() triggers another DB query - $rs->reset; - - # TODO also check for stale jobs in Netdisco DB - debug "mgr ($wid): sleeping now..."; sleep( setting('workers')->{sleep_time} || 2 ); } } -sub lock_job { - my ($self, $job) = @_; - my $happy = 0; - - # lock db row and update to show job has been picked - try { - schema('netdisco')->txn_do(sub { - schema('netdisco')->resultset('Admin')->find( - {job => $job->job, status => 'queued'}, - {for => 'update'} - )->update({ status => "queued-$fqdn" }); - }); - $happy = 1; - }; - - return $happy; -} - 1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Common.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Common.pm index 9f3f5cd2..448c2cbb 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Common.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Common.pm @@ -1,51 +1,40 @@ package App::Netdisco::Daemon::Worker::Poller::Common; use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; use App::Netdisco::Util::SNMP 'snmp_connect'; use App::Netdisco::Util::Device 'get_device'; use App::Netdisco::Daemon::Util ':all'; +use Dancer::Plugin::DBIC 'schema'; use NetAddr::IP::Lite ':lower'; use Role::Tiny; use namespace::clean; +with 'App::Netdisco::Daemon::JobQueue::'. setting('job_queue'); +requires qw/jq_queued jq_insert/; + # queue a job for all devices known to Netdisco sub _walk_body { my ($self, $job_type, $job) = @_; - my $action_method = $job_type .'_action'; - my $job_action = $self->$action_method; - my $layer_method = $job_type .'_layer'; my $job_layer = $self->$layer_method; - my $jobqueue = schema('netdisco')->resultset('Admin'); + my %queued = map {$_ => 1} $self->jq_queued($job_type); my @devices = schema('netdisco')->resultset('Device') - ->search({ip => { -not_in => - $jobqueue->search({ - device => { '!=' => undef}, - action => $job_type, - status => { -like => 'queued%' }, - })->get_column('device')->as_query - }})->has_layer($job_layer)->get_column('ip')->all; + ->has_layer($job_layer)->get_column('ip')->all; + my @filtered_devices = grep {!exists $queued{$_}} @devices; - my $filter_method = $job_type .'_filter'; - my $job_filter = $self->$filter_method; - - my @filtered_devices = grep {$job_filter->($_)} @devices; - - schema('netdisco')->resultset('Admin')->txn_do_locked(sub { - $jobqueue->populate([ + $self->jq_insert([ map {{ device => $_, action => $job_type, - status => 'queued', + username => $job->username, + userip => $job->userip, }} (@filtered_devices) - ]); - }); + ]); return job_done("Queued $job_type job for all devices"); } diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Device.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Device.pm index dcb42ec1..fc840ddf 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Device.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Device.pm @@ -1,43 +1,38 @@ package App::Netdisco::Daemon::Worker::Poller::Device; use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; use App::Netdisco::Util::SNMP 'snmp_connect'; use App::Netdisco::Util::Device qw/get_device is_discoverable/; use App::Netdisco::Core::Discover ':all'; use App::Netdisco::Daemon::Util ':all'; +use Dancer::Plugin::DBIC 'schema'; use NetAddr::IP::Lite ':lower'; use Role::Tiny; use namespace::clean; +with 'App::Netdisco::Daemon::JobQueue::'. setting('job_queue'); +requires qw/jq_queued jq_insert/; + # queue a discover job for all devices known to Netdisco sub discoverall { my ($self, $job) = @_; - my $jobqueue = schema('netdisco')->resultset('Admin'); - my $devices = schema('netdisco')->resultset('Device') - ->search({ip => { -not_in => - $jobqueue->search({ - device => { '!=' => undef}, - action => 'discover', - status => { -like => 'queued%' }, - })->get_column('device')->as_query - }})->get_column('ip'); + my %queued = map {$_ => 1} $self->jq_queued('discover'); + my @devices = schema('netdisco')->resultset('Device') + ->get_column('ip')->all; + my @filtered_devices = grep {!exists $queued{$_}} @devices; - schema('netdisco')->resultset('Admin')->txn_do_locked(sub { - $jobqueue->populate([ + $self->jq_insert([ map {{ device => $_, action => 'discover', - status => 'queued', username => $job->username, userip => $job->userip, - }} ($devices->all) - ]); - }); + }} (@filtered_devices) + ]); return job_done("Queued discover job for all devices"); } @@ -48,7 +43,6 @@ sub discover { my $host = NetAddr::IP::Lite->new($job->device); my $device = get_device($host->addr); - my $jobqueue = schema('netdisco')->resultset('Admin'); if ($device->ip eq '0.0.0.0') { return job_error("discover failed: no device param (need -d ?)"); @@ -80,26 +74,20 @@ sub discover { # if requested, and the device has not yet been arpniped/macsucked, queue now if ($device->in_storage and $job->subaction and $job->subaction eq 'with-nodes') { if (!defined $device->last_macsuck) { - schema('netdisco')->txn_do(sub { - $jobqueue->create({ + $self->jq_insert({ device => $device->ip, action => 'macsuck', - status => 'queued', username => $job->username, userip => $job->userip, - }); }); } if (!defined $device->last_arpnip) { - schema('netdisco')->txn_do(sub { - $jobqueue->create({ + $self->jq_insert({ device => $device->ip, action => 'arpnip', - status => 'queued', username => $job->username, userip => $job->userip, - }); }); } } diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm index 141f4fa3..0b1e476e 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm @@ -1,15 +1,13 @@ package App::Netdisco::Daemon::Worker::Scheduler; use Dancer qw/:moose :syntax :script/; - use Algorithm::Cron; -use Try::Tiny; use Role::Tiny; use namespace::clean; with 'App::Netdisco::Daemon::JobQueue::'. setting('job_queue'); -requires 'jobqueue_insert'; +requires 'jq_insert'; sub worker_begin { my $self = shift; @@ -55,17 +53,12 @@ sub worker_body { next unless $sched->{when}->next_time($win_start) <= $win_end; # queue it! - try { - info "sched ($wid): queueing $action job"; - $self->jobqueue_insert({ - action => $action, - device => $sched->{device}, - extra => $sched->{extra}, - }); - } - catch { - debug "sched ($wid): action $action was not queued (dupe?)"; - }; + info "sched ($wid): queueing $action job"; + $self->jq_insert({ + action => $action, + device => $sched->{device}, + extra => $sched->{extra}, + }); } } }