remove all trace of SQLite - new lightweight Job object

This commit is contained in:
Oliver Gorwits
2014-08-10 16:48:58 +01:00
parent a13ed25f6a
commit 0c3675a8f4
9 changed files with 85 additions and 147 deletions

View File

@@ -35,8 +35,8 @@ BEGIN {
# for netdisco app config # for netdisco app config
use App::Netdisco; use App::Netdisco;
use App::Netdisco::Daemon::Job;
use Dancer qw/:moose :script/; use Dancer qw/:moose :script/;
use Dancer::Plugin::DBIC 'schema';
info "App::Netdisco version $App::Netdisco::VERSION loaded."; info "App::Netdisco version $App::Netdisco::VERSION loaded.";
@@ -73,9 +73,6 @@ $ENV{DBIC_TRACE} ||= $sqltrace;
# reconfigure logging to force console output # reconfigure logging to force console output
Dancer::Logger->init('console', $CONFIG); Dancer::Logger->init('console', $CONFIG);
# for the in-memory local job queue
schema('daemon')->deploy;
# get requested action # get requested action
my $action = shift @ARGV; my $action = shift @ARGV;
@@ -143,7 +140,7 @@ if (not $worker->can( $action )) {
} }
# what job are we asked to do? # what job are we asked to do?
my $job = schema('daemon')->resultset('Admin')->new_result({ my $job = App::Netdisco::Daemon::Job->new({
job => 0, job => 0,
action => $action, action => $action,
device => $device, device => $device,

View File

@@ -35,17 +35,6 @@ if (ref {} eq ref setting('database')) {
} }
# static configuration for the in-memory local job queue
setting('plugins')->{DBIC}->{daemon} = {
dsn => 'dbi:SQLite:dbname=:memory:',
options => {
AutoCommit => 1,
RaiseError => 1,
sqlite_use_immediate_transaction => 1,
},
schema_class => 'App::Netdisco::Daemon::DB',
};
# defaults for workers # defaults for workers
setting('workers')->{queue} ||= 'PostgreSQL'; setting('workers')->{queue} ||= 'PostgreSQL';
if (exists setting('workers')->{interactives} if (exists setting('workers')->{interactives}

View File

@@ -1,10 +0,0 @@
use utf8;
package App::Netdisco::Daemon::DB;
use strict;
use warnings;
use base 'DBIx::Class::Schema';
__PACKAGE__->load_namespaces;
1;

View File

@@ -1,83 +0,0 @@
use utf8;
package App::Netdisco::Daemon::DB::Result::Admin;
use strict;
use warnings;
use base 'DBIx::Class::Core';
__PACKAGE__->table("admin");
__PACKAGE__->add_columns(
"job",
{ data_type => "integer", is_nullable => 0 },
"entered",
{ data_type => "timestamp", is_nullable => 1 },
"started",
{ data_type => "timestamp", is_nullable => 1 },
"finished",
{ data_type => "timestamp", is_nullable => 1 },
"device",
{ data_type => "inet", is_nullable => 1 },
"port",
{ data_type => "text", is_nullable => 1 },
"action",
{ data_type => "text", is_nullable => 1 },
"subaction",
{ data_type => "text", is_nullable => 1 },
"status",
{ data_type => "text", is_nullable => 1 },
"username",
{ data_type => "text", is_nullable => 1 },
"userip",
{ data_type => "inet", is_nullable => 1 },
"log",
{ data_type => "text", is_nullable => 1 },
"debug",
{ data_type => "boolean", is_nullable => 1 },
);
__PACKAGE__->set_primary_key("job");
=head1 METHODS
=head2 summary
An attempt to make a meaningful statement about the job.
=cut
sub summary {
my $job = shift;
return join ' ',
$job->action,
($job->device || ''),
($job->port || '');
# ($job->subaction ? (q{'}. $job->subaction .q{'}) : '');
}
=head1 ADDITIONAL COLUMNS
=head2 extra
Alias for the C<subaction> column.
=cut
sub extra { (shift)->subaction }
=head2 entererd_stamp
Formatted version of the C<entered> field, accurate to the minute.
The format is somewhat like ISO 8601 or RFC3339 but without the middle C<T>
between the date stamp and time stamp. That is:
2012-02-06 12:49
=cut
sub entered_stamp {
(my $stamp = (shift)->entered) =~ s/\.\d+$//;
return $stamp;
}
1;

View File

@@ -0,0 +1,54 @@
package App::Netdisco::Daemon::Job;
use Moo;
use namespace::clean;
foreach my $slot (qw/
job
entered
started
finished
device
port
action
subaction
status
username
userip
log
debug
/) {
has $slot => (
is => 'rw',
);
}
=head1 METHODS
=head2 summary
An attempt to make a meaningful statement about the job.
=cut
sub summary {
my $job = shift;
return join ' ',
$job->action,
($job->device || ''),
($job->port || '');
# ($job->subaction ? (q{'}. $job->subaction .q{'}) : '');
}
=head1 ADDITIONAL COLUMNS
=head2 extra
Alias for the C<subaction> column.
=cut
sub extra { (shift)->subaction }
1;

View File

@@ -1,7 +1,6 @@
package App::Netdisco::Daemon::Worker::Common; package App::Netdisco::Daemon::Worker::Common;
use Dancer qw/:moose :syntax :script/; use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';
use Try::Tiny; use Try::Tiny;
use App::Netdisco::Util::Daemon; use App::Netdisco::Util::Daemon;
@@ -20,17 +19,14 @@ sub worker_body {
my $job = $self->{queue}->dequeue(1); my $job = $self->{queue}->dequeue(1);
next unless defined $job; next unless defined $job;
# TODO stop using DBIC
$job = schema('daemon')->dclone( $job );
my $action = $job->action; my $action = $job->action;
try { try {
$job->started(scalar localtime); $job->started(scalar localtime);
prctl sprintf 'netdisco-daemon: worker #%s poller: working on #%s: %s', prctl sprintf 'netdisco-daemon: worker #%s poller: working on #%s: %s',
$wid, $job->id, $job->summary; $wid, $job->job, $job->summary;
info sprintf "pol (%s): starting %s job(%s) at %s", info sprintf "pol (%s): starting %s job(%s) at %s",
$wid, $action, $job->id, $job->started; $wid, $action, $job->job, $job->started;
my ($status, $log) = $self->$action($job); my ($status, $log) = $self->$action($job);
$job->status($status); $job->status($status);
$job->log($log); $job->log($log);
@@ -50,9 +46,9 @@ sub close_job {
my $now = scalar localtime; my $now = scalar localtime;
prctl sprintf 'netdisco-daemon: worker #%s poller: wrapping up %s #%s: %s', prctl sprintf 'netdisco-daemon: worker #%s poller: wrapping up %s #%s: %s',
$self->wid, $job->action, $job->id, $job->status; $self->wid, $job->action, $job->job, $job->status;
info sprintf "pol (%s): wrapping up %s job(%s) - status %s at %s", info sprintf "pol (%s): wrapping up %s job(%s) - status %s at %s",
$self->wid, $job->action, $job->id, $job->status, $now; $self->wid, $job->action, $job->job, $job->status, $now;
try { try {
if ($job->status eq 'defer') { if ($job->status eq 'defer') {

View File

@@ -53,7 +53,7 @@ sub worker_body {
# mark job as running # mark job as running
next unless jq_lock($job); next unless jq_lock($job);
info sprintf "mgr (%s): job %s booked out for this processing node", info sprintf "mgr (%s): job %s booked out for this processing node",
$wid, $job->id; $wid, $job->job;
# copy job to local queue # copy job to local queue
$self->{queue}->enqueue($job); $self->{queue}->enqueue($job);

View File

@@ -14,7 +14,7 @@ sub worker_begin {
my $self = shift; my $self = shift;
my $wid = $self->wid; my $wid = $self->wid;
return debug "mgr ($wid): no need for scheduler... skip begin" return debug "sch ($wid): no need for scheduler... skip begin"
unless setting('schedule'); unless setting('schedule');
debug "entering Scheduler ($wid) worker_begin()"; debug "entering Scheduler ($wid) worker_begin()";
@@ -38,7 +38,7 @@ sub worker_body {
my $self = shift; my $self = shift;
my $wid = $self->wid; my $wid = $self->wid;
return debug "mgr ($wid): no need for scheduler... quitting" return debug "sch ($wid): no need for scheduler... quitting"
unless setting('schedule'); unless setting('schedule');
while (1) { while (1) {

View File

@@ -3,6 +3,7 @@ package App::Netdisco::JobQueue::PostgreSQL;
use Dancer qw/:moose :syntax :script/; use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema'; use Dancer::Plugin::DBIC 'schema';
use App::Netdisco::Daemon::Job;
use Net::Domain 'hostfqdn'; use Net::Domain 'hostfqdn';
use Module::Load (); use Module::Load ();
use Try::Tiny; use Try::Tiny;
@@ -42,8 +43,7 @@ sub jq_getsome {
} }
while (my $job = $rs->next) { while (my $job = $rs->next) {
push @returned, schema('daemon')->resultset('Admin') push @returned, App::Netdisco::Daemon::Job->new({ $job->get_columns });
->new_result({ $job->get_columns });
} }
return @returned; return @returned;
} }
@@ -56,8 +56,7 @@ sub jq_locked {
->search({status => "queued-$fqdn"}); ->search({status => "queued-$fqdn"});
while (my $job = $rs->next) { while (my $job = $rs->next) {
push @returned, schema('daemon')->resultset('Admin') push @returned, App::Netdisco::Daemon::Job->new({ $job->get_columns });
->new_result({ $job->get_columns });
} }
return @returned; return @returned;
} }
@@ -73,34 +72,18 @@ sub jq_queued {
} }
sub jq_log { sub jq_log {
my @returned = (); return schema('netdisco')->resultset('Admin')->search({}, {
my $rs = schema('netdisco')->resultset('Admin')->search({}, {
order_by => { -desc => [qw/entered device action/] }, order_by => { -desc => [qw/entered device action/] },
rows => 50, rows => 50,
}); })->with_times->hri->all;
while (my $job = $rs->next) {
push @returned, schema('daemon')->resultset('Admin')
->new_result({ $job->get_columns });
}
return @returned;
} }
sub jq_userlog { sub jq_userlog {
my $user = shift; my $user = shift;
my @returned = (); return schema('netdisco')->resultset('Admin')->search({
my $rs = schema('netdisco')->resultset('Admin')->search({
username => $user, username => $user,
finished => { '>' => \"(now() - interval '5 seconds')" }, finished => { '>' => \"(now() - interval '5 seconds')" },
}); })->with_times->hri->all;
while (my $job = $rs->next) {
push @returned, schema('daemon')->resultset('Admin')
->new_result({ $job->get_columns });
}
return @returned;
} }
sub jq_lock { sub jq_lock {
@@ -112,7 +95,7 @@ sub jq_lock {
try { try {
schema('netdisco')->txn_do(sub { schema('netdisco')->txn_do(sub {
schema('netdisco')->resultset('Admin') schema('netdisco')->resultset('Admin')
->find($job->id, {for => 'update'}) ->find($job->job, {for => 'update'})
->update({ status => "queued-$fqdn" }); ->update({ status => "queued-$fqdn" });
# remove any duplicate jobs, needed because we have race conditions # remove any duplicate jobs, needed because we have race conditions
@@ -126,6 +109,9 @@ sub jq_lock {
}, {for => 'update'})->delete(); }, {for => 'update'})->delete();
}); });
$happy = true; $happy = true;
}
catch {
error $_;
}; };
return $happy; return $happy;
@@ -139,10 +125,13 @@ sub jq_defer {
# lock db row and update to show job is available # lock db row and update to show job is available
schema('netdisco')->txn_do(sub { schema('netdisco')->txn_do(sub {
schema('netdisco')->resultset('Admin') schema('netdisco')->resultset('Admin')
->find($job->id, {for => 'update'}) ->find($job->job, {for => 'update'})
->update({ status => 'queued', started => undef }); ->update({ status => 'queued', started => undef });
}); });
$happy = true; $happy = true;
}
catch {
error $_;
}; };
return $happy; return $happy;
@@ -156,7 +145,7 @@ sub jq_complete {
try { try {
schema('netdisco')->txn_do(sub { schema('netdisco')->txn_do(sub {
schema('netdisco')->resultset('Admin') schema('netdisco')->resultset('Admin')
->find($job->id, {for => 'update'})->update({ ->find($job->job, {for => 'update'})->update({
status => $job->status, status => $job->status,
log => $job->log, log => $job->log,
started => $job->started, started => $job->started,
@@ -164,6 +153,9 @@ sub jq_complete {
}); });
}); });
$happy = true; $happy = true;
}
catch {
error $_;
}; };
return $happy; return $happy;
@@ -189,6 +181,9 @@ sub jq_insert {
]); ]);
}); });
$happy = true; $happy = true;
}
catch {
error $_;
}; };
return $happy; return $happy;