New backend daemon code, no SQLite. MCE::Flow.

Squashed commit of the following:

commit 3284b62509
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sun Aug 10 21:17:06 2014 +0100

    config defaults tidying

commit ade7bcd880
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sun Aug 10 20:00:01 2014 +0100

    high priority jobs are picked first and inserted to prio queue

commit d450dfd2bd
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sun Aug 10 19:25:21 2014 +0100

    better status

commit b8a742e5de
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sun Aug 10 16:54:03 2014 +0100

    update proctitle when worker not running

commit 0c3675a8f4
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sun Aug 10 16:48:58 2014 +0100

    remove all trace of SQLite - new lightweight Job object

commit a13ed25f6a
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sun Aug 10 14:45:22 2014 +0100

    rename pollers to tasks

commit 44b50f413f
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sun Aug 10 14:13:00 2014 +0100

    update docs

commit 517b1ae4c1
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sun Aug 10 13:55:31 2014 +0100

    merge interactive and poller worker types

commit e9043b90e8
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sun Aug 10 13:47:41 2014 +0100

    only take one job at a time per worker

commit 2366738d54
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sun Aug 10 13:43:31 2014 +0100

    auto job priorities

commit 1fd473fd50
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sun Aug 10 13:18:59 2014 +0100

    preload all worker modules into shared memory

commit 9ceb43c0f7
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sun Aug 10 13:13:07 2014 +0100

    daemon clean

commit c817a35537
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sun Aug 10 12:36:24 2014 +0100

    first refactor for MCE::Flow and MCE::Queue
This commit is contained in:
Oliver Gorwits
2014-08-10 21:34:46 +01:00
parent d3992f57fd
commit 9afbe6d7e6
23 changed files with 278 additions and 477 deletions

View File

@@ -13,7 +13,6 @@ requires 'Archive::Extract' => 0;
requires 'CGI::Expand' => 2.05; requires 'CGI::Expand' => 2.05;
requires 'Data::Printer' => 0; requires 'Data::Printer' => 0;
requires 'DBD::Pg' => 0; requires 'DBD::Pg' => 0;
requires 'DBD::SQLite' => 1.37;
requires 'DBIx::Class' => 0.08250; requires 'DBIx::Class' => 0.08250;
requires 'DBIx::Class::Helpers' => 2.018004; requires 'DBIx::Class::Helpers' => 2.018004;
requires 'Daemon::Control' => 0.001000; requires 'Daemon::Control' => 0.001000;
@@ -32,7 +31,7 @@ requires 'MIME::Base64' => 3.13;
requires 'Module::Find' => 0.12; requires 'Module::Find' => 0.12;
requires 'Module::Load' => 0.32; requires 'Module::Load' => 0.32;
requires 'Moo' => 1.001000; requires 'Moo' => 1.001000;
requires 'MCE' => 1.408; requires 'MCE' => 1.515;
requires 'Net::Domain' => 1.23; requires 'Net::Domain' => 1.23;
requires 'Net::DNS' => 0.72; requires 'Net::DNS' => 0.72;
requires 'Net::LDAP' => 0; requires 'Net::LDAP' => 0;
@@ -44,6 +43,7 @@ requires 'Plack' => 1.0023;
requires 'Plack::Middleware::Expires' => 0.03; requires 'Plack::Middleware::Expires' => 0.03;
requires 'Plack::Middleware::ReverseProxy' => 0.15; requires 'Plack::Middleware::ReverseProxy' => 0.15;
requires 'Role::Tiny' => 1.002005; requires 'Role::Tiny' => 1.002005;
requires 'Sereal' => 0;
requires 'Socket6' => 0.23; requires 'Socket6' => 0.23;
requires 'Starman' => 0.4008; requires 'Starman' => 0.4008;
requires 'SNMP::Info' => 3.18; requires 'SNMP::Info' => 3.18;

View File

