add log messages to the Daemon
This commit is contained in:
		| @@ -4,6 +4,7 @@ | ||||
|  | ||||
|   * random() and LIMIT the number of daemon jobs requested from Netdisco queue | ||||
|   * Remove Daemon's job queue DBIC schema from user config | ||||
|   * Add log messages to the Daemon | ||||
|  | ||||
| 2.005000_002 - 2013-02-10 | ||||
|  | ||||
|   | ||||
| @@ -21,6 +21,8 @@ mkdir $tmp_dir if ! -d $tmp_dir; | ||||
| use MCE; | ||||
| use Role::Tiny::With; | ||||
|  | ||||
| info "App::Netdisco version $App::Netdisco::VERSION daemon loaded."; | ||||
|  | ||||
| my $mce = MCE->new( | ||||
|   spawn_delay => 0.15, | ||||
|   job_delay   => 0.15, | ||||
| @@ -53,6 +55,8 @@ sub build_tasks_list { | ||||
|     user_begin => worker_factory('Interactive'), | ||||
|   } if setting('daemon_interactives'); | ||||
|  | ||||
|   info sprintf "MCE will load %s tasks: 1 Manager, %s Poller, %s Interactive", | ||||
|     (1+ scalar @$tasks), (setting('daemon_pollers') || 0), (setting('daemon_interactives') || 0); | ||||
|   return $tasks; | ||||
| } | ||||
|  | ||||
| @@ -64,8 +68,9 @@ sub worker_factory { | ||||
|   my $role = shift; | ||||
|   return sub { | ||||
|     my $self = shift; | ||||
|     my $wid = $self->wid; | ||||
|     info "applying role $role to worker $wid"; | ||||
|  | ||||
|     # my $wid = $self->wid; | ||||
|     # $self->sendto('stderr', ">>> worker $wid starting with role $role\n"); | ||||
|     Role::Tiny->apply_roles_to_object($self, "App::Netdisco::Daemon::Worker::$role"); | ||||
|  | ||||
| @@ -76,5 +81,7 @@ sub worker_factory { | ||||
| sub restart_worker { | ||||
|   my ($self, $e) = @_; | ||||
|   reset_jobs($e->{wid}); | ||||
|  | ||||
|   debug "restarting worker $e->{wid}"; | ||||
|   $self->restart_worker($e->{wid}); | ||||
| } | ||||
|   | ||||
| @@ -24,11 +24,13 @@ my $queue = schema('daemon')->resultset('Admin'); | ||||
|  | ||||
| sub add_jobs { | ||||
|   my ($jobs) = @_; | ||||
|   info sprintf "adding %s jobs to local queue", scalar @$jobs; | ||||
|   $queue->populate($jobs); | ||||
| } | ||||
|  | ||||
| sub capacity_for { | ||||
|   my ($action) = @_; | ||||
|   debug "checking local capacity for action $action"; | ||||
|  | ||||
|   my $action_map = { | ||||
|     Interactive => [qw/location contact portcontrol portname vlan power/] | ||||
| @@ -56,8 +58,10 @@ sub take_jobs { | ||||
|   $max ||= 1; | ||||
|  | ||||
|   # asking for more jobs means the current ones are done | ||||
|   debug "removing complete jobs for worker $wid from local queue"; | ||||
|   $queue->search({wid => $wid})->delete; | ||||
|  | ||||
|   debug "searching for $max new jobs for worker $wid (role $role)"; | ||||
|   my $rs = $queue->search( | ||||
|     {role => $role, wid => 0}, | ||||
|     {rows => $max}, | ||||
| @@ -66,6 +70,7 @@ sub take_jobs { | ||||
|   return [] if $rs->count == 0; | ||||
|  | ||||
|   my @rows = $rs->all; | ||||
|   debug sprintf "booking out %s jobs to worker %s", scalar @rows, $wid; | ||||
|   $rs->update({wid => $wid}); | ||||
|  | ||||
|   return [ map {{$_->get_columns}} @rows ]; | ||||
| @@ -73,6 +78,7 @@ sub take_jobs { | ||||
|  | ||||
| sub reset_jobs { | ||||
|   my ($wid) = @_; | ||||
|   debug "resetting jobs owned by worked $wid to be available"; | ||||
|   return unless $wid > 1; | ||||
|   $queue->search({wid => $wid}) | ||||
|         ->update({wid => 0}); | ||||
|   | ||||
| @@ -13,8 +13,10 @@ with 'App::Netdisco::Daemon::Worker::Interactive::DeviceActions', | ||||
|  | ||||
| sub worker_body { | ||||
|   my $self = shift; | ||||
|   my $wid = $self->wid; | ||||
|  | ||||
|   while (1) { | ||||
|       debug "int ($wid): asking for a job"; | ||||
|       my $jobs = $self->do('take_jobs', $self->wid, 'Interactive'); | ||||
|  | ||||
|       foreach my $candidate (@$jobs) { | ||||
| @@ -23,14 +25,17 @@ sub worker_body { | ||||
|           # (will throw an exception) | ||||
|           my $job = schema('daemon')->resultset('Admin') | ||||
|                       ->new_result($candidate); | ||||
|           my $jid = $job->id; | ||||
|  | ||||
|           my $target = 'set_'. $job->action; | ||||
|           next unless $self->can($target); | ||||
|           info "int ($wid): can ${target}() for job $jid"; | ||||
|  | ||||
|           # do job | ||||
|           my ($status, $log); | ||||
|           try { | ||||
|               $job->started(scalar localtime); | ||||
|               debug sprintf "int (%s): starting job %s at %s", $wid, $jid, $job->started; | ||||
|               ($status, $log) = $self->$target($job); | ||||
|           } | ||||
|           catch { | ||||
| @@ -39,9 +44,11 @@ sub worker_body { | ||||
|               $self->sendto('stderr', $log ."\n"); | ||||
|           }; | ||||
|  | ||||
|           info "int ($wid): wrapping up job $jid - status $status"; | ||||
|           $self->close_job($job, $status, $log); | ||||
|       } | ||||
|  | ||||
|       debug "int ($wid): sleeping now..."; | ||||
|       sleep( setting('daemon_sleep_time') || 5 ); | ||||
|   } | ||||
| } | ||||
|   | ||||
| @@ -19,19 +19,27 @@ my $role_map = { | ||||
|  | ||||
| sub worker_begin { | ||||
|   my $self = shift; | ||||
|   my $wid = $self->wid; | ||||
|   debug "entering Manager ($wid) worker_begin()"; | ||||
|  | ||||
|   # requeue jobs locally | ||||
|   debug "mgr ($wid): searching for jobs booked to this processing node"; | ||||
|   my $rs = schema('netdisco')->resultset('Admin') | ||||
|     ->search({status => "queued-$fqdn"}); | ||||
|  | ||||
|   my @jobs = map {{$_->get_columns}} $rs->all; | ||||
|   map { $_->{role} = $role_map->{$_->{action}} } @jobs; | ||||
|  | ||||
|   $self->do('add_jobs', \@jobs); | ||||
|   if (scalar @jobs) { | ||||
|       info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs; | ||||
|       map { $_->{role} = $role_map->{$_->{action}} } @jobs; | ||||
|  | ||||
|       $self->do('add_jobs', \@jobs); | ||||
|   } | ||||
| } | ||||
|  | ||||
| sub worker_body { | ||||
|   my $self = shift; | ||||
|   my $wid = $self->wid; | ||||
|   my $num_slots = $self->do('num_workers'); | ||||
|  | ||||
|   # get some pending jobs | ||||
| @@ -42,15 +50,23 @@ sub worker_body { | ||||
|     ); | ||||
|  | ||||
|   while (1) { | ||||
|       debug "mgr ($wid): getting potential jobs for $num_slots workers"; | ||||
|       while (my $job = $rs->next) { | ||||
|           my $jid = $job->job; | ||||
|  | ||||
|           # filter for discover_* | ||||
|           next unless is_discoverable($job->device); | ||||
|           info sprintf "mgr (%s): job %s is discoverable", $wid, $jid; | ||||
|  | ||||
|           # check for available local capacity | ||||
|           next unless $self->do('capacity_for', $job->action); | ||||
|           info sprintf "mgr (%s): processing node has capacity for job %s (%s)", | ||||
|             $wid, $jid, $job->action; | ||||
|  | ||||
|           # mark job as running | ||||
|           next unless $self->lock_job($job); | ||||
|           info sprintf "mgr (%s): job %s booked out for this processing node", | ||||
|             $wid, $jid; | ||||
|  | ||||
|           my $local_job = { $job->get_columns }; | ||||
|           $local_job->{role} = $role_map->{$job->action}; | ||||
| @@ -64,6 +80,7 @@ sub worker_body { | ||||
|  | ||||
|       # TODO also check for stale jobs in Netdisco DB | ||||
|  | ||||
|       debug "mgr ($wid): sleeping now..."; | ||||
|       sleep( setting('daemon_sleep_time') || 5 ); | ||||
|   } | ||||
| } | ||||
|   | ||||
| @@ -16,7 +16,10 @@ layout: "main" | ||||
| charset: "UTF-8" | ||||
|  | ||||
| # web sessions stored in memory | ||||
| session: Simple | ||||
| session: "YAML" | ||||
|  | ||||
| # logging format | ||||
| logger_format: '[%P] %L @%D> %m' | ||||
|  | ||||
| # web output template settings | ||||
| template: "template_toolkit" | ||||
|   | ||||
		Reference in New Issue
	
	Block a user