worker roles in Role namespace
This commit is contained in:
@@ -1,90 +0,0 @@
|
||||
package App::Netdisco::Backend::Worker::Manager;
|
||||
|
||||
use Dancer qw/:moose :syntax :script/;
|
||||
|
||||
use List::Util 'sum';
|
||||
use App::Netdisco::Util::MCE;
|
||||
|
||||
use Role::Tiny;
|
||||
use namespace::clean;
|
||||
|
||||
use App::Netdisco::JobQueue
|
||||
qw/jq_locked jq_getsome jq_getsomep jq_lock jq_warm_thrusters/;
|
||||
|
||||
sub worker_begin {
|
||||
my $self = shift;
|
||||
my $wid = $self->wid;
|
||||
|
||||
return debug "mgr ($wid): no need for manager... skip begin"
|
||||
if setting('workers')->{'no_manager'};
|
||||
|
||||
debug "entering Manager ($wid) worker_begin()";
|
||||
|
||||
# job queue initialisation
|
||||
jq_warm_thrusters;
|
||||
|
||||
# requeue jobs locally
|
||||
debug "mgr ($wid): searching for jobs booked to this processing node";
|
||||
my @jobs = jq_locked;
|
||||
|
||||
if (scalar @jobs) {
|
||||
info sprintf "mgr (%s): found %s jobs booked to this processing node",
|
||||
$wid, scalar @jobs;
|
||||
$self->{queue}->enqueuep(100, @jobs);
|
||||
}
|
||||
}
|
||||
|
||||
sub worker_body {
|
||||
my $self = shift;
|
||||
my $wid = $self->wid;
|
||||
|
||||
if (setting('workers')->{'no_manager'}) {
|
||||
prctl sprintf 'nd2: #%s mgr: inactive', $wid;
|
||||
return debug "mgr ($wid): no need for manager... quitting"
|
||||
}
|
||||
|
||||
while (1) {
|
||||
prctl sprintf 'nd2: #%s mgr: gathering', $wid;
|
||||
my $num_slots = 0;
|
||||
|
||||
$num_slots = parse_max_workers( setting('workers')->{tasks} )
|
||||
- $self->{queue}->pending();
|
||||
debug "mgr ($wid): getting potential jobs for $num_slots workers (HP)";
|
||||
|
||||
# get some high priority jobs
|
||||
# TODO also check for stale jobs in Netdisco DB
|
||||
foreach my $job ( jq_getsomep($num_slots) ) {
|
||||
|
||||
# mark job as running
|
||||
next unless jq_lock($job);
|
||||
info sprintf "mgr (%s): job %s booked out for this processing node",
|
||||
$wid, $job->job;
|
||||
|
||||
# copy job to local queue
|
||||
$self->{queue}->enqueuep(100, $job);
|
||||
}
|
||||
|
||||
$num_slots = parse_max_workers( setting('workers')->{tasks} )
|
||||
- $self->{queue}->pending();
|
||||
debug "mgr ($wid): getting potential jobs for $num_slots workers (NP)";
|
||||
|
||||
# get some normal priority jobs
|
||||
# TODO also check for stale jobs in Netdisco DB
|
||||
foreach my $job ( jq_getsome($num_slots) ) {
|
||||
|
||||
# mark job as running
|
||||
next unless jq_lock($job);
|
||||
info sprintf "mgr (%s): job %s booked out for this processing node",
|
||||
$wid, $job->job;
|
||||
|
||||
# copy job to local queue
|
||||
$self->{queue}->enqueue($job);
|
||||
}
|
||||
|
||||
debug "mgr ($wid): sleeping now...";
|
||||
prctl sprintf 'nd2: #%s mgr: idle', $wid;
|
||||
sleep( setting('workers')->{sleep_time} || 1 );
|
||||
}
|
||||
}
|
||||
|
||||
1;
|
||||
@@ -1,68 +0,0 @@
|
||||
package App::Netdisco::Backend::Worker::Poller;
|
||||
|
||||
use Dancer qw/:moose :syntax :script/;
|
||||
|
||||
use Try::Tiny;
|
||||
use App::Netdisco::Util::MCE;
|
||||
|
||||
use Role::Tiny;
|
||||
use namespace::clean;
|
||||
|
||||
use Time::HiRes 'sleep';
|
||||
use App::Netdisco::JobQueue qw/jq_defer jq_complete/;
|
||||
|
||||
# add dispatch methods for poller tasks
|
||||
with 'App::Netdisco::Backend::Runner';
|
||||
|
||||
sub worker_begin { (shift)->{started} = time }
|
||||
|
||||
sub worker_body {
|
||||
my $self = shift;
|
||||
my $wid = $self->wid;
|
||||
|
||||
while (1) {
|
||||
prctl sprintf 'nd2: #%s poll: idle', $wid;
|
||||
|
||||
my $job = $self->{queue}->dequeue(1);
|
||||
next unless defined $job;
|
||||
|
||||
try {
|
||||
$job->started(scalar localtime);
|
||||
prctl sprintf 'nd2: #%s poll: #%s: %s',
|
||||
$wid, $job->job, $job->summary;
|
||||
info sprintf "pol (%s): starting %s job(%s) at %s",
|
||||
$wid, $job->action, $job->job, $job->started;
|
||||
$self->run($job);
|
||||
}
|
||||
catch {
|
||||
$job->status('error');
|
||||
$job->log("error running job: $_");
|
||||
$self->sendto('stderr', $job->log ."\n");
|
||||
};
|
||||
|
||||
$self->close_job($job);
|
||||
sleep( setting('workers')->{'min_runtime'} || 0 );
|
||||
$self->exit(0); # recycle worker
|
||||
}
|
||||
}
|
||||
|
||||
sub close_job {
|
||||
my ($self, $job) = @_;
|
||||
my $now = scalar localtime;
|
||||
|
||||
info sprintf "pol (%s): wrapping up %s job(%s) - status %s at %s",
|
||||
$self->wid, $job->action, $job->job, $job->status, $now;
|
||||
|
||||
try {
|
||||
if ($job->status eq 'defer') {
|
||||
jq_defer($job);
|
||||
}
|
||||
else {
|
||||
$job->finished($now);
|
||||
jq_complete($job);
|
||||
}
|
||||
}
|
||||
catch { $self->sendto('stderr', "error closing job: $_\n") };
|
||||
}
|
||||
|
||||
1;
|
||||
@@ -1,80 +0,0 @@
|
||||
package App::Netdisco::Backend::Worker::Scheduler;
|
||||
|
||||
use Dancer qw/:moose :syntax :script/;
|
||||
|
||||
use Algorithm::Cron;
|
||||
use App::Netdisco::Util::MCE;
|
||||
|
||||
use Role::Tiny;
|
||||
use namespace::clean;
|
||||
|
||||
use App::Netdisco::JobQueue qw/jq_insert/;
|
||||
|
||||
sub worker_begin {
|
||||
my $self = shift;
|
||||
my $wid = $self->wid;
|
||||
|
||||
return debug "sch ($wid): no need for scheduler... skip begin"
|
||||
unless setting('schedule');
|
||||
|
||||
debug "entering Scheduler ($wid) worker_begin()";
|
||||
|
||||
foreach my $action (keys %{ setting('schedule') }) {
|
||||
my $config = setting('schedule')->{$action};
|
||||
|
||||
# accept either single crontab format, or individual time fields
|
||||
$config->{when} = Algorithm::Cron->new(
|
||||
base => 'local',
|
||||
%{
|
||||
(ref {} eq ref $config->{when})
|
||||
? $config->{when}
|
||||
: {crontab => $config->{when}}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
sub worker_body {
|
||||
my $self = shift;
|
||||
my $wid = $self->wid;
|
||||
|
||||
unless (setting('schedule')) {
|
||||
prctl sprintf 'nd2: #%s sched: inactive', $wid;
|
||||
return debug "sch ($wid): no need for scheduler... quitting"
|
||||
}
|
||||
|
||||
while (1) {
|
||||
# sleep until some point in the next minute
|
||||
my $naptime = 60 - (time % 60) + int(rand(45));
|
||||
|
||||
prctl sprintf 'nd2: #%s sched: idle', $wid;
|
||||
debug "sched ($wid): sleeping for $naptime seconds";
|
||||
|
||||
sleep $naptime;
|
||||
prctl sprintf 'nd2: #%s sched: queueing', $wid;
|
||||
|
||||
# NB next_time() returns the next *after* win_start
|
||||
my $win_start = time - (time % 60) - 1;
|
||||
my $win_end = $win_start + 60;
|
||||
|
||||
# if any job is due, add it to the queue
|
||||
foreach my $action (keys %{ setting('schedule') }) {
|
||||
my $sched = setting('schedule')->{$action};
|
||||
|
||||
# next occurence of job must be in this minute's window
|
||||
debug sprintf "sched ($wid): $action: win_start: %s, win_end: %s, next: %s",
|
||||
$win_start, $win_end, $sched->{when}->next_time($win_start);
|
||||
next unless $sched->{when}->next_time($win_start) <= $win_end;
|
||||
|
||||
# queue it!
|
||||
info "sched ($wid): queueing $action job";
|
||||
jq_insert({
|
||||
action => $action,
|
||||
device => $sched->{device},
|
||||
extra => $sched->{extra},
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
1;
|
||||
Reference in New Issue
Block a user