91 lines
		
	
	
		
			2.6 KiB
		
	
	
	
		
			Perl
		
	
	
	
	
	
			
		
		
	
	
			91 lines
		
	
	
		
			2.6 KiB
		
	
	
	
		
			Perl
		
	
	
	
	
	
package App::Netdisco::Backend::Worker::Manager;
 | 
						|
 | 
						|
use Dancer qw/:moose :syntax :script/;
 | 
						|
 | 
						|
use List::Util 'sum';
 | 
						|
use App::Netdisco::Util::Backend;
 | 
						|
 | 
						|
use Role::Tiny;
 | 
						|
use namespace::clean;
 | 
						|
 | 
						|
use App::Netdisco::JobQueue
 | 
						|
  qw/jq_locked jq_getsome jq_getsomep jq_lock jq_warm_thrusters/;
 | 
						|
 | 
						|
sub worker_begin {
 | 
						|
  my $self = shift;
 | 
						|
  my $wid = $self->wid;
 | 
						|
 | 
						|
  return debug "mgr ($wid): no need for manager... skip begin"
 | 
						|
    if setting('workers')->{'no_manager'};
 | 
						|
 | 
						|
  debug "entering Manager ($wid) worker_begin()";
 | 
						|
 | 
						|
  # job queue initialisation
 | 
						|
  jq_warm_thrusters;
 | 
						|
 | 
						|
  # requeue jobs locally
 | 
						|
  debug "mgr ($wid): searching for jobs booked to this processing node";
 | 
						|
  my @jobs = jq_locked;
 | 
						|
 | 
						|
  if (scalar @jobs) {
 | 
						|
      info sprintf "mgr (%s): found %s jobs booked to this processing node",
 | 
						|
        $wid, scalar @jobs;
 | 
						|
      $self->{queue}->enqueuep(100, @jobs);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
sub worker_body {
 | 
						|
  my $self = shift;
 | 
						|
  my $wid = $self->wid;
 | 
						|
 | 
						|
  if (setting('workers')->{'no_manager'}) {
 | 
						|
      prctl sprintf 'netdisco-backend: worker #%s manager: inactive', $wid;
 | 
						|
      return debug "mgr ($wid): no need for manager... quitting"
 | 
						|
  }
 | 
						|
 | 
						|
  while (1) {
 | 
						|
      prctl sprintf 'netdisco-backend: worker #%s manager: gathering', $wid;
 | 
						|
      my $num_slots = 0;
 | 
						|
 | 
						|
      $num_slots = parse_max_workers( setting('workers')->{tasks} )
 | 
						|
                     - $self->{queue}->pending();
 | 
						|
      debug "mgr ($wid): getting potential jobs for $num_slots workers (HP)";
 | 
						|
 | 
						|
      # get some high priority jobs
 | 
						|
      # TODO also check for stale jobs in Netdisco DB
 | 
						|
      foreach my $job ( jq_getsomep($num_slots) ) {
 | 
						|
 | 
						|
          # mark job as running
 | 
						|
          next unless jq_lock($job);
 | 
						|
          info sprintf "mgr (%s): job %s booked out for this processing node",
 | 
						|
            $wid, $job->job;
 | 
						|
 | 
						|
          # copy job to local queue
 | 
						|
          $self->{queue}->enqueuep(100, $job);
 | 
						|
      }
 | 
						|
 | 
						|
      $num_slots = parse_max_workers( setting('workers')->{tasks} )
 | 
						|
                     - $self->{queue}->pending();
 | 
						|
      debug "mgr ($wid): getting potential jobs for $num_slots workers (NP)";
 | 
						|
 | 
						|
      # get some normal priority jobs
 | 
						|
      # TODO also check for stale jobs in Netdisco DB
 | 
						|
      foreach my $job ( jq_getsome($num_slots) ) {
 | 
						|
 | 
						|
          # mark job as running
 | 
						|
          next unless jq_lock($job);
 | 
						|
          info sprintf "mgr (%s): job %s booked out for this processing node",
 | 
						|
            $wid, $job->job;
 | 
						|
 | 
						|
          # copy job to local queue
 | 
						|
          $self->{queue}->enqueue($job);
 | 
						|
      }
 | 
						|
 | 
						|
      debug "mgr ($wid): sleeping now...";
 | 
						|
      prctl sprintf 'netdisco-backend: worker #%s manager: idle', $wid;
 | 
						|
      sleep( setting('workers')->{sleep_time} || 1 );
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
1;
 |