we had situations where the manager would start workers on the same job, either because of race conditions or because at the time of queueing it wasn't known that the jobs were targeting the same device (due to device aliases). this commit removes duplicate jobs, reduces the need for locking on the job queue, and makes use of lldpRemChassisId to try to deduplicate jobs before they are started. in effect we have several goes to prevent duplicate jobs: 1. at neighbor discovery time we try to skip queueing same lldpRemChassisId 2. at job selection we 'error out' jobs with same profile as job selected 3. at job selection we check for running job with same profile as selected 4. the job manager process also checks for duplicate job profiles 5. at job lock we abort if the job was 'errored out' all together this seems to work well. a test on a large university network of 303 devices (four core routers and the rest edge routers, runing VRF with many duplicate identities), ~1200 subnets, ~50k hosts, resulted in no DB deadlock or contention and a complete discover+arpnip+macsuck (909 jobs) in ~3 minutes (with ~150 duplicate jobs identified and skipped).
330 lines
8.5 KiB
Perl
330 lines
8.5 KiB
Perl
package App::Netdisco::JobQueue::PostgreSQL;
|
||
|
||
use Dancer qw/:moose :syntax :script/;
|
||
use Dancer::Plugin::DBIC 'schema';
|
||
|
||
use App::Netdisco::Util::Device
|
||
qw/is_discoverable is_macsuckable is_arpnipable/;
|
||
use App::Netdisco::Backend::Job;
|
||
|
||
use Module::Load ();
|
||
use Try::Tiny;
|
||
|
||
use base 'Exporter';
|
||
our @EXPORT = ();
|
||
our @EXPORT_OK = qw/
|
||
jq_warm_thrusters
|
||
jq_getsome
|
||
jq_getsomep
|
||
jq_locked
|
||
jq_queued
|
||
jq_lock
|
||
jq_defer
|
||
jq_complete
|
||
jq_log
|
||
jq_userlog
|
||
jq_insert
|
||
jq_delete
|
||
/;
|
||
our %EXPORT_TAGS = ( all => \@EXPORT_OK );
|
||
|
||
# given a device, tests if any of the primary acls applies
|
||
# returns a list of job actions to be denied/skipped on this host.
|
||
sub _get_denied_actions {
|
||
my $device = shift;
|
||
my @badactions = ();
|
||
return @badactions unless $device;
|
||
|
||
push @badactions, ('discover', @{ setting('job_prio')->{high} })
|
||
if not is_discoverable($device);
|
||
|
||
push @badactions, (qw/macsuck nbtstat/)
|
||
if not is_macsuckable($device);
|
||
|
||
push @badactions, 'arpnip'
|
||
if not is_arpnipable($device);
|
||
|
||
return @badactions;
|
||
}
|
||
|
||
sub jq_warm_thrusters {
|
||
my @devices = schema('netdisco')->resultset('Device')->all;
|
||
my $rs = schema('netdisco')->resultset('DeviceSkip');
|
||
my %actionset = ();
|
||
|
||
foreach my $d (@devices) {
|
||
my @badactions = _get_denied_actions($d);
|
||
$actionset{$d->ip} = \@badactions if scalar @badactions;
|
||
}
|
||
|
||
schema('netdisco')->txn_do(sub {
|
||
$rs->search({ backend => setting('workers')->{'BACKEND'} })->delete;
|
||
$rs->populate([
|
||
map {{
|
||
backend => setting('workers')->{'BACKEND'},
|
||
device => $_,
|
||
actionset => $actionset{$_},
|
||
}} keys %actionset
|
||
]);
|
||
});
|
||
}
|
||
|
||
sub _getsome {
|
||
my ($num_slots, $where) = @_;
|
||
return () if ((!defined $num_slots) or ($num_slots < 1));
|
||
return () if ((!defined $where) or (ref {} ne ref $where));
|
||
|
||
my $jobs = schema('netdisco')->resultset('Admin');
|
||
my $rs = $jobs->search({
|
||
status => 'queued',
|
||
device => { '-not_in' =>
|
||
$jobs->skipped(setting('workers')->{'BACKEND'},
|
||
setting('workers')->{'max_deferrals'},
|
||
setting('workers')->{'retry_after'})
|
||
->columns('device')->as_query },
|
||
%$where,
|
||
}, { order_by => 'random()', rows => $num_slots });
|
||
|
||
my @returned = ();
|
||
while (my $job = $rs->next) {
|
||
if ($job->device) {
|
||
# need to handle device discovered since backend daemon started
|
||
# and the skiplist was primed. these should be checked against
|
||
# the various acls and have device_skip entry added if needed,
|
||
# and return false if it should have been skipped.
|
||
my @badactions = _get_denied_actions($job->device);
|
||
if (scalar @badactions) {
|
||
schema('netdisco')->resultset('DeviceSkip')->find_or_create({
|
||
backend => setting('workers')->{'BACKEND'}, device => $job->device,
|
||
},{ key => 'device_skip_pkey' })->add_to_actionset(@badactions);
|
||
|
||
# will now not be selected in a future _getsome()
|
||
next if scalar grep {$_ eq $job->action} @badactions;
|
||
}
|
||
}
|
||
|
||
# remove any duplicate jobs, incuding possibly this job if there
|
||
# is already an equivalent job running
|
||
|
||
my %job_properties = (
|
||
action => $job->action,
|
||
port => $job->port,
|
||
subaction => $job->subaction,
|
||
-or => [
|
||
{ device => $job->device },
|
||
($job->device_key ? ({ device_key => $job->device_key }) : ()),
|
||
],
|
||
);
|
||
|
||
my $gone = $jobs->search({
|
||
status => 'queued',
|
||
-and => [
|
||
%job_properties,
|
||
-or => [{
|
||
job => { '!=' => $job->id },
|
||
},{
|
||
job => $job->id,
|
||
-exists => $jobs->search({
|
||
status => { -like => 'queued-%' },
|
||
%job_properties,
|
||
})->as_query,
|
||
}],
|
||
],
|
||
}, {for => 'update'})
|
||
->update({ status => 'error', log => (sprintf 'duplicate of %s', $job->id) });
|
||
|
||
debug sprintf 'getsome: cancelled %s duplicate(s) of job %s', ($gone || 0), $job->id;
|
||
push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
|
||
}
|
||
|
||
return @returned;
|
||
}
|
||
|
||
sub jq_getsome {
|
||
return _getsome(shift,
|
||
{ action => { -in => setting('job_prio')->{'normal'} } }
|
||
);
|
||
}
|
||
|
||
sub jq_getsomep {
|
||
return _getsome(shift, {
|
||
-or => [{
|
||
username => { '!=' => undef },
|
||
action => { -in => setting('job_prio')->{'normal'} },
|
||
},{
|
||
action => { -in => setting('job_prio')->{'high'} },
|
||
}],
|
||
});
|
||
}
|
||
|
||
sub jq_locked {
|
||
my @returned = ();
|
||
my $rs = schema('netdisco')->resultset('Admin')
|
||
->search({ status => ('queued-'. setting('workers')->{'BACKEND'}) });
|
||
|
||
while (my $job = $rs->next) {
|
||
push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
|
||
}
|
||
return @returned;
|
||
}
|
||
|
||
sub jq_queued {
|
||
my $job_type = shift;
|
||
|
||
return schema('netdisco')->resultset('Admin')->search({
|
||
device => { '!=' => undef},
|
||
action => $job_type,
|
||
status => { -like => 'queued%' },
|
||
})->get_column('device')->all;
|
||
}
|
||
|
||
sub jq_lock {
|
||
my $job = shift;
|
||
my $happy = false;
|
||
|
||
# lock db row and update to show job has been picked
|
||
try {
|
||
my $updated = schema('netdisco')->resultset('Admin')
|
||
->search({ job => $job->id, status => 'queued' }, { for => 'update' })
|
||
->update({ status => ('queued-'. setting('workers')->{'BACKEND'}) });
|
||
|
||
$happy = true if $updated > 0;
|
||
}
|
||
catch {
|
||
error $_;
|
||
};
|
||
|
||
return $happy;
|
||
}
|
||
|
||
sub jq_defer {
|
||
my $job = shift;
|
||
my $happy = false;
|
||
|
||
# note this taints all actions on the device. for example if both
|
||
# macsuck and arpnip are allowed, but macsuck fails 10 times, then
|
||
# arpnip (and every other action) will be prevented on the device.
|
||
|
||
# seeing as defer is only triggered by an SNMP connect failure, this
|
||
# behaviour seems reasonable, to me (or desirable, perhaps).
|
||
|
||
try {
|
||
schema('netdisco')->txn_do(sub {
|
||
if ($job->device) {
|
||
schema('netdisco')->resultset('DeviceSkip')->find_or_create({
|
||
backend => setting('workers')->{'BACKEND'}, device => $job->device,
|
||
},{ key => 'device_skip_pkey' })->increment_deferrals;
|
||
}
|
||
|
||
# lock db row and update to show job is available
|
||
schema('netdisco')->resultset('Admin')
|
||
->find($job->id, {for => 'update'})
|
||
->update({ status => 'queued', started => undef });
|
||
});
|
||
$happy = true;
|
||
}
|
||
catch {
|
||
error $_;
|
||
};
|
||
|
||
return $happy;
|
||
}
|
||
|
||
sub jq_complete {
|
||
my $job = shift;
|
||
my $happy = false;
|
||
|
||
# lock db row and update to show job is done/error
|
||
|
||
# now that SNMP connect failures are deferrals and not errors, any complete
|
||
# status, whether success or failure, indicates an SNMP connect. reset the
|
||
# connection failures counter to forget oabout occasional connect glitches.
|
||
|
||
try {
|
||
schema('netdisco')->txn_do(sub {
|
||
if ($job->device) {
|
||
schema('netdisco')->resultset('DeviceSkip')->find_or_create({
|
||
backend => setting('workers')->{'BACKEND'}, device => $job->device,
|
||
},{ key => 'device_skip_pkey' })->update({ deferrals => 0 });
|
||
}
|
||
|
||
schema('netdisco')->resultset('Admin')
|
||
->find($job->id, {for => 'update'})->update({
|
||
status => $job->status,
|
||
log => $job->log,
|
||
started => $job->started,
|
||
finished => $job->finished,
|
||
});
|
||
});
|
||
$happy = true;
|
||
}
|
||
catch {
|
||
# use DDP; p $job;
|
||
error $_;
|
||
};
|
||
|
||
return $happy;
|
||
}
|
||
|
||
sub jq_log {
|
||
return schema('netdisco')->resultset('Admin')->search({}, {
|
||
prefetch => 'target',
|
||
order_by => { -desc => [qw/entered device action/] },
|
||
rows => 50,
|
||
})->with_times->hri->all;
|
||
}
|
||
|
||
sub jq_userlog {
|
||
my $user = shift;
|
||
return schema('netdisco')->resultset('Admin')->search({
|
||
username => $user,
|
||
finished => { '>' => \"(now() - interval '5 seconds')" },
|
||
})->with_times->all;
|
||
}
|
||
|
||
sub jq_insert {
|
||
my $jobs = shift;
|
||
$jobs = [$jobs] if ref [] ne ref $jobs;
|
||
my $happy = false;
|
||
|
||
try {
|
||
schema('netdisco')->txn_do(sub {
|
||
schema('netdisco')->resultset('Admin')->populate([
|
||
map {{
|
||
device => $_->{device},
|
||
device_key => $_->{device_key},
|
||
port => $_->{port},
|
||
action => $_->{action},
|
||
subaction => ($_->{extra} || $_->{subaction}),
|
||
username => $_->{username},
|
||
userip => $_->{userip},
|
||
status => 'queued',
|
||
}} @$jobs
|
||
]);
|
||
});
|
||
$happy = true;
|
||
}
|
||
catch {
|
||
error $_;
|
||
};
|
||
|
||
return $happy;
|
||
}
|
||
|
||
sub jq_delete {
|
||
my $id = shift;
|
||
|
||
if ($id) {
|
||
schema('netdisco')->txn_do(sub {
|
||
schema('netdisco')->resultset('Admin')->find($id)->delete();
|
||
});
|
||
}
|
||
else {
|
||
schema('netdisco')->txn_do(sub {
|
||
schema('netdisco')->resultset('Admin')->delete();
|
||
});
|
||
}
|
||
}
|
||
|
||
true;
|