@@ -24,16 +24,17 @@ use App::Netdisco;
use Dancer qw/:moose :script/; use Dancer qw/:moose :script/;
warning sprintf "App::Netdisco %s backend", ($App::Netdisco::VERSION || 'HEAD'); warning sprintf "App::Netdisco %s backend", ($App::Netdisco::VERSION || 'HEAD');
# local job queue management
use App::Netdisco::Daemon::LocalQueue ':all';
use App::Netdisco::Util::Daemon; use App::Netdisco::Util::Daemon;
use NetAddr::IP::Lite ':lower'; # to quench AF_INET6 symbol errors
# needed to quench AF_INET6 symbol errors
use NetAddr::IP::Lite ':lower';
use List::Util 'sum';
use Role::Tiny::With; use Role::Tiny::With;
# preload all worker modules into shared memory
use Module::Find ();
Module::Find::useall 'App::Netdisco::Daemon';
use MCE::Signal '-setpgrp'; use MCE::Signal '-setpgrp';
use MCE; use MCE::Flow Sereal => 1;
use MCE::Queue;
# set temporary MCE files' location in home directory # set temporary MCE files' location in home directory
my $home = ($ENV{NETDISCO_HOME} || $ENV{HOME}); my $home = ($ENV{NETDISCO_HOME} || $ENV{HOME});
@@ -43,79 +44,38 @@ mkdir $tmp_dir if ! -d $tmp_dir;
setpgrp(0,0); # only portable variety of setpgrp setpgrp(0,0); # only portable variety of setpgrp
prctl 'netdisco-daemon: master'; prctl 'netdisco-daemon: master';
my $mce = MCE->new( # shared local job queue
spawn_delay => 0.15, my $queue = MCE::Queue->new;
job_delay => 1.15,
tmp_dir => $tmp_dir,
user_func => sub { $_[0]->worker_body },
on_post_exit => \&restart_this_worker,
user_tasks => build_tasks_list(),
)->run();
sub build_tasks_list { # support a scheduler-only node
# NB MCE does not like max_workers => 0 setting('workers')->{'no_manager'} = 1
my $tasks = []; if setting('workers')->{tasks} eq '0';
push @$tasks, { mce_flow {
max_workers => 1, task_name => [qw/ scheduler manager poller /],
user_begin => worker_factory('Manager'), max_workers => [ 1, 1, setting('workers')->{tasks} ],
} if num_workers() > 0; tmp_dir => $tmp_dir,
on_post_exit => sub { MCE->restart_worker },
}, _mk_wkr('Scheduler'), _mk_wkr('Manager'), _mk_wkr('Poller');
push @$tasks, { sub _mk_wkr {
max_workers => 1,
user_begin => worker_factory('Scheduler'),
} if setting('schedule');
my @logmsg = ();
foreach my $key (keys %{setting('job_type_keys')}) {
my $val = setting('job_type_keys')->{$key};
setting('workers')->{$val} = 2
if !defined setting('workers')->{$val};
push @logmsg, setting('workers')->{$val} ." $key";
push @$tasks, {
max_workers => setting('workers')->{$val},
user_begin => worker_factory($key),
} if setting('workers')->{$val};
}
info sprintf "MCE will load: %s Manager, %s Scheduler, %s",
(num_workers() ? 1 : 0),
(setting('schedule') ? 1 : 0),
(join ', ', @logmsg);
return $tasks;
}
sub num_workers {
return sum( 0, map { setting('workers')->{$_} }
values %{setting('job_type_keys')} );
}
sub worker_factory {
my $role = shift; my $role = shift;
return sub { return sub {
my $self = shift; my $self = shift;
my $wid = $self->wid; $self->{queue} = $queue;
prctl sprintf 'netdisco-daemon: worker #%s %s: init', $wid, lc($role);
info "applying role $role to worker $wid";
# $self->sendto('stderr', ">>> worker $wid starting with role $role\n"); prctl sprintf 'netdisco-daemon: worker #%s %s: init', MCE->wid, lc($role);
Role::Tiny->apply_roles_to_object($self, "App::Netdisco::Daemon::Worker::$role"); info sprintf 'applying role %s to worker %s', $role, MCE->wid;
# post-fork, become manager, scheduler, poller, etc
Role::Tiny->apply_roles_to_object(
$self => "App::Netdisco::Daemon::Worker::$role");
$self->worker_begin if $self->can('worker_begin'); $self->worker_begin if $self->can('worker_begin');
$self->worker_body;
}; };
} }
sub restart_this_worker {
my ($self, $e) = @_;
reset_jobs($e->{wid});
debug "restarting worker $e->{wid}";
$self->restart_worker($e->{wid});
}
=head1 NAME =head1 NAME
netdisco-daemon-fg - Job Control for Netdisco netdisco-daemon-fg - Job Control for Netdisco

View File

