daemon clean
This commit is contained in:
		| @@ -41,24 +41,23 @@ setpgrp(0,0); # only portable variety of setpgrp | ||||
| prctl 'netdisco-daemon: master'; | ||||
|  | ||||
| # shared local job queue | ||||
| my $Q = MCE::Queue->new; | ||||
| my $queue = MCE::Queue->new; | ||||
|  | ||||
| mce_flow { | ||||
|   task_name => [qw/ scheduler manager poller /], | ||||
|   max_workers => [ 1, 1, 'AUTO * 2' ], # FIXME allow setting override | ||||
|   max_workers => [ 1, 1, setting('workers')->{pollers} ], | ||||
|   tmp_dir => $tmp_dir, | ||||
|   on_post_exit => sub { MCE->restart_worker() }, | ||||
|   on_post_exit => sub { MCE->restart_worker }, | ||||
| }, _mk_wkr('Scheduler'), _mk_wkr('Manager'), _mk_wkr('Poller'); | ||||
|  | ||||
| sub _mk_wkr { | ||||
|   my $role = shift; | ||||
|   return sub { | ||||
|     my $self = shift; | ||||
|     my $wid = $self->wid; | ||||
|     $self->{Q} = $Q; # FIXME make it a method | ||||
|     $self->{queue} = $queue; | ||||
|  | ||||
|     prctl sprintf 'netdisco-daemon: worker #%s %s: init', $wid, lc($role); | ||||
|     info "applying role $role to worker $wid"; | ||||
|     prctl sprintf 'netdisco-daemon: worker #%s %s: init', MCE->wid, lc($role); | ||||
|     info sprintf 'applying role %s to worker %s', $role, MCE->wid; | ||||
|  | ||||
|     # post-fork, become manager, scheduler, poller, etc | ||||
|     Role::Tiny->apply_roles_to_object( | ||||
| @@ -69,81 +68,6 @@ sub _mk_wkr { | ||||
|   }; | ||||
| } | ||||
|  | ||||
| #use List::Util 'sum'; | ||||
| #sub num_workers { | ||||
| #  return sum( 0, map { setting('workers')->{$_} } | ||||
| #                     values %{setting('job_type_keys')} ); | ||||
| #} | ||||
|  | ||||
| #sub restart_this_worker { | ||||
| #  my ($self, $e) = @_; | ||||
| #  reset_jobs($e->{wid}); | ||||
| # | ||||
| #  debug "restarting worker ". MCE->wid(); | ||||
| #  MCE->restart_worker(); | ||||
| #  $self->restart_worker($e->{wid}); | ||||
| #} | ||||
|  | ||||
| #sub build_tasks_list { | ||||
| #  # NB MCE does not like max_workers => 0 | ||||
| #  my $tasks = []; | ||||
| # | ||||
| #  push @$tasks, { | ||||
| #    max_workers => 1, | ||||
| #    user_begin => worker_factory('Manager'), | ||||
| #  } if num_workers() > 0; | ||||
| # | ||||
| #  push @$tasks, { | ||||
| #    max_workers => 1, | ||||
| #    user_begin => worker_factory('Scheduler'), | ||||
| #  } if setting('schedule'); | ||||
| # | ||||
| #  my @logmsg = (); | ||||
| #  foreach my $key (keys %{setting('job_type_keys')}) { | ||||
| #      my $val = setting('job_type_keys')->{$key}; | ||||
| # | ||||
| #      setting('workers')->{$val} = 2 | ||||
| #        if !defined setting('workers')->{$val}; | ||||
| # | ||||
| #      push @logmsg, setting('workers')->{$val} ." $key"; | ||||
| #      push @$tasks, { | ||||
| #        max_workers => setting('workers')->{$val}, | ||||
| #        user_begin => worker_factory($key), | ||||
| #      } if setting('workers')->{$val}; | ||||
| #  } | ||||
| # | ||||
| #  info sprintf "MCE will load: %s Manager, %s Scheduler, %s", | ||||
| #    (num_workers() ? 1 : 0), | ||||
| #    (setting('schedule') ? 1 : 0), | ||||
| #    (join ', ', @logmsg); | ||||
| # | ||||
| #  return $tasks; | ||||
| #} | ||||
|  | ||||
| #sub worker_factory { | ||||
| #  my $role = shift; | ||||
| #  return sub { | ||||
| #    my $self = shift; | ||||
| #    my $wid = $self->wid; | ||||
| #    prctl sprintf 'netdisco-daemon: worker #%s %s: init', $wid, lc($role); | ||||
| #    info "applying role $role to worker $wid"; | ||||
| # | ||||
| #    # $self->sendto('stderr', ">>> worker $wid starting with role $role\n"); | ||||
| #    Role::Tiny->apply_roles_to_object($self, "App::Netdisco::Daemon::Worker::$role"); | ||||
| # | ||||
| #    $self->worker_begin if $self->can('worker_begin'); | ||||
| #  }; | ||||
| #} | ||||
|  | ||||
| #my $mce = MCE->new( | ||||
| #  spawn_delay => 0.15, | ||||
| #  job_delay   => 1.15, | ||||
| #  tmp_dir     => $tmp_dir, | ||||
| #  user_func    => sub { $_[0]->worker_body }, | ||||
| #  on_post_exit => \&restart_this_worker, | ||||
| #  user_tasks   => build_tasks_list(), | ||||
| #)->run(); | ||||
|  | ||||
| =head1 NAME | ||||
|  | ||||
| netdisco-daemon-fg - Job Control for Netdisco | ||||
|   | ||||
| @@ -48,8 +48,16 @@ setting('plugins')->{DBIC}->{daemon} = { | ||||
|  | ||||
| # defaults for workers | ||||
| setting('workers')->{queue} ||= 'PostgreSQL'; | ||||
| setting('workers')->{interactives} = 1 | ||||
|   if setting('workers') and not exists setting('workers')->{interactives}; | ||||
| if (exists setting('workers')->{interactives} | ||||
|     or exists setting('workers')->{pollers}) { | ||||
|  | ||||
|     setting('workers')->{pollers} = ( | ||||
|       (setting('workers')->{pollers} || 0) | ||||
|       .' + '. (setting('workers')->{interactives} || 0) | ||||
|     ) if setting('workers')->{interactives}; | ||||
|  | ||||
|     delete setting('workers')->{interactives}; | ||||
| } | ||||
|  | ||||
| # force skipped DNS resolution, if unset | ||||
| setting('dns')->{hosts_file} ||= '/etc/hosts'; | ||||
|   | ||||
| @@ -20,7 +20,7 @@ sub worker_body { | ||||
|  | ||||
|   while (1) { | ||||
|       prctl sprintf 'netdisco-daemon: worker #%s %s: idle', $wid, lc($type); | ||||
|       my $jobs = [ $self->{Q}->dequeue(1) ]; # FIXME multiple take, take type, thaw | ||||
|       my $jobs = [ $self->{queue}->dequeue(1) ]; # FIXME multiple take, take type, thaw | ||||
|  | ||||
|       foreach my $job (@$jobs) { | ||||
|           next unless defined $job; | ||||
|   | ||||
| @@ -27,7 +27,7 @@ sub worker_begin { | ||||
|   if (scalar @jobs) { | ||||
|       info sprintf "mgr (%s): found %s jobs booked to this processing node", | ||||
|         $wid, scalar @jobs; | ||||
|       $self->{Q}->enqueue(@jobs); # FIXME priority and freeze | ||||
|       $self->{queue}->enqueue(@jobs); # FIXME priority and freeze | ||||
|   } | ||||
| } | ||||
|  | ||||
| @@ -39,7 +39,7 @@ sub worker_body { | ||||
|     if setting('workers')->{'no_manager'}; | ||||
|  | ||||
|   # FIXME really the best strategy? | ||||
|   my $num_slots = (MCE::Util::get_ncpu() * 2) - $self->{Q}->pending(); | ||||
|   my $num_slots = (MCE::Util::get_ncpu() * 2) - $self->{queue}->pending(); | ||||
|  | ||||
|   while (1) { | ||||
|       debug "mgr ($wid): getting potential jobs for $num_slots workers"; | ||||
| @@ -55,7 +55,7 @@ sub worker_body { | ||||
|             $wid, $job->id; | ||||
|  | ||||
|           # copy job to local queue | ||||
|           $self->{Q}->enqueue($job); # FIXME priority and freeze | ||||
|           $self->{queue}->enqueue($job); # FIXME priority and freeze | ||||
|       } | ||||
|  | ||||
|       debug "mgr ($wid): sleeping now..."; | ||||
|   | ||||
| @@ -170,8 +170,7 @@ port_control_reasons: | ||||
| # -------------- | ||||
|  | ||||
| workers: | ||||
|   interactives: 1 | ||||
|   pollers: 10 | ||||
|   pollers: 'AUTO * 2' | ||||
|   sleep_time: 1 | ||||
|   queue: PostgreSQL | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user