417 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Perl
		
	
	
	
	
	
			
		
		
	
	
			417 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Perl
		
	
	
	
	
	
| package App::Netdisco::JobQueue::PostgreSQL;
 | ||
| 
 | ||
| use Dancer qw/:moose :syntax :script/;
 | ||
| use Dancer::Plugin::DBIC 'schema';
 | ||
| 
 | ||
| use App::Netdisco::Util::Device
 | ||
|   qw/get_device is_discoverable is_macsuckable is_arpnipable/;
 | ||
| use App::Netdisco::Backend::Job;
 | ||
| 
 | ||
| use Module::Load ();
 | ||
| use JSON::PP ();
 | ||
| use Try::Tiny;
 | ||
| 
 | ||
| use base 'Exporter';
 | ||
| our @EXPORT = ();
 | ||
| our @EXPORT_OK = qw/
 | ||
|   jq_warm_thrusters
 | ||
|   jq_getsome
 | ||
|   jq_locked
 | ||
|   jq_queued
 | ||
|   jq_lock
 | ||
|   jq_defer
 | ||
|   jq_complete
 | ||
|   jq_log
 | ||
|   jq_userlog
 | ||
|   jq_insert
 | ||
|   jq_delete
 | ||
| /;
 | ||
| our %EXPORT_TAGS = ( all => \@EXPORT_OK );
 | ||
| 
 | ||
| # given a device, tests if any of the primary acls applies
 | ||
| # returns a list of job actions to be denied/skipped on this host.
 | ||
| sub _get_denied_actions {
 | ||
|   my $device = shift;
 | ||
|   my @badactions = ();
 | ||
|   return @badactions unless $device;
 | ||
|   $device = get_device($device); # might be no-op but is done in is_* anyway
 | ||
| 
 | ||
|   if ($device->is_pseudo) {
 | ||
|       # always let pseudo devices do contact|location|portname|snapshot
 | ||
|       # and additionally if there's a snapshot cache, is_discoverable will let
 | ||
|       # them do all other discover and high prio actions
 | ||
|       push @badactions, ('discover', grep { $_ !~ m/^(?:contact|location|portname|snapshot)$/ }
 | ||
|                                           @{ setting('job_prio')->{high} })
 | ||
|         if not is_discoverable($device);
 | ||
|   }
 | ||
|   else {
 | ||
|       push @badactions, ('discover', @{ setting('job_prio')->{high} })
 | ||
|         if not is_discoverable($device);
 | ||
|   }
 | ||
| 
 | ||
|   push @badactions, (qw/macsuck nbtstat/)
 | ||
|     if not is_macsuckable($device);
 | ||
| 
 | ||
|   push @badactions, 'arpnip'
 | ||
|     if not is_arpnipable($device);
 | ||
| 
 | ||
|   return @badactions;
 | ||
| }
 | ||
| 
 | ||
| sub jq_warm_thrusters {
 | ||
|   my @devices = schema(vars->{'tenant'})->resultset('Device')->all;
 | ||
|   my $rs = schema(vars->{'tenant'})->resultset('DeviceSkip');
 | ||
|   my %actionset = ();
 | ||
| 
 | ||
|   foreach my $d (@devices) {
 | ||
|     my @badactions = _get_denied_actions($d);
 | ||
|     $actionset{$d->ip} = \@badactions if scalar @badactions;
 | ||
|   }
 | ||
| 
 | ||
|   schema(vars->{'tenant'})->txn_do(sub {
 | ||
|     $rs->search({
 | ||
|       backend => setting('workers')->{'BACKEND'},
 | ||
|     }, { for => 'update' }, )->update({ actionset => [] });
 | ||
| 
 | ||
|     # on backend restart, allow one retry of all devices which have
 | ||
|     # reached max retry (max_deferrals)
 | ||
|     my $deferrals = setting('workers')->{'max_deferrals'} - 1;
 | ||
|     $rs->search({
 | ||
|       backend => setting('workers')->{'BACKEND'},
 | ||
|       deferrals => { '>' => $deferrals },
 | ||
|     }, { for => 'update' }, )->update({ deferrals => $deferrals });
 | ||
| 
 | ||
|     $rs->search({
 | ||
|       backend => setting('workers')->{'BACKEND'},
 | ||
|       actionset => { -value => [] }, # special syntax for matching empty ARRAY
 | ||
|       deferrals => 0,
 | ||
|     })->delete;
 | ||
| 
 | ||
|     $rs->update_or_create({
 | ||
|       backend => setting('workers')->{'BACKEND'},
 | ||
|       device  => $_,
 | ||
|       actionset => $actionset{$_},
 | ||
|     }, { key => 'primary' }) for keys %actionset;
 | ||
| 
 | ||
|     # add one faux record to allow *walk actions to see there is a backend running
 | ||
|     $rs->update_or_create({
 | ||
|       backend => setting('workers')->{'BACKEND'},
 | ||
|       device  => '255.255.255.255',
 | ||
|       last_defer => \'LOCALTIMESTAMP',
 | ||
|     }, { key => 'primary' });
 | ||
|   });
 | ||
| }
 | ||
