webif/webif/lib/queue.class

355 lines
6.2 KiB
Tcl

if {![exists -command class]} { package require oo }
if {![exists -command sqlite3.open]} { package require sqlite3 }
class queue {
id -1
file ""
action ""
args ""
start 0
status ""
log ""
runtime 0
retries 0
interrupts 0
submitted 0
last 0
}
proc {queue key} {o} {
set type "string"
catch { set type [getref $o] }
switch -- $type {
ts { return [file normalize [$o get file]] }
default {
if {[string first :// $o] != -1} {
return $o
}
return "file://$o"
}
}
}
# Queue status values:
# PENDING
# FAILED
# INTERRUPTED
# COMPLETE
# DEFER
# HOLD
proc {queue dbhandle} {args} {
if {"-close" in $args} {
if {[info exists ::queue::db]} {
catch {$::queue::db close}
unset ::queue::db
return 1
}
return 0
}
if {[info exists ::queue::db]} {
return $::queue::db
}
try {
set db [sqlite3.open /mod/etc/queue.db]
} on error {msg} {
stderr puts $msg
return 0
}
if {[catch {$db query { select count(*) from queue, config; }}]} {
catch {$db {drop table queue}}
catch {$db {drop table config}}
try {
$db query {
create table queue(
id integer primary key autoincrement,
file text,
action text,
args text,
start integer default 0,
status text default 'PENDING',
log text default '',
runtime integer,
retries integer default 0,
interrupts integer default 0,
submitted integer default 0,
last integer default 0
);
}
$db query {
create table config(
var text,
val text
);
}
$db query {
insert into config values('version', 3);
}
$db query {
create unique index file on queue(file, action);
}
} on error {msg opts} {
stderr puts $msg
catch {$db close}
return 0
}
}
return [set ::queue::db $db]
}
proc {queue dbquery} {query args} {
set db [queue dbhandle]
try {
return [$db query $query {*}$args]
} on error {msg opts} {
return {}
}
}
proc {queue dbaction} {query args} {
set db [queue dbhandle]
try {
$db query $query {*}$args
return true
} on error {msg opts} {
return false
}
}
proc {queue dbqueryl} {query_list {txn_mode ""}} {
set db [queue dbhandle]
try {
if {$txn_mode ne ""} {
if {$txn_mode ni { immediate exclusive }} {
set txn_mode "deferred"
}
$db query "begin %s transaction" $txn_mode
}
foreach q $query_list {
$db query {*}$q
}
if {$txn_mode ne ""} {
$db query "commit transaction"
}
return true
} on error {msg opts} {
}
if {$txn_mode ne ""} {
catch { $db query "rollback transaction" }
}
return false
}
proc {queue startup} {{days 7}} {
if {$days == 0} { set days 7 }
return [queue dbqueryl [list { {
update queue
set status = 'INTERRUPTED',
log = 'Job will be retried automatically.',
retries = retries + 1,
interrupts = interrupts + 1
where status = 'RUNNING'
} } { {
update queue
set status = 'FAILED',
log = 'Too many interrupts.'
where status = 'INTERRUPTED'
and interrupts >= 5
} } { {
update queue
set status = 'PENDING'
where status = 'DEFER'
} } [list {
delete from queue
where status in ('COMPLETE', 'FAILED')
and submitted < %s
} [expr {[clock seconds] - 86400 * $days}]
] ] ]
}
proc {queue fetch} {file action} {
foreach row [queue dbquery {
select * from queue
where file = '%s'
and action = '%s'
} $file $action] {
return [queue new $row]
}
return {}
}
proc {queue insert} {args file action} {
set status "PENDING"
if {"-hold" in $args} { set status "HOLD" }
set file [queue key $file]
if {[queue dbaction {
insert or ignore into queue(submitted, file, action, status)
values(%s, '%s', '%s', '%s')
} [clock seconds] $file $action $status]} {
return [queue fetch $file $action]
}
return 0
}
proc {queue delete} {file {action "*"}} {
set q "
delete from queue
where file = '%s'
and status != 'RUNNING'
"
if {$action ne "*"} {
append q " and action = '%s'"
}
return [queue dbaction $q [queue key $file] $action]
}
proc {queue delete_by_id} {id} {
set q "
delete from queue
where id = '%s'
and status != 'RUNNING'
"
return [queue dbaction $q $id]
}
proc {queue resubmit} {id} {
set q "
update queue
set status = 'PENDING', interrupts = 0
where id = '%s'
and status in ('FAILED', 'HOLD', 'COMPLETE')
"
return [queue dbaction $q $id]
}
proc {queue hold} {id} {
set q "
update queue
set status = 'HOLD'
where id = '%s'
and status not in ('RUNNING', 'COMPLETE')
"
return [queue dbaction $q $id]
}
proc {queue status} {file} {
if {$file eq "0"} { return "" }
set ret [queue dbquery {
select group_concat(action)
from queue
where file = '%s'
and status not in ('COMPLETE', 'FAILED', 'HOLD')
} [queue key $file]]
set q ""
if {[llength $ret] == 1} {
lassign [lindex $ret 0] x q
}
return $q
}
proc {queue check} {file {q "any"}} {
set queues [split [queue status $file] ,]
if {$q eq "any" && [llength $queues]} {
return 1
}
return $($q in $queues)
}
proc {queue all} {} {
set ret {}
foreach row [queue dbquery {select * from queue order by id}] {
lappend ret [queue new $row]
}
return $ret
}
proc {queue pending} {} {
set db [queue dbhandle]
set ret {}
foreach row [queue dbquery {
select * from queue
where status in ('PENDING', 'INTERRUPTED')
and start < %s
order by id desc
} [clock seconds]] {
lappend ret [queue new $row]
}
return $ret
}
proc {queue size} {} {
return [llength [queue runcandidates]]
}
proc {queue version} {} {
set version 1
foreach row [queue dbquery {
select val from config
where var = 'version'
}] {
lassign $row x version
}
return $version
}
queue method update {_status {_log ""} {_retries 0} {_runtime 0}} {
if {[queue dbaction {
update queue
set status = '%s',
log = '%s',
retries = retries + %s,
runtime = %s,
last = %s
where id = %s
} $_status $_log $_retries $_runtime [clock seconds] $id]} {
set status $_status
set log $_log
incr retries $_retries
set runtime $_runtime
return true
}
return false
}
queue method set {var val} {
if {[queue dbaction {
update queue
set %s = '%s'
where id = %s
} $var $val $id]} {
set $var $val
}
return $var
}
queue method submit {{_start 0}} {
if {$_start} { $self set start $_start }
queue resubmit $id
}