From c6925d1c1a2f4aaeb99e0581a2badac613743380 Mon Sep 17 00:00:00 2001 From: Oliver Gorwits Date: Wed, 16 Oct 2013 23:38:58 +0100 Subject: [PATCH] Remove dupe restriction on job queue, try for smarter *all queuing (closes #41) --- Netdisco/Changes | 1 + Netdisco/lib/App/Netdisco/DB.pm | 2 +- .../lib/App/Netdisco/DB/ResultSet/Admin.pm | 4 ++++ .../App-Netdisco-DB-27-28-PostgreSQL.sql | 6 ++++++ .../Netdisco/Daemon/Worker/Poller/Common.pm | 21 ++++++++----------- .../Netdisco/Daemon/Worker/Poller/Device.pm | 21 ++++++++----------- 6 files changed, 30 insertions(+), 25 deletions(-) create mode 100644 Netdisco/lib/App/Netdisco/DB/schema_versions/App-Netdisco-DB-27-28-PostgreSQL.sql diff --git a/Netdisco/Changes b/Netdisco/Changes index 6c37f39d..beaec18a 100644 --- a/Netdisco/Changes +++ b/Netdisco/Changes @@ -5,6 +5,7 @@ * Update Print media CSS to handle new UI components * Deadlock in Discover over access to the device_ports table * Set canonical IP failed on synthesized cols (and was wrong anyway) (closes #35) + * Remove dupe restriction on job queue, try for smarter *all queuing (closes #41) 2.018000 - 2013-10-08 diff --git a/Netdisco/lib/App/Netdisco/DB.pm b/Netdisco/lib/App/Netdisco/DB.pm index 7ed8e4f8..be986637 100644 --- a/Netdisco/lib/App/Netdisco/DB.pm +++ b/Netdisco/lib/App/Netdisco/DB.pm @@ -8,7 +8,7 @@ use base 'DBIx::Class::Schema'; __PACKAGE__->load_namespaces; -our $VERSION = 27; # schema version used for upgrades, keep as integer +our $VERSION = 28; # schema version used for upgrades, keep as integer use Path::Class; use File::Basename; diff --git a/Netdisco/lib/App/Netdisco/DB/ResultSet/Admin.pm b/Netdisco/lib/App/Netdisco/DB/ResultSet/Admin.pm index 21c033ab..2777454b 100644 --- a/Netdisco/lib/App/Netdisco/DB/ResultSet/Admin.pm +++ b/Netdisco/lib/App/Netdisco/DB/ResultSet/Admin.pm @@ -4,6 +4,10 @@ use base 'DBIx::Class::ResultSet'; use strict; use warnings FATAL => 'all'; +__PACKAGE__->load_components(qw/ + +App::Netdisco::DB::ExplicitLocking +/); + =head1 ADDITIONAL METHODS =head2 with_times diff --git a/Netdisco/lib/App/Netdisco/DB/schema_versions/App-Netdisco-DB-27-28-PostgreSQL.sql b/Netdisco/lib/App/Netdisco/DB/schema_versions/App-Netdisco-DB-27-28-PostgreSQL.sql new file mode 100644 index 00000000..931c9abf --- /dev/null +++ b/Netdisco/lib/App/Netdisco/DB/schema_versions/App-Netdisco-DB-27-28-PostgreSQL.sql @@ -0,0 +1,6 @@ + +BEGIN; + +DROP INDEX IF EXISTS jobs_queued; + +COMMIT; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Common.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Common.pm index c780cf63..2539cdae 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Common.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Common.pm @@ -16,8 +16,15 @@ use namespace::clean; sub _walk_body { my ($self, $job_type, $job) = @_; - my $devices = schema('netdisco')->resultset('Device')->get_column('ip'); my $jobqueue = schema('netdisco')->resultset('Admin'); + my $devices = schema('netdisco')->resultset('Device') + ->search({ip => { -not_in => + $jobqueue->search({ + device => { '!=' => undef}, + action => $job_type, + status => { -like => 'queued%' }, + })->get_column('device')->as_query + }})->get_column('ip'); if ($job->subaction and $job->subaction eq 'after-discoverall') { # make sure there are no incomplete discover jobs queued @@ -29,17 +36,7 @@ sub _walk_body { if $discover; } - schema('netdisco')->txn_do(sub { - # clean up user submitted jobs older than 1min, - # assuming skew between schedulers' clocks is not greater than 1min - $jobqueue->search({ - action => $job_type, - status => 'queued', - entered => { '<' => \"(now() - interval '1 minute')" }, - })->delete; - - # is scuppered by any user job submitted in last 1min (bad), or - # any similar job from another scheduler (good) + schema('netdisco')->resultset('Admin')->txn_do_locked(sub { $jobqueue->populate([ map {{ device => $_, diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Device.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Device.pm index cf931f85..dcb42ec1 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Device.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Poller/Device.pm @@ -17,20 +17,17 @@ use namespace::clean; sub discoverall { my ($self, $job) = @_; - my $devices = schema('netdisco')->resultset('Device')->get_column('ip'); my $jobqueue = schema('netdisco')->resultset('Admin'); + my $devices = schema('netdisco')->resultset('Device') + ->search({ip => { -not_in => + $jobqueue->search({ + device => { '!=' => undef}, + action => 'discover', + status => { -like => 'queued%' }, + })->get_column('device')->as_query + }})->get_column('ip'); - schema('netdisco')->txn_do(sub { - # clean up user submitted jobs older than 1min, - # assuming skew between schedulers' clocks is not greater than 1min - $jobqueue->search({ - action => 'discover', - status => 'queued', - entered => { '<' => \"(now() - interval '1 minute')" }, - })->delete; - - # is scuppered by any user job submitted in last 1min (bad), or - # any similar job from another scheduler (good) + schema('netdisco')->resultset('Admin')->txn_do_locked(sub { $jobqueue->populate([ map {{ device => $_,