diff --git a/Netdisco/Changes b/Netdisco/Changes index 7f79d7b6..9a1be270 100644 --- a/Netdisco/Changes +++ b/Netdisco/Changes @@ -7,6 +7,7 @@ [BUG FIXES] * Fix when Interactives number set to zero + * [#121] Daemon crash when restarting with in-progress jobs and many workers 2.028012 - 2014-07-22 diff --git a/Netdisco/bin/netdisco-daemon-fg b/Netdisco/bin/netdisco-daemon-fg index 6b763ab6..f10e332c 100755 --- a/Netdisco/bin/netdisco-daemon-fg +++ b/Netdisco/bin/netdisco-daemon-fg @@ -38,7 +38,7 @@ my $mce = MCE->new( job_delay => 1.15, tmp_dir => $tmp_dir, user_func => sub { $_[0]->worker_body }, - on_post_exit => \&restart_worker, + on_post_exit => \&restart_this_worker, user_tasks => build_tasks_list(), )->run(); @@ -97,7 +97,7 @@ sub worker_factory { }; } -sub restart_worker { +sub restart_this_worker { my ($self, $e) = @_; reset_jobs($e->{wid}); diff --git a/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm b/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm index 13834a3f..b3dbcb9a 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/LocalQueue.pm @@ -5,7 +5,7 @@ use Dancer::Plugin::DBIC 'schema'; use base 'Exporter'; our @EXPORT = (); -our @EXPORT_OK = qw/ add_jobs capacity_for take_jobs reset_jobs/; +our @EXPORT_OK = qw/ add_jobs capacity_for take_jobs reset_jobs release_jobs /; our %EXPORT_TAGS = ( all => \@EXPORT_OK ); schema('daemon')->deploy; @@ -59,4 +59,11 @@ sub reset_jobs { ->update({wid => 0}); } +# not used by workers, only the daemon when reinitializing a worker +sub release_jobs { + my ($jid) = @_; + debug "releasing local job ID $jid"; + $queue->search({job => $jid})->delete; +} + 1; diff --git a/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm index ffd89ef7..dc66f6af 100644 --- a/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm +++ b/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm @@ -143,12 +143,19 @@ sub jq_lock { return $happy; } +# PostgreSQL engine depends on LocalQueue, which is accessed synchronously via +# the main daemon process. This is only used by daemon workers which can use +# MCE ->do() method. sub jq_defer { my $job = shift; my $happy = false; - # lock db row and update to show job is available try { + # other local workers are polling the central queue, so + # to prevent a race, first delete the job in our local queue + MCE->do('release_jobs', $job->id); + + # lock db row and update to show job is available schema('netdisco')->txn_do(sub { schema('netdisco')->resultset('Admin') ->find($job->id, {for => 'update'})