278 lines
6.3 KiB
Perl
278 lines
6.3 KiB
Perl
package App::Netdisco::JobQueue::PostgreSQL;
|
|
|
|
use Dancer qw/:moose :syntax :script/;
|
|
use Dancer::Plugin::DBIC 'schema';
|
|
|
|
use App::Netdisco::Util::Device
|
|
qw/is_discoverable is_macsuckable is_arpnipable/;
|
|
use App::Netdisco::Backend::Job;
|
|
|
|
|
|
use Net::Domain 'hostfqdn';
|
|
use Module::Load ();
|
|
use Try::Tiny;
|
|
|
|
use base 'Exporter';
|
|
our @EXPORT = ();
|
|
our @EXPORT_OK = qw/
|
|
jq_getsome
|
|
jq_getsomep
|
|
jq_locked
|
|
jq_queued
|
|
jq_prime_skiplist
|
|
jq_log
|
|
jq_userlog
|
|
jq_lock
|
|
jq_defer
|
|
jq_complete
|
|
jq_insert
|
|
jq_delete
|
|
/;
|
|
our %EXPORT_TAGS = ( all => \@EXPORT_OK );
|
|
|
|
sub _getsome {
|
|
my ($num_slots, $where) = @_;
|
|
return () if ((!defined $num_slots) or ($num_slots < 1));
|
|
return () if ((!defined $where) or (ref {} ne ref $where));
|
|
|
|
my $fqdn = hostfqdn || 'localhost';
|
|
my $jobs = schema('netdisco')->resultset('Admin');
|
|
|
|
my $rs = $jobs->search({
|
|
status => 'queued',
|
|
device => { '-not_in' => $jobs->correlate('skipped')->search({
|
|
backend => $fqdn,
|
|
-or => [{ failures => { '>=', 10 } },{ '-bool' => 'skipover' }],
|
|
}, { columns => 'device' })->as_query },
|
|
%$where,
|
|
}, { order_by => 'random()', rows => $num_slots });
|
|
|
|
my @returned = ();
|
|
while (my $job = $rs->next) {
|
|
push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
|
|
}
|
|
return @returned;
|
|
}
|
|
|
|
sub jq_getsome {
|
|
return _getsome(shift,
|
|
{ action => { -in => setting('job_prio')->{'normal'} } }
|
|
);
|
|
}
|
|
|
|
sub jq_getsomep {
|
|
return _getsome(shift, {
|
|
-or => [{
|
|
username => { '!=' => undef },
|
|
action => { -in => setting('job_prio')->{'normal'} },
|
|
},{
|
|
action => { -in => setting('job_prio')->{'high'} },
|
|
}],
|
|
});
|
|
}
|
|
|
|
sub jq_locked {
|
|
my $fqdn = hostfqdn || 'localhost';
|
|
my @returned = ();
|
|
|
|
my $rs = schema('netdisco')->resultset('Admin')
|
|
->search({status => "queued-$fqdn"});
|
|
|
|
while (my $job = $rs->next) {
|
|
push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
|
|
}
|
|
return @returned;
|
|
}
|
|
|
|
sub jq_queued {
|
|
my $job_type = shift;
|
|
|
|
return schema('netdisco')->resultset('Admin')->search({
|
|
device => { '!=' => undef},
|
|
action => $job_type,
|
|
status => { -like => 'queued%' },
|
|
})->get_column('device')->all;
|
|
}
|
|
|
|
sub jq_prime_skiplist {
|
|
my $fqdn = hostfqdn || 'localhost';
|
|
my $rs = schema('netdisco')->resultset('DeviceSkip');
|
|
my @d_actions = ('discover', @{ setting('job_prio')->{high} });
|
|
|
|
schema('netdisco')->txn_do(sub {
|
|
my @devices = schema('netdisco')->resultset('Device')->all;
|
|
$rs->search({ backend => $fqdn })->delete;
|
|
|
|
foreach my $action (@d_actions) {
|
|
$rs->populate([
|
|
map {{
|
|
backend => $fqdn,
|
|
device => $_->ip,
|
|
action => $action,
|
|
skipover => \'true',
|
|
}} grep { not is_discoverable($_) } @devices
|
|
]);
|
|
}
|
|
|
|
foreach my $action (qw/macsuck nbtstat/) {
|
|
$rs->populate([
|
|
map {{
|
|
backend => $fqdn,
|
|
device => $_->ip,
|
|
action => $action,
|
|
skipover => \'true',
|
|
}} grep { not is_macsuckable($_) } @devices
|
|
]);
|
|
}
|
|
|
|
$rs->populate([
|
|
map {{
|
|
backend => $fqdn,
|
|
device => $_->ip,
|
|
action => 'arpnip',
|
|
skipover => \'true',
|
|
}} grep { not is_arpnipable($_) } @devices
|
|
]);
|
|
});
|
|
}
|
|
|
|
sub jq_log {
|
|
return schema('netdisco')->resultset('Admin')->search({}, {
|
|
order_by => { -desc => [qw/entered device action/] },
|
|
rows => 50,
|
|
})->with_times->hri->all;
|
|
}
|
|
|
|
sub jq_userlog {
|
|
my $user = shift;
|
|
return schema('netdisco')->resultset('Admin')->search({
|
|
username => $user,
|
|
finished => { '>' => \"(now() - interval '5 seconds')" },
|
|
})->with_times->all;
|
|
}
|
|
|
|
sub jq_lock {
|
|
my $job = shift;
|
|
my $fqdn = hostfqdn || 'localhost';
|
|
my $happy = false;
|
|
|
|
# lock db row and update to show job has been picked
|
|
try {
|
|
schema('netdisco')->txn_do(sub {
|
|
schema('netdisco')->resultset('Admin')
|
|
->search({job => $job->job}, {for => 'update'})
|
|
->update({ status => "queued-$fqdn" });
|
|
|
|
return unless
|
|
schema('netdisco')->resultset('Admin')
|
|
->count({job => $job->job, status => "queued-$fqdn"});
|
|
|
|
# remove any duplicate jobs, needed because we have race conditions
|
|
# when queueing jobs of a type for all devices
|
|
schema('netdisco')->resultset('Admin')->search({
|
|
status => 'queued',
|
|
device => $job->device,
|
|
port => $job->port,
|
|
action => $job->action,
|
|
subaction => $job->subaction,
|
|
}, {for => 'update'})->delete();
|
|
|
|
$happy = true;
|
|
});
|
|
}
|
|
catch {
|
|
error $_;
|
|
};
|
|
|
|
return $happy;
|
|
}
|
|
|
|
sub jq_defer {
|
|
my $job = shift;
|
|
my $happy = false;
|
|
|
|
try {
|
|
# lock db row and update to show job is available
|
|
schema('netdisco')->txn_do(sub {
|
|
schema('netdisco')->resultset('Admin')
|
|
->find($job->job, {for => 'update'})
|
|
->update({ status => 'queued', started => undef });
|
|
});
|
|
$happy = true;
|
|
}
|
|
catch {
|
|
error $_;
|
|
};
|
|
|
|
return $happy;
|
|
}
|
|
|
|
sub jq_complete {
|
|
my $job = shift;
|
|
my $happy = false;
|
|
|
|
# lock db row and update to show job is done/error
|
|
try {
|
|
schema('netdisco')->txn_do(sub {
|
|
schema('netdisco')->resultset('Admin')
|
|
->find($job->job, {for => 'update'})->update({
|
|
status => $job->status,
|
|
log => $job->log,
|
|
started => $job->started,
|
|
finished => $job->finished,
|
|
});
|
|
});
|
|
$happy = true;
|
|
}
|
|
catch {
|
|
error $_;
|
|
};
|
|
|
|
return $happy;
|
|
}
|
|
|
|
sub jq_insert {
|
|
my $jobs = shift;
|
|
$jobs = [$jobs] if ref [] ne ref $jobs;
|
|
my $happy = false;
|
|
|
|
try {
|
|
schema('netdisco')->txn_do(sub {
|
|
schema('netdisco')->resultset('Admin')->populate([
|
|
map {{
|
|
device => $_->{device},
|
|
port => $_->{port},
|
|
action => $_->{action},
|
|
subaction => ($_->{extra} || $_->{subaction}),
|
|
username => $_->{username},
|
|
userip => $_->{userip},
|
|
status => 'queued',
|
|
}} @$jobs
|
|
]);
|
|
});
|
|
$happy = true;
|
|
}
|
|
catch {
|
|
error $_;
|
|
};
|
|
|
|
return $happy;
|
|
}
|
|
|
|
sub jq_delete {
|
|
my $id = shift;
|
|
|
|
if ($id) {
|
|
schema('netdisco')->txn_do(sub {
|
|
schema('netdisco')->resultset('Admin')->find($id)->delete();
|
|
});
|
|
}
|
|
else {
|
|
schema('netdisco')->txn_do(sub {
|
|
schema('netdisco')->resultset('Admin')->delete();
|
|
});
|
|
}
|
|
}
|
|
|
|
true;
|