first refactor for MCE::Flow and MCE::Queue

This commit is contained in:
Oliver Gorwits
2014-08-10 12:36:24 +01:00
parent bc8f75f7f8
commit c817a35537
9 changed files with 117 additions and 126 deletions

View File

@@ -9,13 +9,6 @@ __PACKAGE__->table("admin");
__PACKAGE__->add_columns(
"job",
{ data_type => "integer", is_nullable => 0 },
"type", # Poller, Interactive, etc
{ data_type => "text", is_nullable => 0 },
"wid", # worker ID, only != 0 once taken
{ data_type => "integer", is_nullable => 0, default_value => 0 },
"entered",
{ data_type => "timestamp", is_nullable => 1 },
"started",

View File

@@ -1,6 +1,7 @@
package App::Netdisco::Daemon::Worker::Common;
use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';
use Try::Tiny;
use App::Netdisco::Util::Daemon;
@@ -8,7 +9,7 @@ use App::Netdisco::Util::Daemon;
use Role::Tiny;
use namespace::clean;
use App::Netdisco::JobQueue qw/jq_take jq_defer jq_complete/;
use App::Netdisco::JobQueue qw/jq_defer jq_complete/;
sub worker_body {
my $self = shift;
@@ -19,9 +20,11 @@ sub worker_body {
while (1) {
prctl sprintf 'netdisco-daemon: worker #%s %s: idle', $wid, lc($type);
my $jobs = jq_take($self->wid, $type);
my $jobs = [ $self->{Q}->dequeue(1) ]; # FIXME multiple take, take type, thaw
foreach my $job (@$jobs) {
next unless defined $job;
$job = schema('daemon')->dclone( $job );
my $target = $self->munge_action($job->action);
try {

View File

@@ -9,6 +9,7 @@ use Role::Tiny;
use namespace::clean;
use App::Netdisco::JobQueue qw/jq_locked jq_getsome jq_lock/;
use MCE::Util 'get_ncpu';
sub worker_begin {
my $self = shift;
@@ -26,7 +27,7 @@ sub worker_begin {
if (scalar @jobs) {
info sprintf "mgr (%s): found %s jobs booked to this processing node",
$wid, scalar @jobs;
$self->do('add_jobs', @jobs);
$self->{Q}->enqueue(@jobs); # FIXME priority and freeze
}
}
@@ -37,8 +38,8 @@ sub worker_body {
return debug "mgr ($wid): no need for manager... quitting"
if setting('workers')->{'no_manager'};
my $num_slots = sum( 0, map { setting('workers')->{$_} }
values %{setting('job_type_keys')} );
# FIXME really the best strategy?
my $num_slots = (MCE::Util::get_ncpu() * 2) - $self->{Q}->pending();
while (1) {
debug "mgr ($wid): getting potential jobs for $num_slots workers";
@@ -48,25 +49,18 @@ sub worker_body {
# TODO also check for stale jobs in Netdisco DB
foreach my $job ( jq_getsome($num_slots) ) {
# check for available local capacity
my $job_type = setting('job_types')->{$job->action};
next unless $job_type and $self->do('capacity_for', $job_type);
debug sprintf "mgr (%s): processing node has capacity for job %s (%s)",
$wid, $job->id, $job->action;
# mark job as running
next unless jq_lock($job);
info sprintf "mgr (%s): job %s booked out for this processing node",
$wid, $job->id;
# copy job to local queue
$self->do('add_jobs', $job);
$self->{Q}->enqueue($job); # FIXME priority and freeze
}
debug "mgr ($wid): sleeping now...";
prctl sprintf 'netdisco-daemon: worker #%s manager: idle', $wid;
sleep( setting('workers')->{sleep_time} || 2 );
sleep( setting('workers')->{sleep_time} || 1 );
}
}

View File

@@ -13,6 +13,10 @@ use App::Netdisco::JobQueue qw/jq_insert/;
sub worker_begin {
my $self = shift;
my $wid = $self->wid;
return debug "mgr ($wid): no need for scheduler... skip begin"
unless setting('scheduler');
debug "entering Scheduler ($wid) worker_begin()";
foreach my $action (keys %{ setting('schedule') }) {
@@ -34,6 +38,9 @@ sub worker_body {
my $self = shift;
my $wid = $self->wid;
return debug "mgr ($wid): no need for scheduler... quitting"
unless setting('scheduler');
while (1) {
# sleep until some point in the next minute
my $naptime = 60 - (time % 60) + int(rand(45));

View File

@@ -14,7 +14,6 @@ our @EXPORT_OK = qw/
jq_queued
jq_log
jq_userlog
jq_take
jq_lock
jq_defer
jq_complete
@@ -64,12 +63,6 @@ Returns a list of jobs which have been entered into the queue by the passed
C<$user>. Jobs are returned as objects which implement the Netdisco job
instance interface (see below).
=head2 jq_take( $wid, $type, $max? )
Searches in the queue for jobs of type C<$type> and if up to C<$max> are
available, will book them out to the worker with ID C<$wid>. The default
number of booked jobs is 1.
=head2 jq_lock( $job )
Marks a job in the queue as booked out to this processing node (denoted by the

View File

@@ -15,7 +15,6 @@ our @EXPORT_OK = qw/
jq_queued
jq_log
jq_userlog
jq_take
jq_lock
jq_defer
jq_complete
@@ -35,9 +34,8 @@ sub jq_getsome {
);
while (my $job = $rs->next) {
my $job_type = setting('job_types')->{$job->action} or next;
push @returned, schema('daemon')->resultset('Admin')
->new_result({ $job->get_columns, type => $job_type });
->new_result({ $job->get_columns });
}
return @returned;
}
@@ -50,9 +48,8 @@ sub jq_locked {
->search({status => "queued-$fqdn"});
while (my $job = $rs->next) {
my $job_type = setting('job_types')->{$job->action} or next;
push @returned, schema('daemon')->resultset('Admin')
->new_result({ $job->get_columns, type => $job_type });
->new_result({ $job->get_columns });
}
return @returned;
}
@@ -76,9 +73,8 @@ sub jq_log {
});
while (my $job = $rs->next) {
my $job_type = setting('job_types')->{$job->action} or next;
push @returned, schema('daemon')->resultset('Admin')
->new_result({ $job->get_columns, type => $job_type });
->new_result({ $job->get_columns });
}
return @returned;
}
@@ -93,28 +89,12 @@ sub jq_userlog {
});
while (my $job = $rs->next) {
my $job_type = setting('job_types')->{$job->action} or next;
push @returned, schema('daemon')->resultset('Admin')
->new_result({ $job->get_columns, type => $job_type });
->new_result({ $job->get_columns });
}
return @returned;
}
# PostgreSQL engine depends on LocalQueue, which is accessed synchronously via
# the main daemon process. This is only used by daemon workers which can use
# MCE ->do() method.
sub jq_take {
my ($wid, $type) = @_;
Module::Load::load 'MCE';
# be polite to SQLite database (that is, local CPU)
debug "$type ($wid): sleeping now...";
sleep(1);
debug "$type ($wid): asking for a job";
MCE->do('take_jobs', $wid, $type);
}
sub jq_lock {
my $job = shift;
my $fqdn = hostfqdn || 'localhost';
@@ -143,18 +123,11 @@ sub jq_lock {
return $happy;
}
# PostgreSQL engine depends on LocalQueue, which is accessed synchronously via
# the main daemon process. This is only used by daemon workers which can use
# MCE ->do() method.
sub jq_defer {
my $job = shift;
my $happy = false;
try {
# other local workers are polling the central queue, so
# to prevent a race, first delete the job in our local queue
MCE->do('release_jobs', $job->id);
# lock db row and update to show job is available
schema('netdisco')->txn_do(sub {
schema('netdisco')->resultset('Admin')