| 
 | ||
| sub jq_getsome {
 | ||
|   my $num_slots = shift;
 | ||
|   return () unless $num_slots and $num_slots > 0;
 | ||
| 
 | ||
|   my $jobs = schema(vars->{'tenant'})->resultset('Admin');
 | ||
|   my @returned = ();
 | ||
| 
 | ||
|   my $tasty = schema(vars->{'tenant'})->resultset('Virtual::TastyJobs')
 | ||
|     ->search(undef,{ bind => [
 | ||
|       setting('workers')->{'BACKEND'}, setting('job_prio')->{'high'},
 | ||
|       setting('workers')->{'BACKEND'}, setting('workers')->{'max_deferrals'},
 | ||
|       setting('workers')->{'retry_after'}, $num_slots,
 | ||
|     ]});
 | ||
| 
 | ||
|   while (my $job = $tasty->next) {
 | ||
|     if ($job->device) {
 | ||
|       # need to handle device discovered since backend daemon started
 | ||
|       # and the skiplist was primed. these should be checked against
 | ||
|       # the various acls and have device_skip entry added if needed,
 | ||
|       # and return false if it should have been skipped.
 | ||
|       my @badactions = _get_denied_actions($job->device);
 | ||
|       if (scalar @badactions) {
 | ||
|         schema(vars->{'tenant'})->resultset('DeviceSkip')->find_or_create({
 | ||
|           backend => setting('workers')->{'BACKEND'}, device => $job->device,
 | ||
|         },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions);
 | ||
| 
 | ||
|         # will now not be selected in a future _getsome()
 | ||
|         next if scalar grep {$_ eq $job->action} @badactions;
 | ||
|       }
 | ||
|     }
 | ||
| 
 | ||
|     # remove any duplicate jobs, incuding possibly this job if there
 | ||
|     # is already an equivalent job running
 | ||
| 
 | ||
|     # note that the self-removal of a job has an unhelpful log: it is
 | ||
|     # reported as a duplicate of itself! however what's happening is that
 | ||
|     # netdisco has seen another running job with same params (but the query
 | ||
|     # cannot see that ID to use it in the message).
 | ||
| 
 | ||
|     my %job_properties = (
 | ||
|       action => $job->action,
 | ||
|       port   => $job->port,
 | ||
|       subaction => $job->subaction,
 | ||
|       -or => [
 | ||
|         { device => $job->device },
 | ||
|         ($job->device_key ? ({ device_key => $job->device_key }) : ()),
 | ||
|       ],
 | ||
|       # never de-duplicate user-submitted jobs
 | ||
|       username => { '=' => undef },
 | ||
|       userip   => { '=' => undef },
 | ||
|     );
 | ||
| 
 | ||
|     my $gone = $jobs->search({
 | ||
|       status => 'queued',
 | ||
|       -and => [
 | ||
|         %job_properties,
 | ||
|         -or => [{
 | ||
|           job => { '<' => $job->id },
 | ||
|         },{
 | ||
|           job => $job->id,
 | ||
|           -exists => $jobs->search({
 | ||
|             job => { '>' => $job->id },
 | ||
|             status => { -like => 'queued-%' },
 | ||
|             started => \[q/> (LOCALTIMESTAMP - ?::interval)/, setting('jobs_stale_after')],
 | ||
|             %job_properties,
 | ||
|           })->as_query,
 | ||
|         }],
 | ||
|       ],
 | ||
|     }, { for => 'update' })
 | ||
|         ->update({ status => 'info', log => (sprintf 'duplicate of %s', $job->id) });
 | ||
| 
 | ||
|     debug sprintf 'getsome: cancelled %s duplicate(s) of job %s', ($gone || 0), $job->id;
 | ||
|     push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
 | ||
|   }
 | ||
| 
 | ||
|   return @returned;
 | ||
| }
 | ||
