move worker sleep into jobqueue

This commit is contained in:
Oliver Gorwits
2014-05-17 22:14:44 +01:00
parent c83b999597
commit e80c575c57
4 changed files with 19 additions and 14 deletions

View File

@@ -12,19 +12,18 @@ sub worker_body {
my $self = shift; my $self = shift;
my $wid = $self->wid; my $wid = $self->wid;
my $tag = $self->worker_tag;
my $type = $self->worker_type; my $type = $self->worker_type;
my $name = $self->worker_name;
while (1) { while (1) {
debug "$type ($wid): asking for a job"; my $jobs = $self->jq_take($self->wid, $type);
my $jobs = $self->jq_take($self->wid, $name);
foreach my $job (@$jobs) { foreach my $job (@$jobs) {
my $target = $self->munge_action($job->action); my $target = $self->munge_action($job->action);
try { try {
$job->started(scalar localtime); $job->started(scalar localtime);
info sprintf "$type (%s): starting %s job(%s) at %s", info sprintf "$tag (%s): starting %s job(%s) at %s",
$wid, $target, $job->id, $job->started; $wid, $target, $job->id, $job->started;
my ($status, $log) = $self->$target($job); my ($status, $log) = $self->$target($job);
$job->status($status); $job->status($status);
@@ -38,18 +37,15 @@ sub worker_body {
$self->close_job($job); $self->close_job($job);
} }
debug "$type ($wid): sleeping now...";
sleep(1);
} }
} }
sub close_job { sub close_job {
my ($self, $job) = @_; my ($self, $job) = @_;
my $type = $self->worker_type; my $tag = $self->worker_tag;
my $now = scalar localtime; my $now = scalar localtime;
info sprintf "$type (%s): wrapping up %s job(%s) - status %s at %s", info sprintf "$tag (%s): wrapping up %s job(%s) - status %s at %s",
$self->wid, $job->action, $job->id, $job->status, $now; $self->wid, $job->action, $job->id, $job->status, $now;
try { try {

View File

@@ -10,8 +10,8 @@ with 'App::Netdisco::Daemon::Worker::Common';
with 'App::Netdisco::Daemon::Worker::Interactive::DeviceActions', with 'App::Netdisco::Daemon::Worker::Interactive::DeviceActions',
'App::Netdisco::Daemon::Worker::Interactive::PortActions'; 'App::Netdisco::Daemon::Worker::Interactive::PortActions';
sub worker_type { 'int' } sub worker_tag { 'int' }
sub worker_name { 'Interactive' } sub worker_type { 'Interactive' }
sub munge_action { 'set_' . $_[1] } sub munge_action { 'set_' . $_[1] }
1; 1;

View File

@@ -13,8 +13,8 @@ with 'App::Netdisco::Daemon::Worker::Poller::Device',
'App::Netdisco::Daemon::Worker::Poller::Nbtstat', 'App::Netdisco::Daemon::Worker::Poller::Nbtstat',
'App::Netdisco::Daemon::Worker::Poller::Expiry'; 'App::Netdisco::Daemon::Worker::Poller::Expiry';
sub worker_type { 'pol' } sub worker_tag { 'pol' }
sub worker_name { 'Poller' } sub worker_type { 'Poller' }
sub munge_action { $_[1] } sub munge_action { $_[1] }
1; 1;

View File

@@ -102,7 +102,16 @@ sub jq_userlog {
# PostgreSQL engine depends on LocalQueue, which is accessed synchronously via # PostgreSQL engine depends on LocalQueue, which is accessed synchronously via
# the main daemon process. This is only used by daemon workers which can use # the main daemon process. This is only used by daemon workers which can use
# MCE ->do() method. # MCE ->do() method.
sub jq_take { (shift)->do('take_jobs', @_) } sub jq_take {
my ($self, $wid, $type) = @_;
# be polite to SQLite database (that is, local CPU)
debug "$type ($wid): sleeping now...";
sleep(1);
debug "$type ($wid): asking for a job";
$self->do('take_jobs', $wid, $type);
}
sub jq_lock { sub jq_lock {
my $job = shift; my $job = shift;