diff --git a/Netdisco/bin/netdisco-daemon-fg b/Netdisco/bin/netdisco-daemon-fg index 5cfcb5eb..a3a8f618 100755 --- a/Netdisco/bin/netdisco-daemon-fg +++ b/Netdisco/bin/netdisco-daemon-fg @@ -41,24 +41,23 @@ setpgrp(0,0); # only portable variety of setpgrp prctl 'netdisco-daemon: master'; # shared local job queue -my $Q = MCE::Queue->new; +my $queue = MCE::Queue->new; mce_flow { task_name => [qw/ scheduler manager poller /], - max_workers => [ 1, 1, 'AUTO * 2' ], # FIXME allow setting override + max_workers => [ 1, 1, setting('workers')->{pollers} ], tmp_dir => $tmp_dir, - on_post_exit => sub { MCE->restart_worker() }, + on_post_exit => sub { MCE->restart_worker }, }, _mk_wkr('Scheduler'), _mk_wkr('Manager'), _mk_wkr('Poller'); sub _mk_wkr { my $role = shift; return sub { my $self = shift; - my $wid = $self->wid; - $self->{Q} = $Q; # FIXME make it a method + $self->{queue} = $queue; - prctl sprintf 'netdisco-daemon: worker #%s %s: init', $wid, lc($role); - info "applying role $role to worker $wid"; + prctl sprintf 'netdisco-daemon: worker #%s %s: init', MCE->wid, lc($role); + info sprintf 'applying role %s to worker %s', $role, MCE->wid; # post-fork, become manager, scheduler, poller, etc Role::Tiny->apply_roles_to_object( @@ -69,81 +68,6 @@ sub _mk_wkr { }; } -#use List::Util 'sum'; -#sub num_workers { -# return sum( 0, map { setting('workers')->{$_} } -# values %{setting('job_type_keys')} ); -#} - -#sub restart_this_worker { -# my ($self, $e) = @_; -# reset_jobs($e->{wid}); -# -# debug "restarting worker ". MCE->wid(); -# MCE->restart_worker(); -# $self->restart_worker($e->{wid}); -#} - -#sub build_tasks_list { -# # NB MCE does not like max_workers => 0 -# my $tasks = []; -# -# push @$tasks, { -# max_workers => 1, -# user_begin => worker_factory('Manager'), -# } if num_workers() > 0; -# -# push @$tasks, { -# max_workers => 1, -# user_begin => worker_factory('Scheduler'), -# } if setting('schedule'); -# -# my @logmsg = (); -# foreach my $key (keys %{setting('job_type_keys')}) { -# my $val = setting('job_type_keys')->{$key}; -# -# setting('workers')->{$val} = 2 -# if !defined setting('workers')->{$val}; -# -# push @logmsg, setting('workers')->{$val} ." $key"; -# push @$tasks, { -# max_workers => setting('workers')->{$val}, -# user_begin => worker_factory($key), -# } if setting('workers')->{$val}; -# } -# -# info sprintf "MCE will load: %s Manager, %s Scheduler, %s", -# (num_workers() ? 1 : 0), -# (setting('schedule') ? 1 : 0), -# (join ', ', @logmsg); -# -# return $tasks; -#} - -#sub worker_factory { -# my $role = shift; -# return sub { -# my $self = shift; -# my $wid = $self->wid; -# prctl sprintf 'netdisco-daemon: worker #%s %s: init', $wid, lc($role); -# info "applying role $role to worker $wid"; -# -# # $self->sendto('stderr', ">>> worker $wid starting with role $role\n"); -# Role::Tiny->apply_roles_to_object($self, "App::Netdisco::Daemon::Worker::$role"); -# -# $self->worker_begin if $self->can('worker_begin'); -# }; -#} - -#my $mce = MCE->new( -# spawn_delay => 0.15, -# job_delay => 1.15, -# tmp_dir => $tmp_dir, -# user_func => sub { $_[0]->worker_body }, -# on_post_exit => \&restart_this_worker, -# user_tasks => build_tasks_list(), -#)->run(); - =head1 NAME netdisco-daemon-fg - Job Control for Netdisco diff --git a/Netdisco/lib/App/Netdisco/Configuration.pm b/Netdisco/lib/App/Netdisco/Configuration.pm index 504c6aca..8e22995e 100644 --- a/Netdisco/lib/App/Netdisco/Configuration.pm +++ b/Netdisco/lib/App/Netdisco/Configuration.pm @@ -48,8 +48,16 @@ setting('plugins')->{DBIC}->{daemon} = { # defaults for workers setting('workers')->{queue} ||= 'PostgreSQL'; -setting('workers')->{interactives} = 1 - if setting('workers') and not exists setting('workers')->{interactives}; +if (exists setting('workers')->{interactives} + or exists setting('workers')->{pollers}) { + + setting('workers')->{pollers} = ( + (setting('workers')->{pollers} || 0) + .' + '. (setting('workers')->{interactives} || 0) + ) if setting('workers')->{interactives}; + + delete setting('workers')->{interactives}; +} # force skipped DNS resolution, if unset setting('dns')->{hosts_file} ||= '/etc/hosts'; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm index edc0bda5..c0394b70 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm @@ -20,7 +20,7 @@ sub worker_body { while (1) { prctl sprintf 'netdisco-daemon: worker #%s %s: idle', $wid, lc($type); - my $jobs = [ $self->{Q}->dequeue(1) ]; # FIXME multiple take, take type, thaw + my $jobs = [ $self->{queue}->dequeue(1) ]; # FIXME multiple take, take type, thaw foreach my $job (@$jobs) { next unless defined $job; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm index ea1a9d04..830950a0 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm @@ -27,7 +27,7 @@ sub worker_begin { if (scalar @jobs) { info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs; - $self->{Q}->enqueue(@jobs); # FIXME priority and freeze + $self->{queue}->enqueue(@jobs); # FIXME priority and freeze } } @@ -39,7 +39,7 @@ sub worker_body { if setting('workers')->{'no_manager'}; # FIXME really the best strategy? - my $num_slots = (MCE::Util::get_ncpu() * 2) - $self->{Q}->pending(); + my $num_slots = (MCE::Util::get_ncpu() * 2) - $self->{queue}->pending(); while (1) { debug "mgr ($wid): getting potential jobs for $num_slots workers"; @@ -55,7 +55,7 @@ sub worker_body { $wid, $job->id; # copy job to local queue - $self->{Q}->enqueue($job); # FIXME priority and freeze + $self->{queue}->enqueue($job); # FIXME priority and freeze } debug "mgr ($wid): sleeping now..."; diff --git a/Netdisco/share/config.yml b/Netdisco/share/config.yml index 397a344f..4c70e44f 100644 --- a/Netdisco/share/config.yml +++ b/Netdisco/share/config.yml @@ -170,8 +170,7 @@ port_control_reasons: # -------------- workers: - interactives: 1 - pollers: 10 + pollers: 'AUTO * 2' sleep_time: 1 queue: PostgreSQL