initial implementatin of new job queue with skip info per job

This commit is contained in:
Oliver Gorwits
2018-04-25 21:43:39 +01:00
parent 5eb443438c
commit 7e1982985c
9 changed files with 187 additions and 26 deletions

View File

@@ -161,4 +161,12 @@ between the date stamp and time stamp. That is:
sub finished_stamp { return (shift)->get_column('finished_stamp') }
=head2 duration
Formatted version of the C<duration> field, accurate to the minute.
=cut
sub duration { return (shift)->get_column('duration') }
1;

View File

@@ -0,0 +1,73 @@
package App::Netdisco::DB::Result::Virtual::JobQueue;
use strict;
use warnings;
use base 'DBIx::Class::Core';
__PACKAGE__->table_class('DBIx::Class::ResultSource::View');
__PACKAGE__->table('job_queue');
__PACKAGE__->result_source_instance->is_virtual(1);
__PACKAGE__->result_source_instance->view_definition(<<ENDSQL
WITH limited_jobs AS
(SELECT a.job, max(ds.deferrals) AS max_defer, min(ds.last_defer) AS min_defer,
CASE WHEN a.status = 'queued' THEN 100
WHEN a.status LIKE 'queued-%' THEN 80
WHEN a.status = 'error' THEN 60
ELSE 40
END AS status_priority,
CASE WHEN (a.username IS NOT NULL OR
a.action = ANY (string_to_array(btrim(?, '{"}'), '","')))
THEN 100
ELSE 0
END AS job_priority
FROM admin a LEFT OUTER JOIN device_skip ds USING (device)
WHERE (?::text is NULL or a.device <<= ?)
GROUP BY a.job
ORDER BY status_priority DESC,
job_priority DESC,
max_defer ASC NULLS FIRST,
min_defer ASC NULLS LAST,
a.device, a.action)
SELECT to_char( entered, 'YYYY-MM-DD HH24:MI' ) AS entered_stamp,
username, action, device, subaction, port, status, log,
replace( age( finished, started ) ::text, 'mon', 'month' ) AS duration,
array(SELECT ('backend',backend,'actionset',actionset,'deferrals',deferrals,'last_defer',EXTRACT(EPOCH FROM last_defer))
FROM device_skip ds
WHERE ds.device = a.device) AS skips
FROM admin a INNER JOIN limited_jobs jobs USING (job)
ENDSQL
);
__PACKAGE__->add_columns(
"device",
{ data_type => "inet", is_nullable => 0 },
"action",
{ data_type => "text", is_nullable => 1 },
"subaction",
{ data_type => "text", is_nullable => 1 },
"port",
{ data_type => "text", is_nullable => 1 },
"status",
{ data_type => "text", is_nullable => 1 },
"username",
{ data_type => "text", is_nullable => 1 },
"log",
{ data_type => "text", is_nullable => 1 },
"skips",
{ data_type => "[text]", is_nullable => 1 },
"entered_stamp",
{ data_type => "text", is_nullable => 1 },
"duration",
{ data_type => "text", is_nullable => 1 },
);
__PACKAGE__->belongs_to('target', 'App::Netdisco::DB::Result::Device',
{ 'foreign.ip' => 'self.device' }, { join_type => 'LEFT' } );
1;

View File

@@ -46,6 +46,8 @@ will add the following additional synthesized columns to the result set:
=item finished_stamp
=item duration
=back
=cut
@@ -61,6 +63,7 @@ sub with_times {
entered_stamp => \"to_char(entered, 'YYYY-MM-DD HH24:MI')",
started_stamp => \"to_char(started, 'YYYY-MM-DD HH24:MI')",
finished_stamp => \"to_char(finished, 'YYYY-MM-DD HH24:MI')",
duration => \"replace(age(finished, started)::text, 'mon', 'month')",
},
});
}

View File

@@ -270,11 +270,13 @@ sub jq_complete {
}
sub jq_log {
return schema('netdisco')->resultset('Admin')->search({}, {
prefetch => 'target',
order_by => { -desc => [qw/entered device action/] },
my $filter = shift;
return schema('netdisco')->resultset('Virtual::JobQueue')->search({}, {
join => 'target',
'+columns' => ['target.dns','target.ip'],
bind => [ setting('job_prio')->{'high'}, $filter, $filter ],
rows => 50,
})->with_times->hri->all;
})->hri->all;
}
sub jq_userlog {

View File

@@ -8,6 +8,11 @@ use Dancer::Plugin::Auth::Extensible;
use App::Netdisco::Web::Plugin;
use App::Netdisco::JobQueue qw/jq_log jq_delete/;
use utf8;
use Time::Piece;
use Text::CSV_XS 'csv';
use NetAddr::IP::Lite ':lower';
register_admin_task({
tag => 'jobqueue',
label => 'Job Queue',
@@ -23,9 +28,61 @@ ajax '/ajax/control/admin/jobqueue/delall' => require_role admin => sub {
};
ajax '/ajax/content/admin/jobqueue' => require_role admin => sub {
my $filter = NetAddr::IP::Lite->new(param('q'));
my @data = jq_log($filter);
foreach my $r (@data) {
$r->{qstat}->{acl} = [];
$r->{qstat}->{next} = [];
$r->{qstat}->{fails} = [];
next unless ($r->{status} eq 'queued');
foreach my $s (@{$r->{skips}}) {
(my $row = $s) =~ s/(^\(|\)$)//g;
next unless $row;
my %skip = @{ [@{csv(in => \$row)}]->[0] };
next unless scalar keys %skip;
$skip{actionset} =~ s/(^{|}$)//g;
$skip{actionset} = [@{ csv(in => \$skip{actionset}) }]->[0] || [];
if ($skip{deferrals}) {
unshift @{$r->{qstat}->{fails}}, sprintf '%s connection failure%s from %s',
$skip{deferrals}, ($skip{deferrals} > 1 ? 's' : ''), $skip{backend};
}
else {
unshift @{$r->{qstat}->{fails}}, sprintf 'No connection failures from %s',
$skip{backend};
}
if (scalar @{$skip{actionset}}
and scalar grep {$_ eq $r->{action}} @{$skip{actionset}}) {
$r->{qstat}->{badacl} = true;
unshift @{$r->{qstat}->{acl}}, sprintf 'Blocked by ACL on %s', $skip{backend};
}
else {
push @{$r->{qstat}->{acl}}, sprintf '✔ on %s', $skip{backend};
}
if ($skip{deferrals} >= setting('workers')->{'max_deferrals'}) {
$r->{qstat}->{last_defer} = true;
my $after = (localtime($skip{last_defer}) + setting('workers')->{'retry_after'});
unshift @{$r->{qstat}->{next}}, sprintf 'Will retry after %s on %s',
$after->cdate, $skip{backend};
}
elsif ($skip{deferrals} > 0) {
$r->{qstat}->{last_defer} = true;
unshift @{$r->{qstat}->{next}}, sprintf 'Connect failed at %s on %s',
localtime($skip{last_defer})->cdate, $skip{backend};
}
else {
push @{$r->{qstat}->{next}}, sprintf '✔ on %s', $skip{backend};
}
}
}
content_type('text/html');
template 'ajax/admintask/jobqueue.tt', {
results => [ jq_log ],
results => \@data,
}, { layout => undef };
};