@@ -35,8 +35,8 @@ BEGIN {
# for netdisco app config # for netdisco app config
use App::Netdisco; use App::Netdisco;
use App::Netdisco::Daemon::Job;
use Dancer qw/:moose :script/; use Dancer qw/:moose :script/;
use Dancer::Plugin::DBIC 'schema';
info "App::Netdisco version $App::Netdisco::VERSION loaded."; info "App::Netdisco version $App::Netdisco::VERSION loaded.";
@@ -73,9 +73,6 @@ $ENV{DBIC_TRACE} ||= $sqltrace;
# reconfigure logging to force console output # reconfigure logging to force console output
Dancer::Logger->init('console', $CONFIG); Dancer::Logger->init('console', $CONFIG);
# for the in-memory local job queue
schema('daemon')->deploy;
# get requested action # get requested action
my $action = shift @ARGV; my $action = shift @ARGV;
@@ -143,7 +140,7 @@ if (not $worker->can( $action )) {
} }
# what job are we asked to do? # what job are we asked to do?
my $job = schema('daemon')->resultset('Admin')->new_result({ my $job = App::Netdisco::Daemon::Job->new({
job => 0, job => 0,
action => $action, action => $action,
device => $device, device => $device,

View File

@@ -35,21 +35,18 @@ if (ref {} eq ref setting('database')) {
} }
# static configuration for the in-memory local job queue
setting('plugins')->{DBIC}->{daemon} = {
dsn => 'dbi:SQLite:dbname=:memory:',
options => {
AutoCommit => 1,
RaiseError => 1,
sqlite_use_immediate_transaction => 1,
},
schema_class => 'App::Netdisco::Daemon::DB',
};
# defaults for workers # defaults for workers
setting('workers')->{queue} ||= 'PostgreSQL'; setting('workers')->{queue} ||= 'PostgreSQL';
setting('workers')->{interactives} = 1 if (exists setting('workers')->{interactives}
if setting('workers') and not exists setting('workers')->{interactives}; or exists setting('workers')->{pollers}) {
setting('workers')->{tasks} =
(setting('workers')->{pollers} || 0)
+ (setting('workers')->{interactives} || 0);
delete setting('workers')->{pollers};
delete setting('workers')->{interactives};
}
# force skipped DNS resolution, if unset # force skipped DNS resolution, if unset
setting('dns')->{hosts_file} ||= '/etc/hosts'; setting('dns')->{hosts_file} ||= '/etc/hosts';

View File

@@ -1,10 +0,0 @@
use utf8;
package App::Netdisco::Daemon::DB;
use strict;
use warnings;
use base 'DBIx::Class::Schema';
__PACKAGE__->load_namespaces;
1;

View File

@@ -1,90 +0,0 @@
use utf8;
package App::Netdisco::Daemon::DB::Result::Admin;
use strict;
use warnings;
use base 'DBIx::Class::Core';
__PACKAGE__->table("admin");
__PACKAGE__->add_columns(
"job",
{ data_type => "integer", is_nullable => 0 },
"type", # Poller, Interactive, etc
{ data_type => "text", is_nullable => 0 },
"wid", # worker ID, only != 0 once taken
{ data_type => "integer", is_nullable => 0, default_value => 0 },
"entered",
{ data_type => "timestamp", is_nullable => 1 },
"started",
{ data_type => "timestamp", is_nullable => 1 },
"finished",
{ data_type => "timestamp", is_nullable => 1 },
"device",
{ data_type => "inet", is_nullable => 1 },
"port",
{ data_type => "text", is_nullable => 1 },
"action",
{ data_type => "text", is_nullable => 1 },
"subaction",
{ data_type => "text", is_nullable => 1 },
"status",
{ data_type => "text", is_nullable => 1 },
"username",
{ data_type => "text", is_nullable => 1 },
"userip",
{ data_type => "inet", is_nullable => 1 },
"log",
{ data_type => "text", is_nullable => 1 },
"debug",
{ data_type => "boolean", is_nullable => 1 },
);
__PACKAGE__->set_primary_key("job");
=head1 METHODS
=head2 summary
An attempt to make a meaningful statement about the job.
=cut
sub summary {
my $job = shift;
return join ' ',
$job->action,
($job->device || ''),
($job->port || '');
# ($job->subaction ? (q{'}. $job->subaction .q{'}) : '');
}
=head1 ADDITIONAL COLUMNS
=head2 extra
Alias for the C<subaction> column.
=cut
sub extra { (shift)->subaction }
=head2 entererd_stamp
Formatted version of the C<entered> field, accurate to the minute.
The format is somewhat like ISO 8601 or RFC3339 but without the middle C<T>
between the date stamp and time stamp. That is:
2012-02-06 12:49
=cut
sub entered_stamp {
(my $stamp = (shift)->entered) =~ s/\.\d+$//;
return $stamp;
}
1;

View File

@@ -0,0 +1,54 @@
package App::Netdisco::Daemon::Job;
use Moo;
use namespace::clean;
foreach my $slot (qw/
job
entered
started
finished
device
port
action
subaction
status
username
userip
log
debug
/) {
has $slot => (
is => 'rw',
);
}
=head1 METHODS
=head2 summary
An attempt to make a meaningful statement about the job.
=cut
sub summary {
my $job = shift;
return join ' ',
$job->action,
($job->device || ''),
($job->port || '');
# ($job->subaction ? (q{'}. $job->subaction .q{'}) : '');
}
=head1 ADDITIONAL COLUMNS
=head2 extra
Alias for the C<subaction> column.
=cut
sub extra { (shift)->subaction }
1;

View File

@@ -1,69 +0,0 @@
package App::Netdisco::Daemon::LocalQueue;
use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';
use base 'Exporter';
our @EXPORT = ();
our @EXPORT_OK = qw/ add_jobs capacity_for take_jobs reset_jobs release_jobs /;
our %EXPORT_TAGS = ( all => \@EXPORT_OK );
schema('daemon')->deploy;
my $queue = schema('daemon')->resultset('Admin');
sub add_jobs {
my (@jobs) = @_;
info sprintf "adding %s jobs to local queue", scalar @jobs;
schema('daemon')->dclone($_)->insert for @jobs;
}
sub capacity_for {
my ($type) = @_;
debug "checking local capacity for worker type $type";
my $setting = setting('workers')->{ setting('job_type_keys')->{$type} };
my $current = $queue->search({type => $type})->count;
return ($current < $setting);
}
sub take_jobs {
my ($wid, $type, $max) = @_;
return () unless $wid > 1;
$max ||= 1;
debug "deleting completed jobs by worker $wid";
$queue->search({wid => $wid})->delete;
debug "searching for $max new jobs for worker $wid (type $type)";
my $rs = $queue->search(
{type => $type, wid => 0},
{rows => $max},
);
my @rows = $rs->all;
return [] if scalar @rows == 0;
debug sprintf "booking out %s jobs to worker %s", (scalar @rows), $wid;
$queue->search({job => { -in => [map {$_->job} @rows] }})
->update({wid => $wid});
return \@rows;
}
# not used by workers, only the daemon when reinitializing a worker
sub reset_jobs {
my ($wid) = @_;
debug "resetting jobs owned by worker $wid to be available";
return unless $wid > 1;
$queue->search({wid => $wid})
->update({wid => 0});
}
# not used by workers, only the daemon when reinitializing a worker
sub release_jobs {
my ($jid) = @_;
debug "releasing local job ID $jid";
$queue->search({job => $jid})->delete;
}
1;

View File

@@ -8,53 +8,47 @@ use App::Netdisco::Util::Daemon;
use Role::Tiny; use Role::Tiny;
use namespace::clean; use namespace::clean;
use App::Netdisco::JobQueue qw/jq_take jq_defer jq_complete/; use App::Netdisco::JobQueue qw/jq_defer jq_complete/;
sub worker_body { sub worker_body {
my $self = shift; my $self = shift;
my $wid = $self->wid; my $wid = $self->wid;
my $tag = $self->worker_tag;
my $type = $self->worker_type;
while (1) { while (1) {
prctl sprintf 'netdisco-daemon: worker #%s %s: idle', $wid, lc($type); prctl sprintf 'netdisco-daemon: worker #%s poller: idle', $wid;
my $jobs = jq_take($self->wid, $type);
foreach my $job (@$jobs) { my $job = $self->{queue}->dequeue(1);
my $target = $self->munge_action($job->action); next unless defined $job;
my $action = $job->action;
try { try {
$job->started(scalar localtime); $job->started(scalar localtime);
prctl sprintf 'netdisco-daemon: worker #%s %s: working on #%s: %s', prctl sprintf 'netdisco-daemon: worker #%s poller: working on #%s: %s',
$wid, lc($type), $job->id, $job->summary; $wid, $job->job, $job->summary;
info sprintf "$tag (%s): starting %s job(%s) at %s", info sprintf "pol (%s): starting %s job(%s) at %s",
$wid, $target, $job->id, $job->started; $wid, $action, $job->job, $job->started;
my ($status, $log) = $self->$target($job); my ($status, $log) = $self->$action($job);
$job->status($status); $job->status($status);
$job->log($log); $job->log($log);
}
catch {
$job->status('error');
$job->log("error running job: $_");
$self->sendto('stderr', $job->log ."\n");
};
$self->close_job($job);
} }
catch {
$job->status('error');
$job->log("error running job: $_");
$self->sendto('stderr', $job->log ."\n");
};
$self->close_job($job);
} }
} }
sub close_job { sub close_job {
my ($self, $job) = @_; my ($self, $job) = @_;
my $tag = $self->worker_tag;
my $type = $self->worker_type;
my $now = scalar localtime; my $now = scalar localtime;
prctl sprintf 'netdisco-daemon: worker #%s %s: wrapping up %s #%s: %s', prctl sprintf 'netdisco-daemon: worker #%s poller: wrapping up %s #%s: %s',
$self->wid, lc($type), $job->action, $job->id, $job->status; $self->wid, $job->action, $job->job, $job->status;
info sprintf "$tag (%s): wrapping up %s job(%s) - status %s at %s", info sprintf "pol (%s): wrapping up %s job(%s) - status %s at %s",
$self->wid, $job->action, $job->id, $job->status, $now; $self->wid, $job->action, $job->job, $job->status, $now;
try { try {
if ($job->status eq 'defer') { if ($job->status eq 'defer') {

View File

@@ -1,17 +0,0 @@
package App::Netdisco::Daemon::Worker::Interactive;
use Role::Tiny;
use namespace::clean;
# main worker body
with 'App::Netdisco::Daemon::Worker::Common';
# add dispatch methods for interactive actions
with 'App::Netdisco::Daemon::Worker::Interactive::DeviceActions',
'App::Netdisco::Daemon::Worker::Interactive::PortActions';
sub worker_tag { 'int' }
sub worker_type { 'Interactive' }
sub munge_action { 'set_' . $_[1] }
1;

View File

@@ -7,12 +7,12 @@ use App::Netdisco::Daemon::Util ':all';
use Role::Tiny; use Role::Tiny;
use namespace::clean; use namespace::clean;
sub set_location { sub location {
my ($self, $job) = @_; my ($self, $job) = @_;
return _set_device_generic($job->device, 'location', $job->subaction); return _set_device_generic($job->device, 'location', $job->subaction);
} }
sub set_contact { sub contact {
my ($self, $job) = @_; my ($self, $job) = @_;
return _set_device_generic($job->device, 'contact', $job->subaction); return _set_device_generic($job->device, 'contact', $job->subaction);
} }

View File

@@ -7,12 +7,12 @@ use App::Netdisco::Daemon::Util ':all';
use Role::Tiny; use Role::Tiny;
use namespace::clean; use namespace::clean;
sub set_portname { sub portname {
my ($self, $job) = @_; my ($self, $job) = @_;
return _set_port_generic($job, 'alias', 'name'); return _set_port_generic($job, 'alias', 'name');
} }
sub set_portcontrol { sub portcontrol {
my ($self, $job) = @_; my ($self, $job) = @_;
my $port = get_port($job->device, $job->port) my $port = get_port($job->device, $job->port)
@@ -39,7 +39,7 @@ sub set_portcontrol {
} }
} }
sub set_vlan { sub vlan {
my ($self, $job) = @_; my ($self, $job) = @_;
my $port = get_port($job->device, $job->port) my $port = get_port($job->device, $job->port)
@@ -97,7 +97,7 @@ sub _set_port_generic {
return job_done("Updated [$pn] $slot status on [$ip] to [$data]"); return job_done("Updated [$pn] $slot status on [$ip] to [$data]");
} }
sub set_power { sub power {
my ($self, $job) = @_; my ($self, $job) = @_;
my $port = get_port($job->device, $job->port) my $port = get_port($job->device, $job->port)

View File

@@ -8,7 +8,8 @@ use App::Netdisco::Util::Daemon;
use Role::Tiny; use Role::Tiny;
use namespace::clean; use namespace::clean;
use App::Netdisco::JobQueue qw/jq_locked jq_getsome jq_lock/; use App::Netdisco::JobQueue qw/jq_locked jq_getsome jq_getsomep jq_lock/;
use MCE::Util ();
sub worker_begin { sub worker_begin {
my $self = shift; my $self = shift;
@@ -26,7 +27,7 @@ sub worker_begin {
if (scalar @jobs) { if (scalar @jobs) {
info sprintf "mgr (%s): found %s jobs booked to this processing node", info sprintf "mgr (%s): found %s jobs booked to this processing node",
$wid, scalar @jobs; $wid, scalar @jobs;
$self->do('add_jobs', @jobs); $self->{queue}->enqueuep(100, @jobs);
} }
} }
@@ -34,39 +35,54 @@ sub worker_body {
my $self = shift; my $self = shift;
my $wid = $self->wid; my $wid = $self->wid;
return debug "mgr ($wid): no need for manager... quitting" if (setting('workers')->{'no_manager'}) {
if setting('workers')->{'no_manager'}; prctl sprintf 'netdisco-daemon: worker #%s manager: inactive', $wid;
return debug "mgr ($wid): no need for manager... quitting"
my $num_slots = sum( 0, map { setting('workers')->{$_} } }
values %{setting('job_type_keys')} );
while (1) { while (1) {
debug "mgr ($wid): getting potential jobs for $num_slots workers";
prctl sprintf 'netdisco-daemon: worker #%s manager: gathering', $wid; prctl sprintf 'netdisco-daemon: worker #%s manager: gathering', $wid;
my $num_slots = 0;
# get some pending jobs $num_slots =
MCE::Util::_parse_max_workers( setting('workers')->{tasks} )
- $self->{queue}->pending();
debug "mgr ($wid): getting potential jobs for $num_slots workers (HP)";
# get some high priority jobs
# TODO also check for stale jobs in Netdisco DB # TODO also check for stale jobs in Netdisco DB
foreach my $job ( jq_getsome($num_slots) ) { foreach my $job ( jq_getsomep($num_slots) ) {
# check for available local capacity
my $job_type = setting('job_types')->{$job->action};
next unless $job_type and $self->do('capacity_for', $job_type);
debug sprintf "mgr (%s): processing node has capacity for job %s (%s)",
$wid, $job->id, $job->action;
# mark job as running # mark job as running
next unless jq_lock($job); next unless jq_lock($job);
info sprintf "mgr (%s): job %s booked out for this processing node", info sprintf "mgr (%s): job %s booked out for this processing node",
$wid, $job->id; $wid, $job->job;
# copy job to local queue # copy job to local queue
$self->do('add_jobs', $job); $self->{queue}->enqueuep(100, $job);
}
$num_slots =
MCE::Util::_parse_max_workers( setting('workers')->{tasks} )
- $self->{queue}->pending();
debug "mgr ($wid): getting potential jobs for $num_slots workers (NP)";
# get some normal priority jobs
# TODO also check for stale jobs in Netdisco DB
foreach my $job ( jq_getsome($num_slots) ) {
# mark job as running
next unless jq_lock($job);
info sprintf "mgr (%s): job %s booked out for this processing node",
$wid, $job->job;
# copy job to local queue
$self->{queue}->enqueue($job);
} }
debug "mgr ($wid): sleeping now..."; debug "mgr ($wid): sleeping now...";
prctl sprintf 'netdisco-daemon: worker #%s manager: idle', $wid; prctl sprintf 'netdisco-daemon: worker #%s manager: idle', $wid;
sleep( setting('workers')->{sleep_time} || 2 ); sleep( setting('workers')->{sleep_time} || 1 );
} }
} }

View File

@@ -11,10 +11,8 @@ with 'App::Netdisco::Daemon::Worker::Poller::Device',
'App::Netdisco::Daemon::Worker::Poller::Arpnip', 'App::Netdisco::Daemon::Worker::Poller::Arpnip',
'App::Netdisco::Daemon::Worker::Poller::Macsuck', 'App::Netdisco::Daemon::Worker::Poller::Macsuck',
'App::Netdisco::Daemon::Worker::Poller::Nbtstat', 'App::Netdisco::Daemon::Worker::Poller::Nbtstat',
'App::Netdisco::Daemon::Worker::Poller::Expiry'; 'App::Netdisco::Daemon::Worker::Poller::Expiry',
'App::Netdisco::Daemon::Worker::Interactive::DeviceActions',
sub worker_tag { 'pol' } 'App::Netdisco::Daemon::Worker::Interactive::PortActions';
sub worker_type { 'Poller' }
sub munge_action { $_[1] }
1; 1;

View File

@@ -13,6 +13,10 @@ use App::Netdisco::JobQueue qw/jq_insert/;
sub worker_begin { sub worker_begin {
my $self = shift; my $self = shift;
my $wid = $self->wid; my $wid = $self->wid;
return debug "sch ($wid): no need for scheduler... skip begin"
unless setting('schedule');
debug "entering Scheduler ($wid) worker_begin()"; debug "entering Scheduler ($wid) worker_begin()";
foreach my $action (keys %{ setting('schedule') }) { foreach my $action (keys %{ setting('schedule') }) {
@@ -34,6 +38,11 @@ sub worker_body {
my $self = shift; my $self = shift;
my $wid = $self->wid; my $wid = $self->wid;
unless (setting('schedule')) {
prctl sprintf 'netdisco-daemon: worker #%s scheduler: inactive', $wid;
return debug "sch ($wid): no need for scheduler... quitting"
}
while (1) { while (1) {
# sleep until some point in the next minute # sleep until some point in the next minute
my $naptime = 60 - (time % 60) + int(rand(45)); my $naptime = 60 - (time % 60) + int(rand(45));

View File

@@ -10,11 +10,11 @@ use base 'Exporter';
our @EXPORT = (); our @EXPORT = ();
our @EXPORT_OK = qw/ our @EXPORT_OK = qw/
jq_getsome jq_getsome
jq_getsomep
jq_locked jq_locked
jq_queued jq_queued
jq_log jq_log
jq_userlog jq_userlog
jq_take
jq_lock jq_lock
jq_defer jq_defer
jq_complete jq_complete
@@ -42,6 +42,10 @@ Returns a list of randomly selected queued jobs. Default is to return one job,
unless C<$num> is provided. Jobs are returned as objects which implement the unless C<$num> is provided. Jobs are returned as objects which implement the
Netdisco job instance interface (see below). Netdisco job instance interface (see below).
=head2 jq_getsomep( $num? )
Same as C<jq_getsome> but for high priority jobs.
=head2 jq_locked() =head2 jq_locked()
Returns the list of jobs currently booked out to this processing node (denoted Returns the list of jobs currently booked out to this processing node (denoted
@@ -64,12 +68,6 @@ Returns a list of jobs which have been entered into the queue by the passed
C<$user>. Jobs are returned as objects which implement the Netdisco job C<$user>. Jobs are returned as objects which implement the Netdisco job
instance interface (see below). instance interface (see below).
=head2 jq_take( $wid, $type, $max? )
Searches in the queue for jobs of type C<$type> and if up to C<$max> are
available, will book them out to the worker with ID C<$wid>. The default
number of booked jobs is 1.
=head2 jq_lock( $job ) =head2 jq_lock( $job )
Marks a job in the queue as booked out to this processing node (denoted by the Marks a job in the queue as booked out to this processing node (denoted by the
@@ -108,10 +106,6 @@ jobs from the queue.
=head2 id (auto) =head2 id (auto)
=head2 type (required)
=head2 wid (required, default 0)
=head2 entered =head2 entered
=head2 started =head2 started

View File

@@ -3,6 +3,7 @@ package App::Netdisco::JobQueue::PostgreSQL;
use Dancer qw/:moose :syntax :script/; use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema'; use Dancer::Plugin::DBIC 'schema';
use App::Netdisco::Daemon::Job;
use Net::Domain 'hostfqdn'; use Net::Domain 'hostfqdn';
use Module::Load (); use Module::Load ();
use Try::Tiny; use Try::Tiny;
@@ -11,11 +12,11 @@ use base 'Exporter';
our @EXPORT = (); our @EXPORT = ();
our @EXPORT_OK = qw/ our @EXPORT_OK = qw/
jq_getsome jq_getsome
jq_getsomep
jq_locked jq_locked
jq_queued jq_queued
jq_log jq_log
jq_userlog jq_userlog
jq_take
jq_lock jq_lock
jq_defer jq_defer
jq_complete jq_complete
@@ -25,23 +26,26 @@ our @EXPORT_OK = qw/
our %EXPORT_TAGS = ( all => \@EXPORT_OK ); our %EXPORT_TAGS = ( all => \@EXPORT_OK );
sub jq_getsome { sub jq_getsome {
my $num_slots = shift; my ($num_slots, $prio) = @_;
return () if defined $num_slots and $num_slots eq '0';
$num_slots ||= 1;
$prio ||= 'normal';
my @returned = (); my @returned = ();
my $rs = schema('netdisco')->resultset('Admin') my $rs = schema('netdisco')->resultset('Admin')
->search( ->search(
{status => 'queued'}, {status => 'queued', action => { -in => setting('job_prio')->{$prio} } },
{order_by => 'random()', rows => ($num_slots || 1)}, {order_by => 'random()', rows => ($num_slots || 1)},
); );
while (my $job = $rs->next) { while (my $job = $rs->next) {
my $job_type = setting('job_types')->{$job->action} or next; push @returned, App::Netdisco::Daemon::Job->new({ $job->get_columns });
push @returned, schema('daemon')->resultset('Admin')
->new_result({ $job->get_columns, type => $job_type });
} }
return @returned; return @returned;
} }
sub jq_getsomep { return jq_getsome(shift, 'high') }
sub jq_locked { sub jq_locked {
my $fqdn = hostfqdn || 'localhost'; my $fqdn = hostfqdn || 'localhost';
my @returned = (); my @returned = ();
@@ -50,9 +54,7 @@ sub jq_locked {
->search({status => "queued-$fqdn"}); ->search({status => "queued-$fqdn"});
while (my $job = $rs->next) { while (my $job = $rs->next) {
my $job_type = setting('job_types')->{$job->action} or next; push @returned, App::Netdisco::Daemon::Job->new({ $job->get_columns });
push @returned, schema('daemon')->resultset('Admin')
->new_result({ $job->get_columns, type => $job_type });
} }
return @returned; return @returned;
} }
@@ -68,51 +70,18 @@ sub jq_queued {
} }
sub jq_log { sub jq_log {
my @returned = (); return schema('netdisco')->resultset('Admin')->search({}, {
my $rs = schema('netdisco')->resultset('Admin')->search({}, {
order_by => { -desc => [qw/entered device action/] }, order_by => { -desc => [qw/entered device action/] },
rows => 50, rows => 50,
}); })->with_times->hri->all;
while (my $job = $rs->next) {
my $job_type = setting('job_types')->{$job->action} or next;
push @returned, schema('daemon')->resultset('Admin')
->new_result({ $job->get_columns, type => $job_type });
}
return @returned;
} }
sub jq_userlog { sub jq_userlog {
my $user = shift; my $user = shift;
my @returned = (); return schema('netdisco')->resultset('Admin')->search({
my $rs = schema('netdisco')->resultset('Admin')->search({
username => $user, username => $user,
finished => { '>' => \"(now() - interval '5 seconds')" }, finished => { '>' => \"(now() - interval '5 seconds')" },
}); })->with_times->hri->all;
while (my $job = $rs->next) {
my $job_type = setting('job_types')->{$job->action} or next;
push @returned, schema('daemon')->resultset('Admin')
->new_result({ $job->get_columns, type => $job_type });
}
return @returned;
}
# PostgreSQL engine depends on LocalQueue, which is accessed synchronously via
# the main daemon process. This is only used by daemon workers which can use
# MCE ->do() method.
sub jq_take {
my ($wid, $type) = @_;
Module::Load::load 'MCE';
# be polite to SQLite database (that is, local CPU)
debug "$type ($wid): sleeping now...";
sleep(1);
debug "$type ($wid): asking for a job";
MCE->do('take_jobs', $wid, $type);
} }
sub jq_lock { sub jq_lock {
@@ -124,7 +93,7 @@ sub jq_lock {
try { try {
schema('netdisco')->txn_do(sub { schema('netdisco')->txn_do(sub {
schema('netdisco')->resultset('Admin') schema('netdisco')->resultset('Admin')
->find($job->id, {for => 'update'}) ->find($job->job, {for => 'update'})
->update({ status => "queued-$fqdn" }); ->update({ status => "queued-$fqdn" });
# remove any duplicate jobs, needed because we have race conditions # remove any duplicate jobs, needed because we have race conditions
@@ -138,30 +107,29 @@ sub jq_lock {
}, {for => 'update'})->delete(); }, {for => 'update'})->delete();
}); });
$happy = true; $happy = true;
}
catch {
error $_;
}; };
return $happy; return $happy;
} }
# PostgreSQL engine depends on LocalQueue, which is accessed synchronously via
# the main daemon process. This is only used by daemon workers which can use
# MCE ->do() method.
sub jq_defer { sub jq_defer {
my $job = shift; my $job = shift;
my $happy = false; my $happy = false;
try { try {
# other local workers are polling the central queue, so
# to prevent a race, first delete the job in our local queue
MCE->do('release_jobs', $job->id);
# lock db row and update to show job is available # lock db row and update to show job is available
schema('netdisco')->txn_do(sub { schema('netdisco')->txn_do(sub {
schema('netdisco')->resultset('Admin') schema('netdisco')->resultset('Admin')
->find($job->id, {for => 'update'}) ->find($job->job, {for => 'update'})
->update({ status => 'queued', started => undef }); ->update({ status => 'queued', started => undef });
}); });
$happy = true; $happy = true;
}
catch {
error $_;
}; };
return $happy; return $happy;
@@ -175,7 +143,7 @@ sub jq_complete {
try { try {
schema('netdisco')->txn_do(sub { schema('netdisco')->txn_do(sub {
schema('netdisco')->resultset('Admin') schema('netdisco')->resultset('Admin')
->find($job->id, {for => 'update'})->update({ ->find($job->job, {for => 'update'})->update({
status => $job->status, status => $job->status,
log => $job->log, log => $job->log,
started => $job->started, started => $job->started,
@@ -183,6 +151,9 @@ sub jq_complete {
}); });
}); });
$happy = true; $happy = true;
}
catch {
error $_;
}; };
return $happy; return $happy;
@@ -208,6 +179,9 @@ sub jq_insert {
]); ]);
}); });
$happy = true; $happy = true;
}
catch {
error $_;
}; };
return $happy; return $happy;

View File

@@ -879,15 +879,17 @@ or remove any entries.
Value: Settings Tree. Default: Value: Settings Tree. Default:
workers: workers:
interactives: 2 tasks: 'AUTO * 2'
pollers: 10 sleep_time: 1
sleep_time: 2
Control the activity of the backend daemon with this configuration setting. Control the activity of the backend daemon with this configuration setting.
C<interactives> and C<pollers> sets how many workers are started for C<tasks> sets how many worker processes are started for interactive jobs (port
interactive jobs (port control) and polling jobs (discover, macsuck, arpnip) control) and polling jobs (discover, macsuck, arpnip) on this node. Other
on this node, respectively. Other nodes can have different settings. nodes can have different settings.
"C<AUTO>" is the number of CPU cores. Set C<tasks> to "C<0>" to disable all
workers (which allows you to have a scheduler-only node).
C<sleep_time> is the number of seconds between polling the database to find C<sleep_time> is the number of seconds between polling the database to find
new jobs. This is a balance between responsiveness and database load. new jobs. This is a balance between responsiveness and database load.
@@ -919,9 +921,9 @@ If set, then this node's backend daemon will schedule polling jobs (discover,
macsuck, arpnip, etc) in the central database. It's fine to have multiple macsuck, arpnip, etc) in the central database. It's fine to have multiple
nodes scheduling work for redundancy (but make sure they all have good NTP). nodes scheduling work for redundancy (but make sure they all have good NTP).
Note that this is independent of the Pollers configured in C<workers>. It's Note that this is independent of the Tasks configured in C<workers>. It's
okay to have this node schedule schedule but not do any of the polling okay to have this node schedule schedule but not do any of the polling
itself (C<pollers: 0>). itself (C<tasks: 0>).
Work can be scheduled using C<cron> style notation, or a simple weekday and Work can be scheduled using C<cron> style notation, or a simple weekday and
hour fields (which accept same types as C<cron> notation). For example: hour fields (which accept same types as C<cron> notation). For example:

View File

@@ -54,9 +54,6 @@ For the daemon, it's very similar:
DBIC_TRACE=1 ~/bin/localenv bin/netdisco-daemon-fg DBIC_TRACE=1 ~/bin/localenv bin/netdisco-daemon-fg
Don't be alarmed by the high rate of database queries in the daemon - most of
them are communicating only with a local in-memory SQLite database.
Happy hacking! Happy hacking!
=head1 Introduction =head1 Introduction
@@ -418,24 +415,12 @@ the central Netdisco database and booking out jobs which it's able to service
according to the local configuration settings. Jobs are "locked" in the according to the local configuration settings. Jobs are "locked" in the
central queue and then copied to a local job queue within the daemon. central queue and then copied to a local job queue within the daemon.
Along with the Manager we start zero or more of two other types of worker.
Some jobs such as port control are "interactive" and the user typically wants
quick feedback on the results. Others such as polling are background tasks
which can take more time and are less schedule sensitive. So as not to starve
the "interactive" jobs of workers we have two types of worker.
The Interactive worker picks jobs from the local job queue relating to device
and port reconfiguration only. It submits results directly back to the central
Netdisco database. The Poller worker similarly picks job from the local queue,
this time relating to device discovery and polling.
There is support in the daemon for the workers to pick more than one job at a There is support in the daemon for the workers to pick more than one job at a
time from the local queue, in case we decide this is worth doing. However the time from the local queue, in case we decide this is worth doing. However the
Manager won't ever book out more jobs from the central Netdisco job queue than Manager won't ever book out more jobs from the central Netdisco job queue than
it has workers available (so as not to hog jobs for itself against other it has workers available (so as not to hog jobs for itself against other
daemons on other servers). The user is free to configure the number of daemons on other servers). The user is free to configure the number of
Interactive and Poller workers in their C<config.yml> file (zero or more of workers in their C<config.yml> file (zero or more).
each).
The fourth kind of worker is called the Scheduler and takes care of adding The fourth kind of worker is called the Scheduler and takes care of adding
discover, macsuck, arpnip, and nbtstat jobs to the queue (which are in turn discover, macsuck, arpnip, and nbtstat jobs to the queue (which are in turn
@@ -458,17 +443,6 @@ Configuration for SNMP::Info comes from the YAML files, of course. This means
that our C<mibhome> and C<mibdirs> settings are now in YAML format. In that our C<mibhome> and C<mibdirs> settings are now in YAML format. In
particular, the C<mibdirs> list is a real list within the configuration. particular, the C<mibdirs> list is a real list within the configuration.
=head2 DBIx::Class Layer
The local job queue for each Job Daemon is actually an SQLite database running
in memory. This makes the queue management code a little more elegant. The
schema for this is of course DBIx::Class using Dancer connection management,
and lives in L<App::Netdisco::Daemon::DB>.
There is currently only one table, the port control job queue, in
L<App::Netdisco::Daemon::DB::Result::Admin>. It's likely this name will change
in the future.
=head1 Other Noteable Technology =head1 Other Noteable Technology
=head2 C<local::lib> =head2 C<local::lib>

View File

@@ -36,6 +36,19 @@ but they are backwards compatible.
=back =back
=head1 2.029002
=head2 General Notices
The backend polling daemon has been rewritten and as a result your
configuration can be simplified. Some keys have also been renamed. Our advice
is to remove (or comment out) the complete C<workers> configuration which
enables auto-tuning. If you do wish to control the number of worker
processes, follow this pattern:
workers:
tasks: 'AUTO * 2' # this is the default, twice the number of CPUs
=head1 2.029001 =head1 2.029001
=head2 Health Advice =head2 Health Advice

View File

@@ -25,7 +25,9 @@ sub add_job {
}); });
} }
foreach my $action (keys %{ setting('job_types') }) { foreach my $action (@{ setting('job_prio')->{high} },
@{ setting('job_prio')->{normal} }) {
ajax "/ajax/control/admin/$action" => require_role admin => sub { ajax "/ajax/control/admin/$action" => require_role admin => sub {
add_job($action, param('device'), param('extra')); add_job($action, param('device'), param('extra'));
}; };

View File

@@ -108,7 +108,11 @@ macsuck_no: []
macsuck_only: [] macsuck_only: []
macsuck_all_vlans: false macsuck_all_vlans: false
macsuck_no_unnamed: false macsuck_no_unnamed: false
macsuck_no_vlan: [fddi-default,token-ring-default,fddinet-default,trnet-default] macsuck_no_vlan:
- 'fddi-default'
- 'token-ring-default'
- 'fddinet-default'
- 'trnet-default'
macsuck_no_devicevlan: [] macsuck_no_devicevlan: []
macsuck_bleed: false macsuck_bleed: false
macsuck_min_age: 0 macsuck_min_age: 0
@@ -170,9 +174,8 @@ port_control_reasons:
# -------------- # --------------
workers: workers:
interactives: 1 tasks: 'AUTO * 2'
pollers: 10 sleep_time: 1
sleep_time: 2
queue: PostgreSQL queue: PostgreSQL
dns: dns:
@@ -194,26 +197,24 @@ dns:
# expire: # expire:
# when: '20 23 * * *' # when: '20 23 * * *'
job_types: job_prio:
discoverall: Poller high:
discover: Poller - location
arpwalk: Poller - contact
arpnip: Poller - portcontrol
macwalk: Poller - portname
macsuck: Poller - vlan
nbtwalk: Poller - power
nbtstat: Poller normal:
expire: Poller - discoverall
location: Interactive - discover
contact: Interactive - arpwalk
portcontrol: Interactive - arpnip
portname: Interactive - macwalk
vlan: Interactive - macsuck
power: Interactive - nbtwalk
- nbtstat
job_type_keys: - expire
Poller: pollers
Interactive: interactives
# --------------- # ---------------
# GraphViz Export # GraphViz Export

View File

@@ -31,17 +31,17 @@ safe_password_store: true
# SNMP community string(s) # SNMP community string(s)
# ```````````````````````` # ````````````````````````
snmp_auth: snmp_auth:
- tag: 'v2default' - tag: 'default_v2_readonly'
community: 'public' community: 'public'
read: true read: true
write: false write: false
# - tag: 'v2default_w' # - tag: 'default_v2_for_write'
# community: 'private' # community: 'private'
# read: false # read: false
# write: true # write: true
# daemon will keep netdisco up to date on this schedule # this is the schedule for automatically keeping netdisco up-to-date
# ````````````````````````````````````````````````````` # ``````````````````````````````````````````````````````````````````
#schedule: #schedule:
# discoverall: # discoverall:
# when: '5 7 * * *' # when: '5 7 * * *'
@@ -56,16 +56,6 @@ snmp_auth:
# expire: # expire:
# when: '20 23 * * *' # when: '20 23 * * *'
# number of SNMP pollers to run in parallel (in netdisco-daemon)
# ``````````````````````````````````````````````````````````````
workers:
pollers: 10
# amount parallel DNS resolution for node names
# `````````````````````````````````````````````
dns:
max_outstanding: 50
# do not discover IP Phones or Wireless Access Points. # do not discover IP Phones or Wireless Access Points.
# usually these are visible as device neighbors but don't support # usually these are visible as device neighbors but don't support
# SNMP, which just clogs up the job queue. # SNMP, which just clogs up the job queue.
@@ -74,6 +64,18 @@ discover_no_type:
- '(?i)phone' - '(?i)phone'
- '(?i)(?:wap|wireless)' - '(?i)(?:wap|wireless)'
# number of SNMP workers to run in parallel (in netdisco-daemon).
# the default is twice the number of CPU cores. increase this if
# your system has few cores and the schedule is taking too long.
# ```````````````````````````````````````````````````````````````
#workers:
# tasks: 'AUTO * 2'
# number of parallel DNS queries for node names
# `````````````````````````````````````````````
#dns:
# max_outstanding: 50
# set to true to globally disable authentication/login. # set to true to globally disable authentication/login.
# create a user called "guest" if you want to assign port/admin rights. # create a user called "guest" if you want to assign port/admin rights.
# ````````````````````````````````````````````````````````````````````` # `````````````````````````````````````````````````````````````````````