# -------------------------------------------------------------------------- # Copyright (c) 2011 Henk P. Penning. # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # 1. Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the # distribution. # # THIS SOFTWARE IS PROVIDED BY Henk P. Penning, ``AS # IS'' AND ANY # EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Henk P. Penning OR # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # # The views and conclusions contained in the software and documentation # are those of the authors and should not be interpreted as representing # official policies, either expressed or implied, of Henk P. Penning. # -------------------------------------------------------------------------- # "Simplified BSD License" or "FreeBSD License" # http://en.wikipedia.org/wiki/BSD_licenses use strict ; our $LCL_JSON ; BEGIN { my @jsons = qw(JSON::XS JSON::PP JSON) ; for ( @jsons ) { if ( eval "use $_ ; 1 ;" ) { $LCL_JSON = $_ ; last ; } } die sprintf "can't find %s\n", join ' or ', @jsons unless defined $LCL_JSON ; die sprintf "%s can't do 'new()'\n", $LCL_JSON unless $LCL_JSON -> can ( 'new' ) ; } use Blib ; use Carp ; use File::Path ; our $PRG = 'iim' ; our $VERSION = '0.4.9' ; our $DEV = 0 ; our $INO = 1 ; our $NLINK = 3 ; our $SIZE = 7 ; our $ATIME = 8 ; our $MTIME = 9 ; our $MAX_TRIES = 3 ; our $LOCK_TRIES = 3 ; our $LOCK_SLEEP = 1 ; our $LCK_FILE = "iim.lck" ; our $LOG_FILE = "iim.log" ; our $PID_FILE = "iim.pid" ; our $SCB_FILE = "iim-scb.html" ; our $SCB_TMPL = "iim-scb-tmpl.html" ; our $SCB_SMPL = "$SCB_TMPL.sample" ; our $REQ_LIST = 'request-list' ; our $MAX_AGE_INIT = '2d' ; our $REOPEN_IVAL = 300; our $SLEEP_NAP = 15 ; our $AGGREGATOR = [ qw(1h 6h 1d 1W 1M 1Q 1Y Z) ] ; our $UMASK = '022' ; our $CPAN_LOCL = 'local' ; our $CPAN_TEMP = "$CPAN_LOCL/iim" ; our $RSYNC_TMP = 'rsync-tmp' ; our $HOSTNAME = `hostname 2>/dev/null` ; our $IIM_SITE = 'http://www.staff.science.uu.nl/~penni101/iim' ; our $IIM_LOGO = 'iim-logo.png' ; our $IIM_LOGO_URL = "$IIM_SITE/images/$IIM_LOGO" ; our $STAMP = 'indices/timestamp.txt' ; our @RSYNC_OPTS = ( '--no-motd' , '-a' , '--stats' , '-z' , '--exclude' => "/$CPAN_LOCL/" ) ; our @RSYNC_LIST = ( @RSYNC_OPTS ) ; our @RSYNC_FULL = ( @RSYNC_OPTS , '-v' , '--delete' ) ; our @RSYNC_CMP = ( @RSYNC_FULL , '-n' , '--timeout' => 300 ) ; our $NOERR = 0 ; our $E_XEC = 11 ; our $E_SIG = 12 ; our $E_XIT = 13 ; our $E_PRT = 23 ; # Partial transfer due to error our $E_MIS = 24 ; # Partial transfer due to vanished source files sub PRG { $PRG ; } sub SCB_FILE { $SCB_FILE ; } sub SCB_TMPL { $SCB_TMPL ; } sub SCB_SMPL { $SCB_SMPL ; } sub LCL_JSON { $LCL_JSON ; } ########################################################## package RF ; use Time::HiRes qw(gettimeofday) ; use Fcntl qw(:flock) ; use IO::Pipe ; our ( @ISA, @EXPORT ) ; BEGIN { require Exporter ; @ISA = qw(Blib) ; @EXPORT = qw(LOGf LOGx) ; } my @methods = ( qw( conf type epoc next_sync next_reopen next_rotate tag scores) , qw( sync status) ) ; eval Blib -> mk_methods ( @methods ) ; our %name4key = ( '$0' => 'dollar0' , 'File::Rsync::Mirror::Recentfile' => 'FRMR' ) ; our %name4row = ( 'Blib::JSON::rfile::recent' => 'event' , 'Blib::JSON::rfile::meta::aggregator' => 'interval' ) ; sub LOGx { my $x = shift ; my $date = localtime ; printf "%s %s\n", $date, $x ; } sub LOGf { my $f = shift ; my $date = localtime ; my $msg = sprintf $f, @_ ; printf "%s %s\n", $date, $msg ; } sub loglevel { my %opts = @_ ; my $res ; $res = $opts { loglevel } if exists $opts { loglevel } ; $res = 'quiet' if $opts{q} ; $res = 'verbose' if $opts{v} ; $res = 'debug' if $opts{d} ; $res ; } sub init { my $self = shift ; my %opts = ( -root => undef , @_ ) ; my $llvl = loglevel %opts ; $self -> set_loglevel ( $llvl ) if $llvl and Blib -> _is_loglevel ( $llvl ) ; $opts { loglevel } = $llvl if $llvl ; my $conf = $self -> conf ( $self -> get_config ( %opts ) ) ; die $conf unless ref $conf ; $self -> set_loglevel ( $conf -> loglevel ) unless $llvl ; $ENV { RSYNC_PASSWORD } = $conf -> passwd ; Blib::JSON -> add_name4key ( %name4key ) ; Blib::JSON -> add_name4row ( %name4row ) ; my $json = ( $conf -> model_file ? $self -> _get_json ( $conf -> model_file ) : $LCL_JSON -> new -> decode ( $conf -> model ) ) ; $self -> type ( Blib::JSON -> mk_model ( 'rfile', $json ) ) ; $self -> epoc ( undef ) ; # undef == 'no epoch determined' $self -> tag ( undef ) ; # undef == 'plain mode' $self -> next_sync ( 0 ) ; # 0 == 'nothing scheduled' $self -> next_rotate ( 0 ) ; # will rotate if rotate.count > 0 $self -> next_reopen ( 0 ) ; $self -> scores ( RF::Scores -> make ( parent => $self ) ) ; $self -> sync ( RF::Sync -> make ( parent => $self ) ) ; $self -> status ( 'initializing' ) ; $self -> set_umask ; $self ; } sub config_list { my $self = shift ; my $home = ( getpwuid $< ) [ 7 ] or die "can get homedir '$<' ($!)" ; ( "$PRG.conf", "$home/.$PRG.conf", "/etc/$PRG.conf" ) ; } sub get_config { my $self = shift ; my %opts = @_ ; my $root = $opts { -root } ; unless ( $root ) { $root = ( grep { -f $_ ; } $self -> config_list ) [ 0 ] ; if ( $root ) { $opts { -root } = $root ; } else { LOGf "can't find a config file :\n %s\n" , join "\n ", $self -> config_list ; $opts { -root } = '/dev/null' ; } } # now we have a root in $opts{-root} Blib::Mods::Conf -> make ( -parent => $self, %opts ) ; } sub try_reload_conf { my $self = shift ; my %opts = @_ ; my $conf = $self -> conf ; if ( $conf -> hot_config and $conf -> touched ) { RF::LOGx "config touched" ; my $root = $conf -> root ; my $new_conf = $self -> get_config ( -root => $root, %opts ) ; if ( ref $new_conf ) { $conf = $self -> conf ( $new_conf ) ; $conf -> show ( ' ' ) ; $self -> set_loglevel ( $conf -> loglevel, 'reload' ) ; RF::LOGx "config reloaded" ; } else { chomp ( my $err = $new_conf ) ; $self -> conf -> show ( ' ' ) ; RF::LOGx $err ; RF::LOGx "no new config loaded" ; } } } sub mode { my $self = shift ; $self -> tag ? 'daemon' : 'plain' ; } sub req_file { my $self = shift ; sprintf "%s/%s.%s" , $self -> conf -> temp, $REQ_LIST , ( $self -> tag || 'term' ) ; } sub put_req_file { my $self = shift ; my $list = shift ; my $file = $self -> req_file ; open FILE, ">$file" or die "put_req_file : fail ($!)" ; printf FILE "%s\n", join "\n", @$list ; close FILE ; } sub next_status { my $self = shift ; my $stat = shift ; my $push = shift ; my $prev = $self -> status ; $self -> status ( $stat ) ; $self -> scores -> put_scoreboards if $push ; $prev ; } sub status_html { my $self = shift ; my $stat = $self -> status ; $stat eq 'looping' ? $stat : "$stat" ; } sub age { my $self = shift ; my $stmp = $self -> path ( 'local', $STAMP ) ; my $res = 'undef' ; if ( open STMP, $stmp ) { my $line = ; my $time = ( split ' ', $line ) [ 0 ] ; if ( $time =~ /^\d+$/ ) { $res = $self -> conf -> _text4secs ( time - $time, 0 ) ; } close STMP ; } $res ; } sub xlock { my $self = shift ; my $file = shift ; my $cnt = 0 ; my $res = 0 ; unless ( open LOCK, ">$file" ) { LOGx "can't write lock $file" ; exit ; } while ( $cnt < $LOCK_TRIES ) { if ( flock LOCK, LOCK_EX|LOCK_NB ) { $res = 1 ; last ; } $cnt ++ ; sleep $LOCK_SLEEP ; } $res ; } sub start_daemon { my $self = shift ; my $name = shift ; my @args = @_ ; my $pid ; use Proc::Daemon ; unless ( -d $name or mkdir $name, 0755 ) { LOGx "can't mkdir $name ($!)" ; $self -> _exit ; } unless ( $self -> xlock ( "$name/$LCK_FILE" ) ) { LOGx "can't lock ; daemon '$name' is already running" if $self -> terse ; $self -> _exit ; } my $daemon = Proc::Daemon -> new ( work_dir => '.' , exec_command => ( sprintf 'perl iim %s' , join ' ' , grep { ! /^--?q$/ ; } map { /^--?daemon$/ ? '--tag' : $_ ; } @args ) , child_STDOUT => ">>$name/$LOG_FILE" , child_STDERR => ">>$name/$LOG_FILE" , pid_file => "$name/$PID_FILE" ) ; if ( -f "$name/$PID_FILE" and $pid = $daemon -> Status ( "$name/$PID_FILE" ) ) { LOGx "iim '$name' is already running ; pid $pid" if $self -> terse ; } else { $pid = $daemon -> Init () ; LOGx "started iim daemon '$name' ; pid $pid" ; } } sub be_the_daemon { my $self = shift ; my $name = shift ; unless ( $self -> xlock ( "$name/$LCK_FILE" ) ) { LOGx "can't lock ; can't be the daemon" ; $self -> _exit ; } $self -> tag ( $name ) ; $self -> reopen_log ; } sub _sleep { my $self = shift ; my $ival = shift ; my $time = $self -> conf -> $ival ; LOGx "sleeping $time ($ival) ..." if $self -> debug ; $self -> scores -> put_scoreboards ; $self -> scores -> incr_t_slp ( $time ) ; sleep $time ; } sub _exit { my $self = shift ; my $stat = shift || 0 ; LOGx "exit ($stat)" if $self -> terse ; $self -> status ( "exit $stat" ) ; $self -> scores -> put_scoreboards if $self -> terse ; exit $stat ; } sub set_umask { my $self = shift ; my $umask = oct $self -> conf -> iim_umask ; my $pmask = umask $umask ; LOGf "set umask 0%o -> 0%o", $pmask, $umask if $umask != $pmask and $self -> debug ; } sub check_max_run_time { my $self = shift ; my $max = $self -> conf -> max_run_time ; if ( $max and time >= $^T + $max ) { LOGx "scheduled exit" ; $self -> _exit } } sub log_next_exit { my $self = shift ; my $max = $self -> conf -> max_run_time ; LOGx ( $max ? sprintf 'exit scheduled at %s', scalar localtime $^T + $max : 'no exit is scheduled' ) ; } sub want_rotate_now { my $self = shift ; my $count = $self -> conf -> rotate -> { count } ; $self -> tag and $count and time > $self -> next_rotate ; } sub _rotate_logs { my $self = shift ; my $log = shift ; my $cnt = $self -> conf -> rotate -> { count } ; unlink "$log.$cnt" ; # ignore status for ( my $i = $cnt - 1 ; $i > 0 ; $i -- ) { my $src = sprintf "%s.%s", $log, $i ; my $dst = sprintf "%s.%s", $log, $i + 1 ; rename $src, $dst or LOGx "can't rename $src, $dst" if -f $src ; } my $dst = "$log.1" ; rename $log, $dst or LOGx "can't rename $log, $dst" if -f $log ; } sub want_reopen_now { my $self = shift ; $self -> tag and time > $self -> next_reopen ; } sub reopen_log { my $self = shift ; my $tag = $self -> tag ; if ( $tag ) { my $log = "$tag/$LOG_FILE" ; if ( $self -> want_rotate_now ) { LOGx "rotate $log" if $self -> terse ; $self -> _rotate_logs ( $log ) ; $self -> next_rotate ( time + $self -> conf -> rotate -> { ival } ) ; } LOGx "reopen_log $log" if $self -> debug ; close STDOUT ; unless ( open STDOUT, ">>$log" ) { printf STDERR "re_open_log : can't write $log (again) for STDOUT ; exit\n" ; $self -> _exit ( 1 ) ; } close STDERR ; unless ( open STDERR, '>>&STDOUT' ) { printf STDOUT "re_open_log : can't write $log (again) for STDERR ; exit\n" ; $self -> _exit ( 1 ) ; } select STDERR ; $| = 1 ; select STDOUT ; $| = 1 ; $self -> next_reopen ( time + $REOPEN_IVAL ) ; } } sub set_next_full { my $self = shift ; my $ival = shift || 'full_sync_interval' ; if ( $self -> conf -> full_sync_interval ) { my $next = time + $self -> conf -> $ival ; LOGf ( "full sync scheduled at %s", scalar localtime $next ) ; $self -> next_sync ( $next ) ; } else { $self -> next_sync ( 0 ) ; } $self -> next_sync ; } sub want_full_now { my $self = shift ; if ( $self -> conf -> full_sync_interval ) { my $next = $self -> next_sync ; $next and time > $next ; } else { 0 ; } } sub version { sprintf '%s-%s', $PRG, $VERSION ; } sub Version { sprintf '%s version %s', $PRG, $VERSION ; } sub mk_temps { my $self = shift ; my $conf = $self -> conf ; use File::Path qw(mkpath rmtree) ; my $temp = $conf -> temp ; mkpath $temp ; # or die for my $dir ( $conf -> rtmp ) { rmtree $dir if -d $dir ; mkdir $dir, 0777 or die "can't mkdir $dir ($!)" ; } my $logo_src = $IIM_LOGO ; my $logo_dst = $self -> path ( 'temp', $IIM_LOGO ) ; # copy logo ; ignore all errors if ( -f $logo_src and ! -f $logo_dst ) { use File::Copy ; copy $logo_src, $logo_dst ; chmod 0644, $logo_dst ; } } sub _get_json { my $self = shift ; my $file = shift ; open FILE, $file or Carp::confess ( "can't open json '$file' ($!)" ) ; my $json = $LCL_JSON -> new -> decode ( join '', ) ; close FILE ; $json ; } sub get_json { my $self = shift ; my $file = shift ; my $json = $self -> _get_json ( $file ) ; $self -> type -> bless ( $json ) ; } sub name_recent_file { my $self = shift ; my $interval = shift || $AGGREGATOR -> [ 0 ] ; sprintf "RECENT-%s.json", $interval ; } sub all_Recents { my $self = shift ; [ map { $self -> name_recent_file ( $_ ) ; } @$AGGREGATOR ] ; } sub have_all_Recents { my $self = shift ; my $res = 1 ; for my $name ( @{ $self -> all_Recents } ) { $res = 0 unless $self -> lstt ( 'local', $name ) ; } LOGx "some Recents are missing ..." unless $res ; $res ; } sub get_Recents { my $self = shift ; my $conf = $self -> conf ; my %inos = () ; my $names = $self -> all_Recents ; my $res ; for my $name ( @$names ) { $inos { $name } = $self -> temp_ino ( $name ) ; } $res = $self -> get_remote ( $names, 'r' ) ; for my $name ( @$names ) { my $old = $inos { $name } ; my $new = $self -> temp_ino ( $name ) ; my $dst = $self -> path ( 'temp', $name ) ; Carp::confess ( "get_Recents: bad dst ($dst)" ) unless -f $dst ; unless ( defined $old ) { Carp::confess ( "get_Recents: undefined ino old" ) } elsif ( ! defined $new ) { Carp::confess ( "get_Recents: undefined ino new" ) } elsif ( $old != $new ) { my $size = -s $dst ; $self -> scores -> incr_fetf ; $self -> scores -> incr_fetr ; $self -> scores -> incr_s_fil ( $size ) ; $self -> scores -> incr_s_rct ( $size ) ; } } $res ; } sub get_Recents_repeat { my $self = shift ; my $cnt = 0 ; while ( ! $self -> get_Recents ) { LOGx "get_Recents failed ; sleeping ..." ; $self -> _sleep ( ++ $cnt < 3 ? 'nap' : 'sleep_main_loop' ) ; } } sub link_Recents { my $self = shift ; LOGx "link_Recents: link local in temp" if $self -> debug ; for my $name ( @{ $self -> all_Recents } ) { my $src = $self -> lstt ( 'local', $name ) ; my $dst = $self -> path ( 'temp', $name ) ; if ( $src ) { unlink $dst ; link $src, $dst ; } else { LOGx "link_Recents: no src for $dst" ; } } } sub move_Recent { my $self = shift ; my $name = shift ; my $src = $self -> conf -> temp . "/$name" ; my $dst = $self -> conf -> local . "/$name" ; unless ( -f $src ) { LOGx "move_Recent: no $src" ; } elsif ( ! -f $dst ) { LOGx "move_Recent: no $dst" ; } else { my ( $i_src, $t_src ) = ( stat $src ) [ $INO, $MTIME ] ; my ( $i_dst, $t_dst ) = ( stat $dst ) [ $INO, $MTIME ] ; if ( $i_src != $i_dst and $t_src > $t_dst ) { unlink $dst ; link $src, $dst ; LOGx "move from temp to local : $name" if $self -> verbose or $name !~ /^RECENT-\dh\.json$/ ; } } } sub move_Recents { my $self = shift ; for my $name ( @{ $self -> all_Recents } ) { $self -> move_Recent ( $name ) ; } } sub by_epoch { $b -> { epoch } <=> $a -> { epoch } ; } sub merge { my $self = shift ; [ sort by_epoch map { ( @$_ ) ; } @_ ] ; } sub log_sync_stat { my $self = shift ; my $stat = shift ; my $tag = shift ; my $sig = ( $stat & 127 ) ; my $xit = ( $stat >> 8 ) ; if ( $stat == -1 ) { LOGx "$tag: failed to execute: $!" ; } elsif ( $sig ) { LOGx "$tag: child died on signal $sig" ; } elsif ( $xit ) { LOGx "$tag: child exited with value $xit" ; } } sub get_remote { my $self = shift ; my $fils = shift ; my $kind = shift || '' ; # 'r' for Recents my $conf = $self -> conf ; my $sync = $self -> sync ; my $prev = $self -> next_status ( 'syncing' ) ; $self -> scores -> incr_rs ; $sync -> request ( $fils, $kind ) ; my $time = $sync -> timer ; my $err = $sync -> error ; if ( $err == -1 ) { RF::LOGf "Sync::open : no pipe for %s", $sync -> cmd ; } elsif ( $err ) { $self -> log_sync_stat ( $err, 'get_remote' ) ; $self -> scores -> incr_rsf ; $self -> scores -> incr_t_out ( $time ) ; } else { $self -> scores -> incr_t_con ( $time ) ; } $self -> next_status ( $prev ) ; ! $err ; } sub full_sync { my $self = shift ; my $doit = shift ; my $conf = $self -> conf ; my $sync = $self -> sync ; my $prev = $self -> next_status ( 'full sync', 'push' ) ; unless ( $doit or $self -> conf -> allow_full_syncs ) { LOGx "full_sync : not allowed " ; $self -> _exit ( 1 ) ; } LOGx 'full sync start' ; $self -> scores -> incr_frs ; my $err = $sync -> full ; my $time = $sync -> timer ; if ( $err ) { $self -> log_sync_stat ( $err, 'full sync' ) ; $self -> scores -> incr_t_out ( $time ) ; $self -> scores -> incr_frsf ; } else { LOGx 'full sync: ok' ; $self -> scores -> incr_t_frs ( $time ) ; } $self -> next_status ( $prev ) ; ! $err ; } sub full_sync_repeat { my $self = shift ; my $doit = shift ; my $prev = $self -> next_status ( 'full syncs until success' ) ; LOGx "doing full syncs until one succeeds" ; while ( ! $self -> full_sync ( $doit ) ) { LOGx "full sync failed ; sleeping ..." ; $self -> _sleep ( 'sleep_init_epoch' ) ; } $self -> next_status ( $prev ) ; } sub compare { my $self = shift ; my $conf = $self -> conf ; my $sync = $conf -> prog_rsync ; my $user = $conf -> user ? $conf -> user . '@' : '' ; my $remo = $user . $conf -> remote ; my $locl = $conf -> local ; my @cmd = ( $sync , @RSYNC_CMP , $remo , $locl ) ; LOGf "%s\n", join ' ', @cmd ; exec @cmd ; } sub path { my $self = shift ; my $tree = shift ; my $path = shift ; sprintf "%s/%s", $self -> conf -> $tree, $path ; } sub lstt { my $self = shift ; my $tree = shift ; my $path = shift ; my $file = $self -> path ( $tree, $path ) ; ( lstat $file ) ? $file : undef ; } sub in_locl { my $self = shift ; $self -> lstt ( 'local', $_[0] ) ; } sub in_temp { my $self = shift ; $self -> lstt ( 'temp', $_[0] ) ; } sub get_EPOCH_local_or_remote { my $self = shift ; my $src = shift ; die "get_EPOCH_local_or_remote : bad src ($src)" unless $src eq 'remote' or $src eq 'local' ; my $name = $self -> name_recent_file ; my $res ; while ( ! $res and $name ) { my $file = ( ( $src eq 'remote' ) ? $self -> in_temp ( $name ) : $self -> in_locl ( $name ) ) ; return undef unless defined $file ; my $json = $self -> get_json ( $file ) ; my $events = $json -> recent ; my $next = ( exists $json -> meta -> { merged } ? $json -> meta -> merged -> into_interval : undef ) ; if ( @$events ) { $res = $events -> [ 0 ] -> epoch ; } elsif ( ! defined $next ) { LOGx "no next in $name" ; return undef ; } else { $name = $self -> name_recent_file ( $next ) ; } } $res ; } sub get_EPOCH_remote { my $self = shift ; $self -> get_Recents_repeat ; $self -> get_EPOCH_local_or_remote ( 'remote' ) ; } sub get_EPOCH_local { my $self = shift ; $self -> get_EPOCH_local_or_remote ( 'local' ) ; } sub set_epoch { my $self = shift ; my $epoc = shift ; LOGf "set EPOCH %s", $epoc ; $self -> epoc ( $epoc ) ; $epoc ; } sub _init_epoch { my $self = shift ; my $conf = $self -> conf ; my $ival = $conf -> max_age_init ; my $res = undef ; my $epoc_r = $self -> get_EPOCH_remote ; if ( ! $epoc_r ) { LOGx "can't get epoch from remote" ; } else { LOGx $conf -> age4epoc ( 'remote', $epoc_r ) ; my $epoc_l = $self -> get_EPOCH_local ; if ( ! $epoc_l or $epoc_r - $epoc_l > $ival ) { if ( $epoc_l ) { my $itxt = $conf -> _text4secs ( $ival, 0 ) ; LOGx $conf -> age4epoc ( 'local', $epoc_l ) ; LOGx "remote is more than $itxt ahead" ; } else { LOGx "no local epoch" ; } $self -> full_sync_repeat ; } else { LOGx $conf -> age4epoc ( 'local', $epoc_l ) ; LOGx 'set epoch from local' ; $res = $epoc_l ; } } $res ; } sub init_epoch { my $self = shift ; my $epoc = $self -> _init_epoch ; while ( ! defined $epoc ) { $self -> _sleep ( 'sleep_init_epoch' ) ; $epoc = $self -> _init_epoch ; } $self -> set_epoch ( $epoc ) ; } sub mtime { my $self = shift ; ( lstat $_ [ 0 ] ) [ $MTIME ] ; } sub nlink { my $self = shift ; ( lstat $_ [ 0 ] ) [ $NLINK ] ; } sub lsize { my $self = shift ; ( lstat $_ [ 0 ] ) [ $SIZE ] ; } sub ino { my $self = shift ; ( lstat $_ [ 0 ] ) [ $INO ] ; } sub is_dir { my $self = shift ; my $name = shift ; lstat $name ; -d _ ; } sub temp_ino { my $self = shift ; my $path = shift ; $self -> ino ( $self -> path ( 'temp', $path ) ) ; } sub events_since_ival { my $self = shift ; my $ival = shift ; my $epoc = $self -> epoc ; my $name = $self -> name_recent_file ( $ival ) ; my $res ; my $rec = [] ; my $file = $self -> in_temp ( $name ) ; if ( defined $file ) { my $json = $self -> get_json ( $file ) ; my $events = $json -> recent ; for my $event ( @$events ) { $event -> init ( base => $self, ival => $ival ) ; } # recurse if all events (if any) are older than $epoc # event are ordered new to old ; the last is eldest # meta -> merged does not exist in Z ; it should be 'merged:null' my $e_cnt = @$events ; if ( ( $e_cnt == 0 or $epoc < $events -> [ $e_cnt - 1 ] -> epoch ) and ( exists $json -> meta -> { merged } ) ) { my $next = $json -> meta -> merged -> into_interval ; if ( $next ) { $rec = $self -> events_since_ival ( $next ) ; } # else don't recurse ; } if ( ref $rec ) { my $mark = $epoc ; $mark = $rec -> [ 0 ] -> epoch if @$rec ; my $new = [ grep { $_ -> epoch > $mark } @$events ] ; $res = RF -> merge ( $rec, $new ) ; } else { # an error occured in recursion ; return the error $res = $rec ; } } else { $res = "events_since_ival: missing in temp ($name)" ; } $res ; } sub new_events { my $self = shift ; my $res = $self -> events_since_ival ( '1h' ) ; if ( ref $res ) { my $ev = @$res ; my $evn = grep { $_ -> type eq 'new' } @$res ; my $evd = $ev - $evn ; $self -> scores -> incr_ev ( $ev ) ; $self -> scores -> incr_evn ( $evn ) ; $self -> scores -> incr_evd ( $evd ) ; } else { LOGx $res ; $res = [] ; } $res ; } sub find_new_events { my $self = shift ; my $res = [] ; my $events = $self -> new_events ; if ( ref $events ) { my $show = ( $self -> verbose ? $events : [ grep { $_ -> must_show } @$events ] ) ; LOGx $self -> show_events ( $show ) if @$show ; $res = $events ; } else { LOGx $events ; } $res ; } sub show_events { my $self = shift ; my $list = shift ; my $res = '' ; if ( @$list ) { my $news = join "\n ", map { my $have = $self -> in_locl ( $_ -> path ) ; my $del = 'del ' . ( $have ? ( -l $have ? 'lnk' : ( -d $have ? 'dir' : ( -f $have ? 'fil' : 'xxx' ) ) ) : 'dud' ) ; sprintf "%.5f %s %-7s %s" , $_ -> epoch , $_ -> ival , ( ( $_ -> type eq 'new' and $have ) ? 'new upd' : ( ( $_ -> type eq 'delete' ) ? $del : $_ -> type ) ) , $_ -> path ; } reverse @$list ; $res = sprintf "---------------------------------------\n %s", $news ; } $res ; } sub Readdir { my $self = shift ; my $dir = shift ; my $res = [] ; if ( opendir DIR, $dir ) { $res = [ grep { ! /^\.\.?$/ } sort readdir DIR ] ; closedir DIR ; } $res ; } sub Rm { my $self = shift ; my $fil = shift ; print "rm $fil\n" if $self -> debug ; unlink $fil ; } sub Rm_rf { my $self = shift ; my $obj = shift ; print "rm-rf $obj\n" if $self -> debug ; if ( -l $obj or -f $obj ) { $self -> Rm ( $obj ) ; } elsif ( -d $obj ) { for my $fil ( map { "$obj/$_" ; } @{ $self -> Readdir ( $obj ) } ) { $self -> Rm_rf ( $fil ) ; } rmdir $obj ; } } sub get_batch { my $self = shift ; my @eqs = @_ ; my $conf = $self -> conf ; my $local = $conf -> local ; my @news = () ; my %ev4pat = () ; my $events = RF -> merge ( @eqs ) ; my $todo = [] ; my $xcrds = [] ; my $addf = 0 ; my $adds = 0 ; my $delf = 0 ; my $dels = 0 ; # find the last event per path for my $event ( reverse @$events ) { $ev4pat { $event -> path } = $event ; } for my $event ( values %ev4pat ) { my $type = $event -> type ; my $path = $event -> path ; my $file = sprintf "%s/%s", $local, $path ; if ( $type eq 'new' ) { if ( $self -> is_dir ( $file ) ) { LOGx "get_batch : shouldn't happen : Rm_rf ($file)" ; $self -> Rm_rf ( $file ) ; } push @news, $event ; } elsif ( $type eq 'delete' ) { $delf ++ ; $dels += $event -> score_del ; $self -> Rm_rf ( $file ) ; } else { LOGf "weird event type (%s) path (%s)", $type, $path ; } } $self -> get_remote ( [ map { $_ -> path } @news ] ) ; # $event -> tries counts tries resulting in partial xfers # ------------------ # xfer | done | todo # ------------------ # ok | 0 | - # miss | 0 | incr tries ; push discard-candidates # fail | 0 | push todo # ok | 1 | - # miss | 1 | - # fail | 1 | - for my $event ( @news ) { my $path = $event -> path ; if ( $self -> sync -> is_ok or $event -> is_done ) { $addf ++ ; $adds += $event -> score_add ; } elsif ( $self -> sync -> is_partial ) { $event -> incr_tries ; push @$xcrds, $event ; } else { push @$todo, $event ; } } LOGf "%s ;\n files add/replace [bytes] %d [%d] delete %d [%d]" , $self -> sync -> report, $addf, $adds, $delf, $dels if $self -> verbose ; for my $event ( @$xcrds ) { my $path = $event -> path ; LOGf "todo %s - tries %s", $path, $event -> tries ; if ( $event -> tries < $MAX_TRIES ) { push @$todo, $event ; } else { # fetch just this $path and discard # if sync is 'partial' and 'number of files: 0' my $type = $event -> type ; LOGf "$MAX_TRIES tries for %s ; discard?", $path ; $self -> get_remote ( [ $path ] ) ; my $epoc = $event -> epoch ; if ( $self -> sync -> is_ok ) { LOGf "fetched %s %s %s", $type, $epoc, $path ; $addf ++ ; $adds += $event -> score_add ; } elsif ( $self -> sync -> is_partial_no_files ) { LOGf "discard %s %s %s", $type, $epoc, $path ; $self -> scores -> incr_evxd ; } else { my $err = $self -> sync -> error ; LOGf "rsync returned (%s)" , ( $err || 'undef' ) ; LOGf "don't discard %s %s %s", $type, $epoc, $path ; push @$todo, $event ; } } } [ sort { $b -> epoch <=> $a -> epoch } @$todo ] ; } package Blib::JSON::rfile::recent::event ; eval Blib -> mk_methods ( qw(base ival tries ino) ) ; my $SKIP_files = [ 'MIRRORED.BY' , 'MIRRORING.FROM' , 'SITES.html' , 'index.html' , 'indices/du-k.gz' , 'indices/find-ls.gz' , 'indices/ls-lR.gz' , 'indices/mirrors.json' , 'indices/timestamp.txt' , 'indices/cpan-stats.json' , 'misc/cpan-faq.html' , 'authors/02STAMP' , 'authors/00whois.html' , 'authors/00whois.xml' , 'modules/02STAMP' , 'modules/01modules.index.html' , 'modules/01modules.mtime.html' , 'modules/01modules.mtime.rss' , 'modules/02packages.details.txt' , 'modules/02packages.details.txt.gz' , 'modules/06perms.txt' , 'modules/06perms.txt.gz' , 'modules/07mirror.json' , 'modules/07mirror.yml' ] ; my %SKIP = ( pats => [ '^(authors|modules)/RECENT' ] , fils => {} ) ; for my $path ( @$SKIP_files ) { $SKIP { fils } { $path } ++ ; } sub init { my $self = shift ; $self -> Blib::JSON::init ( @_ ) ; my $base = $self -> base ; my $efil = $base -> path ( 'local', $self -> path ) ; my $eino = $base -> ino ( $efil ) ; $self -> ino ( $eino ) ; $self -> tries ( 0 ) ; $self ; } sub must_show { my $self = shift ; my $path = $self -> path ; return 0 if exists $SKIP { fils } { $path } ; for my $pat ( @{ $SKIP { pats } } ) { return 0 if $path =~ /$pat/ ; } 1 ; } sub incr_tries { my $self = shift ; my $tried = $self -> tries ; $self -> tries ( $tried + 1 ) ; $self -> tries ; } sub is_done { my $self = shift ; my $base = $self -> base ; my $path = $self -> path ; my $file = $base -> path ( 'local', $path ) ; my $new = ( lstat $file ) [ $INO ] ; my $old = $self -> ino ; my $res = ( defined $old ? ( defined $new and $new != $old ) : ( defined $new ) ) || 0 ; RF::LOGx "done $path" if $res and $base -> verbose ; $res ; } sub score_add { my $self = shift ; my $base = $self -> base ; my $path = $self -> path ; my $file = $base -> path ( 'local', $path ) ; my $size = 0 ; if ( -l $file ) { $base -> scores -> incr_fetl ; } elsif ( -f $file ) { $size = -s $file ; $base -> scores -> incr_fetf ; $base -> scores -> incr_s_fil ( $size ) ; unless ( defined $self -> ino ) { $base -> scores -> incr_fetn ; $base -> scores -> incr_s_new ( $size ) ; } } elsif ( ! -d $file ) { RF::LOGf "*** can't score file/link ($file)" ; } RF::LOGf "score %8d %s", $size, $path if $base -> debug ; $size ; } sub score_del { my $self = shift ; my $base = $self -> base ; my $file = $base -> path ( 'local', $self -> path ) ; my $size = 0 ; if ( -l $file ) { $base -> scores -> incr_dell ; } elsif ( -f $file ) { $size = -s $file ; $base -> scores -> incr_delf ; $base -> scores -> incr_s_del ( $size ) ; } elsif ( -d $file ) { $base -> scores -> incr_deld ; } else { $base -> scores -> incr_ddud ; } $size ; } ########################################################## package Blib::Mods ; @Blib::Mods::ISA = qw(Blib) ; sub print { print @_ ; } sub printf { printf @_ ; } ########################################################## package Blib::Mods::Conf ; @Blib::Mods::Conf::ISA = qw(Blib) ; our %CNF_defaults = ( remote => 'cpan-rsync.perl.org::CPAN' , user => '' , passwd => '' , sleep_init_epoch => '15m' , sleep_main_loop => '1m' , full_sync_interval => '0' , max_run_time => '4w-15m' , model_file => '' , prog_rsync => '/usr/bin/rsync' , scoreboard_file => '' , scoreboard_template => '' , allow_full_syncs => '1' , hot_config => '0' , loglevel => 'terse' , iim_umask => $UMASK , rotate => { count => '8' , ival => '4w' } , timeout => '300s' ) ; our @REQ_KEYS = qw(local) ; our $model = <<'MODEL' ; { "recent" : [ { "epoch" : "1307095198.77243" , "path" : "authors/RECENT-1h.yaml" , "type" : "new" } ] , "meta" : { "aggregator" : [ "1h", "6h", "1d", "1W", "1M", "1Q", "1Y", "Z" ] , "protocol" : 1 , "interval" : "1d" , "Producers" : { "time" : 1307095198.80471 , "$0" : "/home/mirror/perl5/bin/rrr-server" , "File::Rsync::Mirror::Recentfile" : "0.0.8" } , "filenameroot" : "RECENT" , "minmax" : { "mtime" : 1307091586 , "min" : "1307009184.4988" , "max" : "1307095198.77243" } , "merged" : { "into_interval" : "1W" , "epoch" : "1307077006.35826" , "time" : 1307077006.43983 } , "dirtymark" : "1300184987.04785" , "serializer_suffix" : ".json" } } MODEL our %CNF_KEYS ; for ( @REQ_KEYS, keys %CNF_defaults ) { $CNF_KEYS { $_ } ++ ; } sub CNF_KEYS { sort keys %CNF_KEYS ; } sub set_CNF_default { my $self = shift ; my $key = shift ; my $val = shift ; Carp::confess "set_CNF_default : bad key ($key)" unless exists $CNF_defaults { $key } ; $CNF_defaults { $key } = $val ; } eval Blib -> mk_methods ( keys %CNF_KEYS, qw(parent _includes) ) ; sub init { my $self = shift ; my %opts = ( @_ ) ; my $prnt = shift ; my $file = shift ; $self -> parent ( $opts { -parent } ) ; $self -> _includes ( [] ) ; my @keys = keys %CNF_defaults ; @{ $self } { @keys } = @CNF_defaults { @keys } ; my $err = $self -> get_conf ( $opts { -root } ) ; if ( $err ) { $err ; } else { for my $key ( sort keys %CNF_KEYS ) { $self -> $key ( $opts{$key} ) if defined $opts{$key} ; } $self -> errors or $self ; } } sub default { my $self = shift ; my $opt = shift ; die "default: nothing for $opt" unless exists $CNF_defaults { $opt } ; $CNF_defaults { $opt } ; } sub temp { my $self = shift ; sprintf '%s/%s', $self -> local, $CPAN_TEMP ; } sub rtmp { my $self = shift ; sprintf '%s/%s', $self -> temp, $RSYNC_TMP ; } sub model { my $self = shift ; $model ; } sub nap { my $self = shift ; $SLEEP_NAP ; } sub add_incl { my $self = shift ; my $file = shift ; my $stat = shift ; push @{ $self -> _includes } , Blib::Mods::Conf::Incl -> make ( $file, $stat ) ; } sub includes { my $self = shift ; join ', ', map { $_ -> file ; } @{ $self -> _includes } ; } sub root { my $self = shift ; $self -> _includes -> [ 0 ] -> file ; } sub _split { my $str = shift ; map { $_ eq 'EMPTY' ? '' : $_ } split ' ', $str ; } sub _fmt_vu { my $v = shift ; my $u = shift ; $v = sprintf "%.2f", $v if $v != int $v ; $v ? "$v $u" . ( $v == 1 ? '' : 's' ) : '' ; } ; my $units = [ { nam => 'week' , mod => 0 } , { nam => 'day' , mod => 7 } , { nam => 'hour' , mod => 24 } , { nam => 'minute', mod => 60 } , { nam => 'second', mod => 60 } ] ; # init { my $siz = 1 ; my $mod = 1 ; my $nxt = undef ; for my $u ( reverse @$units ) { $u -> { siz } = $siz *= $mod ; $u -> { nxt } = $nxt ; $mod = $u -> { mod } ; $nxt = $u ; } } sub set_cnts ; sub set_cnts { my $prec = shift ; my $unit = shift ; my $ival = shift ; my $nzs = shift ; return 0 unless $unit ; my $res = 0 ; my $siz = $unit -> { siz } ; my $mod = $unit -> { mod } ; if ( $prec and $nzs == $prec and $ival ) { $res = sprintf '%.2f', $ival / ( $mod * $siz ) ; } else { my $cnt = int ( $ival / $siz ) ; my $nz = ( $cnt ? 1 : 0 ) ; $cnt += set_cnts ( $prec, $unit -> { nxt }, $ival % $siz, $nzs + $nz ) ; if ( $mod and $cnt == $mod ) { $cnt = 0 ; $res = 1 ; } $unit -> { cnt } = $cnt ; } $res ; } ; sub _text4secs { my $self = shift ; my $ival = shift ; my $prec = shift ; $prec = 4 unless defined $prec ; if ( 0.01 <= $ival and $ival < 60 ) { sprintf "%.2f seconds", $ival ; } elsif ( $ival < 0 ) { sprintf "-%.2f seconds", -$ival ; } else { for my $u ( @$units ) { $u -> { cnt } = 0 ; } $ival = int ( $ival + 0.5 ) ; set_cnts ( $prec, $units -> [ 0 ], $ival, 0 ) ; my $res = join ' ', map { _fmt_vu ( @{ $_ } { qw(cnt nam) } ) ; } grep { $_ -> { cnt } } @$units ; $res or '0 seconds' ; } } my %s4u = ( 's' => 1 ) ; $s4u { m } = 60 * $s4u { s } ; $s4u { h } = 60 * $s4u { m } ; $s4u { d } = 24 * $s4u { h } ; $s4u { w } = 7 * $s4u { d } ; sub s4uv { my $v = shift ; my $u = shift ; $v = 1 unless defined $v and length $v ; $u = 's' unless defined $u and length $u ; die "500: no s4u {$u}" unless exists $s4u { $u } ; $v * $s4u { $u } ; } sub _secs4spec { my $self = shift ; my $attr = shift ; my $spec = shift ; my $num = '[-+]?\d+(\.\d)?' ; my $one = "($num)?([smhdw]?)" ; my $all = "^($one)+\$" ; my ( $res, $err ) ; unless ( $spec =~ /$all/ ) { my $msg = '( [+-] NUM [smhdw] ) ...' ; $err = "bad spec ($spec) for '$attr' ; should be like '$msg'" ; } else { my $tmp = $spec ; $res = 0 ; while ( length $tmp ) { die "500: '$tmp' ~! /^$one/" unless $tmp =~ /^$one/ ; my $num = $1 ; my $unit = $3 ; $tmp = $' ; $res += s4uv $num, $unit ; } } $res, $err ; } sub secs4spec { my $self = shift ; my $attr = shift ; $self -> _secs4spec ( $attr, $self -> $attr ) ; } sub age4epoc { my $self = shift ; my $kind = shift ; my $epoc = shift ; sprintf "age %-6s : %s", $kind , $self -> _text4secs ( time - int ( $epoc ), 0 ) ; } sub max_age_init { my $self = shift ; my $ival = $MAX_AGE_INIT ; my ( $res, $err ) = $self -> _secs4spec ( 'max_age_init', $ival ) ; if ( $err ) { die "bad max_age_init '$ival' ($err)" ; } elsif ( ! $res ) { die sprintf "default_full_sync_interval shouldn't be '%s'" , ( defined $res ? $res : '' ) ; } $res ; } sub touched { my $self = shift ; my $res = 0 ; for my $incl ( @{ $self -> _includes } ) { my $mtim = ( stat $incl -> file ) [ $MTIME ] ; $res = 1 if $mtim > $incl -> mtime ; } $res ; } sub get_conf { my $self = shift ; my $FILE = shift ; my $prnt = $self -> parent ; my $stat = [ stat $FILE ] ; my $err = '' ; unless ( @$stat ) { return "config : can't find config file '$FILE'\n" ; } elsif ( grep { $_ -> is_file ( $stat ) ; } @{ $self -> _includes } ) { return "config error : '$FILE' is already included\n" ; } else { $self -> add_incl ( $FILE, $stat ) ; } open FILE, $FILE or return "config $FILE: can't open '$FILE' ($!)" ; my $CONF = join "\n", grep /./, ; close FILE ; $CONF =~ s/\t/ /g ; # replace tabs $CONF =~ s/^[+ ]+// ; # delete leading space, plus $CONF =~ s/\n\n\s+/ /g ; # glue continuation lines $CONF =~ s/\n\n\+\s+//g ; # glue concatenation lines $CONF =~ s/\n\n\./\n/g ; # glue concatenation lines chomp $CONF ; my $opt_d = $self -> debug ; Blib::_pr ( "----\n$CONF\n----\n" ) if $opt_d ; for ( grep ! /^#/, split /\n\n/, $CONF ) { my ($key,$val) = split ' ', $_, 2 ; $val = '' unless defined $val ; $val = '' if $val eq 'EMPTY' ; Blib::_pr ( "conf '$FILE' : key '$key', val '$val'\n" ) if $opt_d ; if ( exists $CNF_KEYS { $key } ) { $self -> $key ( $val ) ; } elsif ( $key eq 'temp' ) { print "ingnoring keyword 'temp' (deprecated) in file $FILE\n" . "- it is safe to remove the 'temp' line from $FILE\n" . ( $val ? "- iim will not use directory $val\n" : '') ; } elsif ( $key eq 'include' ) { $err .= $self -> get_conf ( $val ) ; } elsif ( $key eq 'env' ) { my ( $x, $y ) = split ' ' , $val ; $ENV { $x } = $y ; } else { $err .= "config error in $FILE : " . "unknown keyword '$key' value '$val'\n" ; } } $err ; } sub errors { my $self = shift ; my $req = '' ; my $err = '' ; for my $key ( @REQ_KEYS ) { $req .= "config error : missing required key '$key'\n" unless exists $self -> { $key } ; } for my $timspec ( qw(full_sync_interval sleep_init_epoch sleep_main_loop) , qw(max_run_time timeout) ) { my ( $v, $r) = $self -> secs4spec ( $timspec ) ; if ( $r ) { $err .= "config : $r\n" ; } else { $self -> $timspec ( $v ) ; } } { unless ( ref ( $self -> rotate ) ) { my ( $cnt, $ivl ) = split ' ', $self -> rotate ; $cnt = $CNF_defaults { rotate } { count } unless defined $cnt ; $ivl = $CNF_defaults { rotate } { ival } unless defined $ivl ; $self -> rotate ( { count => $cnt, ival => $ivl } ) ; } my $timspec = $self -> rotate -> { ival } ; my ( $v, $r) = $self -> _secs4spec ( 'rotate interval', $timspec ) ; if ( $r ) { $err .= "config : $r\n" ; } else { $self -> rotate -> { ival } = $v ; } $v = $self -> rotate -> { count } ; ; $err .= sprintf "config error : rotate count is not a number ($v) ;\n" unless $v =~ /^[0-9]+$/ ; } for my $bool ( qw(allow_full_syncs hot_config) ) { my $v = $self -> $bool ; $err .= "config error : bad value for $bool ($v) ; must be 0 or 1\n" unless $v =~ /^[01]$/ ; } { my $v = $self -> loglevel ; $err .= sprintf "config error : bad loglevel ($v) ; must be in (%s)\n" , ( join ', ', Blib -> _loglevels) unless Blib -> _is_loglevel ( $v ) ; } { my $v = $self -> iim_umask ; $err .= sprintf "config error : iim_umask not octal ($v) ;\n" unless $v =~ /^[0-7]+$/ ; } { use Proc::Daemon ; $err .= sprintf "error : perl module Proc::Daemon is too old ; " . "install a recent version ; see the manual ;\n" unless Proc::Daemon -> can ( 'Status' ) and Proc::Daemon -> can ( 'Init' ) ; } unless ( $req ) { my $lcl = $self -> local ; my $tmp = $self -> temp ; for my $pth ( $lcl, $tmp ) { $err .= sprintf "config error : not a full path ($pth) ;\n" unless $pth =~ m!^/! ; } unless ( -d $lcl ) { $err .= "can't find directory local ($lcl)\n" ; } } $req . $err ; } sub show { my $self = shift ; my $sep = shift || '' ; my $res = "config :\n" ; for my $key ( sort keys %$self ) { next if $key =~ m/^_/ ; next if $key eq 'parent' ; my $val = $self -> { $key } ; $val = '********' if $key eq 'passwd' ; $res .= sprintf "$key = '%s'\n", ( defined $val ? $val : '' ) ; if ( $val and ref $val eq 'ARRAY' and scalar @$val ) { $res .= sprintf " %s\n", join "\n ", @$val ; } elsif ( ref $val eq 'HASH' and scalar keys %$val ) { for my $k ( sort keys %$val ) { $res .= sprintf " %s = %s\n", $k, $val -> { $k } ; } } } $res .= sprintf "included '%s'\n", $self -> includes ; if ( $sep ) { chomp $res ; $res =~ s/\n/\n$sep/g ; } RF::LOGx $res ; } sub unbless { my $self = shift ; my $res = {} ; for my $key ( sort keys %$self ) { next if $key =~ m/^_/ ; next if $key eq 'parent' ; next if $key eq 'user' ; next if $key eq 'passwd' ; my $val = $self -> { $key } ; if ( ref ( $val ) =~ /^(ARRAY|HASH)?$/ ) { $res -> { $key } = $val ; } } $res -> { included } = $self -> includes ; $res ; } ########################################################## package Blib::Mods::Conf::Incl ; @Blib::Mods::Conf::Incl::ISA = qw(Blib) ; eval Blib -> mk_methods ( qw(file stat) ) ; sub mtime { my $self = shift ; $self -> stat -> [ $MTIME ] ; } sub dev { my $self = shift ; $self -> stat -> [ $DEV ] ; } sub ino { my $self = shift ; $self -> stat -> [ $INO ] ; } sub is_file { my $self = shift ; my $stat = shift ; $self -> dev == $stat -> [ $DEV ] and $self -> ino == $stat -> [ $INO ] ; } sub init { my $self = shift ; my $file = shift ; my $stat = shift ; $self -> file ( $file ) ; $self -> stat ( $stat ) ; $self ; } ########################################################## package RF::Scores ; @RF::Scores::ISA = qw(Blib) ; use Time::HiRes qw(gettimeofday) ; our $_T = gettimeofday ; my @_counters = ( [ qw(loops loops) ] , [ qw(ev events) ] , [ qw(evn -> new) ] , [ qw(evd -> delete) ] , [ qw(evxd -> discarded) ] # event axed , [ qw(rs syncs) ] , [ qw(rsf -> failed ) ] , [ qw(frs full syncs) ] , [ qw(frsf -> failed) ] , [ qw(fetf files fetched) ] , [ qw(fetr -> recents) ] , [ qw(fetn -> new files) ] , [ qw(fetl links fetched) ] , [ qw(delf files deleted) ] , [ qw(dell links deleted) ] , [ qw(deld dirs deleted) ] , [ qw(ddud dud deletes) ] , [ qw(t_slp -> sleeping) ] , [ qw(t_con -> connected) ] , [ qw(t_frs -> full syncs) ] , [ qw(t_out -> sync errors) ] , [ qw(s_rcvd sync received) ] , [ qw(s_rcvdr -> recents) ] , [ qw(s_sent sync sent) ] , [ qw(s_fil files fetched) ] , [ qw(s_rct -> recents) ] , [ qw(s_new -> new files) ] , [ qw(s_del files deleted) ] ) ; my @counters = grep ! /^._/, map { $_ -> [ 0 ] ; } @_counters ; my @timers = grep /^t_/, map { $_ -> [ 0 ] ; } @_counters ; my @sizers = grep /^s_/, map { $_ -> [ 0 ] ; } @_counters ; my @bandwds = grep /^x_/, map { $_ -> [ 0 ] ; } @_counters ; my %name4cntr = () ; for my $cntr ( @_counters ) { my ( $key, @name ) = @$cntr ; $name4cntr { $key } = ( join ' ', @name ) || $key ; } sub name4cntr { my $key = shift ; $name4cntr { $key } || $key ; } eval Blib -> mk_methods ( qw(parent tmpl f_tmpl t_tmpl) , @counters, @timers, @sizers, @bandwds ) ; eval RF::Scores -> incr_methods ( @counters, @timers, @sizers, @bandwds ) ; my $_TMPL ; my $_TMPL_mtime = 0 ; sub find_tmpl { my $self = shift ; my $tmpl = $self -> parent -> conf -> scoreboard_template || '' ; my @tmpl = $tmpl ? ( $tmpl ) : ( $SCB_TMPL, $SCB_SMPL ) ; my $file ; for my $cand ( @tmpl ) { if ( -f $cand ) { $file = $cand ; last ; } } unless ( $file ) { RF::LOGf "can't find scoreboard template in (%s)", join ',', @tmpl ; } $file ; } sub read_tmpl { my $self = shift ; my $file = shift ; my $res = '' ; if ( open TMPL, $file ) { $res = join '', ; close TMPL ; } else { RF::LOGf "can't open scoreboard template '%s' (%s)", $file, $! ; } $res ; } sub get_tmpl { my $self = shift ; my $file = $self -> find_tmpl ; my $mtim = $file ? ( RF -> mtime ( $file ) || 0 ) : 0 ; unless ( $file and $mtim ) { $self -> tmpl ( '' ) ; $self -> f_tmpl ( '' ) ; $self -> t_tmpl ( 0 ) ; } elsif ( $file ne $self -> f_tmpl or $mtim != $self -> t_tmpl ) { my $time = $self -> t_tmpl ; RF::LOGf "read scoreboard template :\n file : %s\n last read : %s" , $file , ( $time ? scalar localtime $time : 'never' ) ; $self -> f_tmpl ( $file ) ; $self -> t_tmpl ( $mtim ) ; $self -> tmpl ( $self -> read_tmpl ( $file ) ) ; } $self -> tmpl ; } sub init { my $self = shift ; my %opts = ( @_ ) ; $self -> parent ( $opts { parent } ) ; $self -> tmpl ( '' ) ; $self -> f_tmpl ( '' ) ; $self -> t_tmpl ( 0 ) ; for my $cntr ( @counters, @timers, @sizers, @bandwds ) { $self -> $cntr ( 0 ) ; } $self ; } sub _incr { my $self = shift ; my $attr = shift ; my $incr = shift ; $incr = 1 unless defined $incr ; $self -> $attr ( $incr + $self -> $attr ) ; } sub incr_method { my $self = shift ; my $attr = shift ; my $meth = 'sub incr_%s { my $self = shift ; my $incr = shift ; ' . '$self -> _incr ( %s, $incr ) ; }' ; sprintf $meth, $attr, "'$attr'" ; } sub incr_methods { my $self = shift ; join '', map { $self -> incr_method ( $_ ) ; } @_ ; } sub MB { sprintf "%.1f", $_ [ 0 ] / 1024 / 1024 ; } sub _persec { my $tag = shift ; my $size = shift ; my $time = shift ; my $bps = 8 * $size / $time ; my $u ; my $r ; if ( $bps < 1024 ) { $u = 'b/s' ; $r = sprintf '%.2f', $bps ; } elsif ( $bps < 1024 * 1024 ) { $u = 'Kb/s' ; $r = sprintf '%.2f', $bps / 1024 ; } else { $u = 'Mb/s' ; $r = sprintf '%.2f', $bps / 1024 / 1024 ; } "$tag [$u]", $r ; } sub make_data { my $self = shift ; my $prnt = $self -> parent ; my $conf = $prnt -> conf ; my $mrt = $conf -> max_run_time ; my $exit = ( $mrt ? $conf -> _text4secs ( $^T + $mrt - time ) : 'not scheduled' ) ; my $nxit = ( $mrt ? 'next exit' : 'exit' ) ; my $nfs = ( $prnt -> next_sync ? $conf -> _text4secs ( $prnt -> next_sync - time ) : 'not scheduled' ) ; my $nrot = $prnt -> next_rotate ; my $rot = ( $nrot ? ( ( $mrt and $nrot > $mrt ) ? 'on next re-start' : $conf -> _text4secs ( $nrot - time ) ) : 'not scheduled' ) ; my $tag = $prnt -> tag ; my $mode = $prnt -> mode . ( $tag ? " -> $tag" : '' ) ; my $trun = ( gettimeofday - $_T ) || 1 ; my $busy = $trun ; my $prcs = 0 ; my $date = sprintf "%s UTC", scalar gmtime ; my $general = [ [ 'date' => $date ] , [ 'host' => ( $HOSTNAME || 'unknown' ) ] , [ 'version' => $prnt -> version ] , [ 'pid → mode' => "$$ → $mode" ] , [ 'remote' => $conf -> remote ] , [ 'status' => $prnt -> status_html ] , [ 'age local' => $prnt -> age ] , [ $nxit => $exit ] , [ 'next full sync' => $nfs ] , [ 'next log rotate' => $rot ] ] ; my $timers = [ [ 'run time' => $conf -> _text4secs ( $trun ) ] , ( map { my $time = $self -> $_ ; my $_prc = 100 * $time / $trun ; my $perc = sprintf "%.2f %%", $_prc ; $busy -= $time ; $prcs += $_prc ; [ $_ => $conf -> _text4secs ( $self -> $_, 3 ) => $perc ] } @timers ) , [ '-> busy' , $conf -> _text4secs ( $busy, 3 ) , sprintf "%.2f %%", 100 - $prcs ] ] ; my $bndwids = [ [ _persec 'sync in' , $self -> s_rcvd, $trun ] , [ _persec 'sync out' , $self -> s_sent, $trun ] , [ _persec 'files in' , $self -> s_fil, $trun ] ] ; my $counters = [ map { [ $_ => $self -> $_ ] ; } @counters ] ; my $sizers = [ map { [ $_ => MB $self -> $_ ] ; } @sizers ] ; if ( wantarray ) { $general, $timers, $counters, $sizers, $bndwids ; } else { [ [ 'general' , $general , 'timers' , $timers ] , [ 'counters' , $counters ] , [ 'data [MB]' , $sizers , 'bandwidth' , $bndwids ] ] ; } } sub as_text { my $self = shift ; my $data = $self -> make_data ; my $list = [] ; my $W = 0 ; for my $col ( @$data ) { while ( @$col ) { my $tag = name4cntr shift @$col ; push @$list, [ $tag, '->' ] ; my $tups = shift @$col ; for my $tup ( @$tups ) { my $name = name4cntr shift @$tup ; $W = length $name if $W < length $name ; push @$list, [ $name, join ' - ', @$tup ] ; } } } join '' , "-- scoreboard ------------------------------\n" , ( map { sprintf "%-${W}s : %s\n", @$_ ; } @$list ) , "----------------------------------------------\n" ; } sub mk_trs { my $self = shift ; my $titl = shift ; my $itms = shift ; my $mcls = shift ; my @list = ( "$titl" ) ; for my $itm ( @$itms ) { my $outs = 0 ; my $left = @$itm ; my $name = name4cntr shift @$itm ; $name =~ s/->/→/g ; push @list, join '' , "$name" , ( map { $left -- ; my $span = $mcls - $outs - $left ; my $cspn = $span == 1 ? '' : "COLSPAN=$span " ; my $attr = /^\d+(\.\d+)?(\s%)?$/ ? 'right' : 'left' ; $outs += $span ; s/->/→/g ; ; "$_" } @$itm ) ; } [ @list ] ; } sub _as_html { my $self = shift ; my $mrws = shift ; my @args = @_ ; my $mcls = 0 ; my @trs = () ; for my $arg ( @args ) { if ( ref $arg ) { for my $itm ( @$arg ) { $mcls = @$itm if @$itm > $mcls ; } } } while ( @args ) { my $titl = shift @args ; my $itms = shift @args ; push @trs, @{ $self -> mk_trs ( $titl, $itms, $mcls ) } ; } if ( @trs < $mrws ) { my $span = $mrws - @trs ; my $rspn = $span == 1 ? '' : "ROWSPAN=$span " ; push @trs, " " ; } [ @trs ] ; } sub glue_by_row { my $self = shift ; my $cols = shift ; my $mrws = shift ; my $lens = 0 ; my @res = () ; map { $lens += @$_ ; } @$cols ; while ( $lens ) { my @row = () ; for my $col ( @$cols ) { if ( @$col ) { push @row, shift @$col ; $lens -- ; } } my $fill = @res == 0 ? "" : '' ; push @res, sprintf "%s\n" , join $fill , @row ; } join '', @res ; } sub mk_tab { my $self = shift ; my $data = shift ; my $mrws = 0 ; for my $descr ( @$data ) { my $cnt = 0 ; map { $cnt += ref $_ ? @$_ : 1 ; } @$descr ; $mrws = $cnt if $cnt > $mrws ; } sprintf "\n%s
\n", $self -> glue_by_row ( [ map { $self -> _as_html ( $mrws, @$_ ) ; } @$data ] , $mrws ) ; } sub as_html { my $self = shift ; my $prnt = $self -> parent ; my $conf = $prnt -> conf ; my $res = 'Error ; see log' ; if ( my $TMPL = $self -> get_tmpl ) { my $data = $self -> make_data ; my $tab = $self -> mk_tab ( $data ) ; my $fmt = "
%s
\n" ; my $ttxt = sprintf $fmt, $tab ; my $sml = $conf -> sleep_main_loop ; my $refr = $sml + 15 ; my %subs = ( '%VERSION%' => $prnt -> version , '%REFRESH%' => $refr , '%SML%' => $sml , '%DATE%' => scalar ( localtime ) , '%NEXT%' => scalar ( localtime time + $refr ) , '%TABLES%' => $ttxt , '%SITE%' => $IIM_SITE , '%LOGO%' => ( -f $prnt -> path ( 'temp', $IIM_LOGO ) ? $IIM_LOGO : $IIM_LOGO_URL ) ) ; for my $pat ( keys %subs ) { my $sub = $subs { $pat } ; $TMPL =~ s/$pat/$sub/g ; } $res = $TMPL ; } $res ; } sub _max { my $m = shift ; ( $m < $_ and $m = $_ ) for @_ ; $m ; } sub as_php { my $self = shift ; my ( $gen, $tim, $cnt, $siz, $bws ) = $self -> make_data ; my $tab = $self -> mk_tab ( [ [ 'general', $gen , 'timers' , $tim ] , [ 'counters' , $cnt ] , [ 'data [MB]' , $siz , 'bandwidth' , $bws ] ] ) ; my $fmt = "
%s
\n" ; my $ttxt = sprintf $fmt, $tab ; sprintf '' . "\$version = '%s' ;\n" . "\$iim_scb_time = '%d' ;\n" . "\$iim_sleep_main_loop = %s ;\n" . "\$tab = << parent -> version , time , $self -> parent -> conf -> sleep_main_loop , $ttxt ; } sub as_json { my $self = shift ; my ( $gen, $tim, $cnt, $siz, $bws ) = $self -> make_data ; my $opts = [ [ 'sleep_main_loop' , $self -> parent -> conf -> sleep_main_loop ] ] ; my $meta = [ [ 'iim_scb_time', time ] ] ; for my $itm ( @$gen, @$tim, @$cnt, @$siz ) { unshift @$itm, $itm -> [ 0 ] ; $itm -> [ 1 ] = name4cntr $itm -> [ 1 ] ; } my $res = { general => $gen , timers => $tim , counters => $cnt , sizes => $siz , bandwidth => $bws , meta => $meta , conf => $self -> parent -> conf -> unbless , conf_def => \%CNF_defaults } ; my $json = $LCL_JSON -> new -> utf8 ( 1 ) -> pretty -> encode ( $res ) ; $json .= "\n" unless $json =~ /\n$/ ; $json ; } sub put_scoreboard { my $self = shift ; my $file = shift ; my $prnt = $self -> parent ; my $text ; if ( $file =~ /.html$/ ) { $text = $self -> as_html ; } elsif ( $file =~ /.php$/ ) { $text = $self -> as_php ; } elsif ( $file =~ /.json$/ ) { $text = $self -> as_json ; } else { $text = $self -> as_text ; } if ( $file and ! open FIL, ">$file" ) { RF::LOGx "can't write scoreboard $file" ; } elsif ( $file ) { RF::LOGx "write scoreboard $file" if $prnt -> debug ; print FIL $text ; close FIL ; } else { print $text ; } } sub put_scoreboards { my $self = shift ; my $prnt = $self -> parent ; my $scbf = $prnt -> conf -> scoreboard_file ; my @fils = ( '' ) ; if ( $scbf ) { @fils = split ' ', $scbf ; } else { @fils = ( $prnt -> path ( 'temp', $SCB_FILE ) ) ; } for my $file ( @fils ) { $self -> put_scoreboard ( $file ) ; } } ########################################################## package RF::Sync ; @RF::Sync::ISA = qw(Blib) ; eval Blib -> mk_methods ( qw(parent conn error cmd timer numf sent rcvd xfrd) ) ; use Time::HiRes qw(gettimeofday) ; sub reset { my $self = shift ; $self -> error ( undef ) ; $self -> numf ( undef ) ; $self -> sent ( undef ) ; $self -> rcvd ( undef ) ; $self -> xfrd ( undef ) ; $self -> cmd ( '' ) ; $self ; } sub init { my $self = shift ; my %opts = ( @_ ) ; $self -> parent ( $opts { parent } ) ; $self -> reset ; } sub _rep { map { defined $_ ? $_ : '' } @_ ; } sub request { my $self = shift ; my $fils = shift ; my $kind = shift || '' ; my $prnt = $self -> parent ; my $conf = $prnt -> conf ; $self -> reset ; my $user = $conf -> user ? $conf -> user . '@' : '' ; my @cmd = ( $conf -> prog_rsync ) ; push @cmd, '-v' if $prnt -> debug ; push @cmd, @RSYNC_LIST ; push @cmd, '--timeout' => $conf -> timeout ; push @cmd, '--contimeout' => $conf -> timeout ; push @cmd, '--temp-dir' => $conf -> rtmp ; push @cmd, '--files-from' => $prnt -> req_file ; push @cmd, $user . $conf -> remote ; push @cmd, ( $kind eq 'r' ? $conf -> temp : $conf -> local ) ; my $cmd = $self -> cmd ( join ' ', @cmd ) ; if ( $prnt -> debug ) { RF::LOGf "%s\n[ %s\n] %d\n" , $cmd , ( join "\n, ", @$fils ) , scalar @$fils ; } $prnt -> put_req_file ( $fils ) ; my $pipe = new IO::Pipe ; my $time = gettimeofday ; my $conn = $self -> conn ( $pipe -> reader ( @cmd ) ) ; if ( $conn ) { while ( defined ( my $line = $conn -> getline () ) ) { my $x ; print $line if $prnt -> debug ; if ( $line =~ /total bytes sent:\s+(\d+)/i ) { $x = $1 ; $self -> sent ( $x ) ; } elsif ( $line =~ /total bytes received:\s+(\d+)/i ) { $x = $1 ; $self -> rcvd ( $x ) ; } elsif ( $line =~ /number of files:\s+(\d+)/i ) { $x = $1 ; $self -> numf ( $x ) ; } elsif ( $line =~ /number of files transferred:\s+(\d+)/i ) { $x = $1 ; $self -> xfrd ( $x ) ; } } $prnt -> scores -> incr_s_sent ( $self -> sent || 0 ) ; $prnt -> scores -> incr_s_rcvd ( $self -> rcvd || 0 ) ; if ( $kind eq 'r' ) { $prnt -> scores -> incr_s_rcvdr ( $self -> rcvd || 0 ) ; } $self -> error ( $self -> close ) ; $self -> timer ( gettimeofday - $time ) ; RF::LOGf "sent %s, rcvd %s, xfrd %s, kind '%s' ok '%s'" , _rep ( $self -> sent, $self -> rcvd, $self -> xfrd ) , $kind, $self -> is_ok if $prnt -> debug ; } else { $self -> error ( -1 ) ; } $self -> error ; } sub close { my $self = shift ; $self -> conn -> close ; $? ; } sub full { my $self = shift ; my $prnt = $self -> parent ; my $conf = $prnt -> conf ; $self -> reset ; my $user = $conf -> user ? $conf -> user . '@' : '' ; my @cmd = ( $conf -> prog_rsync , @RSYNC_FULL , '--timeout' => $conf -> timeout () , '--contimeout' => $conf -> timeout () , '--temp-dir' => $conf -> rtmp () , $user . $conf -> remote , $conf -> local ) ; my $cmd = $self -> cmd ( join ' ', @cmd ) ; if ( $prnt -> debug ) { RF::LOGf "%s\n", $cmd ; } my $time = gettimeofday ; system @cmd ; $self -> error ( $? ) ; $self -> timer ( gettimeofday - $time ) ; $self -> error ; } sub is_partial { my $self = shift ; my $err = $self -> error ; my $sig = ( $err & 127 ) ; my $xit = ( $err >> 8 ) ; defined $err and $sig == 0 and ( $xit == $E_PRT or $xit = $E_MIS ) ; } sub is_partial_no_files { my $self = shift ; my $numf = $self -> numf ; $self -> is_partial and defined $numf and $numf == 0 ; } sub is_ok { my $self = shift ; my $err = $self -> error ; defined $err and $err == 0 ; } sub report { my $self = shift ; sprintf "files xferred %s, bytes sent %s received %s" , _rep $self -> xfrd, $self -> sent, $self -> rcvd ; } 1 ;