diff --git a/Netdisco/bin/netdisco-do b/Netdisco/bin/netdisco-do index d8981d3a..4b2e0d53 100755 --- a/Netdisco/bin/netdisco-do +++ b/Netdisco/bin/netdisco-do @@ -35,8 +35,8 @@ BEGIN { # for netdisco app config use App::Netdisco; +use App::Netdisco::Daemon::Job; use Dancer qw/:moose :script/; -use Dancer::Plugin::DBIC 'schema'; info "App::Netdisco version $App::Netdisco::VERSION loaded."; @@ -73,9 +73,6 @@ $ENV{DBIC_TRACE} ||= $sqltrace; # reconfigure logging to force console output Dancer::Logger->init('console', $CONFIG); -# for the in-memory local job queue -schema('daemon')->deploy; - # get requested action my $action = shift @ARGV; @@ -143,7 +140,7 @@ if (not $worker->can( $action )) { } # what job are we asked to do? -my $job = schema('daemon')->resultset('Admin')->new_result({ +my $job = App::Netdisco::Daemon::Job->new({ job => 0, action => $action, device => $device, diff --git a/Netdisco/lib/App/Netdisco/Configuration.pm b/Netdisco/lib/App/Netdisco/Configuration.pm index 264b79b7..ae4f41ce 100644 --- a/Netdisco/lib/App/Netdisco/Configuration.pm +++ b/Netdisco/lib/App/Netdisco/Configuration.pm @@ -35,17 +35,6 @@ if (ref {} eq ref setting('database')) { } -# static configuration for the in-memory local job queue -setting('plugins')->{DBIC}->{daemon} = { - dsn => 'dbi:SQLite:dbname=:memory:', - options => { - AutoCommit => 1, - RaiseError => 1, - sqlite_use_immediate_transaction => 1, - }, - schema_class => 'App::Netdisco::Daemon::DB', -}; - # defaults for workers setting('workers')->{queue} ||= 'PostgreSQL'; if (exists setting('workers')->{interactives} diff --git a/Netdisco/lib/App/Netdisco/Daemon/DB.pm b/Netdisco/lib/App/Netdisco/Daemon/DB.pm deleted file mode 100644 index 11c15293..00000000 --- a/Netdisco/lib/App/Netdisco/Daemon/DB.pm +++ /dev/null @@ -1,10 +0,0 @@ -use utf8; -package App::Netdisco::Daemon::DB; - -use strict; -use warnings; - -use base 'DBIx::Class::Schema'; -__PACKAGE__->load_namespaces; - -1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm b/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm deleted file mode 100644 index 4a0c60c4..00000000 --- a/Netdisco/lib/App/Netdisco/Daemon/DB/Result/Admin.pm +++ /dev/null @@ -1,83 +0,0 @@ -use utf8; -package App::Netdisco::Daemon::DB::Result::Admin; - -use strict; -use warnings; - -use base 'DBIx::Class::Core'; -__PACKAGE__->table("admin"); -__PACKAGE__->add_columns( - "job", - { data_type => "integer", is_nullable => 0 }, - "entered", - { data_type => "timestamp", is_nullable => 1 }, - "started", - { data_type => "timestamp", is_nullable => 1 }, - "finished", - { data_type => "timestamp", is_nullable => 1 }, - "device", - { data_type => "inet", is_nullable => 1 }, - "port", - { data_type => "text", is_nullable => 1 }, - "action", - { data_type => "text", is_nullable => 1 }, - "subaction", - { data_type => "text", is_nullable => 1 }, - "status", - { data_type => "text", is_nullable => 1 }, - "username", - { data_type => "text", is_nullable => 1 }, - "userip", - { data_type => "inet", is_nullable => 1 }, - "log", - { data_type => "text", is_nullable => 1 }, - "debug", - { data_type => "boolean", is_nullable => 1 }, -); - -__PACKAGE__->set_primary_key("job"); - -=head1 METHODS - -=head2 summary - -An attempt to make a meaningful statement about the job. - -=cut - -sub summary { - my $job = shift; - return join ' ', - $job->action, - ($job->device || ''), - ($job->port || ''); -# ($job->subaction ? (q{'}. $job->subaction .q{'}) : ''); -} - -=head1 ADDITIONAL COLUMNS - -=head2 extra - -Alias for the C column. - -=cut - -sub extra { (shift)->subaction } - -=head2 entererd_stamp - -Formatted version of the C field, accurate to the minute. - -The format is somewhat like ISO 8601 or RFC3339 but without the middle C -between the date stamp and time stamp. That is: - - 2012-02-06 12:49 - -=cut - -sub entered_stamp { - (my $stamp = (shift)->entered) =~ s/\.\d+$//; - return $stamp; -} - -1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Job.pm b/Netdisco/lib/App/Netdisco/Daemon/Job.pm new file mode 100644 index 00000000..3c078fa8 --- /dev/null +++ b/Netdisco/lib/App/Netdisco/Daemon/Job.pm @@ -0,0 +1,54 @@ +package App::Netdisco::Daemon::Job; + +use Moo; +use namespace::clean; + +foreach my $slot (qw/ + job + entered + started + finished + device + port + action + subaction + status + username + userip + log + debug + /) { + + has $slot => ( + is => 'rw', + ); +} + +=head1 METHODS + +=head2 summary + +An attempt to make a meaningful statement about the job. + +=cut + +sub summary { + my $job = shift; + return join ' ', + $job->action, + ($job->device || ''), + ($job->port || ''); +# ($job->subaction ? (q{'}. $job->subaction .q{'}) : ''); +} + +=head1 ADDITIONAL COLUMNS + +=head2 extra + +Alias for the C column. + +=cut + +sub extra { (shift)->subaction } + +1; diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm index 01928171..437be666 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Common.pm @@ -1,7 +1,6 @@ package App::Netdisco::Daemon::Worker::Common; use Dancer qw/:moose :syntax :script/; -use Dancer::Plugin::DBIC 'schema'; use Try::Tiny; use App::Netdisco::Util::Daemon; @@ -20,17 +19,14 @@ sub worker_body { my $job = $self->{queue}->dequeue(1); next unless defined $job; - - # TODO stop using DBIC - $job = schema('daemon')->dclone( $job ); my $action = $job->action; try { $job->started(scalar localtime); prctl sprintf 'netdisco-daemon: worker #%s poller: working on #%s: %s', - $wid, $job->id, $job->summary; + $wid, $job->job, $job->summary; info sprintf "pol (%s): starting %s job(%s) at %s", - $wid, $action, $job->id, $job->started; + $wid, $action, $job->job, $job->started; my ($status, $log) = $self->$action($job); $job->status($status); $job->log($log); @@ -50,9 +46,9 @@ sub close_job { my $now = scalar localtime; prctl sprintf 'netdisco-daemon: worker #%s poller: wrapping up %s #%s: %s', - $self->wid, $job->action, $job->id, $job->status; + $self->wid, $job->action, $job->job, $job->status; info sprintf "pol (%s): wrapping up %s job(%s) - status %s at %s", - $self->wid, $job->action, $job->id, $job->status, $now; + $self->wid, $job->action, $job->job, $job->status, $now; try { if ($job->status eq 'defer') { diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm index 18ae8421..e95fb603 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Manager.pm @@ -53,7 +53,7 @@ sub worker_body { # mark job as running next unless jq_lock($job); info sprintf "mgr (%s): job %s booked out for this processing node", - $wid, $job->id; + $wid, $job->job; # copy job to local queue $self->{queue}->enqueue($job); diff --git a/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm b/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm index a2a3213b..198872b8 100644 --- a/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm +++ b/Netdisco/lib/App/Netdisco/Daemon/Worker/Scheduler.pm @@ -14,7 +14,7 @@ sub worker_begin { my $self = shift; my $wid = $self->wid; - return debug "mgr ($wid): no need for scheduler... skip begin" + return debug "sch ($wid): no need for scheduler... skip begin" unless setting('schedule'); debug "entering Scheduler ($wid) worker_begin()"; @@ -38,7 +38,7 @@ sub worker_body { my $self = shift; my $wid = $self->wid; - return debug "mgr ($wid): no need for scheduler... quitting" + return debug "sch ($wid): no need for scheduler... quitting" unless setting('schedule'); while (1) { diff --git a/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm index 861a4a5b..03a93c3a 100644 --- a/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm +++ b/Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm @@ -3,6 +3,7 @@ package App::Netdisco::JobQueue::PostgreSQL; use Dancer qw/:moose :syntax :script/; use Dancer::Plugin::DBIC 'schema'; +use App::Netdisco::Daemon::Job; use Net::Domain 'hostfqdn'; use Module::Load (); use Try::Tiny; @@ -42,8 +43,7 @@ sub jq_getsome { } while (my $job = $rs->next) { - push @returned, schema('daemon')->resultset('Admin') - ->new_result({ $job->get_columns }); + push @returned, App::Netdisco::Daemon::Job->new({ $job->get_columns }); } return @returned; } @@ -56,8 +56,7 @@ sub jq_locked { ->search({status => "queued-$fqdn"}); while (my $job = $rs->next) { - push @returned, schema('daemon')->resultset('Admin') - ->new_result({ $job->get_columns }); + push @returned, App::Netdisco::Daemon::Job->new({ $job->get_columns }); } return @returned; } @@ -73,34 +72,18 @@ sub jq_queued { } sub jq_log { - my @returned = (); - - my $rs = schema('netdisco')->resultset('Admin')->search({}, { + return schema('netdisco')->resultset('Admin')->search({}, { order_by => { -desc => [qw/entered device action/] }, rows => 50, - }); - - while (my $job = $rs->next) { - push @returned, schema('daemon')->resultset('Admin') - ->new_result({ $job->get_columns }); - } - return @returned; + })->with_times->hri->all; } sub jq_userlog { my $user = shift; - my @returned = (); - - my $rs = schema('netdisco')->resultset('Admin')->search({ + return schema('netdisco')->resultset('Admin')->search({ username => $user, finished => { '>' => \"(now() - interval '5 seconds')" }, - }); - - while (my $job = $rs->next) { - push @returned, schema('daemon')->resultset('Admin') - ->new_result({ $job->get_columns }); - } - return @returned; + })->with_times->hri->all; } sub jq_lock { @@ -112,7 +95,7 @@ sub jq_lock { try { schema('netdisco')->txn_do(sub { schema('netdisco')->resultset('Admin') - ->find($job->id, {for => 'update'}) + ->find($job->job, {for => 'update'}) ->update({ status => "queued-$fqdn" }); # remove any duplicate jobs, needed because we have race conditions @@ -126,6 +109,9 @@ sub jq_lock { }, {for => 'update'})->delete(); }); $happy = true; + } + catch { + error $_; }; return $happy; @@ -139,10 +125,13 @@ sub jq_defer { # lock db row and update to show job is available schema('netdisco')->txn_do(sub { schema('netdisco')->resultset('Admin') - ->find($job->id, {for => 'update'}) + ->find($job->job, {for => 'update'}) ->update({ status => 'queued', started => undef }); }); $happy = true; + } + catch { + error $_; }; return $happy; @@ -156,7 +145,7 @@ sub jq_complete { try { schema('netdisco')->txn_do(sub { schema('netdisco')->resultset('Admin') - ->find($job->id, {for => 'update'})->update({ + ->find($job->job, {for => 'update'})->update({ status => $job->status, log => $job->log, started => $job->started, @@ -164,6 +153,9 @@ sub jq_complete { }); }); $happy = true; + } + catch { + error $_; }; return $happy; @@ -189,6 +181,9 @@ sub jq_insert { ]); }); $happy = true; + } + catch { + error $_; }; return $happy;