| 
 | ||
| sub jq_locked {
 | ||
|   my @returned = ();
 | ||
|   my $rs = schema(vars->{'tenant'})->resultset('Admin')->search({
 | ||
|     status  => ('queued-'. setting('workers')->{'BACKEND'}),
 | ||
|     started => \[q/> (LOCALTIMESTAMP - ?::interval)/, setting('jobs_stale_after')],
 | ||
|   });
 | ||
| 
 | ||
|   while (my $job = $rs->next) {
 | ||
|       push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
 | ||
|   }
 | ||
|   return @returned;
 | ||
| }
 | ||
| 
 | ||
| sub jq_queued {
 | ||
|   my $job_type = shift;
 | ||
| 
 | ||
|   return schema(vars->{'tenant'})->resultset('Admin')->search({
 | ||
|     device => { '!=' => undef},
 | ||
|     action => $job_type,
 | ||
|     status => { -like => 'queued%' },
 | ||
|   })->get_column('device')->all;
 | ||
| }
 | ||
| 
 | ||
| sub jq_lock {
 | ||
|   my $job = shift;
 | ||
|   my $happy = false;
 | ||
| 
 | ||
|   # lock db row and update to show job has been picked
 | ||
|   try {
 | ||
|     my $updated = schema(vars->{'tenant'})->resultset('Admin')
 | ||
|       ->search({ job => $job->id, status => 'queued' }, { for => 'update' })
 | ||
|       ->update({
 | ||
|           status  => ('queued-'. setting('workers')->{'BACKEND'}),
 | ||
|           started => \"LOCALTIMESTAMP",
 | ||
|       });
 | ||
| 
 | ||
|     $happy = true if $updated > 0;
 | ||
|   }
 | ||
|   catch {
 | ||
|     error $_;
 | ||
|   };
 | ||
| 
 | ||
|   return $happy;
 | ||
| }
 | ||
| 
 | ||
| sub jq_defer {
 | ||
|   my $job = shift;
 | ||
|   my $happy = false;
 | ||
| 
 | ||
|   # note this taints all actions on the device. for example if both
 | ||
|   # macsuck and arpnip are allowed, but macsuck fails 10 times, then
 | ||
|   # arpnip (and every other action) will be prevented on the device.
 | ||
| 
 | ||
|   # seeing as defer is only triggered by an SNMP connect failure, this
 | ||
|   # behaviour seems reasonable, to me (or desirable, perhaps).
 | ||
| 
 | ||
|   # the deferrable_actions setting exists as a workaround to this behaviour
 | ||
|   # should it be needed by any action (that is, per-device action but
 | ||
|   # do not increment deferrals count and simply try to run again).
 | ||
| 
 | ||
|   try {
 | ||
|     schema(vars->{'tenant'})->txn_do(sub {
 | ||
|       if ($job->device
 | ||
|           and not scalar grep { $job->action eq $_ }
 | ||
|                               @{ setting('deferrable_actions') || [] }) {
 | ||
| 
 | ||
|         schema(vars->{'tenant'})->resultset('DeviceSkip')->find_or_create({
 | ||
|           backend => setting('workers')->{'BACKEND'}, device => $job->device,
 | ||
|         },{ key => 'device_skip_pkey' })->increment_deferrals;
 | ||
|       }
 | ||
| 
 | ||
|       # lock db row and update to show job is available
 | ||
|       schema(vars->{'tenant'})->resultset('Admin')
 | ||
|         ->search({ job => $job->id }, { for => 'update' })
 | ||
|         ->update({ status => 'queued', started => undef, log => $job->log });
 | ||
|     });
 | ||
|     $happy = true;
 | ||
|   }
 | ||
|   catch {
 | ||
|     error $_;
 | ||
|   };
 | ||
| 
 | ||
|   return $happy;
 | ||
| }
 | ||
