pick jobs from queue with lowest retries first
This commit is contained in:
@@ -71,7 +71,7 @@ query with a C<backend> host, C<max_deferrals>, and C<retry_after> parameters
|
|||||||
|
|
||||||
=cut
|
=cut
|
||||||
|
|
||||||
__PACKAGE__->has_many( device_skips => 'App::Netdisco::DB::Result::DeviceSkip',
|
__PACKAGE__->might_have( device_skips => 'App::Netdisco::DB::Result::DeviceSkip',
|
||||||
sub {
|
sub {
|
||||||
my $args = shift;
|
my $args = shift;
|
||||||
return {
|
return {
|
||||||
|
|||||||
86
lib/App/Netdisco/DB/Result/Virtual/TastyJobs.pm
Normal file
86
lib/App/Netdisco/DB/Result/Virtual/TastyJobs.pm
Normal file
@@ -0,0 +1,86 @@
|
|||||||
|
package App::Netdisco::DB::Result::Virtual::TastyJobs;
|
||||||
|
|
||||||
|
use strict;
|
||||||
|
use warnings;
|
||||||
|
|
||||||
|
use base 'DBIx::Class::Core';
|
||||||
|
|
||||||
|
__PACKAGE__->table_class('DBIx::Class::ResultSource::View');
|
||||||
|
|
||||||
|
__PACKAGE__->table('tasty_jobs');
|
||||||
|
__PACKAGE__->result_source_instance->is_virtual(1);
|
||||||
|
__PACKAGE__->result_source_instance->view_definition(<<ENDSQL
|
||||||
|
SELECT jobs.*, ds2.deferrals AS num_deferrals
|
||||||
|
FROM (
|
||||||
|
(SELECT me.*, 100 AS job_priority
|
||||||
|
FROM admin me
|
||||||
|
WHERE ( me.username IS NOT NULL OR me.action = ANY (string_to_array(btrim(?, '{"}'), '","')) )
|
||||||
|
AND me.device NOT IN
|
||||||
|
(SELECT ds.device
|
||||||
|
FROM device_skip ds
|
||||||
|
WHERE ( me.action = ANY (ds.actionset) OR
|
||||||
|
(ds.deferrals >= ? AND ds.last_defer > ( LOCALTIMESTAMP - ?::interval )) )
|
||||||
|
AND ds.backend = ? AND ds.device = me.device)
|
||||||
|
AND me.status = 'queued'
|
||||||
|
ORDER BY random()
|
||||||
|
LIMIT ?)
|
||||||
|
UNION
|
||||||
|
(SELECT me.*, 0 AS job_priority
|
||||||
|
FROM admin me
|
||||||
|
WHERE NOT (me.action = ANY (string_to_array(btrim(?, '{"}'), '","')))
|
||||||
|
AND me.device NOT IN
|
||||||
|
(SELECT ds.device
|
||||||
|
FROM device_skip ds
|
||||||
|
WHERE ( me.action = ANY (ds.actionset) OR
|
||||||
|
(ds.deferrals >= ? AND ds.last_defer > ( LOCALTIMESTAMP - ?::interval )) )
|
||||||
|
AND ds.backend = ? AND ds.device = me.device)
|
||||||
|
AND me.status = 'queued'
|
||||||
|
ORDER BY random()
|
||||||
|
LIMIT ?)
|
||||||
|
) jobs
|
||||||
|
LEFT OUTER JOIN device_skip ds2
|
||||||
|
ON ds2.backend = ? AND ds2.device = jobs.device
|
||||||
|
ORDER BY jobs.job_priority DESC,
|
||||||
|
ds2.deferrals ASC NULLS FIRST
|
||||||
|
LIMIT ?
|
||||||
|
ENDSQL
|
||||||
|
);
|
||||||
|
|
||||||
|
__PACKAGE__->add_columns(
|
||||||
|
"job",
|
||||||
|
{ data_type => "integer", is_nullable => 0, },
|
||||||
|
"entered",
|
||||||
|
{ data_type => "timestamp", is_nullable => 1 },
|
||||||
|
"started",
|
||||||
|
{ data_type => "timestamp", is_nullable => 1 },
|
||||||
|
"finished",
|
||||||
|
{ data_type => "timestamp", is_nullable => 1 },
|
||||||
|
"device",
|
||||||
|
{ data_type => "inet", is_nullable => 1 },
|
||||||
|
"port",
|
||||||
|
{ data_type => "text", is_nullable => 1 },
|
||||||
|
"action",
|
||||||
|
{ data_type => "text", is_nullable => 1 },
|
||||||
|
"subaction",
|
||||||
|
{ data_type => "text", is_nullable => 1 },
|
||||||
|
"status",
|
||||||
|
{ data_type => "text", is_nullable => 1 },
|
||||||
|
"username",
|
||||||
|
{ data_type => "text", is_nullable => 1 },
|
||||||
|
"userip",
|
||||||
|
{ data_type => "inet", is_nullable => 1 },
|
||||||
|
"log",
|
||||||
|
{ data_type => "text", is_nullable => 1 },
|
||||||
|
"debug",
|
||||||
|
{ data_type => "boolean", is_nullable => 1 },
|
||||||
|
"device_key",
|
||||||
|
{ data_type => "text", is_nullable => 1 },
|
||||||
|
"job_priority",
|
||||||
|
{ data_type => "integer", is_nullable => 1 },
|
||||||
|
"num_deferrals",
|
||||||
|
{ data_type => "integer", is_nullable => 1 },
|
||||||
|
);
|
||||||
|
|
||||||
|
__PACKAGE__->set_primary_key("job");
|
||||||
|
|
||||||
|
1;
|
||||||
@@ -75,41 +75,17 @@ sub jq_getsome {
|
|||||||
my $jobs = schema('netdisco')->resultset('Admin');
|
my $jobs = schema('netdisco')->resultset('Admin');
|
||||||
my @returned = ();
|
my @returned = ();
|
||||||
|
|
||||||
my %jobsearch = (
|
my @filter = (
|
||||||
status => 'queued',
|
setting('job_prio')->{'high'}, setting('workers')->{'max_deferrals'},
|
||||||
device => { '-not_in' =>
|
setting('workers')->{'retry_after'}, setting('workers')->{'BACKEND'},
|
||||||
$jobs->skipped(setting('workers')->{'BACKEND'},
|
$num_slots,
|
||||||
setting('workers')->{'max_deferrals'},
|
|
||||||
setting('workers')->{'retry_after'})
|
|
||||||
->columns('device')->as_query },
|
|
||||||
);
|
);
|
||||||
my %randoms = (order_by => 'random()', rows => $num_slots );
|
my $tasty = schema('netdisco')->resultset('Virtual::TastyJobs')
|
||||||
|
->search(undef,{ bind => [
|
||||||
|
@filter, @filter, setting('workers')->{'BACKEND'}, $num_slots
|
||||||
|
]});
|
||||||
|
|
||||||
my $hiprio = $jobs->search({
|
while (my $job = $tasty->next) {
|
||||||
%jobsearch,
|
|
||||||
-or => [
|
|
||||||
{ username => { '!=' => undef } },
|
|
||||||
{ action => { -in => setting('job_prio')->{'high'} } },
|
|
||||||
],
|
|
||||||
}, {
|
|
||||||
%randoms,
|
|
||||||
'+select' => [\'100 as job_priority'], '+as' => ['me.job_priority'],
|
|
||||||
});
|
|
||||||
|
|
||||||
my $loprio = $jobs->search({
|
|
||||||
%jobsearch,
|
|
||||||
action => { -not_in => setting('job_prio')->{'high'} },
|
|
||||||
}, {
|
|
||||||
%randoms,
|
|
||||||
'+select' => [\'0 as job_priority'], '+as' => ['me.job_priority'],
|
|
||||||
});
|
|
||||||
|
|
||||||
my $rs = $hiprio->union($loprio)->search(undef, {
|
|
||||||
order_by => { '-desc' => 'job_priority' },
|
|
||||||
rows => $num_slots,
|
|
||||||
});
|
|
||||||
|
|
||||||
while (my $job = $rs->next) {
|
|
||||||
if ($job->device) {
|
if ($job->device) {
|
||||||
# need to handle device discovered since backend daemon started
|
# need to handle device discovered since backend daemon started
|
||||||
# and the skiplist was primed. these should be checked against
|
# and the skiplist was primed. these should be checked against
|
||||||
|
|||||||
Reference in New Issue
Block a user