if {![exists -proc class]} { package require oo } if {![exists -proc sqlite3.open]} { package require sqlite3 } class queue { id -1 file "" action "" args "" start 0 status "" log "" runtime 0 retries 0 interrupts 0 submitted 0 } # Queue status values: # PENDING # FAILED # INTERRUPTED # COMPLETE # DEFER # HOLD proc {queue dbhandle} {args} { if {"-close" in $args} { if {[info exists ::queue::db]} { catch {$::queue::db close} unset ::queue::db return 1 } return 0 } if {[info exists ::queue::db]} { return $::queue::db } if {![file exists /mod/etc/queue.db]} { set ::queue::db [sqlite3.open /mod/etc/queue.db] $::queue::db query { create table queue( id integer primary key autoincrement, file text, action text, args text, start integer default 0, status text default 'PENDING', log text default '', runtime integer, retries integer default 0, interrupts integer default 0, submitted integer ); } $::queue::db query { create table config( var text, val text ); } $::queue::db query { insert into config values('version', 2); } $::queue::db query { create unique index file on queue(file, action); } } else { set ::queue::db [sqlite3.open /mod/etc/queue.db] } return $::queue::db } proc {queue startup} {{days 7}} { if {$days == 0} { set days 7 } set db [queue dbhandle] $db query { update queue set status = 'INTERRUPTED', log = 'Job will be retried automatically.', retries = retries + 1, interrupts = interrupts + 1 where status = 'RUNNING' } $db query { update queue set status = 'FAILED', log = 'Too many interrupts.' where status = 'INTERRUPTED' and interrupts >= 5 } $db query { update queue set status = 'PENDING' where status = 'DEFER' } $db query { delete from queue where status in ('COMPLETE', 'FAILED') and submitted < %s } [expr [clock seconds] - 86400 * $days] } proc {queue fetch} {file action} { set db [queue dbhandle] foreach row [$db query { select * from queue where file = '%s' and action = '%s' } [file normalize $file] $action] { return [queue new $row] } return {} } proc {queue insert} {ts action} { set db [queue dbhandle] $db query { insert or ignore into queue(submitted, file, action) values(%s, '%s', '%s') } [clock seconds] [file normalize [$ts get file]] $action return [queue fetch [$ts get file] $action] } proc {queue delete} {ts {action "*"}} { set db [queue dbhandle] set q " delete from queue where file = '%s' and status != 'RUNNING' " if {$action ne "*"} { append q " and action = '%s'" } $db query $q [file normalize [$ts get file]] $action } proc {queue delete_by_id} {id} { set db [queue dbhandle] set q " delete from queue where id = '%s' and status != 'RUNNING' " $db query $q $id } proc {queue resubmit} {id} { set db [queue dbhandle] set q " update queue set status = 'PENDING', interrupts = 0 where id = '%s' and status in ('FAILED', 'HOLD') " $db query $q $id } proc {queue hold} {id} { set db [queue dbhandle] set q " update queue set status = 'HOLD' where id = '%s' and status != 'RUNNING' " $db query $q $id } proc {queue status} {ts} { if {$ts eq "0"} { return "" } set db [queue dbhandle] set ret [$db query { select group_concat(action) from queue where file = '%s' and status not in ('COMPLETE', 'FAILED', 'HOLD') } [file normalize [$ts get file]]] set q "" if {[llength $ret] == 1} { lassign [lindex $ret 0] x q } return $q } proc {queue check} {ts {q "all"}} { set queues [split [queue status $ts] ,] if {$q eq "any" && [llength $queues]} { return 1 } return $($q in $queues) } proc {queue all} {} { set db [queue dbhandle] set ret {} foreach row [$db query {select * from queue order by id}] { lappend ret [queue new $row] } return $ret } proc {queue pending} {} { set db [queue dbhandle] set ret {} foreach row [$db query { select * from queue where status in ('PENDING', 'INTERRUPTED') and start < %s order by id desc } [clock seconds]] { lappend ret [queue new $row] } return $ret } proc {queue size} {} { return [llength [queue runcandidates]] } proc {queue version} {} { set db [queue dbhandle] set version 1 catch { foreach row [$db query { select val from config where var = 'version' }] { lassign $row x version } } return $version } queue method update {_status {_log ""} {_retries 0} {_runtime 0}} { set db [queue dbhandle] $db query { update queue set status = '%s', log = '%s', retries = retries + %s, runtime = %s where id = %s } $_status $_log $_retries $_runtime $id set status $_status set log $_log incr retries $_retries set runtime $_runtime } queue method set {var val} { set db [queue dbhandle] $db query { update queue set %s = '%s' where id = %s } $var $val $id set $var $val }