| 
 | ||
| sub jq_complete {
 | ||
|   my $job = shift;
 | ||
|   my $happy = false;
 | ||
| 
 | ||
|   # lock db row and update to show job is done/error
 | ||
| 
 | ||
|   # now that SNMP connect failures are deferrals and not errors, any complete
 | ||
|   # status, whether success or failure, indicates an SNMP connect. reset the
 | ||
|   # connection failures counter to forget about occasional connect glitches.
 | ||
| 
 | ||
|   try {
 | ||
|     schema(vars->{'tenant'})->txn_do(sub {
 | ||
|       if ($job->device and not $job->is_offline) {
 | ||
|         schema(vars->{'tenant'})->resultset('DeviceSkip')->find_or_create({
 | ||
|           backend => setting('workers')->{'BACKEND'}, device => $job->device,
 | ||
|         },{ key => 'device_skip_pkey' })->update({ deferrals => 0 });
 | ||
|       }
 | ||
| 
 | ||
|       schema(vars->{'tenant'})->resultset('Admin')
 | ||
|         ->search({ job => $job->id }, { for => 'update' })
 | ||
|         ->update({
 | ||
|           status => $job->status,
 | ||
|           log    => (ref($job->log) eq ref('')) ? $job->log : '',
 | ||
|           started  => $job->started,
 | ||
|           finished => $job->finished,
 | ||
|           (($job->action eq 'hook') ? (subaction => $job->subaction) : ()),
 | ||
|           ($job->only_namespace ? (action => ($job->action .'::'. $job->only_namespace)) : ()),
 | ||
|         });
 | ||
|     });
 | ||
|     $happy = true;
 | ||
|   }
 | ||
|   catch {
 | ||
|     # use DDP; p $job;
 | ||
|     error $_;
 | ||
|   };
 | ||
| 
 | ||
|   return $happy;
 | ||
| }
 | ||
| 
 | ||
| sub jq_log {
 | ||
|   return schema(vars->{'tenant'})->resultset('Admin')->search({
 | ||
|     'me.action' => { '-not_like' => 'hook::%' },
 | ||
|     -or => [
 | ||
|       { 'me.log' => undef },
 | ||
|       { 'me.log' => { '-not_like' => 'duplicate of %' } },
 | ||
|     ],
 | ||
|   }, {
 | ||
|     prefetch => 'target',
 | ||
|     order_by => { -desc => [qw/entered device action/] },
 | ||
|     rows     => (setting('jobs_qdepth') || 50),
 | ||
|   })->with_times->hri->all;
 | ||
| }
 | ||
| 
 | ||
| sub jq_userlog {
 | ||
|   my $user = shift;
 | ||
|   return schema(vars->{'tenant'})->resultset('Admin')->search({
 | ||
|     username => $user,
 | ||
|     log      => { '-not_like' => 'duplicate of %' },
 | ||
|     finished => { '>' => \"(CURRENT_TIMESTAMP - interval '5 seconds')" },
 | ||
|   })->with_times->all;
 | ||
| }
 | ||
| 
 | ||
