get rid of horrid OO jobqueue interface
This commit is contained in:
@@ -1,19 +0,0 @@
|
|||||||
package App::Netdisco::Daemon::JobQueue;
|
|
||||||
|
|
||||||
use Role::Tiny;
|
|
||||||
use namespace::clean;
|
|
||||||
|
|
||||||
use Module::Load ();
|
|
||||||
Module::Load::load_remote 'JobQueue' => 'App::Netdisco::JobQueue' => ':all';
|
|
||||||
|
|
||||||
# central queue
|
|
||||||
sub jq_getsome { shift and JobQueue::jq_getsome(@_) }
|
|
||||||
sub jq_locked { shift and JobQueue::jq_locked(@_) }
|
|
||||||
sub jq_queued { shift and JobQueue::jq_queued(@_) }
|
|
||||||
sub jq_take { shift and JobQueue::jq_take(@_) }
|
|
||||||
sub jq_lock { shift and JobQueue::jq_lock(@_) }
|
|
||||||
sub jq_defer { shift and JobQueue::jq_defer(@_) }
|
|
||||||
sub jq_complete { shift and JobQueue::jq_complete(@_) }
|
|
||||||
sub jq_insert { shift and JobQueue::jq_insert(@_) }
|
|
||||||
|
|
||||||
1;
|
|
||||||
@@ -6,7 +6,7 @@ use Try::Tiny;
|
|||||||
use Role::Tiny;
|
use Role::Tiny;
|
||||||
use namespace::clean;
|
use namespace::clean;
|
||||||
|
|
||||||
with 'App::Netdisco::Daemon::JobQueue';
|
use App::Netdisco::JobQueue qw/jq_take jq_defer jq_complete/;
|
||||||
|
|
||||||
sub worker_body {
|
sub worker_body {
|
||||||
my $self = shift;
|
my $self = shift;
|
||||||
@@ -16,7 +16,7 @@ sub worker_body {
|
|||||||
my $type = $self->worker_type;
|
my $type = $self->worker_type;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
my $jobs = $self->jq_take($self->wid, $type);
|
my $jobs = jq_take($self->wid, $type);
|
||||||
|
|
||||||
foreach my $job (@$jobs) {
|
foreach my $job (@$jobs) {
|
||||||
my $target = $self->munge_action($job->action);
|
my $target = $self->munge_action($job->action);
|
||||||
@@ -50,11 +50,11 @@ sub close_job {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
if ($job->status eq 'defer') {
|
if ($job->status eq 'defer') {
|
||||||
$self->jq_defer($job);
|
jq_defer($job);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
$job->finished($now);
|
$job->finished($now);
|
||||||
$self->jq_complete($job);
|
jq_complete($job);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch { $self->sendto('stderr', "error closing job: $_\n") };
|
catch { $self->sendto('stderr', "error closing job: $_\n") };
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ use Role::Tiny;
|
|||||||
use namespace::clean;
|
use namespace::clean;
|
||||||
|
|
||||||
use List::Util 'sum';
|
use List::Util 'sum';
|
||||||
with 'App::Netdisco::Daemon::JobQueue';
|
use App::Netdisco::JobQueue qw/jq_locked jq_getsome jq_lock/;
|
||||||
|
|
||||||
sub worker_begin {
|
sub worker_begin {
|
||||||
my $self = shift;
|
my $self = shift;
|
||||||
@@ -19,7 +19,7 @@ sub worker_begin {
|
|||||||
|
|
||||||
# requeue jobs locally
|
# requeue jobs locally
|
||||||
debug "mgr ($wid): searching for jobs booked to this processing node";
|
debug "mgr ($wid): searching for jobs booked to this processing node";
|
||||||
my @jobs = $self->jq_locked;
|
my @jobs = jq_locked;
|
||||||
|
|
||||||
if (scalar @jobs) {
|
if (scalar @jobs) {
|
||||||
info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs;
|
info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs;
|
||||||
@@ -42,7 +42,7 @@ sub worker_body {
|
|||||||
|
|
||||||
# get some pending jobs
|
# get some pending jobs
|
||||||
# TODO also check for stale jobs in Netdisco DB
|
# TODO also check for stale jobs in Netdisco DB
|
||||||
foreach my $job ( $self->jq_getsome($num_slots) ) {
|
foreach my $job ( jq_getsome($num_slots) ) {
|
||||||
|
|
||||||
# check for available local capacity
|
# check for available local capacity
|
||||||
my $job_type = setting('job_types')->{$job->action};
|
my $job_type = setting('job_types')->{$job->action};
|
||||||
@@ -51,7 +51,7 @@ sub worker_body {
|
|||||||
$wid, $job->id, $job->action;
|
$wid, $job->id, $job->action;
|
||||||
|
|
||||||
# mark job as running
|
# mark job as running
|
||||||
next unless $self->jq_lock($job);
|
next unless jq_lock($job);
|
||||||
info sprintf "mgr (%s): job %s booked out for this processing node",
|
info sprintf "mgr (%s): job %s booked out for this processing node",
|
||||||
$wid, $job->id;
|
$wid, $job->id;
|
||||||
|
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ use Algorithm::Cron;
|
|||||||
use Role::Tiny;
|
use Role::Tiny;
|
||||||
use namespace::clean;
|
use namespace::clean;
|
||||||
|
|
||||||
with 'App::Netdisco::Daemon::JobQueue';
|
use App::Netdisco::JobQueue qw/jq_insert/;
|
||||||
|
|
||||||
sub worker_begin {
|
sub worker_begin {
|
||||||
my $self = shift;
|
my $self = shift;
|
||||||
@@ -53,7 +53,7 @@ sub worker_body {
|
|||||||
|
|
||||||
# queue it!
|
# queue it!
|
||||||
info "sched ($wid): queueing $action job";
|
info "sched ($wid): queueing $action job";
|
||||||
$self->jq_insert({
|
jq_insert({
|
||||||
action => $action,
|
action => $action,
|
||||||
device => $sched->{device},
|
device => $sched->{device},
|
||||||
extra => $sched->{extra},
|
extra => $sched->{extra},
|
||||||
|
|||||||
Reference in New Issue
Block a user