Files
netdisco/lib/App/Netdisco/Backend/Worker/Manager.pm
Oliver Gorwits d74ccac4f6 rename *-daemon apps to be *-backend
Squashed commit of the following:

commit 39b438aa4b
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 6 16:40:11 2017 +0100

    add release notes

commit ca4ea90d35
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 6 16:32:06 2017 +0100

    update distmeta

commit 4e35b904b0
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 6 16:30:22 2017 +0100

    rename files from Daemon to Backend

commit 86a605ba68
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 6 16:26:43 2017 +0100

    rename daemon to backend in code

commit ffe8fc180f
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 6 16:15:57 2017 +0100

    add daemon files which exec to backend equivalents

commit 53e041594e
Author: Oliver Gorwits <oliver@cpan.org>
Date:   Sat May 6 15:32:49 2017 +0100

    rename netdisco-daemon to netdisco-backend
2017-05-06 16:40:48 +01:00

87 lines
2.5 KiB
Perl

package App::Netdisco::Backend::Worker::Manager;
use Dancer qw/:moose :syntax :script/;
use List::Util 'sum';
use App::Netdisco::Util::Backend;
use Role::Tiny;
use namespace::clean;
use App::Netdisco::JobQueue qw/jq_locked jq_getsome jq_getsomep jq_lock/;
sub worker_begin {
my $self = shift;
my $wid = $self->wid;
return debug "mgr ($wid): no need for manager... skip begin"
if setting('workers')->{'no_manager'};
debug "entering Manager ($wid) worker_begin()";
# requeue jobs locally
debug "mgr ($wid): searching for jobs booked to this processing node";
my @jobs = jq_locked;
if (scalar @jobs) {
info sprintf "mgr (%s): found %s jobs booked to this processing node",
$wid, scalar @jobs;
$self->{queue}->enqueuep(100, @jobs);
}
}
sub worker_body {
my $self = shift;
my $wid = $self->wid;
if (setting('workers')->{'no_manager'}) {
prctl sprintf 'netdisco-backend: worker #%s manager: inactive', $wid;
return debug "mgr ($wid): no need for manager... quitting"
}
while (1) {
prctl sprintf 'netdisco-backend: worker #%s manager: gathering', $wid;
my $num_slots = 0;
$num_slots = parse_max_workers( setting('workers')->{tasks} )
- $self->{queue}->pending();
debug "mgr ($wid): getting potential jobs for $num_slots workers (HP)";
# get some high priority jobs
# TODO also check for stale jobs in Netdisco DB
foreach my $job ( jq_getsomep($num_slots) ) {
# mark job as running
next unless jq_lock($job);
info sprintf "mgr (%s): job %s booked out for this processing node",
$wid, $job->job;
# copy job to local queue
$self->{queue}->enqueuep(100, $job);
}
$num_slots = parse_max_workers( setting('workers')->{tasks} )
- $self->{queue}->pending();
debug "mgr ($wid): getting potential jobs for $num_slots workers (NP)";
# get some normal priority jobs
# TODO also check for stale jobs in Netdisco DB
foreach my $job ( jq_getsome($num_slots) ) {
# mark job as running
next unless jq_lock($job);
info sprintf "mgr (%s): job %s booked out for this processing node",
$wid, $job->job;
# copy job to local queue
$self->{queue}->enqueue($job);
}
debug "mgr ($wid): sleeping now...";
prctl sprintf 'netdisco-backend: worker #%s manager: idle', $wid;
sleep( setting('workers')->{sleep_time} || 1 );
}
}
1;