Error protect database access; re-create corrupt queue DB.

This commit is contained in:
df 2021-02-09 15:10:05 +00:00 committed by HummyPkg
parent 15cba14cc3
commit 7834dae075
1 changed files with 122 additions and 76 deletions

View File

@ -53,77 +53,127 @@ proc {queue dbhandle} {args} {
return $::queue::db return $::queue::db
} }
if {![file exists /mod/etc/queue.db]} { try {
set ::queue::db [sqlite3.open /mod/etc/queue.db] set db [sqlite3.open /mod/etc/queue.db]
$::queue::db query { } on error {msg} {
create table queue( stderr puts $msg
id integer primary key autoincrement, return 0
file text, }
action text, if {[catch {$db query { select count(*) from queue, config; }}]} {
args text, catch {$db {drop table queue}}
start integer default 0, catch {$db {drop table config}}
status text default 'PENDING', try {
log text default '', $db query {
runtime integer, create table queue(
retries integer default 0, id integer primary key autoincrement,
interrupts integer default 0, file text,
submitted integer default 0, action text,
last integer default 0 args text,
); start integer default 0,
status text default 'PENDING',
log text default '',
runtime integer,
retries integer default 0,
interrupts integer default 0,
submitted integer default 0,
last integer default 0
);
}
$db query {
create table config(
var text,
val text
);
}
$db query {
insert into config values('version', 3);
}
$db query {
create unique index file on queue(file, action);
}
} on error {msg opts} {
stderr puts $msg
catch {$db close}
return 0
} }
$::queue::db query {
create table config(
var text,
val text
);
}
$::queue::db query {
insert into config values('version', 3);
}
$::queue::db query {
create unique index file on queue(file, action);
}
} else {
set ::queue::db [sqlite3.open /mod/etc/queue.db]
} }
return $::queue::db return [set ::queue::db $db]
}
proc {queue dbquery} {query args} {
set db [queue dbhandle]
try {
return [$db query $query {*}$args]
} on error {msg opts} {
return {}
}
}
proc {queue dbaction} {query args} {
set db [queue dbhandle]
try {
$db query $query {*}$args
return true
} on error {msg opts} {
return false
}
}
proc {queue dbqueryl} {query_list {txn_mode ""}} {
set db [queue dbhandle]
try {
if {$txn_mode ne ""} {
if {$txn_mode ni { immediate exclusive }} {
set txn_mode "deferred"
}
$db query "begin %s transaction" $txn_mode
}
foreach q $query_list {
$db query {*}$q
}
if {$txn_mode ne ""} {
$db query "commit transaction"
}
return true
} on error {msg opts} {
}
if {$txn_mode ne ""} {
catch { $db query "rollback transaction" }
}
return false
} }
proc {queue startup} {{days 7}} { proc {queue startup} {{days 7}} {
if {$days == 0} { set days 7 } if {$days == 0} { set days 7 }
set db [queue dbhandle] return [queue dbqueryl { { {
$db query {
update queue update queue
set status = 'INTERRUPTED', set status = 'INTERRUPTED',
log = 'Job will be retried automatically.', log = 'Job will be retried automatically.',
retries = retries + 1, retries = retries + 1,
interrupts = interrupts + 1 interrupts = interrupts + 1
where status = 'RUNNING' where status = 'RUNNING'
} } } { {
$db query {
update queue update queue
set status = 'FAILED', set status = 'FAILED',
log = 'Too many interrupts.' log = 'Too many interrupts.'
where status = 'INTERRUPTED' where status = 'INTERRUPTED'
and interrupts >= 5 and interrupts >= 5
} } } { {
$db query {
update queue update queue
set status = 'PENDING' set status = 'PENDING'
where status = 'DEFER' where status = 'DEFER'
} } } { {
$db query {
delete from queue delete from queue
where status in ('COMPLETE', 'FAILED') where status in ('COMPLETE', 'FAILED')
and submitted < %s and submitted < %s
} [expr [clock seconds] - 86400 * $days] } [expr [clock seconds] - 86400 * $days]
} } ]
} }
proc {queue fetch} {file action} { proc {queue fetch} {file action} {
set db [queue dbhandle]
foreach row [$db query { foreach row [queue dbquery {
select * from queue select * from queue
where file = '%s' where file = '%s'
and action = '%s' and action = '%s'
@ -134,23 +184,23 @@ proc {queue fetch} {file action} {
} }
proc {queue insert} {args file action} { proc {queue insert} {args file action} {
set db [queue dbhandle]
set status "PENDING" set status "PENDING"
if {"-hold" in $args} { set status "HOLD" } if {"-hold" in $args} { set status "HOLD" }
set file [queue key $file] set file [queue key $file]
$db query { if {[queue dbaction {
insert or ignore into queue(submitted, file, action, status) insert or ignore into queue(submitted, file, action, status)
values(%s, '%s', '%s', '%s') values(%s, '%s', '%s', '%s')
} [clock seconds] $file $action $status } [clock seconds] $file $action $status]} {
return [queue fetch $file $action] return [queue fetch $file $action]
}
return 0
} }
proc {queue delete} {file {action "*"}} { proc {queue delete} {file {action "*"}} {
set db [queue dbhandle]
set q " set q "
delete from queue delete from queue
@ -161,11 +211,10 @@ proc {queue delete} {file {action "*"}} {
append q " and action = '%s'" append q " and action = '%s'"
} }
$db query $q [queue key $file] $action return [queue dbaction $q [queue key $file] $action]
} }
proc {queue delete_by_id} {id} { proc {queue delete_by_id} {id} {
set db [queue dbhandle]
set q " set q "
delete from queue delete from queue
@ -173,11 +222,10 @@ proc {queue delete_by_id} {id} {
and status != 'RUNNING' and status != 'RUNNING'
" "
$db query $q $id return [queue dbaction $q $id]
} }
proc {queue resubmit} {id} { proc {queue resubmit} {id} {
set db [queue dbhandle]
set q " set q "
update queue update queue
@ -186,11 +234,10 @@ proc {queue resubmit} {id} {
and status in ('FAILED', 'HOLD', 'COMPLETE') and status in ('FAILED', 'HOLD', 'COMPLETE')
" "
$db query $q $id return [queue dbaction $q $id]
} }
proc {queue hold} {id} { proc {queue hold} {id} {
set db [queue dbhandle]
set q " set q "
update queue update queue
@ -199,15 +246,13 @@ proc {queue hold} {id} {
and status not in ('RUNNING', 'COMPLETE') and status not in ('RUNNING', 'COMPLETE')
" "
$db query $q $id return [queue dbaction $q $id]
} }
proc {queue status} {file} { proc {queue status} {file} {
if {$file eq "0"} { return "" } if {$file eq "0"} { return "" }
set db [queue dbhandle] set ret [queue dbquery {
set ret [$db query {
select group_concat(action) select group_concat(action)
from queue from queue
where file = '%s' where file = '%s'
@ -230,10 +275,9 @@ proc {queue check} {file {q "any"}} {
} }
proc {queue all} {} { proc {queue all} {} {
set db [queue dbhandle]
set ret {} set ret {}
foreach row [$db query {select * from queue order by id}] { foreach row [queue dbquery {select * from queue order by id}] {
lappend ret [queue new $row] lappend ret [queue new $row]
} }
return $ret return $ret
@ -243,7 +287,7 @@ proc {queue pending} {} {
set db [queue dbhandle] set db [queue dbhandle]
set ret {} set ret {}
foreach row [$db query { foreach row [queue dbquery {
select * from queue select * from queue
where status in ('PENDING', 'INTERRUPTED') where status in ('PENDING', 'INTERRUPTED')
and start < %s and start < %s
@ -259,23 +303,20 @@ proc {queue size} {} {
} }
proc {queue version} {} { proc {queue version} {} {
set db [queue dbhandle]
set version 1 set version 1
catch { foreach row [queue dbquery {
foreach row [$db query {
select val from config select val from config
where var = 'version' where var = 'version'
}] { }] {
lassign $row x version lassign $row x version
}
} }
return $version return $version
} }
queue method update {_status {_log ""} {_retries 0} {_runtime 0}} { queue method update {_status {_log ""} {_retries 0} {_runtime 0}} {
set db [queue dbhandle]
$db query { if {[queue dbaction {
update queue update queue
set status = '%s', set status = '%s',
log = '%s', log = '%s',
@ -283,22 +324,27 @@ queue method update {_status {_log ""} {_retries 0} {_runtime 0}} {
runtime = %s, runtime = %s,
last = %s last = %s
where id = %s where id = %s
} $_status $_log $_retries $_runtime [clock seconds] $id } $_status $_log $_retries $_runtime [clock seconds] $id]} {
set status $_status set status $_status
set log $_log set log $_log
incr retries $_retries incr retries $_retries
set runtime $_runtime set runtime $_runtime
return true
}
return false
} }
queue method set {var val} { queue method set {var val} {
set db [queue dbhandle]
$db query { if {[queue dbaction {
update queue update queue
set %s = '%s' set %s = '%s'
where id = %s where id = %s
} $var $val $id } $var $val $id]} {
set $var $val set $var $val
}
return $var
} }
queue method submit {{_start 0}} { queue method submit {{_start 0}} {