#! /usr/bin/perl use strict ; use warnings ; package Dmon ; use 5.008005 ; use File::Path ; use LWP::UserAgent ; use Digest::MD5 ; use Carp qw(confess) ; use DBI ; use DBD::SQLite 1.17 ; use JSON::PP ; use Net::DNS ; use Net::Ping ; use IO::Pipe ; use IO::Select ; use IO::Socket::INET ; use Proc::Daemon 0.14 ; use Time::HiRes ; use Time::Local ; use Net::hostent qw(gethost) ; use Socket qw(inet_ntoa) ; use Config ; use constant { PROG => 'dmon' , CDIR => '/etc/dmon' , PATH => '/usr/sbin/dmon' , REPO => 'https://svn.science.uu.nl/repos/project.dmon/' , WORK => 'works' , UPGR => 'upgrade' , SECR => 'cgi-secret' , CGID => 'cgi-data' , PLVL => '%REVISION%' , SPIN_UP => 1 , SPIN_DO => 0 , T_EVNTS => 'events' , REPITEM => 'reporting' , TIME => 'TIME' , IVAL => 'IVAL' , MAIL => '/usr/sbin/sendmail' , LMOD => 1 , SRC_DEF => 'config' , SRC_SRC => 'source' , NO_ALRT => 'no-alert' } ; our $VERSION = '0.05' ; sub VERSION { $VERSION ; } ; sub get_revision { return PLVL unless PLVL =~ /REVISION/ ; my $res ; if ( -f 'REVISION' ) { $res = `cat REVISION` ; chomp $res ; } else { my $tmp = `svn info @{[REPO]} | grep Revision:` ; $res = ( $tmp and $tmp =~ /^Revision:\s+(\d+)/ ) ? $1 : 'l' ; } $res ; } our $REVISION = get_revision ; our $Version = "$VERSION-p$REVISION" ; sub Version { sprintf "%s-%s", PROG, $Version ; } ################################################################### package OBB ; use constant PROG => Dmon::PROG ; our ( @EXPORT, @EXPORT_OK, @ISA ) ; BEGIN { require Exporter ; @EXPORT = qw(logs logq logt logv logd) ; @EXPORT_OK = qw() ; @ISA = qw(Exporter) } # use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw(name ...) ) ; use Carp ; use constant VLVLS => qw(Silent Quiet Terse Verbose Debug Trace) ; our %VLVLS = () ; { my $cnt = 1 ; $VLVLS { $_ } = $cnt ++ for ( VLVLS ) ; } { my $cnt = 1 ; for ( VLVLS ) { $VLVLS { $cnt } = $cnt ; $cnt ++ ; } } sub _VLVL { $VLVLS { shift @_ } ; } our $Verbosity = $VLVLS { Terse } ; # get Verbosity ; or set Verbosity and return previous Verbosity sub Verbosity { my $self = shift ; my $res = $Verbosity ; if ( @_ ) { my $tmp = shift @_ ; my $lvl = $VLVLS { $tmp } or die "bad level $tmp" ; $Verbosity = $lvl ; } $res ; } sub logx { # sprintf can't have a @list as first argument my ( $fmt, @args ) = @_ ; my $msg = sprintf $fmt, @args ; chomp $msg ; printf "%s %s[%s] %s\n", scalar ( localtime ) , PROG, $$, $msg ; } for my $lvl ( VLVLS ) { my $num = $VLVLS { $lvl } or die "bad num for lvl [$lvl]" ; my $sub = sprintf "sub $lvl { \$Verbosity >= $num ; }" ; eval $sub ; die $@ if $@ ; my $ini = lc substr $lvl, 0, 1 ; $sub = "sub log$ini { logx ( \@_ ) if OBB -> $lvl ; }" ; unless ( $lvl eq 'Trace' ) { eval $sub ; die $@ if $@ ; } } sub mk_getset { my $self = shift ; my $sub = <<'SUB' ; sub %s::%s { my $self = shift ; $self -> {%s} = shift if @_ ; $self -> {%s} ; } SUB my @bads = grep ! /^[A-Za-z_]\w*$/, @_ ; die "mk_getset: bad name [@bads]\n" if @bads ; eval sprintf $sub, $self, $_, $_, $_ for @_ ; } sub mk_get { my $self = shift ; my $sub = <<'SUB' ; sub %s::%s { my $self = shift ; Carp::confess "mk_get %s : can't set" if @_ ; $self -> {%s} ; } SUB my @bads = grep ! /^[A-Za-z_]\w*$/, @_ ; die "mk_getset: bad name [@bads]\n" if @bads ; eval sprintf $sub, $self, $_, $_, $_ for @_ ; } sub New { my $self = shift ; bless {}, $self ; } sub Defs { () ; } sub Init { my $self = shift ; my %opts = ( $self -> Defs, @_ ) ; my @opts = grep { $self -> can ( $_ ) } keys %opts ; @$self { @opts } = @opts { @opts } ; $self ; } sub Make { my $self = shift ; $self -> New -> Init ( @_ ) ; } sub Die { my $self = shift ; my $fmt = shift ; my $msg = sprintf "[error] $fmt", @_ ; Util::syslog ( $msg ) ; die $msg ; } sub Warn { my $self = shift ; my $fmt = shift ; warn sprintf "[warng] $fmt\n", @_ ; } sub Xit { my $self = shift ; my $fmt = shift ; confess ( sprintf $fmt, @_ ) ; } sub A_is { confess ( "! #args == $_[0]" ) unless $_[1] == $_[0] ; } sub A_ge { confess ( "! #args >= $_[0]" ) unless $_[1] >= $_[0] ; } sub A_in { A_is 3, scalar @_ ; my ( $lo, $hi, $sz ) = @_ ; confess ( "! #args in [$lo,$hi]" ) unless $lo <= $sz and $sz <= $hi ; } sub A_defd { for ( my $i = 0 ; $i < scalar @_ ; $i ++ ) { confess ( "! defined arg $i" ) unless defined $_[$i] ; } } sub _addr { my $x = shift ; ( defined $x and ref $x ) ? sprintf ( "%s", $x ) : ref $x ; } sub _blessed { my $r = shift ; my $res = ( defined $r and ref $r and UNIVERSAL::isa ( $r, 'UNIVERSAL' ) ) ; $res ; } sub _unbless ; sub _unbless { my $obj = shift ; my $lvl = shift ; my $hav = shift ; my @pth = @_ ; my $ind = ' ' x $lvl ; my $res ; if ( ! ref $obj ) { if ( ! defined $obj ) { $res = undef ; } elsif ( $obj eq '' ) { $res = '' ; } else { $res = "$obj" ; } } elsif ( $hav -> { $obj } ) { $res = sprintf "%s %s", ref $obj, $hav -> { $obj } ; } else { $hav -> { $obj } = join '/', @pth ; my $addr = _addr ( $obj ) ; if ( $addr =~ /HASH\(/ ) { $res = {} ; for my $key ( sort keys %$obj ) { my $val = $obj -> { $key } ; $res -> { $key } = _unbless $val, $lvl + 1, $hav, @pth, $key ; } } elsif ( $addr =~ /ARRAY\(/ ) { my $i = 0 ; $res = [ map { _unbless $_, $lvl + 1, $hav, @pth, $i ++ } @$obj ] ; } elsif ( $addr =~ /GLOB\(/ ) { $res = $addr ; } else { die sprintf "can't unbless %s = %s", $addr, ref $obj ; } } $res ; } sub Unbless { _unbless ( $_[1], 0 ) ; } # if $dst == undef : print to \*STDOUT # elif ref dst == IO::FILE : print to $dst # else $dst : return Util::as_text sub Dmp { my $obj = shift ; my $tag = shift ; my $suf = shift || '' ; my $fh = shift || \*STDOUT ; my $txt = JSON::PP -> new -> utf8 -> pretty -> canonical -> allow_nonref ( 1 ) -> encode ( OBB -> Unbless ( $obj ) ) ; $txt .= "\n" unless substr $txt, -1 eq "\n" ; print $fh '' . ( $tag ? "$tag :\n" : '' ) . $txt . $suf ; } sub TT { # printf "TT[%s]\n", join '|', @_ ; my $fmt = shift || 'TT' ; my $txt = sprintf $fmt, map { ( ref $_ and $_ -> can ( 'diag' ) ) ? $_ -> diag : Util::diag ( $_ ) } @_ ; my ( $sec, $mic ) = Time::HiRes::gettimeofday ; my $res = sprintf "time %.3f %s\n", $sec + $mic / 1000000 - $^T, $txt ; print $res if OBB -> Trace ; $res ; } ################################################################### package Util ; our ( @EXPORT, @EXPORT_OK, @ISA ) ; BEGIN { require Exporter ; @EXPORT = qw(syslog) ; @EXPORT_OK = qw() ; @ISA = qw(Exporter) } OBB -> import ; use Exporter ; use constant PROG => Dmon::PROG ; use constant MONS => [ qw(Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec) ] ; our $IP_PAT = '^\d{1,3}(\.\d{1,3}){3}$' ; our $LOGGER = '' ; our $CONF ; our $STOP = 'dirty' ; our $EXEC = 0 ; our %MONS ; { my $cnt = 0 ; $MONS { $_ } = $cnt ++ for @{+MONS} ; } sub MON { $MONS { $_[0] } ; } our $MODE = 0 ; sub MODE { $MODE = shift if @_ ; $MODE ; } ; sub INITx { my $self = shift ; my %opts = @_ ; $CONF = $opts {conf} ; for my $root ( '', '/usr' ) { for my $bin ( qw(bin sbin) ) { my $prg = "$root/$bin/logger" ; if ( -f $prg ) { $LOGGER = $prg ; last ; } } last if $LOGGER ; } my @dirs = qw(logdir rundir vardir) ; for my $dir ( @dirs ) { my $xxxdir = $CONF -> $dir ; if ( $xxxdir ) { my $dotdot = Util::dirname ( $xxxdir ) ; if ( -d $dotdot ) { -d $xxxdir or mkdir $xxxdir, 0755 or OBB -> Die ( "can't mkdir $xxxdir" ) ; } else { OBB -> Die ( "can't find dir $dotdot" ) ; } } } } our $_addlog = 0 ; our $LOGF_hndl ; sub addlog { my $msg = shift ; return unless $CONF and ref ( $CONF ) eq 'Conf' ; return unless $CONF -> logdir ; # for Pmaker ; my $LOGF = $CONF -> log_file ; unless ( $LOGF_hndl and defined $LOGF_hndl -> fileno ) { unless ( $LOGF_hndl = new IO::File ">>$LOGF" ) { syslog ( "can't append [$LOGF] ($!)" ) unless $_addlog ++ ; } else { $LOGF_hndl -> autoflush ( 1 ) ; } } if ( $LOGF_hndl ) { $LOGF_hndl -> printf ( "%s %s[%s] %s\n", scalar ( localtime ) , PROG, $$, $msg ) ; $_addlog = 0 ; } } sub _syslog { my $mesg = shift ; my $PROG = PROG ; if ( $LOGGER and open SYSL, "|$LOGGER -p user.err -t ${PROG}[$$]" ) { printf SYSL "$mesg\n" ; close SYSL ; } } sub syslog { my $fmt = shift ; my $msg = sprintf $fmt, @_ ; chomp $msg ; _syslog $msg if $MODE ; logt ( $msg . ( $MODE ? " [syslog'ed]" : '' ) ) ; } sub uniq { my %x = () ; for ( @_ ) { $x { $_ } ++ ; } sort keys %x ; } sub slice { my $h = shift ; my $r = { map { my $v = $h -> $_ ; my $x = ( ( ref $v and $v -> can ( 'exp' ) ) ? $v -> exp : $v ) ; ( $_ => $x ) } @_ } ; $r ; } sub diag { my $x = shift ; $x =~ s/\n/\\n/g if defined $x ; unless ( defined $x ) { '[undef]' ; } elsif ( ref $x and $x -> can ( 'diag' ) ) { sprintf '[%s]', $x -> diag ; } elsif ( $x eq '' ) { '[empty]' ; } else { my $l = length ( $x ) ; my $c = $l < 1000 ; sprintf '[%s]', $c ? $x : substr ( $x, 0, 48 ) . "...($l)" ; } } # $time = timegm($sec,$min,$hour,$mday,$mon,$year); # Apr 14 00:00:00 2015 GMT sub secs4date { my $date = shift ; chomp $date ; if ( $date =~ /^(\w{3})\s+(\d{1,2})\s+(\d{2}:\d{2}:\d{2})\s+(\d{4})/ ) { my $mon = MON ucfirst lc $1 ; my ( $day, $hms, $yea ) = ( $2, $3, $4 ) ; my @smh = reverse split /:/, $hms ; Time::Local::timegm ( @smh, $day, $mon, $yea ) ; } else { OBB::TT ( 'secs4date: bad date %s', $date ) ; undef ; } } sub get_host_ips { my $hnam = shift ; my $res = undef ; if ( $hnam =~ /$IP_PAT/ ) { $res = [ $hnam ] ; } elsif ( my $info = Net::hostent::gethost $hnam ) { $res = [ map { Socket::inet_ntoa $_ ; } @{ $info -> addr_list } ] ; } $res ; } # cache localhost_ips ; re-resolve if not defined our $localhost_ips = get_host_ips 'localhost' ; sub localhost_ips { $localhost_ips || get_host_ips 'localhost' ; } sub mk_allowed { my $list = shift ; my $port = shift ; my $res = {} ; for my $hnam ( uniq @$list, 'localhost' ) { my $ips = Util::get_host_ips ( $hnam ) ; if ( defined $ips ) { for my $ip ( uniq @$ips ) { $res -> { $ip } = { hnam => $hnam, ltim => time } ; printf "allow %s %s\n", $port, ( $hnam =~ /$IP_PAT/ ? $ip : "$hnam ($ip)" ) if OBB -> Terse ; } } else { print "can't resolve $hnam\n" ; } } $res ; } our $bin_hostname = `hostname` ; chomp $bin_hostname ; sub bin_hostname { my $self = shift ; $bin_hostname ; } our $canonical_hostname ; sub hostname { my $self = shift ; $canonical_hostname ; } sub hostname_make { OBB::A_is ( 3, scalar @_ ) ; my $self = shift ; my $name = shift ; my $cdom = shift ; # configured domain or undef my $resolv = Net::DNS::Resolver -> new ; my $res = $name ; unless ( $res =~ /\./ ) { my $dom = $cdom || ( $resolv -> searchlist ) [ 0 ] ; $res .= ".$dom" if $dom ; } if ( my $search = $resolv -> search ( $res ) ) { my @nams = map { $_ -> cname } grep { $_ -> type eq 'CNAME' } $search -> answer ; $res = $nams [ 0 ] if @nams ; } OBB::TT ( 'hostname_make(%s,%s) => %s', $name, $cdom, $res ) ; printf "hostname_make ( %s , %s ) => %s\n" , $name, $cdom || 'no_dom', $res || 'no_res' if OBB -> Verbose ; $res ; } sub hostname_set { OBB::A_is ( 3, scalar @_ ) ; my $self = shift ; my $name = shift ; my $cdom = shift ; # configured domain or undef $canonical_hostname = $self -> hostname_make ( $name, $cdom ) ; } hostname_set ( 'Util', $bin_hostname, undef ) ; # our $canonical_hostname = _canonical_hostname $bin_hostname, undef ; our $DISTRIB ; if ( -f '/usr/bin/lsb_release' ) { $DISTRIB = lc `/usr/bin/lsb_release -is` ; chomp $DISTRIB ; } $DISTRIB ||= 'no_distrib' ; sub as_pvar { my $r = JSON::PP::decode_json ( $_[0] ) ; $r ; } sub as_text { my $r = JSON::PP::encode_json ( $_[0] ) ; chomp $r ; $r ; } sub pretty { my $v = shift ; my $txt = JSON::PP -> new -> utf8 -> pretty -> canonical -> allow_nonref ( 1 ) -> encode ( OBB -> Unbless ( $v ) ) ; chomp $txt ; $txt ; } sub basename { my $x = shift || $0 ; substr $x, 1 + rindex $x, '/' ; } sub dirname { my $x = shift || $0 ; my $idx = rindex $x, '/' ; ( $idx == -1 ) ? '.' : substr $x, 0, $idx ; } our %s4u = ( 's' => 1 ) ; $s4u { m } = 60 * $s4u { s } ; $s4u { h } = 60 * $s4u { m } ; $s4u { d } = 24 * $s4u { h } ; $s4u { w } = 7 * $s4u { d } ; sub s4uv { my $v = shift ; my $u = shift ; $v = 1 unless defined $v and length $v ; $u = 's' unless defined $u and length $u ; die "500: no s4u {$u}" unless exists $s4u { $u } ; $v * $s4u { $u } ; } # return undef on 'bad spec' # my $msg = '( [+-] NUM [smhdw] ) ...' ; # $err = "bad spec ($spec) ; should be like '$msg'" ; sub secs4spec { my $spc = shift ; my $num = '[-+]?\d+(\.\d)?' ; my $one = "($num)?([smhdw]?)" ; my $all = "^($one)+\$" ; my $res ; if ( $spc =~ /$all/ ) { my $tmp = $spc ; $res = 0 ; while ( length $tmp ) { die "500: '$tmp' ~! /^$one/" unless $tmp =~ /^$one/ ; my $num = $1 ; my $unit = $3 ; $tmp = $' ; $res += s4uv $num, $unit ; } } $res ; } sub ddate ($) { my $x = shift ; my @x = reverse ( ( localtime $x ) [ 0 .. 5 ] ) ; $x [ 0 ] += 1900 ; $x [ 1 ] ++ ; @x = map { sprintf '%02d', $_ ; } @x ; sprintf "%s|%s", join ( '-', @x[(0..2)] ), join ( ':', @x[(3..5)] ) ; } # ( $sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime sub date ($) { my $x = shift ; my $thn = [ reverse ( ( localtime $x ) [ 0 .. 5 ] ) ] ; my $now = [ reverse ( ( localtime time ) [ 0 .. 5 ] ) ] ; for my $r ( $thn, $now ) { $r -> [ 0 ] = sprintf "%04d", $r -> [ 0 ] + 1900 ; $r -> [ 1 ] = MONS -> [ $r -> [ 1 ] ] ; $r -> [ $_ ] = sprintf "%02d", $r -> [ $_ ] for ( 2 .. 5 ) ; } while ( @$thn and $thn -> [ 0 ] eq $now -> [ 0 ] ) { shift @$thn ; shift @$now ; } if ( @$thn == 0 ) { 'now' ; } elsif ( @$thn >= 3 ) { my $hms = join ':', @$thn [ $#$thn - 2 .. $#$thn ] ; pop @$thn ; pop @$thn ; pop @$thn ; my $ymd = join ' ', @$thn if defined $thn and @$thn ; join ' ', grep { defined $_ } ( $ymd, $hms ) ; } else { join ':', @$thn ; } } sub sig_name { my $sig = shift ; ( split ' ', $Config::Config{'sig_name'} ) [ $sig ] || "sig#$sig" ; } sub rotate { my $conf = shift ; my $log = $conf -> log_file ; addlog "rotate $log" ; my $cnt = ( split ' ', $conf -> rotate ) [ 0 ] ; unlink "$log.$cnt" ; # ignore status for ( my $i = $cnt - 1 ; $i > 0 ; $i -- ) { my $src = sprintf "%s.%s", $log, $i ; my $dst = sprintf "%s.%s", $log, $i + 1 ; rename $src, $dst or addlog "can't rename $src, $dst" if -f $src ; } my $dst = "$log.1" ; my $msg = "can't reopen " ; rename $log, $dst or addlog "can't rename $log, $dst" if -f $log ; close STDOUT ; open STDOUT, '>>', $log or _syslog ( "$msg STDOUT" ) ; close STDERR ; open STDERR, '>>', $log or _syslog ( "$msg STDERR" ) ; } sub get_url { my $url = shift ; my $HEAD = shift ; OBB -> Xit ( "get_url [$url] HEAD [$HEAD]" ) ; addlog "get_url [$url]" ; my $UA = LWP::UserAgent -> new ( timeout => 5 ) ; my $resp = $HEAD ? $UA -> head ( $url ) : $UA -> get ( $url ) ; my $code = $resp -> code ; my $ok = $resp -> is_success ; my $lm = $resp -> header ( 'Last-Modified' ) ; my $body = $resp -> content if $ok and not $HEAD ; addlog "got_url ok [$ok] code [$code] lm [$lm]" ; ( $ok, $body, $code, $lm ) ; } sub md5_text { my $txt = shift ; my $md5 = Digest::MD5 -> new -> add ( $txt ) -> hexdigest ; } sub md5_file { my $file = shift ; open FILE, '<', $file or return undef ; my $hex = md5_text join '', ; close FILE ; $hex ; } sub text_file { my $file = shift ; open FILE, '<', $file or return undef ; my $res = join '', ; close FILE ; $res ; } # returns $rtt or undef sub bin_ping { my $hnam = shift ; my $png = "ping -w 1 -c 1 -q $hnam" ; my ( $res, $rtt ) ; my $cmd = "$png 2> /dev/null | grep '^rtt'|" ; OBB::TT 'bin_ping start %s', $hnam ; if ( open PNG, $cmd ) { my $line = ; close PNG ; chomp $line ; OBB::TT 'line %s', $line ; if ( $line and $line =~ m!= (\d+(\.\d+)?)/! ) { $res = 1 ; $rtt = $1 ; } } OBB::TT 'bin_ping done res %s %s %s', $res, $rtt, $hnam ; defined $res ? $rtt : undef ; } # returns $rtt or undef sub ping { my $hnam = shift ; my $max = shift || 1 ; return bin_ping $hnam if $< ; OBB::TT 'ping start %s max %s', $hnam, $max ; my $png = Net::Ping -> new ( 'icmp', 1 ) ; $png -> hires ( 1 ) ; my $cnt = 0 ; my $min ; while ( $cnt < $max ) { my ( $res, $rtt, $ip ) = $png -> ping ( $hnam ) ; OBB::TT 'ping %s done res %s %s %s', $cnt, $res, $rtt, $hnam ; if ( $res ) { $min = $rtt unless defined $min and $rtt > $min ; Time::HiRes::usleep 10000 ; } else { sleep 1 ; } $cnt ++ ; } $png -> close () ; $min ; } sub uptime { ( split ' ', `cat /proc/uptime` ) [ 0 ] ; } ################################################################### package Host ; use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw(hnam show ips) ) ; # may return undef [or die] if hnam not in DNS sub Init { my $self = shift ; my %opts = ( hnam => Util -> hostname , @_ ) ; $self -> OBB::Init ( %opts ) ; my $hnam = $opts { hnam } ; my $ips = $opts { ips } || Util::get_host_ips ( $hnam ) ; if ( defined $ips ) { $self -> ips ( $ips ) ; } else { # OBB -> Die ( "no ips for [$hnam]" ) ; return undef ; } $self ; } sub ping { my $self = shift ; Util::ping $self -> hnam, @_ ; } sub diag { my $self = shift ; Util::diag $self -> hnam ; } ################################################################### # state from reporting # t_state set by server # wrkr from work-file package Host::Client ; use base 'Host' ; __PACKAGE__ -> mk_getset ( qw(state t_state wrkr) ) ; sub Init { my $self = shift ; $self -> Host::Init ( @_ ) ; $self ; } ################################################################### package Host::Pmaker ; use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw(xargs xfits) ) ; ################################################################### # stub for Server, Client, Pmaker ; App's must init 'hnam' package App ; use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw(conf work threads service Items) ) ; sub Init { my $self = shift ; my %opts = ( threads => undef, SERVICE => 1, @_ ) ; my $conf = $self -> conf ( $opts { conf } ) ; my $threads = $opts { threads } ; OBB::TT 'App Init %s', $self ; die "App : no hostname" unless $self -> hnam ; my $work = $self -> work ( $self -> mk_work ) ; $self -> Items ( Items -> Make ) -> add_items ( $work -> items ) ; $self -> threads ( $threads ) ; if ( $opts{SERVICE} and $self -> threads ) { my $PORT = $self -> PORT ; $self -> service ( $threads -> Add ( Dmon::Thread::Service -> Make ( port => $PORT ), $self ) ) ; $self -> service -> mk_allowed ( $self -> allowed, $PORT ) ; } OBB::TT 'App Init done %s', $self ; $self ; } sub _sender_to_me { my $self = shift ; my $hnam = shift ; $hnam = $hnam -> hnam if ref $hnam ; my $port = shift ; my $msgs = shift ; my $res = Dmon::Thread::Send -> Make ( hnam => $hnam , port => $port , mbox => $msgs ) ; $self -> threads -> Add ( $res, $self ) if $res ; $res ; } sub new_work { my $self = shift ; my $lm = shift ; unless ( $lm ) { OBB::TT "new_work : no lm ; fetch work" ; my $work = $self -> mk_work ( Dmon::LMOD ) ; $lm = $work -> stamp if $work ; OBB::TT "new_work : no lm ; no work" unless $work ; } OBB::TT ( "new_work : self %s lm %s", $self -> work -> stamp, $lm ) ; $lm and $self -> work -> stamp < $lm ; } sub _work_file { OBB::A_is ( 3, scalar @_ ) ; my $wdir = shift || 'no_dir' ; my $prog = shift || 'no_prog' ; my $hnam = shift || 'no_host' ; sprintf "%s/%s/%s.txt", $wdir, $prog, $hnam ; } sub work_file { my $self = shift ; _work_file $self -> conf -> workdir, $self -> PROG, $self -> hnam ; } sub _work_cmd { OBB::A_is ( 3, scalar @_ ) ; my $prog = shift || 'no_prog' ; my $hnam = shift || 'no_host' ; my $lmod = shift ; sprintf "%s %s %s", ( $lmod ? 'WORK_LM' : 'WORK' ), $prog, $hnam ; } sub work_cmd { my ( $self, $lmod ) = @_ ; _work_cmd $self -> PROG, $self -> hnam, $lmod ; } sub mk_work { my $self = shift ; $self -> Xit ( 'called stub App::mk_work' ) ; } sub command { my $self = shift ; $self -> Xit ( 'called stub App::command' ) ; } sub DEF_IVAL { my $self = shift ; $self -> conf -> ival_make_state ; } ################################################################### package Server ; use base qw(Host App) ; __PACKAGE__ -> mk_getset ( qw(state clients users alerts levels by_ip Tzero CLIENT) ) ; sub Defs { ( Host -> Defs , App -> Defs , clients => {} , users => {} , alerts => [] , by_ip => {} , Tzero => time ) ; } use constant { PROG => 'dmon-server' } ; sub Init { my $self = shift ; my %opts = @_ ; my $conf = $opts{conf} ; for my $dir ( map { $conf -> $_ } qw(workdir upgrdir) ) { $self -> Die ( "no dir $dir" ) unless -d $dir ; } for my $fil ( map { $conf -> $_ } qw(upgr_file) ) { $self -> Die ( "no file $fil" ) unless -f $fil ; } $self -> Host::Init ( @_ ) ; $self -> App::Init ( @_ ) ; $self -> levels ( Fitl -> mk_levels ( $self -> work -> levels ) ) ; $self -> set_clients ; $self -> set_users ; $self -> set_alerts ; $self ; } sub PORT { my $self = shift ; $self -> conf -> port_server ; } sub allowed { my $self = shift ; $self -> work ? $self -> work -> wrkrs : [] ; } sub mk_work { my ( $self, $lmod ) = @_ ; Work::Server -> make_from_file ( $self -> work_file, $lmod ) ; } sub set_clients { my $self = shift ; my $hosts = $self -> work -> hosts ; for my $host ( @$hosts ) { my $client = Host::Client -> Make ( %$host ) ; $self -> clients -> { $client -> hnam } = $client ; if ( defined $client and defined $client -> ips ) { for my $ip ( @{ $client -> ips } ) { $self -> by_ip -> { $ip } = $client ; } } } } sub set_users { my $self = shift ; my $works = $self -> work -> users ; for my $work ( @$works ) { my $user = User -> Make ( %$work ) ; $self -> users -> { $user -> name } = $user ; } # OBB::Dmp ( $self -> users ) ; exit ; } sub set_alerts { my $self = shift ; my $works = $self -> work -> alerts ; for my $work ( @$works ) { my $users = $work -> {users} ; my $alert = Alert -> Make ( users => [ map { $self -> users -> { $_ } } @$users ] , levl1 => { map { ( $_ => 1 ) } @{ $work -> {levl1} } } , levl2 => { map { ( $_ => 1 ) } @{ $work -> {levl2} } } , hosts => { map { ( $_ => 1 ) } @{ $work -> {hosts} } } , items => { map { ( $_ => 1 ) } @{ $work -> {items} } } ) ; push @{ $self -> alerts }, $alert ; } } # my $cli = $self -> clients -> { $hnm } ; # $cli -> state ( $rep ) ; # $cli -> t_state ( time ) ; sub check_reporting { my $self = shift ; my $ival = $self -> conf -> ival_send_report ; my $init = time < $self -> Tzero + $ival ; OBB::TT ' check_reporting init %s ...', $init || 0 ; my $clnts = $self -> clients ; return scalar grep { defined $_ and defined $_ -> t_state and $_ -> t_state } values %$clnts if $init ; my $table = Dmon::T_EVNTS ; my $res = 0 ; my $evlst = $self -> CLIENT -> hist -> select_hash ( $table , cols => 'e2.*' , from => "$table e1 left join $table e2 using ( TIME, hnam )" , where => sprintf ( "e1.item = '%s'", Dmon::REPITEM ) , group_by => 'hnam' , having => 'TIME = MAX ( TIME )' , order_by => 'TIME' ) ; my %evnts = map { my $event = Event -> Make ( %$_ ) ; ( $event -> hnam => $event ) ; } @$evlst ; for my $hnam ( sort keys %$clnts ) { my $cli = $clnts -> { $hnam } ; my $tim = $cli -> t_state ; next unless defined $tim ; my $ival = $self -> conf -> ival_send_report ; # one miss is allowed my $age = int ( abs ( time - $tim ) / $ival - 1.5 ) ; my $lmax = @{ $self -> levels } - 1 ; my $lnew = ( $age < 0 ? 0 : ( $age > $lmax ? $lmax : $age ) ) ; my $last = $evnts { $hnam } ; my $lold = $last ? $last -> lnew : 0 ; if ( $lnew != $lold ) { OBB::TT ( 'hnam %s age %s lold %s lnew %s' , $hnam, $age, $lold, $lnew ) ; my $evnt = Event -> Make ( hnam => $hnam , item => Dmon::REPITEM , lold => $lold , lnew => $lnew , mesg => "age [$age]" ) ; $evnt -> save ( $self -> CLIENT -> hist ) ; $evnt -> send_mail ( $self ) ; } $res ++ unless $lnew ; } $res ; } sub send_client { OBB::A_is ( 2, scalar @_ ) ; my $self = shift ; my $hnam = shift ; my $PORT = $self -> conf -> port_client ; my $clnt = $self -> clients -> { $hnam } ; my $res ; OBB::TT 'to client PORT %s', $PORT ; if ( $clnt ) { $res = $self -> _sender_to_me ( $clnt, $PORT ) ; } else { $res = undef ; } $res ; } sub cnt_clients { my $self = shift ; scalar keys %{ $self -> clients } ; } sub show_client { OBB::A_is ( 2, scalar @_ ) ; my ( $self, $hnam ) = @_ ; my $host = $self -> clients -> { $hnam } ; ( $host ? ( sprintf '%s [%s]', $host -> show, $host -> hnam ) : ( sprintf '%s [%s]', $hnam, 'not yet found' ) ) ; } # Server allow_command sub allow_command { my $self = shift ; my $cmd = shift ; my $thrd = shift ; my $peer = $thrd -> inp -> peerhost ; my $res = 1 ; # command UPGRADE is only allowed from localhost if ( $cmd eq 'UPGRADE' ) { my $ips = Util::localhost_ips () ; $res = $ips ? grep { $peer eq $_ } @$ips : 0 ; } elsif ( $cmd eq 'NO_ALRT' ) { my $pmkr = $self -> work -> pmaker ; my $host = $self -> clients -> { $pmkr } ; my $ips = $host ? $host -> ips : [] ; $res = $ips ? grep { $peer eq $_ } @$ips : 0 ; } $res ; } # Server command ; must return undef for unknown command sub command { my $self = shift ; my $cmd = shift ; my $arg = shift ; my $thrd = shift ; my $peer = $thrd -> inp -> peerhost ; my $res = undef ; if ( ! $self -> allow_command ( $cmd, $thrd ) ) { $res = "command not allowed [$cmd] from $peer" ; } elsif ( $cmd eq 'WORK' ) { $res = $self -> do_work ( $arg, $thrd ) ; } elsif ( $cmd eq 'WORK_LM' ) { $res = $self -> do_work ( $arg, $thrd, 1 ) ; } elsif ( $cmd eq 'PROG' ) { $res = $self -> do_prog ( $arg, $thrd ) ; } elsif ( $cmd eq 'EVENTS' ) { my $val = JSON::PP::decode_json ( $arg || '[]' ) ; $res = $self -> do_events ( $val, $peer ) ; } elsif ( $cmd eq 'CLIENT' ) { $res = $self -> do_client ( $arg, $thrd ) ; } elsif ( $cmd eq 'CLIENTS' ) { $res = $self -> do_clients ( $arg ) ; } elsif ( $cmd eq 'ALLPING' ) { $res = $self -> do_allping ( $arg, $thrd ) ; } elsif ( $cmd eq 'ALLSEND' ) { $res = $self -> do_allsend ( $arg, $thrd ) ; } elsif ( $cmd eq 'ALLMETA' ) { $res = $self -> do_allmeta ( $arg, $thrd ) ; } elsif ( $cmd eq 'UPGRADE' ) { $res = $self -> do_upgrade ( $arg, $thrd ) ; } elsif ( $cmd eq 'NO_ALRT' ) { $res = $self -> do_no_alrt ( $arg, $thrd ) ; } $res ; } sub do_client { my $self = shift ; my $arg = shift ; my $thrd = shift ; my ( $hnam, $cmd ) = split ' ', $arg, 2 ; my $sndr = $self -> send_client ( $hnam ) ; return "can't connect to client ($hnam)" unless $sndr ; $thrd -> wait4 ( $sndr ) ; $sndr -> send ( $cmd ) ; # result is ignored } # Server _do_all sub _do_all { OBB::A_in 4, 5, scalar @_ ; my $self = shift ; my $arg = shift ; my $thrd = shift ; my $cmd = shift ; my $skip = shift || 0 ; # skip server my $hsts = $self -> work -> wrkrs ; my $res = [] ; my $CMD = $cmd ; $CMD .= " $arg" if defined $arg ; $thrd -> proxy ( 0 ) ; for my $hnam ( @$hsts ) { next if $skip and $hnam eq Util -> hostname ; OBB::TT "send $cmd to %s", $hnam ; if ( my $sndr = $self -> send_client ( $hnam ) ) { $thrd -> wait4 ( $sndr ) ; $sndr -> send ( $CMD ) ; } else { push @$res, "can't connect to $hnam" ; } } push @$res, "ALL$cmd : no hosts reachable" unless $thrd -> waitc ; OBB::TT "ALL$cmd is waiting for %s hosts", $thrd -> waitc ; join '', map "$_\n", @$res ; } # default : skip server ; sub do_allping { _do_all @_, 'PING', 0 ; } sub do_allsend { _do_all @_, 'SEND', 1 ; } sub do_allmeta { _do_all @_, 'META', 0 ; } # Server do_report sub do_report { my $self = shift ; my $rprt = shift ; my $from = shift ; my $resp = 'ok' ; my $clnt = $self -> by_ip -> { $from } ; my $work = undef ; if ( $clnt ) { my $hnam = $clnt -> hnam ; my $clnts = $self -> clients ; my @bads = grep { ! $self -> clients -> { $_ } } keys %$rprt ; if ( @bads ) { $resp = "no client for [@bads] from $hnam [$from]" ; } elsif ( scalar keys %$rprt == 0 ) { $resp = "no keys in report from $hnam [$from]" ; } else { my @fors = () ; for my $hnm ( sort keys %$rprt ) { my $rep = $rprt -> { $hnm } ; my $cli = $self -> clients -> { $hnm } ; $cli -> state ( $rep ) ; $cli -> t_state ( time ) ; push @fors, $hnm ; } $resp = "ok report from $hnam" ; $resp .= " [$from]" . ( @fors ? " for @fors" : '' ) unless @fors == 1 and $fors[0] eq $hnam ; my $path = App::_work_file ( $self -> conf -> workdir, Client -> PROG, $hnam ) ; OBB::TT "work_file %s", $path ; $work = ( stat $path ) [ 9 ] || 0 ; } } else { $resp = "no client for $from" ; } Util::as_text { resp => $resp, work => $work } ; } # Server do_events sub do_events { my $self = shift ; my $evnts = shift ; my $from = shift ; my $resp = 'ok' ; my $clnt = $self -> by_ip -> { $from } ; unless ( ref ( $evnts ) =~ /ARRAY/ ) { $resp = "not an ARRAY ($evnts)" ; } elsif ( $clnt ) { my $hnam = $clnt -> hnam ; my $clnts = $self -> clients ; my @bads = grep { ! $self -> clients -> { $_ } } map { $_ -> { hnam } || 'no_hnam' } @$evnts ; if ( @bads ) { $resp = "no client for [@bads] in events from $hnam [$from]" ; } elsif ( @$evnts == 0 ) { $resp = "no events from $hnam [$from]" ; } elsif ( ! ( $self -> CLIENT and $self -> CLIENT -> hist ) ) { $resp = "can't find a history" ; } else { my @fors = () ; for my $hash ( @$evnts ) { my $evnt = Event -> Make ( %$hash ) ; $evnt -> save ( $self -> CLIENT -> hist ) ; push @fors, $evnt -> hnam ; $evnt -> send_mail ( $self ) ; } $resp = "ok events from $hnam [$from]" . ( @fors ? " for @fors" : '' ) ; } } else { $resp = "no client for $from" ; } Util::as_text { resp => $resp } ; } sub events { my $self = shift ; my $arg = shift || 0 ; my $hnam = shift ; my $inam = shift ; my $res = [] ; my $hist = $self -> CLIENT -> hist if $self -> CLIENT ; my %sel = ( order_by => 'TIME DESC' ) ; my @qwe = () ; push @qwe, "hnam = '$hnam'" if $hnam ; push @qwe, "item = '$inam'" if $inam ; my $secs ; if ( $arg =~ /^\d+$/ ) { $sel { limit } = $arg if $arg > 0 ; } elsif ( defined ( $secs = Util::secs4spec ( $arg ) ) ) { push @qwe, sprintf "TIME >= %s", time - $secs ; } $sel { where } = join ' AND ', @qwe if @qwe ; if ( $hist ) { $res = $hist -> select_hash ( Dmon::T_EVNTS, %sel ) ; } $res ; } sub event0 { my $self = shift ; my $hist = $self -> CLIENT -> hist if $self -> CLIENT ; $hist ? $hist -> min ( Dmon::T_EVNTS, Dmon::TIME ) : undef ; } sub do_clients { my $self = shift ; my $arg = shift ; my @args = split ' ', $arg ; my $spec = pop @args ; my $hnam = shift @args ; my $inam = shift @args ; my $cdmp = $self -> clients ; my $resp = 'ok clients' ; if ( $hnam ) { if ( not exists $self -> clients -> { $hnam } ) { return Util::as_text { resp => "no host $hnam" } ; } else { $cdmp = { $hnam => $self -> clients -> { $hnam } } ; $resp = "ok clients $hnam" ; } } Util::as_text { resp => $resp , cdmp => OBB -> Unbless ( $cdmp ) , events => $self -> events ( $spec, $hnam, $inam ) , event0 => $self -> event0 , noalrt => $self -> conf -> no_alert } ; } sub _path { join ( '/', @_ ) . '.txt' ; } sub do_work { my $self = shift ; my $arg = shift ; my $thrd = shift ; my $lmod = shift || 0 ; my $wdir = $self -> conf -> workdir ; my $resp ; my ( $prog, $hnam ) = split ' ', $arg, 2 ; my $path = App::_work_file ( $wdir, $prog, $hnam ) ; unless ( $prog ) { $resp = 'no prog' ; } elsif ( ! $hnam ) { $resp = 'no hnam' ; } elsif ( ! -f $path ) { $resp = "no work [$path]" ; } elsif ( ! open PATH, '<', $path ) { $resp = "can't open work [$path]" ; } return Util::as_text { resp => $resp } if $resp ; my $json ; unless ( $lmod ) { $json = Util::as_pvar join '', ; close PATH ; } my $lm = ( stat $path ) [ 9 ] || 0 ; Util::as_text { resp => 'ok work', data => $json, lm => $lm } ; } # Server do_upgrade sub do_upgrade { my $self = shift ; my $arg = shift ; my $thrd = shift ; my $conf = $self -> conf ; my $hsts = $self -> work -> wrkrs ; my $res = [] ; my %only = map { ( $_ => 1 ) } split ' ', $arg ; my $dmon = $conf -> upgr_file ; my $text = Util::text_file ( $dmon ) ; return "can't get text from $dmon" unless defined $text ; my $json = Util::as_text { text => $text } ; $thrd -> proxy ( 0 ) ; for my $hnam ( @$hsts ) { # skip the server next if $hnam eq Util -> hostname ; next if $arg and ! $only { $hnam } ; OBB::TT "send upgrade to %s", $hnam ; if ( my $sndr = $self -> send_client ( $hnam ) ) { $thrd -> wait4 ( $sndr ) ; $sndr -> send ( "UPGRADE $json" ) ; } else { push @$res, "can't connect to $hnam" ; } } push @$res, 'no hosts reachable' unless $thrd -> waitc ; OBB::TT 'waiting for %s hosts', $thrd -> waitc ; join '', map "$_\n", @$res ; } sub do_prog { my $self = shift ; my $arg = shift ; my $thrd = shift ; my $resp ; my $path = $self -> conf -> upgr_file ; if ( ! open PATH, '<', $path ) { $resp = "can't open prog [$path]" ; } return Util::as_text { resp => $resp } if $resp ; my $text = join '', ; close PATH ; my $lm = ( stat $path ) [ 9 ] || 0 ; Util::as_text { resp => "ok prog $path", data => $text, lm => $lm } ; } sub do_no_alrt { my $self = shift ; my $arg = shift ; my $thrd = shift ; my $res = $self -> conf -> no_alert ; my $resp = "ok no_alert [$arg]" ; if ( $arg =~ /^\w+$/ ) { if ( $arg eq '0' ) { $res .= ' -> ' . $self -> conf -> no_alert ( $arg ) ; } elsif ( $self -> users -> { $arg } ) { $res .= ' -> ' . $self -> conf -> no_alert ( $arg ) ; } else { $resp = "no valid arg [$arg]" ; } } Util::as_text { resp => $resp, res => $res } ; } # Server sub STATE { my $self = shift ; sprintf "-- Server state : keeping state for %d clients" , $self -> cnt_clients ; } ################################################################### package Client ; use base qw(Host App) ; __PACKAGE__ -> mk_getset ( qw(server items state levels rexec hist next_send SERVER) ) ; use constant { PROG => 'dmon-client' , STOPPED => 'STOPPED' } ; sub Init { my $self = shift ; OBB::TT 'Client Init %s', $self ; $self -> Host::Init ( @_ ) ; $self -> App::Init ( @_ ) ; $self -> set_server ; $self -> levels ( Fitl -> mk_levels ( $self -> work -> levels ) ) ; my $state = State::Client -> Make ( work => $self -> work , IVAL => $self -> DEF_IVAL , levels => $self -> levels , Items => $self -> Items ) ; $self -> state ( $state ) ; $self -> next_send ( time ) ; OBB::TT 'Client ref self %s', ref $self ; $self -> hist ( $self -> make_db ) unless ref $self eq 'Pmaker' ; OBB::TT 'Client Init done %s', $self ; $self ; } sub mk_update { my $self = shift ; my $hnam = shift ; $self -> state -> mk_update ( $hnam, $self ) ; } sub make_db { my $self = shift ; my $file = $self -> conf -> dbs_file ; my $hist = TS -> Make ( file => $file ) ; my $dbh = $hist -> connect ; return undef unless $hist -> dbh ; OBB::TT 'db connected %s file %s', !! $hist -> dbh, $file ; my $state = $self -> state ; for my $hnam ( $self -> state -> hnams ) { my $tnam = $hist -> tnam ( $hnam ) ; $hist -> add_tab ( $tnam ) unless $hist -> has_tab ( $tnam ) ; my @items = @{ $state -> host_items ( $hnam ) } ; my @names = ( map { $_ -> name } @items ) ; for my $nam ( @names ) { $hist -> add_col ( $tnam, $nam, 'real' ) unless $hist -> has_col ( $tnam, $nam ) ; } my $IVAL = Dmon::IVAL ; unless ( $hist -> has_col ( $tnam, $IVAL ) ) { $hist -> add_col ( $tnam, $IVAL, 'real', 'NOT NULL DEFAULT 1' ) ; $hist -> dbh -> do ( "UPDATE $tnam SET IVAL = 1" ) ; } my %want = ( $IVAL => 1, map { ( $_ => 1 ) } @names ) ; my @drop = () ; for my $col ( sort $hist -> user_cols ( $tnam ) ) { push @drop, $col unless $want { $col } ; OBB::TT "col [$col] want %s", $want { $col } ; } if ( @drop ) { OBB::TT "drop [@drop] from $tnam" ; $hist -> drop_cols ( $tnam, @drop ) ; } } my $tnam = Dmon::T_EVNTS ; my $sql = Event -> SQL_CREATE_TAB ( $tnam ) ; $hist -> dbh -> do ( $sql ) unless $hist -> has_tab ( $tnam ) ; for my $col ( Event -> SQL_ADD_COLS ) { $hist -> add_col ( $tnam, $col, 'real' ) unless $hist -> has_col ( $tnam, $col ) ; } OBB::TT 'make_db done' ; $hist ; } sub save_hist { my $self = shift ; my $hnam = shift ; my $hash = shift ; my $hist = $self -> hist ; if ( $hist ) { my $tnam = $hist -> tnam ( $hnam ) ; $hist -> save_hash ( $tnam, $hash ) ; } } sub last_hist { my $self = shift ; my $hnam = shift ; my $hash = shift ; my $hist = $self -> hist ; my $res = {} ; if ( $hist ) { my $tnam = $hist -> tnam ( $hnam ) ; $res = $hist -> select1_hash ( $tnam , where => "TIME = ( select MAX(TIME) FROM $tnam )" ) ; } $res ; } sub show_levels { my $self = shift ; join ',', map { $_ -> show } @{ $self -> levels } ; } sub PORT { my $self = shift ; $self -> conf -> port_client ; } sub allowed { my $self = shift ; [ $self -> conf -> server ] ; } sub mk_work { my ( $self, $lmod ) = @_ ; if ( $self -> hnam eq $self -> conf -> server ) { Work::Client -> make_from_file ( $self -> work_file, $lmod ) ; } else { Work::Client -> make_from_serv ( $self, $lmod ) ; } } sub send_server_sync { my $self = shift ; my $mesg = shift ; my $pack = shift ; my $res = undef ; my $SOCK = IO::Socket::INET -> new ( PeerAddr => $self -> conf -> server , PeerPort => $self -> conf -> port_server , Proto => 'tcp' ) ; if ( $SOCK ) { printf $SOCK "$mesg\n" ; $SOCK -> shutdown ( 1 ) ; # done writing my @lines = <$SOCK> ; $SOCK -> shutdown ( 2 ) ; # done using $res = $pack -> Make ( lines => [ @lines ] ) if @lines ; } $res ; } # returns undef on "can't connect" sub get_work_mesg { my ( $self, $lmod ) = @_ ; my $cmd = $self -> work_cmd ( $lmod ) ; $self -> send_server_sync ( $cmd, 'Dmon::Mesg::Work' ) ; } sub set_server { my $self = shift ; my $serv = Host -> Make ( hnam => $self -> conf -> server ) ; OBB::TT 'server %s', $serv || 'no server' ; $self -> server ( $serv ) ; } sub send_server { my $self = shift ; my $mesg = shift ; my $msgs = shift ; my $PORT = $self -> conf -> port_server ; my $res ; $self -> Xit ( 'no server?' ) unless $self -> server ; OBB::TT 'to server PORT %s', $PORT ; if ( $self -> server ) { $res = $self -> _sender_to_me ( $self -> server, $PORT, $msgs ) ; $res -> send ( $mesg ) if $res and defined $mesg ; } $res ; } sub send_server_reboot { my $self = shift ; my $hnam = shift ; my $mbox = shift ; my $last = $self -> last_hist ( $hnam ) ; my $curr = Util::uptime ; my $prev = $last -> { uptime } if $last ; if ( defined $curr and defined $prev and $curr < $prev ) { OBB::TT 'send reboot-event to server %s', $hnam ; $self -> send_server ( sprintf ( 'EVENTS %s' , Util::as_text [ { hnam => $hnam, item => 'uptime' , lold => 0 , lnew => 0 , mesg => 'reboot' } ] ) , $mbox ) ; } else { OBB::TT 'no reboot-event to server %s', $hnam ; } } # Client allow_command sub allow_command { my $self = shift ; my $cmd = shift ; my $thrd = shift ; my $peer = $thrd -> inp -> peerhost ; my $res = 1 ; # commands not allowed from localhost my @COMMANDS = qw(UPGRADE) ; if ( grep { $cmd eq $_ } @COMMANDS ) { my $ips = Util::localhost_ips () ; $res = $ips ? not grep { $peer eq $_ } @$ips : 0 ; } $res ; } sub zap_hist { OBB::A_is ( 2, scalar @_ ) ; my $self = shift ; my $hnam = shift ; my $hist = $self -> hist ; my $tnam = TS -> tnam ( $hnam ) ; my $res ; unless ( $hist ) { $res = 'no hist' ; } elsif ( ! $hist -> has_tab ( $tnam ) ) { $res = "no table $tnam" ; } else { $res = $hist -> zap ( $tnam, 3600, 60 * 60 * 24 * 32 ) ; $res = $hist -> zap ( $tnam, 900, 60 * 60 * 24 * 8 ) ; $res = $hist -> zap ( $tnam, 300, 60 * 60 * 24 * 2 ) ; } Util::as_text { resp => $res, data => {} } ; } sub zap_old { OBB::A_is ( 3, scalar @_ ) ; my $self = shift ; my $name = shift ; # hnam or tnam my $keep = shift ; my $hist = $self -> hist ; my $tnam = TS -> tnam ( $name ) ; my $res ; unless ( $hist ) { $res = 'no hist' ; } elsif ( ! $hist -> has_tab ( $tnam ) ) { $res = "no table $tnam" ; } else { $res = $hist -> zap_old ( $tnam, $keep ) ; } Util::as_text { resp => $res, data => {} } ; } # Client command ; must return undef for unknown command sub command { my $self = shift ; my $cmd = shift ; my $arg = shift ; my $thrd = shift ; my $peer = $thrd -> inp -> peerhost ; my $res ; if ( ! $self -> allow_command ( $cmd, $thrd ) ) { $res = "command not allowed [$cmd] from $peer" ; } elsif ( $cmd eq 'STOP' ) { my $STPR = $self -> conf -> read_stp || 'no secret' ; $res = ( ( $arg eq $STPR ) ? Client::STOPPED : 'BAD SECRET' ) ; } elsif ( $cmd eq 'SERVER' ) { $res = $self -> do_server ( $arg, $thrd ) ; } elsif ( $cmd eq 'UPGRADE' ) { $res = $self -> do_upgrade ( $arg, $thrd ) ; } elsif ( $cmd eq 'SEND' ) { $res = $self -> do_send ( $arg, $thrd ) ; } elsif ( $cmd eq 'ZAP' ) { $res = $self -> do_zap ( $arg, $thrd ) ; } elsif ( $cmd eq 'META' ) { $res = $self -> do_meta ( $arg, $thrd ) ; } elsif ( $cmd eq 'HIST' ) { $res = $self -> do_hist ( $arg, $thrd ) ; } $res ; } # Client do_report sub do_report { my $self = shift ; Util::as_text { resp => 'ok', state => $self -> state -> as_json } ; } # Client do_send sub do_send { my $self = shift ; sprintf "next_send %s %s" , Util -> hostname , scalar localtime $self -> next_send ( time + int rand 15 ) ; } # arg == [ hnam ] ; default $self -> hnam sub do_zap { my $self = shift ; my $arg = shift ; my $thrd = shift ; my $hnam = shift || $self -> hnam ; $self -> zap_hist ( $hnam ) ; } sub do_meta { my $self = shift ; my $arg = shift ; my $thrd = shift ; my $hist = $self -> hist ; my $meta = {} ; my $resp ; if ( $hist ) { for my $hnam ( $self -> state -> hnams ) { my $tnam = $hist -> tnam ( $hnam ) ; OBB::TT 'do_meta %s', $hnam ; $meta -> { $hnam } = { count => $hist -> count ( $tnam ) , first => $hist -> min ( $tnam, Dmon::TIME ) , last => $hist -> max ( $tnam, Dmon::TIME ) } ; } $resp = sprintf 'ok meta' ; } else { $resp = sprintf "no history [%s]", $self -> hnam ; } Util::as_text { resp => $resp , data => $meta } ; } # Client do_hist # arg == [ [ival] [pnts] [host/table-name] [col ...] ] # default ival : H # default pnts : 100 == number of data-points # default host : LOCAL == $self -> hnam # default cols : hist.* # returns data => { cols => [], rows => [] } sub do_hist { my $self = shift ; my $arg = shift ; my $thrd = shift ; my $hist = $self -> hist ; my @arg = split ' ', $arg ; my $ival = shift @arg || 'H' ; $ival = uc substr $ival, 0, 1 ; my $pnts = shift @arg || 100 ; my $hnam = shift @arg || 'LOCAL' ; my %ival = () ; my %flat = () ; $ival { H } = 60 * 60 ; $ival { D } = 24 * $ival { H } ; $ival { W } = 7 * $ival { D } ; $ival { M } = 4 * $ival { W } + 3 * $ival { D } ; $ival { Y } = 52 * $ival { W } + 2 * $ival { D } ; $flat { H } = 60 ; $flat { D } = 10 * $flat { H } ; $flat { W } = 1 * $ival { H } ; $flat { M } = 4 * $ival { H } ; $flat { Y } = 2 * $ival { D } ; unless ( $hist ) { Util::as_text { resp => 'no hist', data => {} } ; } elsif ( ! $ival { $ival } ) { Util::as_text { resp => "bad ival [$ival]", data => {} } ; } elsif ( $pnts !~ /^\d+$/ ) { Util::as_text { resp => "pnts NaN [$pnts]", data => {} } ; } else { my $tnam = $hist -> tnam ( $hnam eq 'LOCAL' ? $self -> hnam : $hnam ) ; my $flat = $flat { $ival } ; my $colt = 'cast ( AVG ( TIME ) as int ) as TIME' ; my $grp = "( TIME / $flat )" ; my @cols = ( @arg ? @arg : $hist -> user_cols ( $tnam ) ) ; my $tmin = time - $ival { $ival } ; my $col0 = '' ; $col0 = $cols [ 0 ] if @cols == 1 ; OBB::TT "tnam %s grp %s @cols", $tnam, $grp ; my $rows = $hist -> select ( $tnam , cols => [ $colt , map { "SUM ( IVAL * $_ ) / SUM ( IVAL ) as $_" } @cols ] , where => "TIME >= $tmin" . ( $col0 ? " AND $col0 IS NOT NULL" : '' ) , group_by => $grp , having => '( COUNT(*) > 0 AND SUM ( IVAL ) > 0 )' ) ; my $rcnt = scalar @$rows ; my $err = $hist -> Err ; my $resp = $err ? "no good: $err" : "ok rows $rcnt" ; Util::as_text { resp => $resp , data => { cols => [ Dmon::TIME, @cols ], rows => $rows } } ; } } sub do_server { my $self = shift ; my $arg = shift ; my $thrd = shift ; my $sndr = $self -> send_server ; return "can't connect to server" unless $sndr ; $thrd -> wait4 ( $sndr ) ; $sndr -> send ( $arg ) ; } # Client do_upgrade sub _upgrade { my $self = shift ; my $arg = shift ; my $thrd = shift ; my $conf = $self -> conf ; my $pth = $conf -> bin_file ; my $tmp = "$pth.tmp" ; my $old = "$pth.old" ; return "can't write $tmp ($!)" unless open TMP, '>', $tmp ; my $prt = print TMP Util::as_pvar ( $arg ) -> {text} ; return "can't print $tmp ($!)" unless $prt ; my $cls = close TMP ; return "bad close writing $tmp ($!)" unless $cls ; return "can't check 'perl -c'" unless open TMP, "perl -c < $tmp 2>&1|" ; my $pwc = join '', ; $cls = close TMP ; return "bad errs from 'perl -c' [$pwc]" unless $pwc eq "- syntax OK\n" ; return "bad close from 'perl -c' ($!) xit ($?)" unless $cls ; return "can't get version $tmp" unless open TMP, "perl $tmp --version 2>&1|" ; my $prv = Dmon -> Version ; my $ver = ; chomp $ver ; $cls = close TMP ; return "bad close from 'perl --version' ($!) xit ($?)" unless $cls ; return "identical version $ver" if $ver eq $prv ; unlink $old ; # ignore result ; return "can't chmod 0755 $tmp" unless chmod 0755, $tmp ; return "can't rename $pth $old" unless rename $pth, $old ; return "can't rename $tmp $pth" unless rename $tmp, $pth ; "ok $prv -> $ver" ; } # Client do_upgrade sub do_upgrade { my $self = shift ; my $res = $self -> _upgrade ( @_ ) ; my $hnam = Util -> hostname ; my $ok = $res =~ /^ok/ ; $self -> rexec ( time + 10 + int rand 90 ) if $ok ; sprintf "upgrade $hnam ... %s" , ( $ok ? $res : "fail\n $res" ) ; } # Client sub STATE { my $self = shift ; ( ( sprintf "-- hostname %s\n", $self -> hnam ) . ( sprintf "-- server %s\n", $self -> server -> hnam ) . ( sprintf "-- work %s\n", scalar localtime $self -> work -> stamp ) . ( sprintf "-- Client state :\n%s", $self -> state -> pretty ) ) ; } ################################################################### package User ; use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw(name mail) ) ; sub xsrt { my $self = shift ; $self -> name ; } sub work { my $self = shift ; { name => $self -> name, mail => $self -> mail } ; } ################################################################### package Alert ; use base qw(OBB) ; __PACKAGE__ -> mk_getset ( qw(users levl1 levl2 hosts items) ) ; sub Defs { ( users => {} , levl1 => {} , levl2 => {} , hosts => {} , items => {} ) ; } sub _in_level { my ( $self, $x, $lvl ) = @_ ; my $level = $self -> $lvl ; scalar keys %$level == 0 or $level -> { $x } || 0 ; } sub in_1 { _in_level @_, 'levl1' ; } sub in_2 { _in_level @_, 'levl2' ; } sub users4 { my $self = shift ; my $evnt = shift ; my $set = Set -> Make ; my $old = $evnt -> lold ; $old = -1 unless defined $old ; my $new = $evnt -> lnew ; $new = -1 unless defined $new ; my $frm1 = $self -> in_1 ( $old ) ; my $frm2 = $self -> in_2 ( $old ) ; my $to1 = $self -> in_1 ( $new ) ; my $to2 = $self -> in_2 ( $new ) ; OBB::TT "old %s new %s frm1 %s frm2 %s to1 %s to2 %s" , $old, $new, $frm1, $frm2, $to1, $to2 ; return $set unless ( $frm1 and $to2 ) or ( $frm2 and $to1 ) ; my $hosts = $self -> hosts ; my $host = $evnt -> hnam ; return $set if scalar keys %$hosts and ! $hosts -> { $host } ; my $items = $self -> items ; my $item = $evnt -> item ; return $set if scalar keys %$items and ! $items -> { $item } ; my $users = $self -> users ; OBB::TT "users %s", scalar @$users ; $set -> add ( $_ ) for @$users ; $set ; } ################################################################### package Event ; use base qw(OBB) ; __PACKAGE__ -> mk_getset ( qw(TIME hnam item lold lnew vold vnew mesg host) ) ; sub SQL_ADD_COLS { qw(vold vnew) ; } sub SQL_CREATE_TAB { my $self = shift ; my $tnam = shift ; my $res = < save_hash ( Dmon::T_EVNTS, OBB -> Unbless ( $self ), time, 1 ) ; } sub users4 { my $self = shift ; my $alerts = shift ; my $set = Set -> Make ; for my $alert ( @$alerts ) { $set -> union ( $alert -> users4 ( $self ) ) ; } $set -> list ; } sub show_level { my $lvls = shift ; my $lvl = shift ; my $levl = $lvls -> [ $lvl ] if defined $lvl ; ( defined $levl ? $levl -> show : sprintf "bad level (%s)", $lvl || 'undef' ) ; } sub send_mail { my $self = shift ; my $server = shift ; my $alerts = $server -> alerts ; my $levels = $server -> levels ; my @users = $self -> users4 ( $alerts ) ; my $lold = show_level ( $levels, $self -> lold ) ; my $lnew = show_level ( $levels, $self -> lnew ) ; my $show = $server -> show_client ( $self -> hnam ) ; my $inam = $self -> item ; my $vold = $self -> vold ; $vold = 'undef' unless defined $vold ; my $vnew = $self -> vnew ; $vnew = 'undef' unless defined $vnew ; my $mesg = < $lnew value : $vold => $vnew MESG for my $user ( @users ) { my $name = $user -> name ; my $mail = $user -> mail ; Util::logt ( "user %s diag %s", $name, $self -> diag ) ; my $MAIL = Dmon::MAIL ; next unless -x $MAIL ; next if $server -> conf -> no_alert ; if ( open MAIL, '|-', $MAIL, '-t' ) { print MAIL "To: $mail\n" ; printf MAIL $mesg, ucfirst $name ; close MAIL ; } } } sub diag { my $self = shift ; sprintf 'hnam %s item %s lold %s lnew %s vold %s vnew %s' , Util::diag ( $self -> hnam ) , Util::diag ( $self -> item ) , Util::diag ( $self -> lold ) , Util::diag ( $self -> lnew ) , Util::diag ( $self -> vold ) , Util::diag ( $self -> vnew ) ; } ################################################################### package State ; use base qw(OBB) ; __PACKAGE__ -> mk_getset ( qw(IVAL levels Items) ) ; sub Defs { ( levels => [] ) ; } use constant SEND_SERVER => [ qw(val vtim ival perr fitl) ] ; sub PICK { my $itm = pop ; Util::slice ( $itm, @{+SEND_SERVER} ) ; } sub RECV { my $self = shift ; my $item = shift ; # plain hash ; state from CLIENTS $item -> { fitl } = ( ( defined $item -> { fitl } ) ? $self -> levels -> [ $item -> { fitl } ] : $Fitl::UNDF ) ; %$item ; } ################################################################### package State::Client ; use base qw(State) ; __PACKAGE__ -> mk_getset ( qw(hosts) ) ; sub Defs { ( hosts => {} ) ; } # called by Client with # work => { hnam => { var => { ival fitr [args] }, ... } } ; # IVAL => $self -> DEF_IVAL # levels => $self -> levels (as objects) # Items => $self -> Items sub Init { my $self = shift ; my %opts = ( @_ ) ; $self -> State::Init ( %opts ) ; my $work = $opts{work} ; my $hnams = $work -> hosts ; my $Items = $self -> Items ; for my $hnam ( sort keys %$hnams ) { my $itms = $hnams -> { $hnam } ; $self -> hosts -> { $hnam } = [ map { # $_ is an item-name # keys $hash : fitr ival [args] my $hash = $itms -> { $_ } ; $hash -> {ival} ||= $self -> IVAL ; Item -> Make ( $Items -> props ( $_ ) , hnam => $hnam , lvls => $self -> levels , %$hash ) } sort keys %$itms ] ; } $self ; } # State::Client sub hnams { my $self = shift ; sort keys %{ $self -> hosts } ; } sub host_items { my $self = shift ; my $hnam = shift ; $self -> hosts -> { $hnam } ; } # this goes to the server sub as_json { my $self = shift ; my $hsts = $self -> hosts ; my $res = {} ; for my $hnam ( sort keys %$hsts ) { my $itms = $self -> hosts -> { $hnam } ; $res -> { $hnam } = {} ; for my $itm ( @$itms ) { $res -> { $hnam } { $itm -> name } = $self -> PICK ( $itm ) ; } } $res ; } sub as_text { my $self = shift ; Util::as_text ( $self -> as_json ) ; } sub pretty { my $self = shift ; Util::pretty ( $self ) ; } # this goes into the database sub hist_hash { my $self = shift ; my $hnam = shift ; my $hsts = $self -> hosts ; my $itms = $self -> hosts -> { $hnam } ; my $hash = {} ; for my $itm ( @$itms ) { $hash -> { $itm -> name } = $itm -> val ; } $hash ; } # State::Client # return the number of items actually probed sub mk_update { my $self = shift ; my $hnam = shift ; # host to probe my $clnt = shift ; # $main::CLIENT my $itms = $self -> hosts -> { $hnam } ; my %want = () ; my %prev = () ; my $cnt = 0 ; for my $item ( @$itms ) { my $prb = $item -> probe ; my $nam = $item -> name ; my $tim = $item -> vtim ; my $ivl = $item -> ival ; my $skp = $tim && $ivl && ( time < $tim + $ivl ) ; unless ( Probes -> can ( $prb ) ) { $item -> perr ( "Probes can't $prb" ) ; OBB::TT "perr $hnam $nam Probes can't probe $prb" ; } elsif ( not $skp ) { $want { $prb } { $nam } = $item ; $prev { $nam } = [ $item, $item -> fitl, $item -> val ] if defined $tim ; $item -> perr ( '' ) ; OBB::TT "want for $hnam $prb $nam $ivl" ; $cnt ++ ; } } for my $prb ( sort keys %want ) { OBB::TT "Probing $prb ..." ; # pass the client for probe meta ; item hist_* Probes -> $prb ( $want { $prb }, $clnt ) ; } my $events = [] ; for my $tup ( values %prev ) { my $item = shift @$tup ; my $fold = shift @$tup ; my $vold = shift @$tup ; my $fnew = $item -> fitl ; my $lold = defined $fold ? $fold -> levl : undef ; my $lnew = defined $fnew ? $fnew -> levl : undef ; my $vnew = $item -> val ; my $same = ( ( defined $lold and defined $lnew ) ? ( $lold == $lnew ) : ! ( defined $lold or defined $lnew ) ) ; unless ( $same ) { push @$events, Event -> Make ( hnam => $hnam , item => $item -> name , lold => $lold , lnew => $lnew , vold => $vold , vnew => $vnew ) ; OBB::TT "Events %s old %s new %s same %s vold %s vnew %s " , $item -> name, $lold, $lnew, $same, $vold, $vnew ; } } OBB::TT "Events %s", scalar @$events ; ( $cnt, $events ) ; } ################################################################### package Pmaker ; use base qw(Client) ; __PACKAGE__ -> mk_getset ( qw(clients events event0 alerts noalrt) ) ; use constant { PROG => 'dmon-pmaker' } ; sub Init { my $self = shift ; OBB::TT 'Pmaker Init %s', $self ; $self -> Client::Init ( @_, SERVICE => 0 ) ; $self -> clients ( {} ) ; $self -> events ( [] ) ; $self -> alerts ( [] ) ; OBB::TT 'Pmaker Init levels %s', $self -> show_levels ; OBB::TT 'Pmaker Init done %s', $self ; $self ; } sub PORT { my $self = shift ; $self -> conf -> port_pmaker ; } sub mk_work { my ( $self, $lmod ) = @_ ; if ( $self -> hnam eq $self -> conf -> server ) { Work::Pmaker -> make_from_file ( $self -> work_file, $lmod ) ; } else { Work::Pmaker -> make_from_serv ( $self, $lmod ) ; } } # $dump == CLIENTS -> cdmp sub get_clients { my $self = shift ; my $dump = shift ; for my $hnam ( keys %$dump ) { my $data = $dump -> { $hnam } ; my $client = Host::Client -> Make ( %$data ) ; $self -> clients -> { $hnam } = $client ; my $state = State::Pmaker -> Make ( host => $client , IVAL => $self -> DEF_IVAL , levels => $self -> levels , Items => $self -> Items ) ; $client -> state ( $state ) ; } } sub get_events { my $self = shift ; my $dump = shift ; $self -> events ( [ map { Event -> Make ( %$_ ) } @$dump ] ) ; } sub show_hnam { my $self = shift ; my $hnam = shift ; my $host = $self -> clients -> { $hnam } ; $host ? $host -> show : "no host $hnam" ; } ################################################################### package State::Pmaker ; use base qw(State) ; __PACKAGE__ -> mk_getset ( qw(host items) ) ; sub Defs { ( items => {} ) ; } # called by Pmaker with # host => { ..., state => { var => { val => ??, ... } } } ; # IVAL => $self -> DEF_IVAL # levels => $self -> levels # Items => $self -> Items sub Init { my $self = shift ; my %opts = ( @_ ) ; $self -> State::Init ( %opts ) ; my $Items = $self -> Items ; my $host = $self -> host ; my $state = $host -> state ; my $items = [ # $_ is an item_name map { Item -> Make ( $Items -> props ( $_ ) , $self -> RECV ( $state -> { $_ } ) , hnam => $host -> hnam ) } sort keys %$state ] ; $self -> items ( $items ) ; $self ; } sub by_name { my $self = shift ; my $name = shift ; ( grep { $_ -> name eq $name } @{ $self -> items } ) [ 0 ] ; } ################################################################### package Work ; use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw(items error stamp) ) ; sub Defs { ( items => [], stamp => 0 ) ; } sub Init { my $self = shift ; $self ; } sub make_from_file { my $self = shift ; my $file = shift ; my $lmod = shift ; my $res = $self -> Make ; my $lm = ( stat $file ) [ 9 ] ; unless ( defined $lm ) { $res -> stamp ( 0 ) ; $res -> error ( "no stat $file ($!)" ) ; } elsif ( $lmod ) { $res -> stamp ( $lm ) ; } elsif ( open FILE, '<', $file ) { my $pvar = Util::as_pvar ( join '', ) ; close FILE ; $res -> Init ( %$pvar, stamp => $lm ) ; } else { $res -> error ( "no open $file ($!)" ) ; OBB::TT "can't make_from_file %s", $res -> error ; } $res ; } # returns undef on "can't connect" sub make_from_serv { my $self = shift ; my $clnt = shift ; my $lmod = shift ; my $mesg = $clnt -> get_work_mesg ( $lmod ) ; my $res = $self -> Make ; if ( $mesg ) { # first Init ; because is sets defaults unless ( $lmod ) { my $work = $mesg -> work || {} ; $res -> Init ( %$work ) ; } $res -> error ( $mesg -> resp ) if $mesg -> err ; $res -> stamp ( $mesg -> lm || 0 ) ; } else { $res = $self -> IDLE ; OBB::TT "can't make_from_serv %s", $res -> error ; } $res ; } sub IDLE { undef ; } sub _hnam { my $x = shift ; $x -> {hnam} ; } sub _show { my $x = shift ; $x -> {show} ; } sub _wrkr { my $x = shift ; $x -> {wrkr} ; } ################################################################### package Work::Server ; use base 'Work' ; __PACKAGE__ -> mk_getset ( qw(hosts users alerts levels pmaker) ) ; sub Init { my $self = shift ; $self -> hosts ( [] ) ; $self -> levels ( '' ) ; $self -> OBB::Init ( @_ ) ; $self ; } sub IDLE { my $self = shift ; $self -> Make ; } # Work::Server sub hnams { my $self = shift ; [ map { Work::_hnam $_ } @{ $self -> hosts } ] ; } sub wrkrs { my $self = shift ; [ map { Work::_hnam $_ } grep { ! $_ -> {wrkr} } @{ $self -> hosts } ] ; } ################################################################### package Work::Client ; use base 'Work' ; __PACKAGE__ -> mk_getset ( qw(hosts levels lm) ) ; sub Init { my $self = shift ; $self -> hosts ( {} ) ; $self -> levels ( '' ) ; $self -> OBB::Init ( @_ ) ; $self ; } sub IDLE { my $self = shift ; $self -> Make ; } ################################################################### package Work::Pmaker ; use base 'Work::Client' ; __PACKAGE__ -> mk_getset ( qw(alerts xargs xfits) ) ; sub have_host { my $self = shift ; my $hnam = shift ; $self -> hosts -> { $hnam } ; } sub xargs_item { my $self = shift ; my $item = shift ; my $xargs = $self -> xargs ; my $res = {} ; for my $h ( sort keys %$xargs ) { if ( exists $xargs -> { $h } -> { $item } ) { $res -> { $h } = join ' ', @{ $xargs -> { $h } -> { $item } } ; } } $res ; } sub xfits_item { my $self = shift ; my $item = shift ; my $xfits = $self -> xfits ; my $res = {} ; for my $h ( grep $_ ne 'DEFAULT', sort keys %$xfits ) { if ( exists $xfits -> { $h } -> { $item } ) { $res -> { $h } = $xfits -> { $h } -> { $item } ; } } $res ; } sub defaults { my $self = shift ; $self -> xfits -> { DEFAULT } ; } sub dftr { my $self = shift ; my $name = shift ; $self -> defaults -> { $name } ; } sub hftr { my $self = shift ; my $hnam = shift ; my $name = shift ; ( exists $self -> xfits -> { $hnam } ? $self -> xfits -> { $hnam } -> { $name } : $self -> defaults -> { $name } ) ; } ################################################################### package Conf ; use base 'OBB' ; use constant FILES => ( 'dmon.conf', Dmon::CDIR . '/conf' ) ; use constant { LOGFILE => 'dmon.log' , PIDFILE => 'dmon.pid' , LCKFILE => 'dmon.lck' , STPFILE => 'dmon.stp' , DBSFILE => 'data.lite' } ; our @FILES = FILES ; our %DEF_CONF = ( PORT => 22007 , save => Dmon::CDIR . '/save' , html => Dmon::CDIR . '/html' , ival_make_state => 60 , ival_send_report => 300 , ival_check_work => 600 , ival_keep_events => '4w' , bindir => '/usr/sbin' , logdir => '/var/log/dmon' , vardir => '/var/dmon' , rundir => '/var/run/dmon' , lckdir => '/var/lock/subsys' , loglvl => 'Terse' , rotate => '8 1d' , server => undef , domain => undef , hostname => undef , page_sec => '/cgi-bin/gen-dmon-page' , plot_url => '/plotter.php' , httpdgid => undef ) ; __PACKAGE__ -> mk_getset ( 'file', keys %DEF_CONF ) ; sub Init { my $self = shift ; my %opts = ( @_ ) ; $self -> OBB::Init ( %DEF_CONF, %opts ) ; my $optc = $opts { file } ; my $file = _use_file ( $optc || @FILES ) ; $self -> file ( $file ) ; $self -> Die ( $optc ? "can't find dmon config file [$optc]" : sprintf "no dmon config found [%s]", join ',', @FILES ) unless defined $file ; $self -> _get_conf ( $file ) ; $self -> _check ; $self ; } sub _use_file { for my $cand ( @_ ) { return $cand if defined $cand and -f $cand ; } undef ; } sub _get_conf { my $self = shift ; my $path = shift ; open CONF, '<', $path or OBB -> Die ( "can't open $path ($!)" ) ; my @lines = ; close CONF ; for my $line ( @lines ) { chomp $line ; next if $line =~ /^#/ ; next if $line =~ /^\s*$/ ; my ( $key, $val ) = split ' ', $line, 2 ; $self -> Die ( "missing value for key $key\n" ) unless defined $val ; $self -> Die ( "bad key '$key'\n" ) unless $key eq 'include' or exists $DEF_CONF { $key } ; $self -> Die ( "Conf can't '$key'\n" ) unless $self -> can ( $key ) ; $self -> $key ( $val ) ; } } sub include { my $self = shift ; my $path = shift ; $self -> _get_conf ( $path ) ; } sub _check { my $self = shift ; my $hnam = $self -> hostname ; my $serv = $self -> server ; my $cdom = $self -> domain ; my $bnam = Util -> bin_hostname ; OBB::TT ( 'Util -> bin_hostname() %s', $bnam ) ; if ( defined $hnam ) { Util -> hostname_set ( $hnam, $cdom ) ; } elsif ( $cdom and $bnam and $bnam !~ /\./ ) { my $was = Util -> hostname || 'no_hostname' ; my $now = Util -> hostname_set ( $bnam, $cdom ) ; print "Conf: set hostname $was => $now\n" if OBB -> Verbose ; } $self -> Die ( "Conf: no 'server'\n" ) unless defined $serv ; my $cnam = Util -> hostname_make ( $serv, $cdom ) ; unless ( $serv eq $cnam ) { print "Conf: set server $serv => $cnam\n" if OBB -> Verbose ; $serv = $self -> server ( $cnam ) ; } my $llvl = $self -> loglvl ; $self -> Die ( "Conf: bad loglvl [$llvl]\n" ) unless $OBB::VLVLS{$llvl} ; { my $rotl = $self -> rotate ; my ( $num, $spc ) = split ' ', $rotl ; $self -> Die ( "Conf: bad rotate [$rotl]\n" ) unless defined $spc ; $self -> Die ( "Conf: bad number in rotate [$rotl]\n" ) unless $num =~ /^\d+$/ ; $self -> Die ( "Conf: bad spec in rotate [$rotl]\n" ) unless defined Util::secs4spec ( $spc ) ; } for my $opt ( qw(ival_send_report ival_check_work ival_keep_events) ) { my $spec = $self -> $opt ; my $secs = Util::secs4spec ( $spec ) ; $self -> Die ( "Conf: bad spec in $opt [$spec]\n" ) unless defined $secs ; $self -> $opt ( $secs ) ; } } sub workdir { sprintf '%s/%s', $_[0] -> vardir, Dmon::WORK ; } sub upgrdir { sprintf '%s/%s', $_[0] -> vardir, Dmon::UPGR ; } sub secrfil { sprintf '%s/%s', $_[0] -> vardir, Dmon::SECR ; } sub cgidata { sprintf '%s/%s', $_[0] -> vardir, Dmon::CGID ; } sub log_file { sprintf '%s/%s', $_[0] -> logdir, LOGFILE ; } sub pid_file { sprintf '%s/%s', $_[0] -> rundir, PIDFILE ; } sub lck_file { sprintf '%s/%s', $_[0] -> rundir, LCKFILE ; } sub stp_file { sprintf '%s/%s', $_[0] -> rundir, STPFILE ; } sub dbs_file { sprintf '%s/%s', $_[0] -> vardir, DBSFILE ; } sub dbs_pmkr { sprintf '%s/%s', $_[0] -> cgidata, DBSFILE ; } sub log_pmkr { sprintf '%s/%s', $_[0] -> cgidata, LOGFILE ; } # config option 'bindir' is ignored by 'root' sub bin_file { ( $< ? ( sprintf '%s/%s', $_[0] -> bindir, Dmon::PROG ) : Dmon::PATH ) ; } sub upgr_file { sprintf '%s/%s', $_[0] -> upgrdir, Dmon::PROG ; } sub port_server { $_[0] -> PORT ; } sub port_client { $_[0] -> PORT + 1 ; } sub port_pmaker { 0 ; } # protocol # no arg : no_alert status # arg == 0 : no_alert reset # arg =~ /^\w+$/ : no_alert set by $1 sub no_alert { my $self = shift ; my $file = sprintf '%s/%s', $self -> vardir, Dmon::NO_ALRT ; if ( @_ ) { my $arg = shift ; if ( $arg ) { open FILE, '>', $file ; print FILE $arg ; close FILE ; } else { unlink $file ; } } -f $file ? `cat $file` : 0 ; } sub _make_secr { my $res = '' ; my @abc = ( 'a' .. 'z' ) ; for my $i ( 1 .. 12 ) { $res .= $abc [ int rand ( 26 ) ] ; } $res ; } sub make_stp { my $self = shift ; my $file = $self -> stp_file ; if ( open STOP, ">$file" ) { printf STOP "%s\n", _make_secr ; close STOP ; chmod 0600, $file or logt ( "can't chmod $file" ) ; } else { logt ( "can't write stop file [$file] ($!) ; nevermind" ) ; } } sub read_stp { my $self = shift ; my $file = $self -> stp_file ; my $res = undef ; if ( open STOP, $file ) { chomp ( $res = ) ; close STOP ; } else { logt ( "can't read stop file [$file] ($!) ; nevermind" ) ; } $res ; } sub rm_stp { my $self = shift ; unlink $self -> stp_file ; } sub own_stp { my $self = shift ; ( stat $self -> stp_file ) [ 4 ] ; } sub sys_lock { sprintf "%s/%s", $_[0] -> lckdir, Dmon::PROG ; } sub make_sys_lock { my $res = 0 ; if ( open SYSLOCK, ">>", $_[0] -> sys_lock ) { close SYSLOCK ; $res = 1 ; } $res ; } sub rm_sys_lock { unlink $_[0] -> sys_lock ; } sub rot_cnt { my $self = shift ; ( split ' ', $self -> rotate ) [ 0 ] ; } sub rot_spc { my $self = shift ; ( split ' ', $self -> rotate ) [ 1 ] ; } sub rot_ivl { my $self = shift ; Util::secs4spec ( $self -> rot_spc ) ; } sub zap_ivl { my $self = shift ; 60 * 60 ; } ################################################################### package Set ; use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw(name mems) ) ; sub Defs { ( name => 'set', mems => {} ) ; } sub compare { my $aa = ( ref $a and $a -> can ( 'xsrt' ) ) ? $a -> xsrt : $a ; my $bb = ( ref $b and $b -> can ( 'xsrt' ) ) ? $b -> xsrt : $b ; $aa cmp $bb ; } sub size { my $self = shift ; scalar keys %{ $self -> mems } ; } sub xsrt { my $self = shift ; $self -> name ; } sub list { my $self = shift ; sort compare values %{ $self -> mems } ; } sub is_empty { my $self = shift ; ! $self -> list ; } sub _utag { my $x = shift ; ( ref ( $x ) and $x -> can ( 'utag' ) ) ? $x -> utag : $x ; } sub add { my $self = shift ; $self -> {mems} { _utag $_ } = $_ for @_ ; $self ; } sub del { my $self = shift ; delete $self -> {mems} { _utag $_ } for @_ ; $self ; } sub union { my $this = shift ; my $that = shift ; $this -> add ( $that -> flat -> list ) ; $this ; } sub minus { my $this = shift ; my $that = shift ; $this -> del ( $that -> flat -> list ) ; $this ; } sub any { my $self = shift ; ( $self -> list ) [ 0 ] ; } sub flat { my $self = shift ; my @incl = @_ ; my $res = Set -> Make ( name => 'flat_' . $self -> name ) ; if ( grep $self == $_, @incl ) { OBB -> Xit ( 'bad recursion: %s' , join ' -> ', map { $_ -> name } @incl, $self ) ; } for my $elem ( $self -> list ) { if ( ref ( $elem ) =~ /Set/ ) { $res -> add ( $elem -> flat ( @incl, $self ) -> list ) ; } else { $res -> add ( $elem ) ; } } $res ; } ################################################################### package Kind ; use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw(name dscr suff stor show) ) ; sub utag { my $self = shift ; $self -> name ; } sub xsrt { my $self = shift ; $self -> name ; } ################################################################### package Kinds ; use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw(list hash) ) ; my $_KINDS = < list ( [] ) ; my $hash = $self -> hash ( {} ) ; for my $line ( grep /./, split "\n", $_KINDS ) { my ( $name, @rec ) = map { s/_/ /g ; s/^\s+// ; s/\s+$// ; $_ ; } split ' ', $line ; my $kind = Kind -> Make ( name => $name ) ; $kind -> $_ ( shift @rec ) for qw(dscr suff stor show) ; __PACKAGE__ -> mk_getset ( $name ) ; $self -> $name ( $kind ) ; push @$list, $name ; $hash -> { $name } = $kind ; } $self ; } sub suff { my $self = shift ; my $kind = shift ; $self -> can ( $kind ) ? $self -> $kind -> suff : '' ; } our $KINDS = __PACKAGE__ -> Make ; ################################################################### package Item ; use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw(name kind probe args hnam val vtim ival perr lvls fitr fitl src) ) ; sub Defs { ( src => Dmon::SRC_DEF ) ; } ; sub Init { my $self = shift ; my %opts = ( @_ ) ; $self -> OBB::Init ( %opts ) ; $self ; } sub utag { my $self = shift ; $self -> name ; } sub xsrt { my $self = shift ; $self -> name ; } sub fit { my $self = shift ; my $fitl = $self -> fitl ; defined ( $fitl ) ? $fitl -> name : 'no fitl' ; } sub levl { my $self = shift ; my $fitl = $self -> fitl ; $fitl ? $fitl -> levl : defined $self -> val ? 0 : undef ; } sub _suf { my $self = shift ; my $kind = $self -> kind ; $Kinds::KINDS -> suff ( $kind ) ; } sub Repr { my $self = shift ; my $suff = $self -> _suf ; if ( @_ ) { my $res = shift ; $suff ? "$res $suff" : "$res" ; } else { my $res = $self -> name ; $suff ? "$res [$suff]" : $res ; } } # $item -> repr shows $item -> val ; # $item -> repr ( $val ) shows $val sub repr { my $self = shift ; my $val = @_ ? shift : $self -> val ; my $name = $self -> name ; my $kind = $self -> kind ; my $res = $val ; my $sty ; unless ( defined $val ) { $res = 'undef' ; $sty = 'brb' ; } elsif ( $kind eq 'spin' ) { $res = $val ? 'up' : 'down' ; $sty = 'brb' if $res eq 'down' ; } elsif ( $kind eq 'perc' ) { $res = sprintf "%.2f", $res ; } elsif ( $kind eq 'gbs' ) { $res = sprintf "%.2f", $res / 1024 / 1024 ; } elsif ( $kind eq 'days' ) { $res = sprintf "%.2f", $res / 3600 / 24 ; } elsif ( $kind eq 'msec' ) { $res = sprintf "%.3f", 1000 * $res ; } elsif ( $kind eq 'int' ) { $res = sprintf "%d", $res ; } elsif ( $kind eq 'date' ) { $res = Util::date ( $res ) ; } $sty ? main::SPAN ( $res, $sty ) : $res ; } sub set_val { my $self = shift ; my $val = shift ; $self -> val ( $val ) ; $self -> vtim ( time ) ; $self -> fitl ( $self -> set_fit ) ; $self -> add_perr ( 'err_undefined' ) unless defined $val ; } sub add_perr { my $self = shift ; my $msg = shift ; my $perr = $self -> perr ; $perr .= "\n" if defined $perr and length $perr ; $self -> perr ( $perr . $msg ) ; } # sub Test::UP # { return 0 unless @_ ; # my @down = () ; # my $last = $_ [ $#_ ] ; # } # sub Test::DOWN { [ @_ ] ; } # returns eval's err iff $test == 1 sub _set_fit { my $val = shift ; my $cons = shift ; my $fitr = shift ; my $lvl0 = shift ; my $test = shift ; my $res ; my $err ; # only for the expr-check OBB::TT ( ' set fit val %s fitr %s', $val, $fitr ) ; unless ( $fitr ) { $res = $lvl0 ; } elsif ( ! defined $val ) { $res = undef ; } else { $cons -> { X } = $val ; my $undf = join "; \n", map { "undef *Dmon::Test::$_" } keys %$cons ; my $cstr = sprintf "use constant\n { %s }", join ', ', map { sprintf "%s => %s", $_, $cons -> { $_ } } keys %$cons ; my $eval = < lvls ; my $fitr = $self -> fitr ; my $cons = { map { ( $_ -> name => $_ -> levl ) } @$lvls } ; my ( $lvl, $err ) = _set_fit $self -> val, $cons, $fitr, 0 ; defined $lvl ? $lvls -> [ $lvl ] : $Fitl::UNDF ; } sub diag { my $self = shift ; sprintf "%s for %s" , Util::diag ( $self -> name ) , Util::diag ( $self -> hnam ) ; } ################################################################### package Items ; use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw(hash) ) ; use constant MAKE_WORK => [ qw(client_cnt clients_ok) ] ; our $_conf = < { trun => [ 'Run Time Remaining' , '.1.3.6.1.4.1.318.1.1.1.2.2.3.0' ] , tbat => [ 'Time on Battery' , '.1.3.6.1.4.1.318.1.1.1.2.1.2.0' ] , inpv => [ 'input voltage' , '.1.3.6.1.4.1.318.1.1.1.3.2.1.0' ] , lout => [ 'Output Load [%]' , '.1.3.6.1.4.1.318.1.1.1.4.2.3.0' ] , load => [ 'Load Current (Amps)' , '.1.3.6.1.4.1.318.1.1.1.4.2.4.0' ] , stat => [ 'Battery Status' , '.1.3.6.1.4.1.318.1.1.1.2.1.1.0' ] , temp => [ 'Battery Temperature' , '.1.3.6.1.4.1.318.1.1.1.2.2.2.0' ] , repl => [ 'Replace Battery Indicator' , '.1.3.6.1.4.1.318.1.1.1.2.2.4.0' ] , capa => [ 'Battery Capacity [%]' , '.1.3.6.1.4.1.318.1.1.1.2.2.1.0' ] } ; # trun APC: Run Time Remaining .1.3.6.1.4.1.318.1.1.1.2.2.3.0 # tbat APC: Time On Battery .1.3.6.1.4.1.318.1.1.1.2.1.2.0 # inpv APC: input voltage .1.3.6.1.4.1.318.1.1.1.3.2.1.0 # lout APC: Output Load (%) .1.3.6.1.4.1.318.1.1.1.4.2.3.0 # load APC: Load Current (Amps) .1.3.6.1.4.1.318.1.1.1.4.2.4.0 # stat APC: Battery Status .1.3.6.1.4.1.318.1.1.1.2.1.1.0 # temp APC: Battery Temperature .1.3.6.1.4.1.318.1.1.1.2.2.2.0 # repl APC: Replace Battery Indicator .1.3.6.1.4.1.318.1.1.1.2.2.4.0 sub Init { my $self = shift ; $self -> hash ( {} ) ; for my $line ( split /\n+/, $_conf ) { chomp $line ; next if $line =~ /^#/ ; next if $line =~ /^\s*$/ ; $self -> _add ( split ' ', $line ) ; } $self ; } sub _add { my $self = shift ; my ( $name, $kind, $probe, @args ) = @_ ; my %opts = () ; @opts { qw(name kind probe args) } = ( $name, $kind, $probe, [ @args ] ) ; $self -> Xit ( "bad probe %s", $probe ) unless Probes -> have ( $probe ) ; $self -> add ( src => Dmon::SRC_SRC, %opts ) ; } sub add { my $self = shift ; my $item = Item -> Make ( @_ ) ; $self -> hash -> { $item -> name } = $item ; } sub add_items { my $self = shift ; my $itms = shift ; $self -> add ( %$_ ) for @$itms ; } sub as_hash_XXX { my $self = shift ; my $hash = $self -> hash ; my $res = {} ; for my $item ( values %$hash ) { $res -> { $item -> name } = $item ; } $res ; } sub list { my $self = shift ; my $hash = $self -> hash ; sort { $a -> name cmp $b -> name } values %$hash ; } sub have { my $self = shift ; my $name = shift ; $self -> hash -> { $name } ; } sub _no_item { my $self = shift ; my $name = shift ; Item -> Make ( name => $name, qw(kind no_kind probe no_probe), args => [] ) ; } sub item { my $self = shift ; my $name = shift ; $self -> hash -> { $name } || $self -> _no_item ( $name ) ; } sub props { my $self = shift ; my $name = shift ; %{ $self -> item ( $name ) } ; } sub probes { my $self = shift ; Util::uniq map $_ -> probe, values %{ $self -> hash } ; } ################################################################### package Probe::Result ; use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw(err xit sig lines) ) ; sub Defs { ( lines => [] ) ; } sub bad { my $self = shift ; $self -> err or $self -> xit or $self -> sig ; } ################################################################### package Probe::File ; use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw(file text) ) ; # Apr 14 00:00:00 2015 GMT # perl -pe 's/notAfter=//' | date --file - +%s our $ttl_cert = <<'CERT' ; #! /bin/sh test "$#" -eq 2 || exit 1 ; H=$1 P=$2 openssl s_client -connect $H:$P 2>/dev/null < /dev/null | openssl x509 -noout -dates | grep notAfter CERT our $stor_cli = <<'STOR' ; #! /bin/sh CACHE=/local/lib/storcli.cache ERROR=/local/lib/storcli.error XIT=0 test -d /local/lib || mkdir -p -m 0755 /local/lib || exit 1 if test $# > 1 then if test "$1" = '-b' then shift 2> $ERROR rm -f $CACHE if /usr/bin/timeout 10 /local/bin/storcli $@ > $CACHE.tmp then mv $CACHE.tmp $CACHE fi else if test -f $CACHE then cat $CACHE else XIT=14 fi sh $0 -b $@ & exit $XIT fi else exit 1 fi STOR our %PROBES = ( ttl_cert => $ttl_cert , storcli => $stor_cli ) ; sub install { my $self = shift ; my $file = $self -> file ; my $text = $self -> text ; if ( open FILE, '>', $file ) { print FILE $text ; close FILE ; chmod 0755, $file ; } else { logq ( "can't write $file ($!)" ) ; } } ################################################################### # Probes are called as $probe ( $want, $clnt ) # -- with %$want = { var => $item, ... } # -- where $clnt wants ( var, ... ) for $hnam ( $item -> hnam ) ; package Probes ; use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw() ) ; use constant TIME => Dmon::TIME ; use constant SNMPS => Items::SNMPS ; our $PDIR ; our $PROBES = { atraidcli => {} , df => {} , init_d => {} , kill_0 => {} , lmstat => {} , mailq => {} , ntpq => {} , omreport => {} , ping => {} , ping_cluster => {} , proc => {} , repo_stats => {} , server => {} , snmp => {} , stor_cli => {} , swapon => {} , ttl_cert => {} , tw_cli => {} , who => {} } ; sub have { my ( $self, $probe ) = @_ ; exists $PROBES -> { $probe } ; } sub Install { my $self = shift ; my $vdir = shift ; my $pdir = "$vdir/probes" ; if ( -d $pdir or mkdir $pdir, 0755 ) { $PDIR = $pdir ; for my $name ( sort keys %Probe::File::PROBES ) { Probe::File -> Make ( file => "$pdir/$name" , text => $Probe::File::PROBES { $name } ) -> install ; } } else { logq ( "can't mkdir %s", $pdir ) ; } } sub _perc { my $x = shift || 0 ; my $y = shift || 0 ; $y == 0 ? 100 : 100 * $x / $y ; } sub _find_bin { my $x = shift ; if ( $x =~ m!^/! ) { return $x if -f $x ; } else { my @path = ( qw(/sbin /usr/sbin /local/bin), split /:/, $ENV { PATH } ) ; unshift @path, $PDIR if $PDIR ; for my $dir ( @path ) { return "$dir/$x" if -f "$dir/$x" ; } } undef ; } sub mk_pipe { my @cmd = @_ ; my $pipe = IO::Pipe -> new ; my $pid = fork () ; if ( $pid ) { # parent $pipe -> reader () ; } elsif ( defined $pid ) { # child alarm 2 ; $pipe -> writer () ; my $fd = $pipe -> fileno ; open STDOUT, ">&=$fd" or die "can't reopen STDOUT" ; close STDERR ; $pipe -> close or die "child can't close pipe" ; exec @cmd ; } else { die "can't fork" ; } ( $pid, $pipe ) ; } sub _get_out { my ( $prog, @args ) = @_ ; my @res ; my $res = Probe::Result -> Make ; my $bin = _find_bin $prog ; OBB::TT " running %s %s", $bin, join ' ', map "\"$_\"", @args ; if ( ! $bin ) { $res -> err ( "can't find $prog" ) ; } else { my ( $pid, $pipe ) = mk_pipe $bin, @args ; my @lines = <$pipe> ; $res -> lines ( [ @lines ] ) ; unless ( $pipe -> close ) { $res -> err ( "can't close pipe" ) ; } else { waitpid $pid, 0 ; my $xit = $res -> xit ( $? >> 8 ) ; my $sig = $res -> sig ( $? & 127 ) ; if ( $sig ) { my $nam = Util::sig_name ( $sig ) ; $res -> err ( $nam eq 'ALRM' ? 'TIMEOUT' : "sig [$nam]" ) ; } elsif ( $xit ) { $res -> err ( "xit [$xit]" ) ; } } } OBB::TT " err %s xit %s sig %s lines %s" , ( $res -> err || '' ) , $res -> xit , $res -> sig , scalar @{ $res -> lines } ; $res ; } sub set_perr { my ( $want, $out ) = @_ ; $_ -> perr ( $out -> err ) for map { $want -> { $_ } } keys %$want ; } sub df { my $self = shift ; my $want = shift ; my $prog = 'df' ; # my $out = _get_out $prog, qw(-P -T -x nfs) ; my $out = _get_out $prog, qw(-P -T) ; my $lines = $out -> lines ; set_perr $want, $out ; unless ( $out -> bad ) { my %df = () ; shift @$lines ; # discard column names for my $line ( @$lines ) { chomp $line ; my @rec = split ' ', $line ; my $fs = pop @rec ; my ( $size, $perc ) = @rec [ 2, 5 ] ; chop $perc ; $df { $fs } = [ $size, $perc ] ; } for my $itm ( values %$want ) { my $clas = $itm -> args -> [ 0 ] ; my $mntp = $itm -> args -> [ 1 ] ; OBB::TT " df clas %s mntp %s" , $clas, $mntp ; if ( $clas eq 'avail' ) { $itm -> set_val ( $df { $mntp } -> [ 0 ] ) ; } else # ( $clas eq 'usage' ) { $itm -> set_val ( $df { $mntp } -> [ 1 ] ) ; } } } } # repo_users int repo_stats users # repo_projs int repo_stats projs # repo_size int repo_stats size # repo_bups int repo_stats bups sub repo_stats { my $self = shift ; my $want = shift ; my $prog = '/local/bin/repo-stats' ; my $out = _get_out $prog ; my $lines = $out -> lines ; set_perr $want, $out ; unless ( $out -> bad ) { my %tab = () ; for my $line ( @$lines ) { chomp $line ; my ( $key, $val ) = split ' ', $line ; $tab { $key } = $val ; } for my $itm ( values %$want ) { my $arg0 = $itm -> args -> [ 0 ] ; $itm -> set_val ( $tab { $arg0 } ) ; } } } # ntp_offset msec ntpq offset # ntp_freq real ntpq frequency # offset=-0.064, frequency=32.519, sub ntpq { my $self = shift ; my $want = shift ; my $clnt = shift ; # $main::CLIENT my $prog = 'ntpq' ; my $hnam = ( values %$want ) [ 0 ] -> hnam ; my $tget = Util -> hostname eq $hnam ? 'localhost' : $hnam ; my $out = _get_out $prog, qw(-c rv), $tget ; my $lines = $out -> lines ; set_perr $want, $out ; unless ( $out -> bad ) { for my $w ( values %$want ) { my $clas = $w -> args -> [ 0 ] ; my $line = ( grep /\b$clas=/, @$lines ) [ 0 ] ; if ( $line ) { if ( $line =~ /\b$clas=(\S+)/ ) { my $val = $1 ; chop $val if $val =~ /,$/ ; $val /= 1000 if $clas eq 'offset' ; $w -> set_val ( abs $val ) ; } } } } } sub omreport { my $self = shift ; my $want = shift ; my $prog = '/opt/dell/srvadmin/bin/omreport' ; for my $item ( values %$want ) { my $clas = $item -> args -> [ 0 ] ; my $arg1 = $item -> args -> [ 1 ] ; my $out = _get_out $prog, qw(storage pdisk), "controller=$arg1" ; set_perr $want, $out ; unless ( $out -> bad ) { my $clas = $item -> args -> [ 0 ] ; if ( $clas eq 'ok_disks' ) { my $lines = $out -> lines ; my $cnt = grep /^Status\s+:\s+Ok$/, @$lines ; $item -> set_val ( $cnt ) ; } } } } # raid1_spares int atraidcli blockdevscan # raid1_status int atraidcli rgdisplay # raid1_members int atraidcli rmstatus RGROUP1 sub atraidcli { my $self = shift ; my $want = shift ; my %args = ( spares => 'blockdevscan' , status => 'rgdisplay' , members => 'rmstatus RGROUP1' ) ; for my $item ( values %$want ) { my $clas = $item -> args -> [ 0 ] ; my $prog = '/usr/sbin/atto-cli/atraidcli_x64' ; my $out = _get_out $prog, '-x', $args { $clas } ; set_perr $want, $out ; unless ( $out -> bad ) { my $lines = $out -> lines ; my $val ; if ( $clas eq 'spares' ) { $val = grep /^\s*\d+\s+.*\*HOTSPARE\*/, @$lines ; } elsif ( $clas eq 'status' ) { my $line = ( grep /^RGROUP1/, @$lines ) [ 0 ] ; my $stat = ( split ' ', $line ) [ 7 ] ; $val = ( $stat and $stat eq 'ONLINE' ) ? 1 : 0 ; } elsif ( $clas eq 'members' ) { $val = grep /^\s*\d+\s+ONLINE/, @$lines ; } $item -> set_val ( $val ) ; OBB::TT " name %s val %s", $item -> name, $val ; } } } sub swapon { my $self = shift ; my $want = shift ; my $prog = 'swapon' ; my $out = _get_out $prog, qw(-s) ; my $lines = $out -> lines ; set_perr $want, $out ; unless ( $out -> bad ) { my $size = 0 ; my $used = 0 ; shift @$lines ; # column names header for my $line ( @$lines ) { my @rec = split ' ', $line ; $size += $rec [ 2 ] ; $used += $rec [ 3 ] ; } for my $w ( values %$want ) { my $clas = $w -> args -> [ 0 ] ; if ( $clas eq 'avail' ) { $w -> set_val ( $size ) ; } elsif ( $clas eq 'usage' ) { $w -> set_val ( _perc $used, $size ) ; } } } } sub proc { my $self = shift ; my $want = shift ; for my $item ( values %$want ) { my $name = $item -> name ; my $arg = $item -> args -> [ 0 ] || '' ; my $file = "/proc/$arg" ; my $tag = "$arg.$name" ; unless ( $arg ) { OBB::TT 'probe proc ; no arg?' ; } elsif ( open FILE, '<', $file ) { my @lines = ; close FILE ; chomp @lines ; unless ( @lines ) { OBB::TT 'proc %s ; no lines', $tag ; } elsif ( $name eq 'cpu_load' or $name eq 'uptime' ) { my $line = $lines [ 0 ] ; my $val = ( split ' ', $line ) [ 0 ] ; if ( $val =~ /^\d+(\.\d+)?$/ ) { $item -> set_val ( $val ) ; } else { OBB::TT 'proc %s ; bad val %s', $tag, $val ; } } else { OBB::TT "proc %s ; can't handle %s", $tag, $name ; } } } } sub init_d { my $self = shift ; my $want = shift ; my $DIST = $Util::DISTRIB ; my $SYSC = '/usr/bin/systemctl' ; for my $item ( values %$want ) { my $args = $item -> args ; my ( $prog, @args ) ; if ( $DIST eq 'ubuntu' ) { @args = ( grep { -f "/etc/init/$_.conf" } @$args ) [ 0 ] ; if ( @args ) { $prog = 'service' ; push @args, 'status' } } unless ( $prog ) { $prog = ( grep { -f $_ } map { "/etc/init.d/$_" } @$args) [ 0 ] ; @args = qw(status) ; } if ( ! $prog and -f $SYSC ) { my $hav ; for my $arg ( @$args ) { if ( 0 == _get_out ( $SYSC, 'is-enabled', $arg ) -> xit ) { $hav = $arg ; last ; } } if ( $hav ) { $prog = $SYSC ; @args = ( 'is-active', $hav ) ; } } OBB::TT ( ' dist %s prog %s args %s', $DIST, $prog, "@args" ) ; if ( $prog and @args ) { my $out = _get_out $prog, @args ; my $val = $out -> xit ? Dmon::SPIN_DO : Dmon::SPIN_UP ; my $nam = $item -> name ; OBB::TT " name %s val %s", $nam, $val ; $item -> set_val ( $val ) ; } else { $item -> perr ( "no init[.d] file [@$args]" ) ; } } } sub ttl_cert { my $self = shift ; my $want = shift ; for my $item ( values %$want ) { my $arg = $item -> args -> [ 0 ] ; my $hnam = $item -> hnam ; my $port = $arg eq 'ldap' ? 636 : 443 ; my $tget = Util -> hostname eq $hnam ? 'localhost' : $hnam ; my $out = _get_out 'ttl_cert', $tget, $port ; if ( $out -> bad ) { $item -> perr ( $out -> err ) ; } else { my $name = $item -> name ; OBB::TT " lines %s", join '', @{ $out -> lines } ; my $date = $out -> lines -> [ 0 ] ; chomp $date ; OBB::TT " name %s date %s", $name, $date ; $date =~ s/notafter=//i ; my $val = Util::secs4date $date ; OBB::TT " name %s val %s", $name, $val ; $item -> set_val ( defined $val ? $val - time : undef ) ; } } } # lm_flex spin lmstat lm # lm_pgroupd spin lmstat pgroupd # lm_maplelmg spin lmstat maplelmg # lm_idl_lmgrd spin lmstat idl_lmgrd sub lmstat { my $self = shift ; my $want = shift ; my $out = _get_out '/local/bin/lmstat' ; if ( $out -> bad ) { set_perr $want, $out ; } else { my $lines = $out -> lines ; for my $item ( values %$want ) { my $name = $item -> name ; my $arg = $item -> args -> [ 0 ] ; my $pat = "(license server|$arg:)" ; my $cnt = scalar grep /$pat\sUP/, @$lines ; my $val = $cnt ? Dmon::SPIN_UP : Dmon::SPIN_DO ; $item -> set_val ( $val ) ; } } } sub ping_cluster { my $self = shift ; my $want = shift ; my $out = _get_out '/local/bin/ping-cluster', '-u' ; if ( $out -> bad ) { set_perr $want, $out ; } else { my $lines = $out -> lines ; for my $item ( values %$want ) { my $val = $lines -> [ 0 ] ; chomp $val if defined $val ; $item -> set_val ( $val ) ; } } } # users_busy int who busy # users_uniq int who uniq sub who { my $self = shift ; my $want = shift ; my $out = _get_out 'who' ; if ( $out -> bad ) { set_perr $want, $out ; } else { my $lines = $out -> lines ; my %uniq = () ; for my $line ( @$lines ) { $uniq { ( split ' ', $line ) [ 0 ] } ++ ; } for my $item ( values %$want ) { my $clas = $item -> args -> [ 0 ] ; if ( $clas eq 'busy' ) { $item -> set_val ( scalar @$lines ) ; } elsif ( $clas eq 'uniq' ) { $item -> set_val ( scalar keys %uniq ) ; } } } } sub mailq { my $self = shift ; my $want = shift ; my $out = _get_out 'mailq' ; if ( $out -> bad ) { set_perr $want, $out ; } else { my $lines = $out -> lines ; my $line = pop @$lines ; chomp $line ; if ( defined $line ) { my $val ; if ( $line =~ /Mail queue is empty/i ) { $val = 0 ; } elsif ( $line =~ /(\d+) requests?/i ) { $val = $1 ; } my $item = ( values %$want ) [ 0 ] ; $item -> set_val ( $val ) if $item and defined $val ; } } } # raid_3w_status int tw_cli sub tw_cli { my $self = shift ; my $want = shift ; for my $item ( values %$want ) { my $out = _get_out 'tw_cli', 'show' ; my @lines = @{ $out -> lines } ; if ( $out -> bad ) { $item -> perr ( $out -> err ) ; } elsif ( @lines < 4 ) { $item -> perr ( sprintf "only [%s] lines", scalar @lines ) ; } else { my $name = $item -> name ; shift @lines ; shift @lines ; shift @lines ; pop @lines ; my $val = 0 ; for my $line ( @lines ) { my $tmp = ( split ' ', $line ) [ 5 ] ; if ( defined $tmp ) { $val = $tmp if $tmp > $val ; } else { $item -> perr ( "bad line [$line]" ) ; } } OBB::TT " name %s val %s", $name, $val ; $item -> set_val ( $val ) ; } } } # raid_mega_status int stor_cli sub stor_cli { my $self = shift ; my $want = shift ; for my $item ( values %$want ) { my $out = _get_out 'storcli', qw(/c0 show) ; my @lines = @{ $out -> lines } ; if ( $out -> bad ) { $item -> perr ( $out -> err ) ; } else { my $name = $item -> name ; while ( @lines and $lines[0] !~ m!^DG/VD\s+TYPE\s+State!i ) { shift @lines ; } shift @lines ; shift @lines ; my $val = undef ; while ( @lines and $lines[0] =~ m!^\s*\d+/\d+! ) { $val = 0 unless defined $val ; $val ++ unless ( split ' ', $lines[0] ) [ 2 ] eq 'Optl' ; shift @lines ; } OBB::TT " name %s val %s", $name, $val ; $item -> set_val ( $val ) ; } } } sub kill_0 { my $self = shift ; my $want = shift ; for my $item ( values %$want ) { my $pidf = $item -> args -> [ 0 ] ; OBB::TT " using pidfile %s", $pidf ; my $val = Dmon::SPIN_DO ; my $line = '' ; if ( open PIDF, '<', $pidf ) { $line = ; close PIDF ; } chomp $line ; $val = Dmon::SPIN_UP if $line and kill 0, $line ; OBB::TT 'line %s', $line ; OBB::TT 'val %s', $val ; $item -> set_val ( $val ) ; } } # snmpget -v1 -c public local-ups01.science.uu.nl .1.3.6.1.4.1.318.1.1.1.2.2.2.0 sub snmp { my $self = shift ; my $want = shift ; my %work ; # @items for hnam for my $item ( values %$want ) { push @{ $work { $item -> hnam } }, $item ; } for my $hnam ( sort keys %work ) { my $items = $work { $hnam } ; my @args = map { SNMPS -> { $_ -> args -> [ 0 ] } [ 1 ] ; } @$items ; my $prog = 'snmpget' ; my @out = ( $prog , qw(-v1 -c public) , $hnam , @args ) ; my $out = _get_out @out ; $_ -> perr ( $out -> err ) for @$items ; unless ( $out -> bad ) { my $lines = $out -> lines ; unless ( @$lines == @$items ) { my $cnt = @$lines ; $out -> err ( "bad #lines ($cnt) from $prog" ) ; } else { for ( my $i = 0 ; $i < @$lines ; $i ++ ) { my $line = $lines -> [ $i ] ; my $item = $items -> [ $i ] ; chomp $line ; my $val = ( split /\s+=\s+/, $line ) [ 1 ] ; $val =~ s/Gauge32: // ; $val =~ s/INTEGER: // ; $val = $1 / 100 if $val =~ /Timeticks:\s+\((\d+)\)/ ; $item -> set_val ( $val ) ; } } } } } sub ping { my $self = shift ; my $want = shift ; my $clnt = shift ; my @itms = values %$want ; for my $item ( @itms ) { my $val ; my $clas = $item -> args -> [ 0 ] ; if ( $clas eq 'server' ) { $val = $clnt -> server -> ping ( 5 ) ; OBB::TT 'ping %s %s', $clnt -> server, $val ; } $item -> set_val ( $val ) if defined $val ; } } # client_cnt int server clients cnt # clients_ok int server clients ok sub server { my $self = shift ; my $want = shift ; my $clnt = shift ; my $SERV = $clnt -> SERVER ; # return unless $SERV and time > $SERV -> Tzero + $ival ; return unless $SERV ; my @itms = values %$want ; my @clis = grep $_ -> args -> [ 0 ] eq 'clients', @itms ; if ( @clis ) { my $oks = $SERV -> check_reporting ; my $cnt = scalar keys %{ $SERV -> clients } ; for my $item ( @clis ) { my $clas = $item -> args -> [ 1 ] ; if ( $clas eq 'cnt' ) { $item -> set_val ( $cnt ) ; } elsif ( $clas eq 'ok' ) { $item -> set_val ( $oks ) ; } } } } ################################################################### package Fitl ; use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw(name show levl) ) ; our $UNDF = Fitl -> Make ( name => 'undf' , show => 'undefined' , levl => 999 ) ; sub UNDF { $UNDF ; } sub xsrt { my $self = shift ; $self -> levl ; } sub exp { my $self = shift ; $self -> levl ; } sub mk_levels { my $self = shift ; my $lvls = shift ; my $cnt = 0 ; [ map { $self -> Make ( name => substr ( $_, 0, 4 ) , show => $_ , levl => $cnt ++ ) } split ' ', $lvls ] ; } ################################################################### package Buff ; use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw() ) ; use constant BUF_SIZE => 65536 ; sub new { my $self = shift ; bless { b => '' }, $self ; } sub Init { my $self = shift ; my $s = shift ; $self->{b} = $s ; $self ; } sub Make { my $self = shift ; $self -> new -> Init ( @_ ) ; } sub get { my $self = shift ; $self->{b} ; } sub set { my $self = shift ; my $s = shift ; $self->{b} = $s ; } sub add { my $self = shift ; my $s = shift ; $self->{b} .= $s ; $self ; } sub length { my $self = shift ; length $self->{b} ; } sub index { my $self = shift ; my $str = shift ; my $off = shift || 0 ; index $self->{b}, $str, $off ; } sub substr { my $argc = scalar @_ ; my $self = shift ; my $off = shift ; my $len = shift ; my $rpl = shift ; if ( $argc == 4 ) { CORE::substr ( $self->{b}, $off, $len, $rpl ) ; } elsif ( $argc == 3 ) { CORE::substr ( $self->{b}, $off, $len ) ; } elsif ( $argc == 2 ) { CORE::substr ( $self->{b}, $off ) ; } } sub del { my $self = shift ; my $len = shift ; $self->{b} = CORE::substr ( $self->{b}, $len ) ; } sub diag { my $self = shift ; sprintf "$self\n %s", Util::diag ( $self -> get ) ; } # Buff sub STATE { my $self = shift ; my $tag = shift ; sprintf " $tag [%s] len [%s]", $self , $self -> length ; } sub sysread { my $self = shift ; my $inp = shift ; sysread $inp, $self->{b}, BUF_SIZE, CORE::length ( $self->{b} ) ; } ############################################################## package Threads ; use base 'OBB' ; OBB -> import ; Util -> import ; __PACKAGE__ -> mk_getset ( qw(list inps outs) ) ; sub Defs { ( list => {}, inps => {}, outs => {} ) ; } our $TID = 0 ; sub _close { my $sock = shift ; my $tag = shift ; OBB::TT "_close $tag %s", $sock ; if ( ref ( $sock ) =~ /Socket/ ) { my $peer = $sock -> peerhost ; my $pprt = $sock -> peerport ; my $port = $sock -> sockport ; OBB::TT ' peer %s port %s' , $peer || 'no_peer' , $port || 'no_port' if $peer or $port ; logv ( "close $tag %s:%s\n", $peer, $port ) if $peer and $port ; } $sock -> close if ref ( $sock ) and $sock -> can ( 'close' ) ; } sub stop { my $self = shift ; for my $thrd ( values %{ $self -> list } ) # { $thrd -> stop ; } { $self -> Del ( $thrd -> stop ) ; } } sub Add { OBB::A_is ( 3, scalar @_ ) ; my $self = shift ; my $thrd = shift ; my $app = shift ; my $out = $thrd -> out ; my $inp = $thrd -> inp ; my $sam = ( $inp and $inp == $out ) ? 'same' : $out ; my $tid = $thrd -> tid ( $TID ++ ) ; logd ( "Add %s %s\n", ref $thrd, $tid ) ; logd ( " inp [%s]\n", $inp ) if $inp ; logd ( " out [%s]\n", $sam ) if $out ; $thrd -> pool ( $self ) ; $thrd -> app ( $app ) ; $self -> list -> { $thrd } = $thrd ; $thrd ; } sub Del { my $self = shift ; my $thrd = shift ; logd ( "Del %s %d\n", ref $thrd, $thrd -> tid ) ; my $out = $thrd -> out ; my $inp = $thrd -> inp ; _close $inp, '<-' ; _close $out, '->' ; $thrd -> gone ( 1 ) ; delete ( $self -> list -> { $thrd } ) ; } sub any_readers { my $self = shift ; my $list = $self -> list ; my $res = new IO::Select ; $self -> {inps} = {} ; for my $key ( %$list ) { my $thrd = $list -> { $key } ; my $inp = $thrd -> inp ; next unless $inp and ref ( $inp ) =~ /IO::/ ; next if $thrd -> waiting ; $res -> add ( $inp ) ; $self -> {inps} { $inp } = $thrd ; } $res ; } sub any_writers { my $self = shift ; my $list = $self -> list ; my $res = new IO::Select ; $self -> {outs} = {} ; for my $key ( %$list ) { my $thrd = $list -> { $key } ; my $out = $thrd -> out ; next unless $thrd -> bout -> length ; next if $thrd -> bout -> index ( "\n" ) == -1 ; next unless $out and ref ( $out ) =~ /IO::/ ; $res -> add ( $out ) ; $self -> {outs} { $out } = $thrd ; } $res ; } sub any_readys { my $self = shift ; my $list = $self -> list ; my @res = () ; for my $key ( keys %$list ) { my $thrd = $list -> { $key } ; push @res, $thrd if $thrd -> ready_to_do_lines ; } @res ; } sub any_inactives { my $self = shift ; [ grep { $_ -> can ( 'inactive' ) and $_ -> inactive } values %{ $self -> list } ] ; } sub by_inp { $_[0] -> {inps} { $_[1] } ; } sub by_out { $_[0] -> {outs} { $_[1] } ; } sub do_a_loop { my $self = shift ; OBB::TT 'do_a_loop ...' ; for my $h ( $self -> any_writers -> can_write () ) { my $thrd = $self -> by_out ( $h ) ; OBB::TT 'flush 1 %s %s', $thrd, $thrd -> bout ; $thrd -> flush ; } for my $h ( $self -> any_readers -> can_read ( 3 ) ) { my $thrd = $self -> by_inp ( $h ) ; OBB::TT sprintf 'reader %s', $thrd ; $self -> Del ( $thrd ) if $thrd -> done ( $self ) ; } my @readys = $self -> any_readys ; OBB::TT 'readys %s', scalar @readys if @readys ; for my $thrd ( @readys ) { OBB::TT 'ready %s %s', $thrd, ref $thrd ; $self -> Del ( $thrd ) if $thrd -> done ( $self ) ; } for my $h ( $self -> any_writers -> can_write () ) { my $thrd = $self -> by_out ( $h ) ; OBB::TT 'flush 2 %s %s', $thrd, $thrd -> bout ; $thrd -> flush ; } @readys = $self -> any_readys ; OBB::TT 'readys %s', scalar @readys if @readys ; for my $thrd ( @readys ) { OBB::TT 'ready %s %s', $thrd, ref $thrd ; $self -> Del ( $thrd ) if $thrd -> done ( $self ) ; } } sub by_tid { $a -> tid <=> $b -> tid } ; sub Dump { my $self = shift ; for my $thrd ( sort by_tid values %{ $self -> {list} } ) { $thrd -> Dump ; } } # Threads sub STATE { my $self = shift ; my @list = () ; for my $tuple ( sort { $a -> [0] cmp $b -> [0] } map { $_ -> STATE } sort by_tid values %{ $self -> {list} } ) { my ( $tag, @itms ) = @$tuple ; push @list, @itms ; } "threads :\n%s\n", join "\n", map { "-- $_" ; } @list ; } ############################################################## package Dmon::Thread ; use base 'OBB' ; use constant COMMAND_DONE => "COMMAND DONE\n" ; __PACKAGE__ -> mk_getset ( qw(tid inp out app pool ltim gone waitx proxy) ) ; __PACKAGE__ -> mk_get ( qw(binp bout) ) ; sub Init { my $self = shift ; $self -> { inp } = shift ; $self -> { out } = shift ; $self -> { binp } = Buff -> Make ( '' ) ; $self -> { bout } = Buff -> Make ( '' ) ; $self -> { base } = undef ; $self -> { ltim } = time ; $self -> { gone } = 0 ; $self -> { waitx } = {} ; $self -> { proxy } = 1 ; $self ; } sub pipe_to { my $self = shift ; my $that = shift ; $self -> { bout } = $that -> binp ; $self -> { out } = undef ; $self ; } sub is_sender { ref $_[0] eq 'Dmon::Thread::Send' ; } sub is_receiver { ref $_[0] eq 'Dmon::Thread::Recv' ; } sub wait4 { OBB::A_is ( 2, scalar @_ ) ; my $self = shift ; my $thrd = shift ; $self -> waitx -> { $thrd } = $thrd ; } sub waits { values %{ $_[0] -> waitx } ; } sub waitc { scalar keys %{ $_[0] -> waitx } ; } sub waiting { my $self = shift ; $self -> is_receiver and $self -> waitc ; } sub wait_tids { my $self = shift ; map { $_ -> tid } $self -> waits ; } sub all_gone { OBB::A_is ( 1, scalar @_ ) ; my $self = shift ; ! grep { $_ -> gone == 0 } $self -> waits ; } sub all_gonex { OBB::A_is ( 2, scalar @_ ) ; my $self = shift ; my $gone = shift ; $self -> waitc == scalar grep { $_ -> gone == $gone } $self -> waits ; } sub all_gone1 { $_[0] -> all_gonex ( 1 ) ; } sub all_gone2 { $_[0] -> all_gonex ( 2 ) ; } sub ready_to_do_lines { my $self = shift ; my $idx = $self -> binp -> index ( "\n" ) ; my $res = 0 ; if ( $self -> can ( 'timeout' ) and $self -> timeout ) { $res = 1 ; OBB::TT 'ready timed out %s timeout %s', $self, $self -> timeout ; } elsif ( $self -> waiting and $self -> all_gone1 ) { $res = 1 ; OBB::TT 'ready all_gone1 %s', $self ; } elsif ( $self -> waiting and $self -> all_gone2 ) { $res = 1 ; OBB::TT 'ready all_gone2 %s', $self ; } elsif ( $self -> binp == $self -> bout ) { $res = 0 ; } elsif ( $idx != -1 ) { $res = 1 ; OBB::TT 'ready have inp %s %s', $self, $idx ; } $res ; } # A 'ready' thread is either "waiting for input" or "waiting for messages" # A thread is ready-to-run if/when # -- a timeout occurrs # action : for now : treat as if 'ready to read' ; # [ the read blocks ; I observer a 6 seconds wait # ; followed by a second read (empty), which closed the socket # and Del'd the thread # ; the application got a (plain) Dmon::Mesg (??) # ] # -- input can be read # Action : read input ; progress with 'do_lines' ; set 'done' on EOF # -- all messages are available to thread (all senders are done/gone) ; # [ the thread is either a proxy or a collector of messages # ; a proxy just passes on what it receives ; # ; a non-proxy has a 'pref' (message prefix), set at thread-creation. # ] # Action : output ( proxy ? full message : cat pref bodies ) ; set 'done' # time 100522.999 do_a_loop ... # time 100526.004 do_a_loop ... # time 100529.009 do_a_loop ... # time 100532.013 make to server # Tue Nov 10 19:09:31 2015 dmon[41041] send REPORT to science-vs33.science.uu.nl # time 100532.016 to server PORT [22007] # Tue Nov 10 19:09:31 2015 dmon[41041] Add Dmon::Thread::Send 386 # Tue Nov 10 19:09:31 2015 dmon[41041] inp [IO::Socket::INET=GLOB(0x2b4b268)] # Tue Nov 10 19:09:31 2015 dmon[41041] out [same] # time 100532.019 make to server done # time 100532.019 do_a_loop ... # time 100535.023 do_a_loop ... # time 100538.028 do_a_loop ... # time 100541.033 do_a_loop ... # time 100544.037 ready timed out 386 [[1]] # time 100544.037 readys [1] # time 100544.037 ready 386 [Dmon::Thread::Send] # time 100550.013 read 386 Buff=HASH(0x2d0afc0) # [COMMAND REPORT\n{"resp":"ok report from science-vs64.science.uu.nl [131.211.32.73] for science-vs64.science.uu.nl"}\nCOMMAND DONE\n] # time 100550.014 ready timed out 386 [[1]] # time 100550.014 readys [1] # time 100550.014 ready 386 [Dmon::Thread::Send] # time 100550.022 read 386 Buff=HASH(0x2d0afc0) # [empty] # Tue Nov 10 19:09:49 2015 dmon[41041] Del Dmon::Thread::Send 386 # time 100550.022 _close [IO::Socket::INET=GLOB(0x2b4b268)] # time 100550.022 _close ; peer [no_peer] # time 100550.023 _close ; port [48343] # Tue Nov 10 19:09:49 2015 dmon[41041] close <- no_peer:48343 # time 100550.023 _close ; closed [IO::Socket::INET=GLOB(0x2b4b268)] # time 100550.023 _close [IO::Socket::INET=GLOB(0x2b4b268)] # time 100550.023 _close ; peer [no_peer] # time 100550.023 _close ; port [no_port] # Can't locate object method "err" via package "Dmon::Mesg" at /usr/sbin/dmon line 4004. # Tue Nov 10 19:09:49 2015 dmon[41041] daemon exits dirty [syslog'ed] sub done { my $self = shift ; my $res ; if ( $self -> can ( 'timeout' ) and $self -> timeout ) { OBB::TT "timeout ; %s is done", $self ; $res = 1 ; } elsif ( $self -> waiting ) { if ( $self -> all_gone1 ) { $self -> bout -> add ( $self -> pref ) unless $self -> proxy ; for my $sndr ( $self -> waits ) { $self -> bout -> add ( $self -> proxy ? $sndr -> rcvd : $sndr -> body ) ; $sndr -> gone ( 2 ) ; my $rcvd = Util::diag ( $sndr -> rcvd ) ; OBB::TT "mesg for %s from %s", $self, $sndr; OBB::TT "mesg for %s %s", $self, $rcvd ; } $self -> bout -> add ( COMMAND_DONE ) unless $self -> proxy ; $res = 0 ; } elsif ( $self -> all_gone2 ) { $res = 1 ; } else { $res = 0 ; } } else { my $len = $self -> binp -> sysread ( $self -> inp ) ; OBB::TT "read %s %s", $self, $self -> binp ; if ( $len ) { $self -> do_lines ; $self -> ltim ( time ) ; } $res = ! $len ; } $self -> Xit ( 'bad res for done()' ) unless defined $res ; $res ; } # by default, copy to bout sub do_line { my $self = shift ; my $line = shift ; $line ; } sub do_lines { my $self = shift ; my $binp = $self -> binp ; my $bout = $self -> bout ; if ( $self -> binp -> length and $binp != $bout ) { my $pos = 0 ; for ( my $idx = $binp -> index ( "\n", $pos ) ; $idx != -1 ; $idx = $binp -> index ( "\n", $pos ) ) { $bout -> add ( $self -> do_line ( $binp -> substr ( $pos, $idx + 1 - $pos ) ) ) ; $pos = $idx + 1 ; } $binp -> del ( $pos ) ; } } sub flush { my $self = shift ; my $tout = shift || 0 ; my $out = $self -> out ; my $str = $self -> bout -> get ; my $err = undef ; my $res = undef ; unless ( $out ) { print "Dmon::Thread::flush : no out\n" if $self -> Terse ; return undef ; } my $time = time ; if ( $tout ) { eval { alarm $tout ; $res = syswrite $out, $str ; $err = $! unless defined $res ; alarm 0 ; } ; alarm 0 ; print "flush : eval[$@]\n" if $@ and $self -> Terse ; } else { $res = syswrite $out, $str ; $err = $! unless defined $res ; } if ( defined $res ) { $self -> bout -> del ( $res ) ; } else { my $ival = time - $time ; printf ( "flush: syswrite returned undef (%s) tout[%s] sec[%s]\n" , $err, $tout, $ival ) if $self -> Terse ; } $res ; } sub stop { my $self = shift ; for my $h ( $self -> inp, $self -> out ) { next unless ref ( $h ) ; $h -> flush if $h -> can ( 'flush' ) ; # $h -> close if $h -> can ( 'close' ) ; } $self ; } sub Dump { my $self = shift ; if ( $self -> Debug ) { my $binp = $self -> binp -> get ; $binp =~ s/\n/\\n/g ; my $bout = $self -> bout -> get ; $bout =~ s/\n/\\n/g ; my $wait = $self -> waiting ; printf " self : %s %d\n", $self, $self -> tid ; printf " inp [$self->{inp}]\n" ; printf " out [$self->{out}]\n" ; printf " binp len %2d %s\n", $self -> binp -> length, $self -> binp ; printf " [%s]\n", $binp if $self -> binp -> length ; printf " bout len %2d %s\n", $self -> bout -> length, $self -> bout ; printf " [%s]\n", $bout if $self -> bout -> length ; printf " wait [%s]\n", join ',', $self -> wait_tids if $wait ; } } sub diag { my $self = shift ; sprintf "%2d", $self -> tid ; } # Dmon::Thread sub STATE { my $self = shift ; [ "sock $self" , join "\n", ( sprintf ( "self %s" , $self ) , sprintf ( " inp [%s]" , $self-> inp ) , sprintf ( " out [%s]" , $self-> out ) , sprintf ( " app [%s]" , $self-> app ) , sprintf ( " binp [%d]", $self -> binp -> length ) , sprintf ( " bout [%d]", $self -> bout -> length ) ) ] ; } sub state_buffs { my $self = shift ; my $res = [] ; for my $tag ( qw(binp bout) ) { my $buff = $self -> { $tag } ; push @$res, $buff ? $buff -> STATE ( $tag ) : "no $tag" ; } $res ; } ############################################################## package Dmon::Thread::Service ; use base 'Dmon::Thread' ; __PACKAGE__ -> mk_getset ( qw(sock port allowed) ) ; OBB -> import ; Util -> import ; sub Init { my $self = shift ; my %opts = ( port => undef, @_ ) ; my $port = $opts { port } ; my $sock = new IO::Socket::INET ( Listen => 128 , LocalPort => $port, ReuseAddr => 1 ) ; $self -> sock ( $sock ) ; $self -> port ( $port ) ; $self -> mk_allowed ( [], $port ) ; $self -> Die ( "Could not create socket for $port ($!)" ) unless $sock ; $self -> Dmon::Thread::Init ( $sock, $sock ) ; } sub allow { my $self = shift ; exists $self -> allowed -> { $_[0] } ; } sub mk_allowed { my $self = shift ; my $list = shift ; my $port = shift ; $self -> allowed ( Util::mk_allowed $list, $port ) ; } sub done { my $self = shift ; my $base = shift ; my $sock = $self -> sock -> accept () ; my $peer = $sock -> peerhost () ; my $port = $sock -> peerport () ; if ( $self -> allow ( $peer ) ) { my $thrd = Dmon::Thread::Recv -> Make ( $sock ) ; $base -> Add ( $thrd, $self -> app ) ; Util::logt ( "open <- $peer:$self->{port} remote $port" ) if $self -> Terse ; } else { close $sock ; my $mesg = "peer $peer not allowed on port $self->{port}" ; Util::logt ( $mesg ) if $self -> Terse ; } 0 ; } # Dmon::Thread::Service sub STATE { my $self = shift ; [ "service $self", sprintf "listening on port %s as a %s" , $self -> port , ref ( $self -> app ) ] ; } ############################################################## package Dmon::Thread::Recv ; use base 'Dmon::Thread' ; __PACKAGE__ -> mk_getset ( qw(pref) ) ; use constant COMMAND_DONE => Dmon::Thread::COMMAND_DONE ; sub Init { my $self = shift ; my $sock = shift ; $self -> Dmon::Thread::Init ( $sock, $sock ) ; $self -> {pref} = '' ; $self ; } # Dmon::Thread::Recv sub do_line { my $self = shift ; my $line = shift ; my $app = ref ( $self -> app ) ; chomp $line ; $line =~ /^(\w+\??)\s*/ ; my $cmd = $1 || 'empty' ; my $arg = $' ; $arg = '' unless defined $arg ; my $res = '' ; print "COMMAND $cmd\n" if $self -> Terse ; if ( $cmd eq 'PING' ) { my $app = ref $self -> app ; my $ver = Dmon -> Version ; my $hst = Util -> hostname ; $res = "PONG from $app $hst $ver" ; } elsif ( $cmd eq 'STATE' ) { my $logf = $self -> app -> conf -> log_file ; my $llvl = $self -> app -> conf -> loglvl ; $res = '' . ( sprintf "-- version %s\n", Dmon -> Version ) . ( sprintf "-- logfile %s\n", $logf ) . ( sprintf "-- loglevel %s\n", $llvl ) . $self -> pool -> STATE . "\n" . $self -> app -> STATE ; } elsif ( $cmd eq 'REPORT' ) { my $val = JSON::PP::decode_json ( $arg || '{}' ) ; $res = $self -> app -> do_report ( $val, $self -> inp -> peerhost ) ; } else { $res = $self -> app -> command ( $cmd, $arg, $self ) ; } $res = "unknown $app command [$line]" unless defined $res ; $self -> STOP if $cmd eq 'STOP' and $res eq Client::STOPPED ; if ( $self -> waiting ) { $self -> pref ( "COMMAND $cmd\n$res" ) unless $self -> proxy ; '' ; } else { "COMMAND $cmd\n$res\n" . COMMAND_DONE ; } } sub STOP { my $self = shift ; $self -> bout -> add ( 'STOPPED' ) ; print "stopping ; self flush\n" if $self -> Debug ; $self -> flush ; print "stopping ; self shutdown\n" if $self -> Debug ; $self -> out -> shutdown ( 2 ) ; # done using print "stopping ; pool stop\n" if $self -> Debug ; $self -> pool -> stop ; print "stopping ; base stop done\n" if $self -> Debug ; Dmon::Daemon -> ulock ; print "stopping ; unlocked\n" if $self -> Debug ; $Util::STOP = 'clean' ; exit ; } # Dmon::Thread::Recv sub STATE { my $self = shift ; my $sock = $self -> inp ; [ "serving $self" , sprintf "%s is processing a command-session << %s port %s" , ref ( $self -> app ) , $sock -> peerhost, $sock -> sockport ] ; } ############################################################## package Dmon::Thread::Send ; use base 'Dmon::Thread' ; __PACKAGE__ -> mk_getset ( qw(lines mark tout mbox mesg) ) ; use constant COMMAND_DONE => Dmon::Thread -> COMMAND_DONE ; sub Init { my $self = shift ; my %opts = ( hnam => undef , port => undef , mbox => undef , tout => 10 , @_ ) ; my $hnam = $opts {hnam} ; my $port = $opts {port} ; my $mbox = $opts {mbox} ; my $sock ; eval { local $SIG{ALRM} = sub { die 'Timed Out'; } ; alarm 2 ; $sock = IO::Socket::INET -> new ( PeerAddr => $opts {hnam} , PeerPort => $opts {port} , Proto => 'tcp' ) ; alarm 0 ; } ; if ( $sock ) { $self -> Dmon::Thread::Init ( $sock, $sock ) ; $self -> mbox ( $opts{mbox} ) ; $self -> tout ( $opts{tout} ) ; $self -> mark ( time ) ; $self -> lines ( [] ) ; $self ; } else { undef ; } } sub _mk_mesg { my $self = shift ; my $lines = $self -> lines ; my $line = $lines -> [ 0 ] ; chomp $line ; my $resp = 'no proper message received ; TIMEOUT' ; if ( $self -> timeout ) { Dmon::Mesg -> Make ( resp => $resp, lines => $lines ) ; } elsif ( $line eq "COMMAND REPORT" ) { Dmon::Mesg::Report -> Make ( lines => $lines ) ; } elsif ( $line eq "COMMAND HIST" ) { Dmon::Mesg::JSON -> Make ( lines => $lines ) ; } elsif ( $line eq "COMMAND CLIENTS" ) { Dmon::Mesg::Clients -> Make ( lines => $lines ) ; } else { Dmon::Mesg -> Make ( lines => $lines ) ; } } # Dmon::Thread::Send sub do_line { my $self = shift ; my $line = shift ; my $lines = $self -> lines ; push @$lines, $line ; if ( $line eq COMMAND_DONE or $self -> timeout ) { my $mesg = $self -> _mk_mesg ( lines => $lines ) ; my $mbox = $self -> mbox ; $self -> mesg ( $mesg ) ; push @$mbox, $mesg if defined $mbox ; } '' ; } sub send { my $self = shift ; my $mesg = shift ; chomp $mesg ; my $idx = index $mesg, "\n" ; $self -> Xit ( "send: newline in mesg ($idx)" ) if $idx >= 0 ; $self -> mark ( time ) ; $self -> bout -> add ( "$mesg\n" ) ; $self -> flush () ; $self -> out -> shutdown ( 1 ) ; # done writing } sub rcvd { my $self = shift ; join '', @{ $self -> lines } ; } sub body { my $self = shift ; my @lines = @{ $self -> lines } ; shift @lines ; pop @lines ; join '', @lines ; } sub timeout { OBB::A_is 1, scalar @_ ; my $self = shift ; $self -> tout and time > $self -> mark + $self -> tout ; } # Dmon::Thread::Send sub STATE { my $self = shift ; my $sock = $self -> inp ; [ "serving $self" , sprintf "%s is sending some command-session >> %s port %s" , ref ( $self -> app ) , $sock -> peerhost, $sock -> peerport ] ; } ############################################################## package Dmon::Mesg ; use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw(lines) ) ; sub Init { my $self = shift ; $self -> OBB::Init ( @_ ) ; $self ; } sub type { my $self = shift ; my $res = $self -> lines -> [ 0 ] ; chomp $res ; $res || 'no type' ; } sub body { my $self = shift ; join '', @{ $self -> lines } ; } sub resp { 'stub response' ; } sub ok { 1 ; } sub err { my $self = shift ; ! $self -> ok ; } ############################################################## package Dmon::Mesg::JSON ; use base 'Dmon::Mesg' ; __PACKAGE__ -> mk_getset ( qw(pvar) ) ; sub Init { my $self = shift ; $self -> Dmon::Mesg::Init ( @_ ) ; my $line = $self -> lines -> [ 1 ] ; $self -> pvar ( Util::as_pvar $line ) ; $self ; } sub resp { my $self = shift -> pvar -> {resp} ; } sub ok { my $self = shift ; $self -> resp =~ /^ok/ ; } ############################################################## package Dmon::Mesg::Report ; use base 'Dmon::Mesg::JSON' ; sub work_lm { my $self = shift ; $self -> pvar -> { work } ; } ############################################################## package Dmon::Mesg::Clients ; use base 'Dmon::Mesg::JSON' ; sub cdmp { my $self = shift ; $self -> pvar -> { cdmp } ; } sub events { my $self = shift ; $self -> pvar -> { events } ; } sub event0 { my $self = shift ; $self -> pvar -> { event0 } ; } sub noalrt { my $self = shift ; $self -> pvar -> { noalrt } ; } ############################################################## package Dmon::Mesg::Work ; use base 'Dmon::Mesg::JSON' ; sub work { my $self = shift ; $self -> pvar -> { data } ; } sub lm { my $self = shift ; $self -> pvar -> { lm } ; } ############################################################## package Dmon::Mesg::Hist ; use base 'Dmon::Mesg::JSON' ; sub cols { my $self = shift ; $self -> pvar -> { data } { cols } ; } sub rows { my $self = shift ; $self -> pvar -> { data } { rows } ; } ############################################################## package Dmon::Daemon ; use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw(conf demn STOP) ) ; use Fcntl qw(:flock) ; OBB -> import ; Util -> import ; our $PROG = Dmon::PROG ; our $DEMN = 'daemon' ; our $HOSTNAME = `hostname` ; chomp $HOSTNAME ; sub Init { my $self = shift ; my %opts = @_ ; my $conf = $self -> conf ( $opts { conf } ) ; my $LOGF = $conf -> log_file ; my $demn = Proc::Daemon -> new ( work_dir => '.' , child_STDOUT => ">>$LOGF" , child_STDERR => ">>$LOGF" , pid_file => $conf -> pid_file ) ; $self -> demn ( $demn ) ; $self ; } sub xlock { my $self = shift ; my $file = $self -> conf -> lck_file ; my $cnt = 0 ; my $res = 0 ; unless ( open LOCK, ">$file" ) { syslog ( "exit ; can't write lock file [$file] ($!)" ) ; exit ; } while ( $cnt < 2 ) { $cnt ++ ; if ( flock LOCK, LOCK_EX|LOCK_NB ) { logd ( "got lock ; try[$cnt]" ) ; $res = 1 ; last ; } else { logd ( "can't get lock ; try[$cnt]" ) ; } sleep 5 ; } $res ; } sub ulock { my $self = shift ; flock LOCK, LOCK_UN ; } # pid # we're the parent # -1 # can't run ; other is already running or can't lock # 0 # we're the daemon ; including opt{i} sub start_daemon { my $self = shift ; my $opti = shift ; my $pid = undef ; my $demn = $self -> demn ; my $msg ; my $cnt = 0 ; while ( $cnt < 3 and $pid = $demn -> Status ( undef ) ) { $cnt ++ ; sleep 1 ; } if ( $pid and $pid != $$ ) { $msg = "$PROG: some $DEMN is already running ; pid $pid" ; $pid = -1 ; } elsif ( ! $self -> xlock ) { $msg = "$PROG: can't lock ; some $DEMN is already running" ; $pid = -1 ; } else { $self -> ulock ; $pid = ( $opti ? 0 : $demn -> Init () ) ; $msg = sprintf "$PROG: launched $DEMN on %s ; pid %s" , $HOSTNAME, $pid ; } return $pid, $msg ; } sub send_stop { my $self = shift ; my $res = 0 ; my $SOCK = IO::Socket::INET -> new ( PeerAddr => 'localhost' , PeerPort => $self -> conf -> port_client , Proto => 'tcp' ) ; if ( $SOCK ) { printf $SOCK "STOP %s\n", ( $self -> conf -> read_stp || '_' ) ; $SOCK -> shutdown ( 1 ) ; # done writing my $line = <$SOCK> ; $res = 1 if $line and $line eq 'STOPPED' ; $SOCK -> shutdown ( 2 ) ; # done using } else { logv ( "can't open sock ; nevermind" ) ; } $res ; } sub send_state { my $self = shift ; my $res = '' ; my $SOCK = IO::Socket::INET -> new ( PeerAddr => 'localhost' , PeerPort => $self -> conf -> port_client , Proto => 'tcp' ) ; if ( $SOCK ) { printf $SOCK "STATE\n" ; $SOCK -> shutdown ( 1 ) ; # done writing $res = join '', <$SOCK> ; $SOCK -> shutdown ( 2 ) ; # done using } else { $res = "can't open sock to daemon" ; } $res ; } sub send_reload { my $self = shift ; my $pid = shift ; ( kill 1, $pid ) ? '' : "can't HUP daemon ($pid)" ; } sub stop_daemon { my $self = shift ; $self -> demn -> Kill_Daemon ( undef ) unless $self -> send_stop ; } sub status { my $self = shift ; $self -> demn -> Status ( undef ) ; } sub sss_exit { my $self = shift ; my $ARG = shift ; my $opti = shift ; my $ownr = $self -> conf -> own_stp ; if ( $ARG and $ARG eq 'start' ) { my ( $pid, $msg ) = $self -> start_daemon ( $opti ) ; if ( $pid ) { # we are the parent syslog ( $msg ) ; my $xit = ( $pid < 0 ? 1 : 0 ) ; exit $xit unless $opti and $pid > 0 ; } else { # we have a running daemon OBB -> Verbosity ( $self -> conf -> loglvl ) ; Util::MODE ( 1 ) ; Util::rotate ( $self -> conf ) unless $opti ; logq ( "$0 start [$$] %s", Dmon -> Version ) ; my $sys_lock = $self -> conf -> sys_lock ; $self -> xlock ; $self -> conf -> make_stp ; $self -> conf -> make_sys_lock or syslog ( "can't write sys_lock [%s] ; nm", $sys_lock ) ; Probes -> Install ( $self -> conf -> vardir ) ; STDOUT -> autoflush ( 1 ) ; STDERR -> autoflush ( 1 ) ; syslog ( "daemon started" ) ; } } elsif ( $ARG and $ARG eq 'state' ) { logt ( $self -> send_state ) ; exit 0 ; } elsif ( $< and ! defined $ownr ) { my $nam = getpwuid ( $< ) || $< ; logt ( "$PROG: no owner ; $nam can't stat\n" ) ; exit 1 ; } elsif ( $< and $ownr != $< ) { my $own = getpwuid ( $ownr ) || $ownr ; my $nam = getpwuid ( $< ) || $< ; logt ( "$PROG: owned by $own ; $nam can't stat\n" ) ; exit 1 ; } elsif ( $ARG and $ARG eq 'stop' ) { my $pid = $self -> status ; my $msg = "$PROG: $DEMN is not running" ; my $xit = 0 ; if ( $pid ) { my $cnt = $self -> stop_daemon ; $msg = sprintf "$PROG: %s $DEMN on %s ; pid %s" , ( $cnt ? 'stopped' : "can't stop" ), $HOSTNAME, $pid ; if ( $cnt ) { $self -> conf -> rm_stp ; $self -> conf -> rm_sys_lock ; } $xit = ! $cnt ; } syslog ( $msg ) ; exit ( $xit || 0 ) ; } elsif ( $ARG and $ARG eq 'reload' ) { my $pid = $self -> status ; my $msg = "$PROG: $DEMN is not running" ; if ( $pid ) { $msg = $self -> send_reload ( $pid ) ; } logt ( $msg ) ; exit 0 ; } elsif ( my $pid = $self -> status ) { logt ( "$PROG: $DEMN is running ; pid %s\n", $pid ) ; exit 0 ; } else { logt ( "$PROG: $DEMN is not running\n" ) ; exit 1 ; } $self ; } ############################################################## package TS ; use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw(file name atrs dbh Err) ) ; sub Defs { ( file => '', name => 'data', atrs => [] ) ; } use constant { TIME => Dmon::TIME , IVAL => Dmon::IVAL } ; sub Reset { my $self = shift ; $self -> Err ( undef ) ; $self ; } sub nn { my $x = shift ; not not $x ; } sub null { defined $_[0] ? $_[0] : 'NaN' ; } sub tnam { my $x = pop ; $x =~ s/[^\w]/_/g ; $x } sub connect { my $self = shift -> Reset ; my $file = $self -> file ; my $res = 0 ; unless ( $file ) { $self -> Err ( 'no file' ) ; } else { my $dbh = DBI -> connect ( "dbi:SQLite:dbname=$file", "", "" , { AutoCommit => 1 , RaiseError => 0 , sqlite_see_if_its_a_number => 1 } ) ; $res = nn $self -> dbh ( $dbh ) ; } $res ; } sub disconnect { my $self = shift -> Reset ; my $dbh = $self -> dbh ; my $res = 0 ; unless ( $dbh ) { $self -> Err ( 'not connected' ) ; } elsif ( $dbh -> disconnect ) { $self -> dbh ( undef ) ; $res = 1 ; } else { $self -> Err ( "can't disconnect" ) ; } $res ; } sub _table_info { my $self = shift ; my $dbh = $self -> dbh ; my $res = undef ; return $res unless $dbh ; my $sth = $dbh -> table_info ( undef, undef, '%', 'TABLE' ) ; if ( $sth ) { $res = $sth -> fetchall_arrayref ; } else { $self -> Err ( "_table_info : can't" ) ; } $res ; } sub _column_info { my $self = shift ; my $name = shift ; my $dbh = $self -> dbh ; my $res = undef ; return $res unless $dbh and $name ; my $sth = $dbh -> column_info ( undef, undef, $name, '%' ) ; if ( $sth ) { $res = $sth -> fetchall_arrayref ; } else { $self -> Err ( "_column_info : can't" ) ; } $res ; } sub _tabs { OBB::A_is 1, scalar @_ ; my $self = shift -> Reset ; my $info = $self -> _table_info ; my $res = undef ; if ( $info ) { $res = [ sort map { $_ -> [2] ; } @$info ] ; } else { $self -> Err ( "_table_info : can't" ) ; } $res ; } sub _cols { OBB::A_is 2, scalar @_ ; my $self = shift -> Reset ; my $name = shift ; my $info = $self -> _column_info ( $name ) ; my $res = undef ; if ( $info ) { $res = [ sort map { $_ -> [3] ; } @$info ] ; } else { $self -> Err ( "_column_info : can't" ) ; } $res ; } sub user_cols { OBB::A_is 2, scalar @_ ; my $self = shift ; my $res = [ grep $_ ne TIME, @{ $self -> _cols ( @_ ) } ] ; wantarray ? @$res : $res ; } sub has_tab { OBB::A_is 2, scalar @_ ; my $self = shift ; my $name = shift ; scalar grep $_ eq $name, @{ $self -> _tabs } ; } sub add_tab_sql { OBB::A_in 2, 3, scalar @_ ; my $self = shift ; my $name = shift ; my $cols = shift || [] ; sprintf "CREATE TABLE %s ( %s integer primary key %s )" , $name, TIME , ( @$cols ? map { ", $_" ; } @$cols : '' ) ; } sub add_tab { OBB::A_is 2, scalar @_ ; my $self = shift ; my $name = shift ; my $res = 1 ; unless ( $self -> has_tab ( $name ) ) { $res = $self -> dbh -> do ( $self -> add_tab_sql ( $name ) ) ; printf "added table $name\n" if $self -> Verbose ; } elsif ( $self -> Verbose ) { printf "already have table $name\n" ; } $res ; } sub del_tab { OBB::A_is 2, scalar @_ ; my $self = shift ; my $name = shift ; printf "%s table $name\n", $self -> has_tab ( $name ) ? 'drop' : '[warn] no' ; nn $self -> dbh -> do ( "DROP TABLE IF EXISTS $name" ) ; } sub has_col { OBB::A_is 3, scalar @_ ; my $self = shift ; my $name = shift ; my $col = shift ; scalar grep $_ eq $col, @{ $self -> _cols ( $name ) } ; } sub add_col { OBB::A_in 4, 5, scalar @_ ; my $self = shift ; my $name = shift ; my $col = shift ; my $typ = shift ; my $xtra = shift || '' ; my $res = 1 ; unless ( $self -> has_col ( $name, $col ) ) { $res = $self -> dbh -> do ( "ALTER TABLE $name ADD COLUMN $col $typ $xtra" ) ; OBB::TT ( "added field $name.$col:$typ %s", $xtra ) ; } elsif ( $self -> Verbose ) { OBB::TT ( "already have $name.$col" ) ; } $res ; } sub _upd_cols_sql { OBB::A_is 3, scalar @_ ; my $name = shift ; my $olds = shift ; # list of old names my $news = shift ; # list of new names my $oldnams = join ', ', TIME, @$olds ; my $newnams = join ', ', TIME, @$news ; my $newtyps = join ', ', map { ( $_ eq TIME ? 'TIME integer primary key' : ( $_ eq IVAL ? 'IVAL real NOT NULL DEFAULT 1' : "$_ real" ) ) ; } ( TIME, @$news ) ; my $tnam = "tmp_$name" ; my $SQL = < has_col ( $name, $old ) ) { printf "[err] don't have $name.$old\n" ; } elsif ( $old eq $new ) { printf "[warn] old [$old] == new [$new]\n" ; $res = 1 ; } elsif ( $old eq TIME ) { printf "[warn] won't move TIME\n" ; } elsif ( $self -> has_col ( $name, $new ) ) { printf "[err] already have $name.$new\n" ; } else { printf "move $name.$old $name.$new\n" ; my $olds = [ sort $self -> user_cols ( $name ) ] ; my $news = [ map { $_ eq $old ? $new : $_ } @$olds ] ; my $SQL = _upd_cols_sql $name, $olds, $news ; print $SQL if $self -> Debug ; $res = nn $self -> dbh -> do ( $_ ) for split "\n", $SQL ; } $res ; } sub drop_cols { OBB::A_ge 3, scalar @_ ; my $self = shift ; my $name = shift ; my @cols = @_ ; my %keep ; $keep { $_ } ++ for $self -> user_cols ( $name ) ; my $drps = 0 ; my $res = 0 ; for my $col ( @cols ) { if ( $col eq TIME ) { printf "[err] won't drop $name.$col\n" ; } elsif ( $keep { $col } ) { printf "drop $name.$col\n" ; $drps ++ ; delete $keep { $col } ; } else { printf "[err] no $name.$col\n" ; } } my $keeps = scalar keys %keep ; unless ( $keeps ) { $res = $self -> del_tab ( $name ) ; } elsif ( $drps ) { my $keep = [ sort keys %keep ] ; my $SQL = _upd_cols_sql $name, $keep, $keep ; print $SQL if $self -> Debug ; $res = nn $self -> dbh -> do ( $_ ) for split "\n", $SQL ; } else # no change { print "no change\n" ; $res = 1 ; } $res ; } sub has_row { OBB::A_is 3, scalar @_ ; my $self = shift ; my $name = shift ; my $time = shift ; $self -> count ( $name, where => "TIME = $time" ) ? 1 : 0 ; } sub _flat ; sub _flat { my @res = () ; for my $itm ( @_ ) { push @res, ( ( ref $itm eq 'ARRAY' ) ? _flat @$itm : $itm ) if defined $itm ; } @res ; } our %KWDS ; our @KWDS = qw(where group_by having order_by limit) ; @KWDS { @KWDS } = map { my $x = $_ ; $x =~ s/_/ / ; uc $x ; } @KWDS ; sub _mk_sql_select { OBB::A_ge 1, scalar @_ ; my $name = shift ; my %opts = ( from => $name , @_ ) ; my $cols = join ( ',', _flat $opts{cols} ) || "$name.*" ; sprintf "SELECT %s FROM %s %s %s %s %s %s" , $cols, $opts { from } , map { $opts{$_} ? "$KWDS{$_} $opts{$_}" : '' ; } @KWDS ; } sub _mk_sql_delete { OBB::A_ge 1, scalar @_ ; my $name = shift ; my %opts = ( from => $name , @_ ) ; sprintf "DELETE FROM %s %s" , $opts { from } , map { $opts{$_} ? "$KWDS{$_} $opts{$_}" : '' ; } qw(where) ; } sub _select_sth { OBB::A_ge 2, scalar @_ ; my $self = shift -> Reset ; my $name = shift ; my %opts = @_ ; my $dbh = $self -> dbh ; unless ( $self -> has_tab ( $name ) ) { $self -> Err ( "no table $name" ) ; return undef ; } my $SQL = _mk_sql_select ( $name, order_by => TIME, %opts ) ; printf "${SQL}\n" if $self -> Debug ; my $sth = $dbh -> prepare ( $SQL ) ; $self -> Err ( "can't prepare $SQL" ) unless $sth ; $sth ; } sub _select { OBB::A_ge 4, scalar @_ ; my $self = shift ; my $one = shift ; my $tups = shift ; my $sth = $self -> _select_sth ( @_ ) ; my $res = [] ; if ( $sth ) { $sth -> execute () ; if ( $tups eq 'tups' ) { while ( my $row = $sth -> fetchrow_arrayref ) { push @$res, [ @$row ] } } else { while ( my $hsh = $sth -> fetchrow_hashref ) { push @$res, { %$hsh } } } } $one ? $res -> [ 0 ] : $res ; } sub select { OBB::A_ge 2, scalar @_ ; shift -> _select ( 0, 'tups', @_ ) ; } sub select_hash { OBB::A_ge 2, scalar @_ ; shift -> _select ( 0, 'hash', @_ ) ; } sub select1 { OBB::A_ge 2, scalar @_ ; shift -> _select ( 1, 'tups', @_ ) ; } sub select1_hash { OBB::A_ge 2, scalar @_ ; shift -> _select ( 1, 'hash', @_ ) ; } sub delete { OBB::A_ge 2, scalar @_ ; my $self = shift ; my $name = shift ; my %opts = @_ ; my $dbh = $self -> dbh ; unless ( $self -> has_tab ( $name ) ) { printf "no table $name\n" ; return [] ; } my $SQL = _mk_sql_delete ( $name, %opts ) ; printf "${SQL}\n" if $self -> Debug ; my $res = $dbh -> do ( $SQL ) ; # or Carp::confess "can't do $SQL" ; $res ; } sub count { OBB::A_ge 2, scalar @_ ; my $self = shift ; my $name = shift ; my %opts = @_ ; my $tups = $self -> select ( $name, %opts, cols => 'count(*) as count' ) ; $tups -> [ 0 ] [ 0 ] ; } sub aggregates { OBB::A_ge 4, scalar @_ ; my $self = shift ; my $name = shift ; my $col = shift ; my @typs = @_ ; my $tups = $self -> select ( $name , cols => [ map "$_($col) as $_", @typs ] ) ; @typs > 1 ? @{ $tups -> [ 0 ] } : $tups -> [ 0 ] [ 0 ] ; } sub min { aggregates ( @_, 'MIN' ) ; } sub max { aggregates ( @_, 'MAX' ) ; } sub avg { aggregates ( @_, 'AVG' ) ; } sub mma { aggregates ( @_, qw(MIN MAX AVG) ) ; } sub check { my $self = shift ; my $res = 1 ; for my $name ( sort @{ $self -> _tabs } ) { unless ( $self -> has_col ( $name, TIME ) ) { printf "[err] no column TIME in $name" ; $res = 0 ; } } $res ; } sub save_hash { OBB::A_in 3, 5, scalar @_ ; my $self = shift ; my $name = shift ; my $hash = shift ; my $time = shift || time ; my $aadd = shift || 0 ; my @cols = () ; my @vals = () ; my $SQL ; for my $atr ( sort keys %$hash ) { my $val = $hash -> { $atr } ; if ( $self -> has_col ( $name, $atr ) ) { push @cols, $atr ; push @vals, $val ; } else { printf "[warn] no $name.$atr ignore [%s]\n", ( $val || '' ) ; } } unless ( @cols ) { printf "[err] no valid cols\n" ; return 0 ; } if ( ! $aadd and $self -> has_row ( $name, $time ) ) { my $ulst = join ',', map "$_ = ?", @cols ; $SQL = sprintf "UPDATE %s SET %s WHERE %s = %s" , $name, $ulst, TIME, $time ; } else { my $clst = join ',', @cols ; my $vlst = join ',', map '?', @cols ; $SQL = sprintf "INSERT INTO %s ( %s, %s ) VALUES ( %s, %s )" , $name, TIME, $clst, $time, $vlst ; } printf "${SQL} ; [%s]\n", join ',', map { null $_ ; } @vals if $self -> Debug ; $SQL .= "\n" ; my $sth = $self -> dbh -> prepare ( $SQL ) or Carp::confess "can't prep $SQL" ; $sth -> execute ( @vals ) ? $time : 0 ; } sub insert_tups { OBB::A_is 4, scalar @_ ; my $self = shift ; my $name = shift ; my $nams = shift ; my $tups = shift ; my @cols = () ; my @idxs = () ; my $res = 1 ; unless ( grep $_ eq TIME, @$nams ) { printf "[err] no TIME in tuples?\n" ; return 0 ; } my $idx = 0 ; for my $nam ( @$nams ) { if ( $self -> has_col ( $name, $nam ) ) { push @cols, $nam ; push @idxs, $idx ; } else { print "[warn] no $name.$nam ; ignore\n" ; } $idx ++ ; } unless ( @cols ) { printf "[err] no valid cols\n" ; return 0 ; } my $clst = join ',', @cols ; my $vlst = join ',', map '?', @cols ; my $SQL = sprintf "INSERT INTO %s ( %s ) VALUES ( %s )" , $name, $clst, $vlst ; printf "${SQL} ; \@tup[%s]\n", join ',', map { $_ ; } @idxs if $self -> Debug ; $SQL .= "\n" ; my $dbh = $self -> dbh ; my $sth ; my $cnt = 0 ; $dbh -> begin_work or die $dbh -> errstr ; $sth = $dbh -> prepare ( $SQL ) or Carp::confess "can't prep $SQL" ; for my $tup ( @$tups ) { my $r = ( $sth -> execute ( @$tup [ @idxs ] ) ? $res : 0 ) ; die "bad tup [@$tup]\n" unless $r ; $res &= $r ; $cnt ++ ; } $dbh -> commit or die $dbh -> errstr ; ( $res ? scalar @$tups : 0 ) ; } sub zap { OBB::A_is ( 4, scalar @_ ) ; my $self = shift ; my $tnam = shift ; my $IVAL = shift ; my $keep = shift ; my $res ; OBB -> Xit ( "no table [$tnam]" ) unless $self -> has_tab ( $tnam ) ; my $tmax = time - $keep ; $tmax -= $tmax % $IVAL ; my $date = Util::ddate $tmax ; my $grp = "TIME / $IVAL" ; my @cols = grep $_ ne 'IVAL', $self -> user_cols ( $tnam ) ; my $colt = 'MIN ( TIME ) as TIME' ; my $coli = 'SUM ( IVAL ) as IVAL' ; my $cnt = 0 ; OBB::TT "zap tnam %s IVAL %s tmax %s\n %s" , $tnam, $IVAL, $date, join ',', @cols ; my $rows = $self -> select ( $tnam , cols => [ $colt , $coli , map { "SUM ( IVAL * $_ ) / SUM ( IVAL ) as $_" } @cols ] , where => "TIME < $tmax" , group_by => $grp , having => '( COUNT(*) > 1 AND SUM ( IVAL ) > 0 )' ) ; my $rcnt = scalar @$rows ; Util::logt ( "zap %s %s chunks %s", $IVAL, $date, $rcnt ) ; for my $row ( @$rows ) { my $hash ; @{$hash} { qw(TIME IVAL), @cols } = @$row ; my $TIME = $hash -> { TIME } ; $TIME = $TIME - $TIME % $IVAL ; delete $hash -> { TIME } ; $self -> dbh -> begin_work ; my $del = $self -> delete ( $tnam, where => "$TIME <= TIME and TIME < $TIME + $IVAL" ) ; $self -> save_hash ( $tnam, $hash, $TIME ) ; $self -> dbh -> commit ; my $dat = Util::ddate $TIME ; my $ivl = $hash -> { IVAL } ; Util::logt ( "zap $del %s ival $ivl", $dat ) ; $cnt += $del ; } $self -> dbh -> do ( 'VACUUM' ) ; OBB::TT "zap $IVAL $date done" ; my $err = $self -> Err ; $res = ( $err ? "no good: $err" : "ok zapped $cnt rows in $rcnt chunks" ) ; Util::logt ( 'zap %s', $res ) ; $res ; } sub zap_old { OBB::A_is ( 3, scalar @_ ) ; my $self = shift ; my $tnam = shift ; my $keep = shift ; OBB -> Xit ( "no table [$tnam]" ) unless $self -> has_tab ( $tnam ) ; my $tmax = time - $keep ; my $date = Util::ddate $tmax ; my $cnt = $self -> count ( $tnam ) ; OBB::TT "zap_old $date ; have $cnt" ; $self -> dbh -> begin_work ; my $del = $self -> delete ( $tnam, where => "TIME < $tmax" ) ; $del = 0 if $del and $del eq '0E0' ; $self -> dbh -> commit ; OBB::TT "zap_old $date done ; del $del" ; my $err = $self -> Err ; my $res = ( $err ? "no good zap_old : $err" : "ok zapped $del old rows in $tnam" ) ; Util::logt ( 'zap_old %s %s', $date, $res ) ; $res ; } 1 ; __END__ =pod =head1 NAME Dmon.pm - distributed monitor =head1 LICENSE You may distribute under the terms of either the GNU General Public License or the Artistic License, as specified in the Perl 5.10.0 README file. =head1 AUTHOR =for html Dmon.pm © 2015-2016 Henk P. Penning - All rights reserved ; dmon-0.05-p214 - Thu Oct 13 17:38:35 2016 =for man Henk P. Penning, ; dmon-0.05-p214 - Thu Oct 13 17:38:35 2016 =cut