Daemon crash when restarting with in-progress jobs and many workers

This commit is contained in:
Oliver Gorwits
2014-07-31 00:55:56 +01:00
parent 8e35a7158e
commit 2402855797
4 changed files with 19 additions and 4 deletions

View File

@@ -7,6 +7,7 @@
[BUG FIXES] [BUG FIXES]
* Fix when Interactives number set to zero * Fix when Interactives number set to zero
* [#121] Daemon crash when restarting with in-progress jobs and many workers
2.028012 - 2014-07-22 2.028012 - 2014-07-22

View File

@@ -38,7 +38,7 @@ my $mce = MCE->new(
job_delay => 1.15, job_delay => 1.15,
tmp_dir => $tmp_dir, tmp_dir => $tmp_dir,
user_func => sub { $_[0]->worker_body }, user_func => sub { $_[0]->worker_body },
on_post_exit => \&restart_worker, on_post_exit => \&restart_this_worker,
user_tasks => build_tasks_list(), user_tasks => build_tasks_list(),
)->run(); )->run();
@@ -97,7 +97,7 @@ sub worker_factory {
}; };
} }
sub restart_worker { sub restart_this_worker {
my ($self, $e) = @_; my ($self, $e) = @_;
reset_jobs($e->{wid}); reset_jobs($e->{wid});

View File

@@ -5,7 +5,7 @@ use Dancer::Plugin::DBIC 'schema';
use base 'Exporter'; use base 'Exporter';
our @EXPORT = (); our @EXPORT = ();
our @EXPORT_OK = qw/ add_jobs capacity_for take_jobs reset_jobs/; our @EXPORT_OK = qw/ add_jobs capacity_for take_jobs reset_jobs release_jobs /;
our %EXPORT_TAGS = ( all => \@EXPORT_OK ); our %EXPORT_TAGS = ( all => \@EXPORT_OK );
schema('daemon')->deploy; schema('daemon')->deploy;
@@ -59,4 +59,11 @@ sub reset_jobs {
->update({wid => 0}); ->update({wid => 0});
} }
# not used by workers, only the daemon when reinitializing a worker
sub release_jobs {
my ($jid) = @_;
debug "releasing local job ID $jid";
$queue->search({job => $jid})->delete;
}
1; 1;

View File

@@ -143,12 +143,19 @@ sub jq_lock {
return $happy; return $happy;
} }
# PostgreSQL engine depends on LocalQueue, which is accessed synchronously via
# the main daemon process. This is only used by daemon workers which can use
# MCE ->do() method.
sub jq_defer { sub jq_defer {
my $job = shift; my $job = shift;
my $happy = false; my $happy = false;
# lock db row and update to show job is available
try { try {
# other local workers are polling the central queue, so
# to prevent a race, first delete the job in our local queue
MCE->do('release_jobs', $job->id);
# lock db row and update to show job is available
schema('netdisco')->txn_do(sub { schema('netdisco')->txn_do(sub {
schema('netdisco')->resultset('Admin') schema('netdisco')->resultset('Admin')
->find($job->id, {for => 'update'}) ->find($job->id, {for => 'update'})