diff --git a/webif/lib/queue.class b/webif/lib/queue.class index b9811cf..59715a0 100644 --- a/webif/lib/queue.class +++ b/webif/lib/queue.class @@ -53,77 +53,127 @@ proc {queue dbhandle} {args} { return $::queue::db } - if {![file exists /mod/etc/queue.db]} { - set ::queue::db [sqlite3.open /mod/etc/queue.db] - $::queue::db query { - create table queue( - id integer primary key autoincrement, - file text, - action text, - args text, - start integer default 0, - status text default 'PENDING', - log text default '', - runtime integer, - retries integer default 0, - interrupts integer default 0, - submitted integer default 0, - last integer default 0 - ); + try { + set db [sqlite3.open /mod/etc/queue.db] + } on error {msg} { + stderr puts $msg + return 0 + } + if {[catch {$db query { select count(*) from queue, config; }}]} { + catch {$db {drop table queue}} + catch {$db {drop table config}} + try { + $db query { + create table queue( + id integer primary key autoincrement, + file text, + action text, + args text, + start integer default 0, + status text default 'PENDING', + log text default '', + runtime integer, + retries integer default 0, + interrupts integer default 0, + submitted integer default 0, + last integer default 0 + ); + } + $db query { + create table config( + var text, + val text + ); + } + $db query { + insert into config values('version', 3); + } + $db query { + create unique index file on queue(file, action); + } + } on error {msg opts} { + stderr puts $msg + catch {$db close} + return 0 } - $::queue::db query { - create table config( - var text, - val text - ); - } - $::queue::db query { - insert into config values('version', 3); - } - $::queue::db query { - create unique index file on queue(file, action); - } - } else { - set ::queue::db [sqlite3.open /mod/etc/queue.db] } - return $::queue::db + return [set ::queue::db $db] +} + +proc {queue dbquery} {query args} { + set db [queue dbhandle] + try { + return [$db query $query {*}$args] + } on error {msg opts} { + return {} + } +} + +proc {queue dbaction} {query args} { + set db [queue dbhandle] + try { + $db query $query {*}$args + return true + } on error {msg opts} { + return false + } +} + +proc {queue dbqueryl} {query_list {txn_mode ""}} { + set db [queue dbhandle] + try { + if {$txn_mode ne ""} { + if {$txn_mode ni { immediate exclusive }} { + set txn_mode "deferred" + } + $db query "begin %s transaction" $txn_mode + } + foreach q $query_list { + $db query {*}$q + } + if {$txn_mode ne ""} { + $db query "commit transaction" + } + return true + } on error {msg opts} { + } + if {$txn_mode ne ""} { + catch { $db query "rollback transaction" } + } + return false } proc {queue startup} {{days 7}} { if {$days == 0} { set days 7 } - set db [queue dbhandle] - $db query { + return [queue dbqueryl { { { update queue set status = 'INTERRUPTED', log = 'Job will be retried automatically.', retries = retries + 1, interrupts = interrupts + 1 where status = 'RUNNING' - } - $db query { + } } { { update queue set status = 'FAILED', log = 'Too many interrupts.' where status = 'INTERRUPTED' and interrupts >= 5 - } - $db query { + } } { { update queue set status = 'PENDING' where status = 'DEFER' - } - $db query { + } } { { delete from queue where status in ('COMPLETE', 'FAILED') and submitted < %s } [expr [clock seconds] - 86400 * $days] + } } ] } proc {queue fetch} {file action} { - set db [queue dbhandle] - foreach row [$db query { + foreach row [queue dbquery { select * from queue where file = '%s' and action = '%s' @@ -134,23 +184,23 @@ proc {queue fetch} {file action} { } proc {queue insert} {args file action} { - set db [queue dbhandle] set status "PENDING" if {"-hold" in $args} { set status "HOLD" } set file [queue key $file] - $db query { + if {[queue dbaction { insert or ignore into queue(submitted, file, action, status) values(%s, '%s', '%s', '%s') - } [clock seconds] $file $action $status + } [clock seconds] $file $action $status]} { - return [queue fetch $file $action] + return [queue fetch $file $action] + } + return 0 } proc {queue delete} {file {action "*"}} { - set db [queue dbhandle] set q " delete from queue @@ -161,11 +211,10 @@ proc {queue delete} {file {action "*"}} { append q " and action = '%s'" } - $db query $q [queue key $file] $action + return [queue dbaction $q [queue key $file] $action] } proc {queue delete_by_id} {id} { - set db [queue dbhandle] set q " delete from queue @@ -173,11 +222,10 @@ proc {queue delete_by_id} {id} { and status != 'RUNNING' " - $db query $q $id + return [queue dbaction $q $id] } proc {queue resubmit} {id} { - set db [queue dbhandle] set q " update queue @@ -186,11 +234,10 @@ proc {queue resubmit} {id} { and status in ('FAILED', 'HOLD', 'COMPLETE') " - $db query $q $id + return [queue dbaction $q $id] } proc {queue hold} {id} { - set db [queue dbhandle] set q " update queue @@ -199,15 +246,13 @@ proc {queue hold} {id} { and status not in ('RUNNING', 'COMPLETE') " - $db query $q $id + return [queue dbaction $q $id] } proc {queue status} {file} { if {$file eq "0"} { return "" } - set db [queue dbhandle] - - set ret [$db query { + set ret [queue dbquery { select group_concat(action) from queue where file = '%s' @@ -230,10 +275,9 @@ proc {queue check} {file {q "any"}} { } proc {queue all} {} { - set db [queue dbhandle] set ret {} - foreach row [$db query {select * from queue order by id}] { + foreach row [queue dbquery {select * from queue order by id}] { lappend ret [queue new $row] } return $ret @@ -243,7 +287,7 @@ proc {queue pending} {} { set db [queue dbhandle] set ret {} - foreach row [$db query { + foreach row [queue dbquery { select * from queue where status in ('PENDING', 'INTERRUPTED') and start < %s @@ -259,23 +303,20 @@ proc {queue size} {} { } proc {queue version} {} { - set db [queue dbhandle] set version 1 - catch { - foreach row [$db query { + foreach row [queue dbquery { select val from config where var = 'version' }] { - lassign $row x version - } + lassign $row x version } return $version } queue method update {_status {_log ""} {_retries 0} {_runtime 0}} { - set db [queue dbhandle] - $db query { + + if {[queue dbaction { update queue set status = '%s', log = '%s', @@ -283,22 +324,27 @@ queue method update {_status {_log ""} {_retries 0} {_runtime 0}} { runtime = %s, last = %s where id = %s - } $_status $_log $_retries $_runtime [clock seconds] $id + } $_status $_log $_retries $_runtime [clock seconds] $id]} { - set status $_status - set log $_log - incr retries $_retries - set runtime $_runtime + set status $_status + set log $_log + incr retries $_retries + set runtime $_runtime + return true + } + return false } queue method set {var val} { - set db [queue dbhandle] - $db query { + + if {[queue dbaction { update queue set %s = '%s' where id = %s - } $var $val $id - set $var $val + } $var $val $id]} { + set $var $val + } + return $var } queue method submit {{_start 0}} {