webif/webif/lib/queue.class

309 lines
5.3 KiB
Tcl

if {![exists -proc class]} { package require oo }
if {![exists -proc sqlite3.open]} { package require sqlite3 }
class queue {
id -1
file ""
action ""
args ""
start 0
status ""
log ""
runtime 0
retries 0
interrupts 0
submitted 0
last 0
}
proc {queue key} {o} {
set type "string"
catch { set type [getref $o] }
switch -- $type {
ts { return [file normalize [$o get file]] }
default {
if {[string first :// $o] != -1} {
return $o
}
return "file://$o"
}
}
}
# Queue status values:
# PENDING
# FAILED
# INTERRUPTED
# COMPLETE
# DEFER
# HOLD
proc {queue dbhandle} {args} {
if {"-close" in $args} {
if {[info exists ::queue::db]} {
catch {$::queue::db close}
unset ::queue::db
return 1
}
return 0
}
if {[info exists ::queue::db]} {
return $::queue::db
}
if {![file exists /mod/etc/queue.db]} {
set ::queue::db [sqlite3.open /mod/etc/queue.db]
$::queue::db query {
create table queue(
id integer primary key autoincrement,
file text,
action text,
args text,
start integer default 0,
status text default 'PENDING',
log text default '',
runtime integer,
retries integer default 0,
interrupts integer default 0,
submitted integer default 0,
last integer default 0
);
}
$::queue::db query {
create table config(
var text,
val text
);
}
$::queue::db query {
insert into config values('version', 3);
}
$::queue::db query {
create unique index file on queue(file, action);
}
} else {
set ::queue::db [sqlite3.open /mod/etc/queue.db]
}
return $::queue::db
}
proc {queue startup} {{days 7}} {
if {$days == 0} { set days 7 }
set db [queue dbhandle]
$db query {
update queue
set status = 'INTERRUPTED',
log = 'Job will be retried automatically.',
retries = retries + 1,
interrupts = interrupts + 1
where status = 'RUNNING'
}
$db query {
update queue
set status = 'FAILED',
log = 'Too many interrupts.'
where status = 'INTERRUPTED'
and interrupts >= 5
}
$db query {
update queue
set status = 'PENDING'
where status = 'DEFER'
}
$db query {
delete from queue
where status in ('COMPLETE', 'FAILED')
and submitted < %s
} [expr [clock seconds] - 86400 * $days]
}
proc {queue fetch} {file action} {
set db [queue dbhandle]
foreach row [$db query {
select * from queue
where file = '%s'
and action = '%s'
} $file $action] {
return [queue new $row]
}
return {}
}
proc {queue insert} {args file action} {
set db [queue dbhandle]
set status "PENDING"
if {"-hold" in $args} { set status "HOLD" }
set file [queue key $file]
$db query {
insert or ignore into queue(submitted, file, action, status)
values(%s, '%s', '%s', '%s')
} [clock seconds] $file $action $status
return [queue fetch $file $action]
}
proc {queue delete} {file {action "*"}} {
set db [queue dbhandle]
set q "
delete from queue
where file = '%s'
and status != 'RUNNING'
"
if {$action ne "*"} {
append q " and action = '%s'"
}
$db query $q [queue key $file] $action
}
proc {queue delete_by_id} {id} {
set db [queue dbhandle]
set q "
delete from queue
where id = '%s'
and status != 'RUNNING'
"
$db query $q $id
}
proc {queue resubmit} {id} {
set db [queue dbhandle]
set q "
update queue
set status = 'PENDING', interrupts = 0
where id = '%s'
and status in ('FAILED', 'HOLD', 'COMPLETE')
"
$db query $q $id
}
proc {queue hold} {id} {
set db [queue dbhandle]
set q "
update queue
set status = 'HOLD'
where id = '%s'
and status not in ('RUNNING', 'COMPLETE')
"
$db query $q $id
}
proc {queue status} {file} {
if {$file eq "0"} { return "" }
set db [queue dbhandle]
set ret [$db query {
select group_concat(action)
from queue
where file = '%s'
and status not in ('COMPLETE', 'FAILED', 'HOLD')
} [queue key $file]]
set q ""
if {[llength $ret] == 1} {
lassign [lindex $ret 0] x q
}
return $q
}
proc {queue check} {file {q "any"}} {
set queues [split [queue status $file] ,]
if {$q eq "any" && [llength $queues]} {
return 1
}
return $($q in $queues)
}
proc {queue all} {} {
set db [queue dbhandle]
set ret {}
foreach row [$db query {select * from queue order by id}] {
lappend ret [queue new $row]
}
return $ret
}
proc {queue pending} {} {
set db [queue dbhandle]
set ret {}
foreach row [$db query {
select * from queue
where status in ('PENDING', 'INTERRUPTED')
and start < %s
order by id desc
} [clock seconds]] {
lappend ret [queue new $row]
}
return $ret
}
proc {queue size} {} {
return [llength [queue runcandidates]]
}
proc {queue version} {} {
set db [queue dbhandle]
set version 1
catch {
foreach row [$db query {
select val from config
where var = 'version'
}] {
lassign $row x version
}
}
return $version
}
queue method update {_status {_log ""} {_retries 0} {_runtime 0}} {
set db [queue dbhandle]
$db query {
update queue
set status = '%s',
log = '%s',
retries = retries + %s,
runtime = %s,
last = %s
where id = %s
} $_status $_log $_retries $_runtime [clock seconds] $id
set status $_status
set log $_log
incr retries $_retries
set runtime $_runtime
}
queue method set {var val} {
set db [queue dbhandle]
$db query {
update queue
set %s = '%s'
where id = %s
} $var $val $id
set $var $val
}
queue method submit {{_start 0}} {
if {$_start} { $self set start $_start }
queue resubmit $id
}