diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm index 7811b53f..72850e8f 100644 --- a/lib/App/Netdisco/Backend/Job.pm +++ b/lib/App/Netdisco/Backend/Job.pm @@ -19,6 +19,7 @@ foreach my $slot (qw/ username userip log + device_key _current_phase _last_namespace diff --git a/lib/App/Netdisco/Backend/Role/Manager.pm b/lib/App/Netdisco/Backend/Role/Manager.pm index 25d13ba7..d594df4d 100644 --- a/lib/App/Netdisco/Backend/Role/Manager.pm +++ b/lib/App/Netdisco/Backend/Role/Manager.pm @@ -34,6 +34,16 @@ sub worker_begin { } } +# creates a 'signature' for each job so that we can check for duplicates ... +# it happens from time to time due to the distributed nature of the job queue +# and manager(s) - also kinder to the DB to skip here rather than jq_lock() +my $memoize = sub { + no warnings 'uninitialized'; + my $job = shift; + return join chr(28), map {$job->{$_}} + (qw/action port subaction/, ($job->{device_key} ? 'device_key' : 'device')); +}; + sub worker_body { my $self = shift; my $wid = $self->wid; @@ -46,6 +56,7 @@ sub worker_body { while (1) { prctl sprintf 'nd2: #%s mgr: gathering', $wid; my $num_slots = 0; + my %seen_job = (); $num_slots = parse_max_workers( setting('workers')->{tasks} ) - $self->{queue}->pending(); @@ -54,6 +65,7 @@ sub worker_body { # get some high priority jobs # TODO also check for stale jobs in Netdisco DB foreach my $job ( jq_getsomep($num_slots) ) { + next if $seen_job{ $memoize->($job) }++; # mark job as running next unless jq_lock($job); @@ -71,6 +83,7 @@ sub worker_body { # get some normal priority jobs # TODO also check for stale jobs in Netdisco DB foreach my $job ( jq_getsome($num_slots) ) { + next if $seen_job{ $memoize->($job) }++; # mark job as running next unless jq_lock($job); @@ -81,6 +94,11 @@ sub worker_body { $self->{queue}->enqueue($job); } + #if (scalar grep {$_ > 1} values %seen_job) { + # debug 'WARNING: saw duplicate jobs after getsome()'; + # use DDP; debug p %seen_job; + #} + debug "mgr ($wid): sleeping now..."; prctl sprintf 'nd2: #%s mgr: idle', $wid; sleep( setting('workers')->{sleep_time} || 1 ); diff --git a/lib/App/Netdisco/DB.pm b/lib/App/Netdisco/DB.pm index 808b45b6..7e864c36 100644 --- a/lib/App/Netdisco/DB.pm +++ b/lib/App/Netdisco/DB.pm @@ -11,7 +11,7 @@ __PACKAGE__->load_namespaces( ); our # try to hide from kwalitee - $VERSION = 44; # schema version used for upgrades, keep as integer + $VERSION = 45; # schema version used for upgrades, keep as integer use Path::Class; use File::ShareDir 'dist_dir'; diff --git a/lib/App/Netdisco/DB/Result/Admin.pm b/lib/App/Netdisco/DB/Result/Admin.pm index 27e0cf9f..c3075c08 100644 --- a/lib/App/Netdisco/DB/Result/Admin.pm +++ b/lib/App/Netdisco/DB/Result/Admin.pm @@ -46,6 +46,8 @@ __PACKAGE__->add_columns( { data_type => "text", is_nullable => 1 }, "debug", { data_type => "boolean", is_nullable => 1 }, + "device_key", + { data_type => "text", is_nullable => 1 }, ); diff --git a/lib/App/Netdisco/DB/Result/Device.pm b/lib/App/Netdisco/DB/Result/Device.pm index 3d3e0a74..e3a794d8 100644 --- a/lib/App/Netdisco/DB/Result/Device.pm +++ b/lib/App/Netdisco/DB/Result/Device.pm @@ -238,6 +238,7 @@ sub renumber { if $new_ip eq '0.0.0.0' or $new_ip eq '127.0.0.1'; + # Community is not included as SNMP::test_connection will take care of it foreach my $set (qw/ DeviceIp DeviceModule @@ -259,10 +260,6 @@ sub renumber { ->search({remote_ip => $old_ip}) ->update({remote_ip => $new_ip}); - $schema->resultset('Admin') - ->search({device => $old_ip}) - ->update({device => $new_ip}); - $schema->resultset('Node') ->search({switch => $old_ip}) ->update({switch => $new_ip}); diff --git a/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/lib/App/Netdisco/JobQueue/PostgreSQL.pm index d7b73d53..859e9380 100644 --- a/lib/App/Netdisco/JobQueue/PostgreSQL.pm +++ b/lib/App/Netdisco/JobQueue/PostgreSQL.pm @@ -13,11 +13,11 @@ use Try::Tiny; use base 'Exporter'; our @EXPORT = (); our @EXPORT_OK = qw/ + jq_warm_thrusters jq_getsome jq_getsomep jq_locked jq_queued - jq_warm_thrusters jq_lock jq_defer jq_complete @@ -28,67 +28,6 @@ our @EXPORT_OK = qw/ /; our %EXPORT_TAGS = ( all => \@EXPORT_OK ); -sub _getsome { - my ($num_slots, $where) = @_; - return () if ((!defined $num_slots) or ($num_slots < 1)); - return () if ((!defined $where) or (ref {} ne ref $where)); - - my $jobs = schema('netdisco')->resultset('Admin'); - my $rs = $jobs->search({ - status => 'queued', - device => { '-not_in' => - $jobs->skipped(setting('workers')->{'BACKEND'}, - setting('workers')->{'max_deferrals'}, - setting('workers')->{'retry_after'}) - ->columns('device')->as_query }, - %$where, - }, { order_by => 'random()', rows => $num_slots }); - - my @returned = (); - while (my $job = $rs->next) { - push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns }); - } - return @returned; -} - -sub jq_getsome { - return _getsome(shift, - { action => { -in => setting('job_prio')->{'normal'} } } - ); -} - -sub jq_getsomep { - return _getsome(shift, { - -or => [{ - username => { '!=' => undef }, - action => { -in => setting('job_prio')->{'normal'} }, - },{ - action => { -in => setting('job_prio')->{'high'} }, - }], - }); -} - -sub jq_locked { - my @returned = (); - my $rs = schema('netdisco')->resultset('Admin') - ->search({ status => ('queued-'. setting('workers')->{'BACKEND'}) }); - - while (my $job = $rs->next) { - push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns }); - } - return @returned; -} - -sub jq_queued { - my $job_type = shift; - - return schema('netdisco')->resultset('Admin')->search({ - device => { '!=' => undef}, - action => $job_type, - status => { -like => 'queued%' }, - })->get_column('device')->all; -} - # given a device, tests if any of the primary acls applies # returns a list of job actions to be denied/skipped on this host. sub _get_denied_actions { @@ -130,49 +69,126 @@ sub jq_warm_thrusters { }); } +sub _getsome { + my ($num_slots, $where) = @_; + return () if ((!defined $num_slots) or ($num_slots < 1)); + return () if ((!defined $where) or (ref {} ne ref $where)); + + my $jobs = schema('netdisco')->resultset('Admin'); + my $rs = $jobs->search({ + status => 'queued', + device => { '-not_in' => + $jobs->skipped(setting('workers')->{'BACKEND'}, + setting('workers')->{'max_deferrals'}, + setting('workers')->{'retry_after'}) + ->columns('device')->as_query }, + %$where, + }, { order_by => 'random()', rows => $num_slots }); + + my @returned = (); + while (my $job = $rs->next) { + if ($job->device) { + # need to handle device discovered since backend daemon started + # and the skiplist was primed. these should be checked against + # the various acls and have device_skip entry added if needed, + # and return false if it should have been skipped. + my @badactions = _get_denied_actions($job->device); + if (scalar @badactions) { + schema('netdisco')->resultset('DeviceSkip')->find_or_create({ + backend => setting('workers')->{'BACKEND'}, device => $job->device, + },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions); + + # will now not be selected in a future _getsome() + next if scalar grep {$_ eq $job->action} @badactions; + } + } + + # remove any duplicate jobs, incuding possibly this job if there + # is already an equivalent job running + + my %job_properties = ( + action => $job->action, + port => $job->port, + subaction => $job->subaction, + -or => [ + { device => $job->device }, + ($job->device_key ? ({ device_key => $job->device_key }) : ()), + ], + ); + + my $gone = $jobs->search({ + status => 'queued', + -and => [ + %job_properties, + -or => [{ + job => { '!=' => $job->id }, + },{ + job => $job->id, + -exists => $jobs->search({ + status => { -like => 'queued-%' }, + %job_properties, + })->as_query, + }], + ], + }, {for => 'update'}) + ->update({ status => 'error', log => (sprintf 'duplicate of %s', $job->id) }); + + debug sprintf 'getsome: cancelled %s duplicate(s) of job %s', ($gone || 0), $job->id; + push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns }); + } + + return @returned; +} + +sub jq_getsome { + return _getsome(shift, + { action => { -in => setting('job_prio')->{'normal'} } } + ); +} + +sub jq_getsomep { + return _getsome(shift, { + -or => [{ + username => { '!=' => undef }, + action => { -in => setting('job_prio')->{'normal'} }, + },{ + action => { -in => setting('job_prio')->{'high'} }, + }], + }); +} + +sub jq_locked { + my @returned = (); + my $rs = schema('netdisco')->resultset('Admin') + ->search({ status => ('queued-'. setting('workers')->{'BACKEND'}) }); + + while (my $job = $rs->next) { + push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns }); + } + return @returned; +} + +sub jq_queued { + my $job_type = shift; + + return schema('netdisco')->resultset('Admin')->search({ + device => { '!=' => undef}, + action => $job_type, + status => { -like => 'queued%' }, + })->get_column('device')->all; +} + sub jq_lock { my $job = shift; my $happy = false; - if ($job->device) { - # need to handle device discovered since backend daemon started - # and the skiplist was primed. these should be checked against - # the various acls and have device_skip entry added if needed, - # and return false if it should have been skipped. - my @badactions = _get_denied_actions($job->device); - if (scalar @badactions) { - schema('netdisco')->resultset('DeviceSkip')->find_or_create({ - backend => setting('workers')->{'BACKEND'}, device => $job->device, - },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions); - - return false if scalar grep {$_ eq $job->action} @badactions; - } - } - # lock db row and update to show job has been picked try { - schema('netdisco')->txn_do(sub { - schema('netdisco')->resultset('Admin') - ->search({ job => $job->id }, { for => 'update' }) - ->update({ status => ('queued-'. setting('workers')->{'BACKEND'}) }); + my $updated = schema('netdisco')->resultset('Admin') + ->search({ job => $job->id, status => 'queued' }, { for => 'update' }) + ->update({ status => ('queued-'. setting('workers')->{'BACKEND'}) }); - return unless - schema('netdisco')->resultset('Admin') - ->count({ job => $job->id, - status => ('queued-'. setting('workers')->{'BACKEND'}) }); - - # remove any duplicate jobs, needed because we have race conditions - # when queueing jobs of a type for all devices - schema('netdisco')->resultset('Admin')->search({ - status => 'queued', - device => $job->device, - port => $job->port, - action => $job->action, - subaction => $job->subaction, - }, {for => 'update'})->delete(); - - $happy = true; - }); + $happy = true if $updated > 0; } catch { error $_; @@ -243,6 +259,7 @@ sub jq_complete { $happy = true; } catch { + # use DDP; p $job; error $_; }; @@ -274,13 +291,14 @@ sub jq_insert { schema('netdisco')->txn_do(sub { schema('netdisco')->resultset('Admin')->populate([ map {{ - device => $_->{device}, - port => $_->{port}, - action => $_->{action}, - subaction => ($_->{extra} || $_->{subaction}), - username => $_->{username}, - userip => $_->{userip}, - status => 'queued', + device => $_->{device}, + device_key => $_->{device_key}, + port => $_->{port}, + action => $_->{action}, + subaction => ($_->{extra} || $_->{subaction}), + username => $_->{username}, + userip => $_->{userip}, + status => 'queued', }} @$jobs ]); }); diff --git a/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm b/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm index 105f4185..c6335187 100644 --- a/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm +++ b/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm @@ -72,6 +72,10 @@ register_worker({ phase => 'main', driver => 'snmp' }, sub { $device->renumber($new_ip) or die "cannot renumber to: $new_ip"; # rollback + # is not done in renumber but required otherwise confusing at job end! + schema('netdisco')->resultset('Admin') + ->find({job => $job->id})->update({device => $new_ip}); + return Status->noop(sprintf ' [%s] device - changed IP to %s (%s)', $old_ip, $device->ip, ($device->dns || '')); }); diff --git a/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm b/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm index aecec0e9..a3b6b20a 100644 --- a/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm +++ b/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm @@ -39,11 +39,13 @@ register_worker({ phase => 'main', driver => 'snmp' }, sub { or return Status->defer("discover failed: could not SNMP connect to $device"); my @to_discover = store_neighbors($device); + my %seen_id = (); # only enqueue if device is not already discovered, # discover_* config permits the discovery foreach my $neighbor (@to_discover) { - my ($ip, $remote_type) = @$neighbor; + my ($ip, $remote_type, $remote_id) = @$neighbor; + next if $remote_id and $seen_id{ $remote_id }++; my $device = get_device($ip); next if $device->in_storage; @@ -55,10 +57,14 @@ register_worker({ phase => 'main', driver => 'snmp' }, sub { next; } + # risk of things going wrong...? + # https://quickview.cloudapps.cisco.com/quickview/bug/CSCur12254 + jq_insert({ device => $ip, action => 'discover', subaction => 'with-nodes', + ($remote_id ? (device_key => $remote_id) : ()), }); } }); @@ -171,7 +177,7 @@ sub store_neighbors { # useable remote IP... if ($remote_ip eq '0.0.0.0' or - check_acl_no($remote_ip, 'group:__LOCAL_ADDRESSES__')) { + check_acl_no($remote_ip, 'group:__LOCAL_ADDRESSES__')) { if ($remote_id) { my $devices = schema('netdisco')->resultset('Device'); @@ -228,7 +234,7 @@ sub store_neighbors { debug sprintf ' [%s] neigh - adding neighbor %s, type [%s], on %s to discovery queue', $device->ip, $remote_ip, ($remote_type || ''), $port; - push @to_discover, [$remote_ip, $remote_type]; + push @to_discover, [$remote_ip, $remote_type, $remote_id]; $remote_port = $c_port->{$entry}; if (defined $remote_port) { diff --git a/lib/App/Netdisco/Worker/Plugin/Expire.pm b/lib/App/Netdisco/Worker/Plugin/Expire.pm index 28c93cc4..0625338d 100644 --- a/lib/App/Netdisco/Worker/Plugin/Expire.pm +++ b/lib/App/Netdisco/Worker/Plugin/Expire.pm @@ -6,6 +6,7 @@ use aliased 'App::Netdisco::Worker::Status'; use Dancer::Plugin::DBIC 'schema'; use App::Netdisco::Util::Statistics 'update_stats'; +use App::Netdisco::DB::ExplicitLocking ':modes'; register_worker({ phase => 'main' }, sub { my ($job, $workerconf) = @_; @@ -40,7 +41,7 @@ register_worker({ phase => 'main' }, sub { } if (setting('expire_jobs') and setting('expire_jobs') > 0) { - schema('netdisco')->txn_do(sub { + schema('netdisco')->txn_do_locked('admin', 'EXCLUSIVE', sub { schema('netdisco')->resultset('Admin')->search({ entered => \[q/< (now() - ?::interval)/, (setting('expire_jobs') * 86400)], diff --git a/share/schema_versions/App-Netdisco-DB-44-45-PostgreSQL.sql b/share/schema_versions/App-Netdisco-DB-44-45-PostgreSQL.sql new file mode 100644 index 00000000..31e258e8 --- /dev/null +++ b/share/schema_versions/App-Netdisco-DB-44-45-PostgreSQL.sql @@ -0,0 +1,5 @@ +BEGIN; + +ALTER TABLE "admin" ADD "device_key" text; + +COMMIT;