fix poller count and refactor for clarity
This commit is contained in:
		| @@ -9,19 +9,10 @@ use Net::Domain 'hostfqdn'; | ||||
| use Role::Tiny; | ||||
| use Try::Tiny; | ||||
|  | ||||
| my $pp = Parallel::Prefork->new( | ||||
|   max_workers => (setting('daemon_workers') || 5), | ||||
|   spawn_interval => 1, | ||||
|   before_fork => \&set_next_worker_role, | ||||
|   after_fork  => \®ister_worker, | ||||
|   on_child_reap => \&unregister_worker, | ||||
|   trap_signals => { | ||||
|     TERM => 'TERM', | ||||
|     INT  => 'TERM', | ||||
|   }, | ||||
| ); | ||||
| # Parallel::Prefork instance | ||||
| my $pp = undef; | ||||
|  | ||||
| # tracks worker pids and their roles | ||||
| # track worker pids and their roles | ||||
| my %workers = (); | ||||
| my $next_role = undef; | ||||
|  | ||||
| @@ -34,25 +25,15 @@ newdaemon( | ||||
|  | ||||
| sub gd_preconfig { | ||||
|   my $gd = shift; | ||||
|   my $daemon = schema('daemon'); | ||||
|  | ||||
|   # deploy the daemon's local DB schema | ||||
|   try { | ||||
|       $daemon->storage->dbh_do(sub { | ||||
|         my ($storage, $dbh) = @_; | ||||
|         $dbh->selectrow_arrayref("SELECT * FROM admin WHERE 0 = 1"); | ||||
|       }); | ||||
|   } | ||||
|   catch { $daemon->deploy }; | ||||
|  | ||||
|   $daemon->storage->disconnect; | ||||
|   if ($daemon->get_db_version < $daemon->schema_version) { | ||||
|       $daemon->upgrade; | ||||
|   } | ||||
|   _deploy_daemon_db(); | ||||
|  | ||||
|   # used for locking jobs in central Pg queue | ||||
|   $gd->{nd_host} = hostfqdn; | ||||
|  | ||||
|   _bootstrap_prefork(); | ||||
|  | ||||
|   # do not remove this line - required by Daemon::Generic | ||||
|   return (); | ||||
| } | ||||
| @@ -64,20 +45,16 @@ sub gd_run_body { | ||||
|   $gd->handle_term | ||||
|     if $pp->signal_received =~ m/^(?:TERM|INT)$/; | ||||
|  | ||||
|   $gd->handle_hup | ||||
|     if $pp->signal_received eq 'HUP'; | ||||
|  | ||||
|   if ($pp->num_workers < $pp->max_workers) { | ||||
|       $pp->start and return; | ||||
|       with "Netdisco::Daemon::Worker::$next_role"; | ||||
|       print STDERR ">>> new $next_role worker started.\n"; | ||||
|       $gd->worker_body; | ||||
|       $pp->finish; | ||||
|   } | ||||
|   $pp->start(sub { | ||||
|     with "Netdisco::Daemon::Worker::$next_role"; | ||||
|     print STDERR ">>> new $next_role worker started.\n"; | ||||
|     $gd->worker_body; | ||||
|   }); | ||||
|  | ||||
|   # I don't think Parallel::Prefork ever returns from start() | ||||
|   # until a child exits. Not sure this is ever reached. | ||||
|   $gd->gd_sleep( setting('daemon_sleep_time') || 5 ); | ||||
|   $gd->gd_sleep( setting('daemon_sleep_time') || 5 ) | ||||
|     if not $pp->signal_received; | ||||
| } | ||||
|  | ||||
| sub register_worker { | ||||
| @@ -112,8 +89,8 @@ sub _find_next_worker_role { | ||||
|  | ||||
|   return 'Manager' if $manager < 1; | ||||
|  | ||||
|   my $need_poller  = $poller < (setting('daemon_pollers') || 2); | ||||
|   my $need_inter   = $inter < (setting('daemon_interactives') || 2); | ||||
|   my $need_poller = $poller < setting('daemon_pollers'); | ||||
|   my $need_inter  = $inter < setting('daemon_interactives'); | ||||
|  | ||||
|   if ($need_poller and $need_inter) { | ||||
|       return (int(rand(2)) ? 'Interactive' : 'Poller'); | ||||
| @@ -135,6 +112,42 @@ END { | ||||
|   $pp->wait_all_children; | ||||
| } | ||||
|  | ||||
| sub _deploy_daemon_db { | ||||
|   my $daemon = schema('daemon'); | ||||
|  | ||||
|   try { | ||||
|       $daemon->storage->dbh_do(sub { | ||||
|         my ($storage, $dbh) = @_; | ||||
|         $dbh->selectrow_arrayref("SELECT * FROM admin WHERE 0 = 1"); | ||||
|       }); | ||||
|   } | ||||
|   catch { $daemon->deploy }; | ||||
|  | ||||
|   $daemon->storage->disconnect; | ||||
|   if ($daemon->get_db_version < $daemon->schema_version) { | ||||
|       $daemon->upgrade; | ||||
|   } | ||||
| } | ||||
|  | ||||
| sub _bootstrap_prefork { | ||||
|   # set defaults | ||||
|   set(daemon_pollers => 2) if !defined setting('daemon_pollers'); | ||||
|   set(daemon_interactives => 2) if !defined setting('daemon_interactives'); | ||||
|  | ||||
|   # need to do this after setting defaults | ||||
|   $pp = Parallel::Prefork->new( | ||||
|     max_workers => (1 + setting('daemon_pollers') + setting('daemon_interactives')), | ||||
|     spawn_interval => 2, | ||||
|     before_fork => \&set_next_worker_role, | ||||
|     after_fork  => \®ister_worker, | ||||
|     on_child_reap => \&unregister_worker, | ||||
|     trap_signals => { | ||||
|       TERM => 'TERM', | ||||
|       INT  => 'TERM', | ||||
|     }, | ||||
|   ); | ||||
| } | ||||
|  | ||||
| # nullify this to permit Parallel::Prefork to register handlers instead | ||||
| sub gd_setup_signals {} | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user