#! /usr/bin/env tclkit # Copyright 2005 Coen Siegerink. # Tequila -=- Shared and persistent collections for Tcl package provide tequila 2.02 package require Itcl namespace import itcl::class itcl::code itcl::delete itcl::chain itcl::scope proc D {args} { puts [regsub -all {[^ -~]} [string range $args 0 79] .] } ### TEQUILA - all private code is in "tq" namespace, only "tequila" is public namespace eval tequila { # tequila::notifier ?name? proc notifier {{name #auto}} { uplevel tq::notifier $name } # tequila::rpc -server port cmd ?-cleanup cmd? # tequila::rpc host port cmd ?-cleanup cmd? proc rpc {args} { uplevel tq::rpc #auto $args } # tequila::pool name ?rpcobj? ?-file name? # tequila::pool -server rpcobj ?-file name? proc pool {name args} { if {$name eq "-server"} { uplevel tq::serverpool #auto -rpc $args } elseif {[regexp {^[^-]} $args]} { uplevel tq::clientpool $name -rpc $args } else { uplevel tq::pool $name $args } } } ### NOTIFIER - a subscription mechanism modeled after Tk's/GK's bindings # # This is almost identical to the notifiers in GroupKit. class tq::notifier { common seq 0 common bindings common wildcards variable events variable scripts method destroy {} { delete object $this } method bind {event script} { set binding bind[incr seq] set events($binding) $event set scripts($binding) $script if {[string match {*[?*[]*} $event]} { lappend wildcards($event) $binding } else { lappend bindings($event) $binding } return $binding } method delbind {binding} { if {[info exists events($binding)]} { set e $events($binding) unset events($binding) scripts($binding) del1bind bindings($e) $binding foreach x [array names wildcards] { del1bind wildcards($x) $binding } return } return -code error "no such binding" } private method del1bind {ref binding} { upvar $ref boundlist if {[info exists boundlist]} { set n [lsearch -exact $boundlist $binding] if {$n >= 0} { if {[llength $boundlist] == 1} { unset boundlist } else { set boundlist [lreplace $boundlist $n $n] } } } } method notify {event {subst ""}} { lappend subst %% % %E $event if {[info exists bindings($event)]} { foreach x $bindings($event) { if {[info exists scripts($x)]} { uplevel #0 [string map $subst $scripts($x)] } } } foreach x [array names wildcards] { if {[string match $x $event]} { foreach x $wildcards($x) { if {[info exists scripts($x)]} { uplevel #0 [string map $subst $scripts($x)] } } } } return "" } } ### RPC - remote procedure call objects, used for point-to-point communication # # The private tq::endpoint class is used for server-side listeners # as well as for both ends of a connection. class tq::endpoint { public variable cleanup "" ;# command to run when a connection is closed public variable connect "" ;# command to run on new connections (server) public variable command ;# command to run when a request comes in variable sock ;# channel used for this connection variable peer ;# site at the other end, or "-server" if listener variable data "" ;# pending input data variable need 0 ;# 0 when not in recv, count > 0 while in progress constructor {sess addr args} { set sock $sess set peer $addr eval configure $args if {$addr ne "-server"} { fconfigure $sock -translation binary -buffering none -blocking 0 -encoding utf-8 fileevent $sock readable [code $this received] } } destructor { if {[llength $cleanup] && $peer ne "-server"} { eval [linsert $cleanup end $this] } close $sock } method destroy {} { delete object $this } method send {args} { puts -nonewline $sock "T2 [string length $args]\n$args" } method received {} { set n [expr {$need-[string length $data]}] if {$n > 0 && ![eof $sock]} { append data [read $sock $n] if {$need == [string length $data]} { eval [linsert $command end $this $data] set need 0 set data "" } } elseif {![regexp {^T2 (\d{1,9})$} [gets $sock] - need]} { destroy } } method port {} { return [lindex [fconfigure $sock -sockname] 2] } } class tq::rpc { inherit tq::endpoint constructor {host port args} { if {$host eq "-server"} { set s [socket -server [code $this accept] $port] } else { set s [socket $host $port] } eval [linsert $args 0 chain $s $host] } { } method accept {sess addr port} { set conn [tq::endpoint #auto $sess $addr \ -command $command -cleanup $cleanup] if {$connect ne ""} { eval [linsert $connect end ::tq::rpc::$conn $addr] } } } ### POOL - structured data collections which can be shared and/or persistent # # Pool data is stored in a Metakit storage, as a "colls" view with one row, # containing all collections as subviews. The _S view is for "flat" keys. # A couple of ideas to improve performance later: # add Tcl arrays for key -> row# once a collection is searched # add Tcl maps for attr -> col# lookup # switch to blocked views for better scalability # POOLS ARE TRICKY BECAUSE THEY OVERRIDE THE SET/UNSET/INCR COMMANDS! class tq::pool { common seq 0 ;# sequence number for MK storages public variable file "" ;# associated database file variable db ;# associated MK storage (need not be on file) variable nfy ;# associated notifier object variable arrays ;# attached array details constructor {args} { eval configure $args if {$file ne ""} { ::set db [mk::file open pool[::incr seq] $file] periodicSave 30 } else { ::set db [mk::file open pool[::incr seq]] } # don't lose other subviews, only define colls when it doesn't exist yet if {[lsearch -exact [mk::file views $db] colls] < 0} { mk::view layout $db.colls {{_S {key value}}} mk::view size $db.colls 1 } else { puts "Data loaded from $file:" foreach x [mk::view info $db.colls] { ::set x [string map {:V ""} $x] ::set n [mk::view size $db.colls!0.$x] if {$x == "_S"} { ::set x "" } puts " $x: $n keys" } } ::set nfy [namespace which [tequila::notifier]] } destructor { mk::file close $db $nfy destroy } method destroy {} { delete object $this } private method periodicSave {secs} { after ${secs}000 [code $this periodicSave $secs] mk::file commit $db } method export {} { ::set fd [vfs::memchan] mk::file save $db $fd seek $fd 0 ::set data [read $fd] close $fd return $data } method import {data} { ::set fd [vfs::memchan] fconfigure $fd -translation binary puts -nonewline $fd $data seek $fd 0 mk::file load $db $fd close $fd } method bind {event script} { $nfy bind $event $script } method delbind {binding} { $nfy delbind $binding } method attach {name} { upvar $name avar ::set coll [namespace tail $name] if {[lsearch -exact [collections] $coll] < 0} { ::set layout [mk::view layout $db.colls] lappend layout [list $coll {key value}] mk::view layout $db.colls $layout } array unset avar mk::loop c $db.colls!0.$coll { array set avar [mk::get $c key value] } ::set binding [bind $coll.* [code $this arraybinder %C %O %K %V]] array set arrays [list $coll [list 1 $name $binding]] trace add variable avar {write unset} [code $this arraytracer $coll] } method detach {name} { ::set coll [namespace tail $name] if {[info exists arrays($coll)]} { foreach {enabled nm binding} $arrays($coll) break if {$name ne $nm} { error "cannot detach $name, was attached as $nm" } delbind $binding trace remove variable $name {write unset} [code $this arraytracer $coll] array unset arrays $coll } } method arraybinder {coll op key value} { if {[info exists arrays($coll)]} { switch $op { set - unset { lset arrays($coll) 0 0 upvar 1 [lindex $arrays($coll) 1] a switch $op { set { array set a [list $key $value] } unset { array unset a $key } } lset arrays($coll) 0 1 } } } } method arraytracer {coll aray elem op} { if {[lindex $arrays($coll) 0]} { switch $op { write { upvar 1 $aray a; set $coll.$elem $a($elem) } unset { if {$elem ne ""} { unset $coll.$elem } } } } } method keys {{coll _S}} { values $coll key } method values {{coll _S} {attr value}} { ::set r {} mk::loop c $db.colls!0.$coll { lappend r [mk::get $c $attr] } return $r } method collections {} { string map {:V ""} [lrange [mk::view info $db.colls] 1 end] } method attributes {{coll _S}} { mk::view info $db.colls!0.$coll } method size {{coll _S}} { if {[lsearch -exact [collections] $coll] < 0} { return 0 } mk::view size $db.colls!0.$coll } method get {key args} { if {![regexp {^([^.!]+)([.!])([^.!]+)$} $key - coll sep row]} { ::set coll _S ::set sep . ::set row $key } if {$sep eq "!"} { ::set key [mk::get $db.colls!0.$coll!$row key] } else { ::set key $row ::set row [mk::select $db.colls!0.$coll -count 2 key $key] switch [llength $row] { 0 { return } 1 { } 2 { error "key is not unique" } } } if {[llength $args] == 0} { ::set args value } eval mk::get $db.colls!0.$coll!$row $args } method set {args} { eval doSet $args } method match {coll args} { lappend cmd mk::select $db.colls!0.$coll foreach {k v} $args { lappend cmd -globnc $k $v } eval $cmd } method doSet {key args} { if {![regexp {^([^.!]+)([.!])([^.!]+)$} $key - coll sep row]} { ::set coll _S ::set sep . ::set row $key } switch [llength $args] { 0 { error "missing attribute and value arguments" } 1 { ::set args [linsert $args 0 value] } } if {[lsearch -exact [collections] $coll] < 0} { ::set layout [mk::view layout $db.colls] ::set props {} if {$sep ne "!"} { lappend props key } foreach {k v} $args { lappend props $k } lappend layout [list $coll $props] mk::view layout $db.colls $layout } if {$sep eq "!"} { ::set key [mk::get $db.colls!0.$coll!$row key] } else { ::set key $row ::set row [mk::select $db.colls!0.$coll -count 2 key $key] switch [llength $row] { 0 { ::set row [mk::view size $db.colls!0.$coll] mk::set $db.colls!0.$coll!$row key $key } 1 { } 2 { error "key is not unique" } } } ::set attrs [attributes $coll] foreach {k v} $args { if {[lsearch -exact $attrs $k] < 0} { lappend newattrs $k } } if {[info exists newattrs]} { ::set layout {} foreach x [mk::view layout $db.colls] { if {[lindex $x 0] eq $coll} { ::set x [list $coll [concat [lindex $x 1] $newattrs]] } lappend layout $x } mk::view layout $db.colls $layout } eval mk::set $db.colls!0.$coll!$row $args ::set substs [list %O set %C $coll %R $row %K [list $key]] foreach {k v} $args { if {[lsearch -exact $attrs $k] < 0} { lappend newattrs $k } $nfy notify $coll.$key.$k [linsert $substs end %A $k %V [list $v]] } } method change {key attr oldval newval} { ::set f [expr {[get $key $attr] eq $oldval}] if {$f} { set $key $attr $newval } return $f } method incr {key {attr value}} { ::set x [get $key $attr] if {$x eq ""} { ::set x 0 } set $key $attr [::incr x] return $x } method unset {key} { doUnset $key } method doUnset {key} { if {![regexp {^(.+)([.!])(.+)$} $key - coll sep row]} { ::set coll _S ::set sep . ::set row $key } if {$sep eq "!"} { ::set key [mk::get $db.colls!0.$coll!$row key] } else { ::set key $row ::set row [mk::select $db.colls!0.$coll -count 1 key $key] if {[llength $row] != 1} return ;# ignore silently if not present } mk::row delete $db.colls!0.$coll!$row $nfy notify $coll.$key [list %O unset %C $coll %R $row %K [list $key]] return $row } method delattr {coll attr} { doDelattr $coll $attr } method doDelattr {coll attr} { if {[lsearch -exact [attributes $coll] $attr] < 0} return ;# ignore ::set newlayout {} foreach x [mk::view layout $db.colls] { if {[lindex $x 0] eq $coll} { ::set x [lindex $x 1] ::set n [lsearch $x $attr] if {$n < 0} { error "cannot happen: $attr not found in $coll?" } ::set x [list $coll [lreplace $x $n $n]] } lappend newlayout $x } mk::view layout $db.colls $newlayout # MK cannot get rid of a "restructured-away" property if the storage # is not on file, because it needs a commit to clean up (doh!), so # the next best thing to do is to clear out all values in the column # (note how the attribute *does* move to the end of the list!) mk::loop c $db.colls!0.$coll { mk::set $c $attr "" } #if {$file ne ""} { mk::file commit $db } $nfy notify $coll..$attr [list %O delattr %C $coll %A $attr] } method delcoll {coll} { doDelcoll $coll } method doDelcoll {coll} { if {[lsearch -exact [collections] $coll] < 0} return ;# ignore ::set newlayout {} foreach x [mk::view layout $db.colls] { if {[lindex $x 0] ne $coll} { lappend newlayout $x } } mk::view layout $db.colls $newlayout # same problem as delattr, let's at least delete all rows for now # (note how the collection *does* move to the end of the list!) mk::view size $db.colls!0.$coll 0 #if {$file ne ""} { mk::file commit $db } $nfy notify $coll [list %O delcoll %C $coll] } method dump {{maxrows 20} {maxwidth 50} {fd stdout}} { lappend r [dump1 "Pool <[namespace tail $this]>" _S $maxrows $maxwidth] foreach x [collections] { lappend r [dump1 $x $x $maxrows $maxwidth] } puts $fd [join $r \n\n] } private method dump1 {header coll maxrows maxwidth} { ::set view $db.colls!0.$coll ::set syms [attributes $coll] foreach x $syms { ::set max [string length $x] mk::loop c $view { if {[mk::cursor position c] >= $maxrows} break ::set w [mk::get $c -size $x] if {$w > $max} { ::set max $w } if {$w >= $maxwidth} break } if {$max >= $maxwidth} { ::set max $maxwidth } append fmt " %-" $max . $max s append bar " " [string repeat - $max] } ::set fmt [list $fmt] lappend r "$header, [mk::view size $view] entries:" lappend r [eval format $fmt $syms] lappend r $bar mk::loop c $view { if {[mk::cursor position c] >= $maxrows} break ::set vals [regsub -all {[^ -~]} [eval mk::get $c $syms] {.}] lappend r [eval format $fmt $vals] } join $r \n } } ### SERVERPOOL - Track changes coming in from RPC and broadcast them # POOLS ARE TRICKY BECAUSE THEY OVERRIDE THE SET/UNSET/INCR COMMANDS! class tq::serverpool { inherit tq::pool public variable rpc ;# listener variable from ;# set during sreceive processing constructor {args} { eval configure $args } { $rpc configure -connect [code $this sconnected] \ -command [code $this sreceive] \ -cleanup [code $this sdone] if {[lsearch -exact [collections] tequila] < 0} { ::set layout [mk::view layout $db.colls] lappend layout {tequila {key address started}} mk::view layout $db.colls $layout } mk::view size $db.colls!0.tequila 0 } method sconnected {conn addr} { D sconnected $conn $addr ::set cid [clientid $conn] ::set from $conn set tequila.$cid address $addr started [clock seconds] ::unset from $conn send initdata $cid [export] } method sreceive {conn data} { D sreceive $conn $data ::set from $conn eval $data ::unset from } method sdone {conn} { D sdone $conn ::set from $conn unset tequila.[clientid $conn] ::unset from if {$file ne ""} { after idle mk::file commit $db } } proc clientid {conn} { regsub {^.*[^\d](\d+)$} $conn {\1} } proc endpoint {cid} { return ::tq::rpc::endpoint$cid } method reply {args} { eval $from send $args } method to {dest args} { ::set cid [clientid $from] foreach x [keys tequila] { if {$dest eq "all" || $x ne $from} { eval [linsert $args 0 [endpoint $x] send] } } } method set {args} { eval to all doSet $args # set locally last, so this also works for the "tequila" collection eval doSet $args } method unset {key} { doUnset $key to all doUnset $key } method delattr {coll attr} { doDelattr $coll $attr to all doDelattr $coll $attr } method delcoll {coll} { doDelcoll $coll to all doDelcoll $coll } } ### CLIENTPOOL - Pass state changes to server, track incoming RPC changes # POOLS ARE TRICKY BECAUSE THEY OVERRIDE THE SET/UNSET/INCR COMMANDS! class tq::clientpool { inherit tq::pool public variable rpc ;# connection to server public variable clientid ;# unique client number (server lifetime) variable result constructor {args} { eval configure $args } { $rpc configure -command [code $this creceive] -cleanup [code $this cdone] } method creceive {conn data} { eval $data } method cdone {conn} { $nfy notify disconnected {%O pool} } method send {args} { eval $rpc send $args } method run {args} { ::set v [scope result] send eval reply ::set [list $v] "\[$args\]" vwait $v ::set $v } method initdata {cid data} { ::set clientid $cid import $data $nfy notify connected {%O pool} } method set {args} { eval send set $args } method change {key attr oldval newval} { run change $key $attr $oldval $newval } method incr {key} { run incr $key } method unset {key} { send unset $key } method delattr {coll attr} { send delattr $coll $attr } method delcoll {coll} { send delcoll $coll } } # uncomment to shut up the server proc D {args} { } switch -- $argv { -s { D [clock format [clock seconds]] D server pid [pid] catch { source hpeek.tcl; hpeek::start 18395 } tequila::pool -server [tequila::rpc -server 18396] -file try.db catch { vwait forever } } -c { D client pid [pid] catch { source hpeek.tcl; hpeek::start 18395 } tequila::pool mypool [tequila::rpc localhost 18396] proc connected {} { puts "clientid: [mypool cget -clientid]" } mypool bind connected connected set n [expr {int(rand()*10)}] set m [expr {int(rand()*1000)}] after 1000 mypool set foo$n bar$m after 2000 mypool dump after 3000 { puts "server pid [mypool run pid]" } after 4000 { puts [mypool change foo$n value bar$m ok] puts [mypool get foo$n] puts [mypool change foo$n value bar$m error] puts [mypool get foo$n] puts [mypool incr seq] puts [mypool incr seq] puts [mypool incr seq] } #after 5000 exit catch { vwait forever } } }