| sub jq_insert {
 | ||
|   my $jobs = shift;
 | ||
|   $jobs = [$jobs] if ref [] ne ref $jobs;
 | ||
| 
 | ||
|   # bit of a hack for heroku hosting to avoid DB overload
 | ||
|   return true if setting('defanged_admin') ne 'admin';
 | ||
| 
 | ||
|   my $happy = false;
 | ||
|   try {
 | ||
|     schema(vars->{'tenant'})->txn_do(sub {
 | ||
|       if (scalar @$jobs == 1 and defined $jobs->[0]->{device} and
 | ||
|           scalar grep {$_ eq $jobs->[0]->{action}} @{ setting('_inline_actions') || [] }) {
 | ||
| 
 | ||
|           my $spec = $jobs->[0];
 | ||
|           my $row = undef;
 | ||
| 
 | ||
|           if ($spec->{port}) {
 | ||
|               $row = schema(vars->{'tenant'})->resultset('DevicePort')
 | ||
|                                              ->find($spec->{port}, $spec->{device});
 | ||
|               undef $row unless
 | ||
|                 scalar grep {('cf_'. $_) eq $spec->{action}}
 | ||
|                             grep {defined}
 | ||
|                             map {$_->{name}}
 | ||
|                             @{ setting('custom_fields')->{device_port} || [] };
 | ||
|           }
 | ||
|           else {
 | ||
|               $row = schema(vars->{'tenant'})->resultset('Device')
 | ||
|                                              ->find($spec->{device});
 | ||
|               undef $row unless
 | ||
|                 scalar grep {('cf_'. $_) eq $spec->{action}}
 | ||
|                             grep {defined}
 | ||
|                             map {$_->{name}}
 | ||
|                             @{ setting('custom_fields')->{device} || [] };
 | ||
|           }
 | ||
| 
 | ||
|           die 'failed to find row for custom field update' unless $row;
 | ||
| 
 | ||
|           my $coder = JSON::PP->new->utf8(0)->allow_nonref(1)->allow_unknown(1);
 | ||
|           $spec->{subaction} = $coder->encode( $spec->{extra} || $spec->{subaction} );
 | ||
|           $spec->{action} =~ s/^cf_//;
 | ||
|           $row->make_column_dirty('custom_fields');
 | ||
|           $row->update({
 | ||
|             custom_fields => \['jsonb_set(custom_fields, ?, ?)'
 | ||
|                               => (qq{{$spec->{action}}}, $spec->{subaction}) ]
 | ||
|             })->discard_changes();
 | ||
|       }
 | ||
|       else {
 | ||
|           schema(vars->{'tenant'})->resultset('Admin')->populate([
 | ||
|             map {{
 | ||
|                 device     => $_->{device},
 | ||
|                 device_key => $_->{device_key},
 | ||
|                 port       => $_->{port},
 | ||
|                 action     => $_->{action},
 | ||
|                 subaction  => ($_->{extra} || $_->{subaction}),
 | ||
|                 username   => $_->{username},
 | ||
|                 userip     => $_->{userip},
 | ||
|                 status     => 'queued',
 | ||
|             }} @$jobs
 | ||
|           ]);
 | ||
|       }
 | ||
|     });
 | ||
|     $happy = true;
 | ||
|   }
 | ||
|   catch {
 | ||
|     error $_;
 | ||
|   };
 | ||
| 
 | ||
|   return $happy;
 | ||
| }
 | ||
| 
 | ||
| sub jq_delete {
 | ||
|   my $id = shift;
 | ||
| 
 | ||
|   if ($id) {
 | ||
|       schema(vars->{'tenant'})->txn_do(sub {
 | ||
|         schema(vars->{'tenant'})->resultset('Admin')->search({ job => $id })->delete;
 | ||
|       });
 | ||
|   }
 | ||
|   else {
 | ||
|       schema(vars->{'tenant'})->txn_do(sub {
 | ||
|         schema(vars->{'tenant'})->resultset('Admin')->delete();
 | ||
|         schema(vars->{'tenant'})->resultset('DeviceSkip')->delete();
 | ||
|       });
 | ||
|   }
 | ||
| }
 | ||
| 
 | ||
| true;
 |