#!/usr/bin/perl -w

=head1 NAME

octo_parser - Octopussy Parser program

=head1 SYNOPSIS

octo_parser <device> 

=head1 DESCRIPTION

octo_parser is the program used by the Octopussy Project 
to parse Logs for each Device

=cut

use strict;
use bytes; # Fix the 'Malformed UTF-8 character' warning
use utf8;
use Cache::SharedMemoryCache;
use File::Copy;
use File::Path;
use Linux::Inotify2;
use AAT;
use Octopussy;

my $PROG_NAME = "octo_parser";
my $NICE_UPARSER = "nice -n 20";

my $dir_program = Octopussy::Directory("programs");
my $dir_pid = Octopussy::Directory("running");
my $device = $ARGV[0];
my ($reload_request, $exit_request) = (0, 0);
my $shared_memory_cache = 
  new Cache::SharedMemoryCache( { namespace => "octo_parser",
    default_expires_in => "1 day" } ) or
  croak( "Couldn't instantiate SharedMemoryCache" );

my @services;
my %alert;
my %service;
my %taxo;
my $minutes_without_logs;
my %dir_data = ();
#my %a_thresold = ();
my (@files, @taxo_list, %known_msgs, @unknown_msgs) = ((), (), (), ());
my ($file, $last_file) = (undef, undef);
my $pid_file = Octopussy::PID_File($PROG_NAME . "_$device");

my $inotify = new Linux::Inotify2
	or die "Unable to create new inotify object: $!";
$inotify->blocking(0);

=head1 FUNCTIONS

=head2 Exit()

Softly stops the parser 
(current file is fully parsed before exiting)

=cut
sub Exit
{
	$exit_request = 1;
}

=head2 Reload()

Softly reloads the configuration of the parser 
(current file is fully parsed before reload)

=cut
sub Reload
{
	$reload_request = 1;
}

=head2 Inotify_Watch($path)

Watches '$path' with inotify

=cut
sub Inotify_Watch($)
{
	my $path = shift;

	if ($path =~ /\/Incoming\/+\d{4}\/+\d{2}\/+\d{2}/)
  	{ $inotify->watch($path, IN_CLOSE_WRITE); }
 	else
 		{ $inotify->watch($path, IN_CREATE); } 
}

=head2 Init()

Inits Parser & launch UParser if needed

=cut
sub Init()
{
	my $dconf = Octopussy::Device::Configuration($device) or 
		die("[OCTOPUSSY ERROR: Unable to get Device configuration for '$device']");
	$minutes_without_logs = $dconf->{minutes_without_logs};
	@services = Octopussy::Device::Services($device);	
	@taxo_list = Octopussy::Taxonomy::List()
		or die("[OCTOPUSSY ERROR: Unable to get Taxonomy configuration]");
	$dir_data{"Unknown"} = Octopussy::Storage::Directory_Unknown($device); 
	foreach my $serv (@services)
 	{
		$dir_data{$serv} = Octopussy::Storage::Directory_Service($device, $serv);
		Octopussy::RRDTool::Syslog_By_Device_Service_Taxonomy_Init($device, $serv);
  	my @msg_to_parse = ();
   	my @messages = Octopussy::Service::Messages($serv);
		AAT::Syslog($PROG_NAME, "Init Service: $serv - " 
			. ($#messages + 1) . " messages");
   	foreach my $m (@messages)
    { 
			my @alerts = Octopussy::Message::Alerts($device, $serv, $m);
			my @msg_alerts = ();
			foreach my $a (@alerts)
			{
				$alert{$a->{name}} = $a;
				foreach my $act (@{$a->{action}})
				{ 
					$alert{$a->{name}}->{action_mail} = 1		if ($act =~ /Mail/i); 
					$alert{$a->{name}}->{action_jabber} = 1 if ($act =~ /Jabber/i);
					$alert{$a->{name}}->{action_nsca} = 1 	if ($act =~ /NSCA/i);
				}
				push(@msg_alerts, $a->{name});
			}
			my $regexp = Octopussy::Message::Pattern_To_Regexp($m);
      push(@{$service{$serv}{msgs}}, 
				{ re => qr/^$regexp\s*[^\t\n\r\f -~]?$/i, 
					pattern => (($#msg_alerts >= 0) ? $m->{pattern} : undef),
					taxo => $m->{taxonomy}, alerts => \@msg_alerts } ); 
		}
	}
	system("$NICE_UPARSER ${dir_program}octo_uparser $device &");
	($file, $last_file) = (undef, undef);
	(@files, %known_msgs, @unknown_msgs) = ((), (), ());

	my $dir =  Octopussy::Storage::Directory_Incoming($device);
	Inotify_Watch("$dir/$device/Incoming/");
	my @dirs = `find "$dir/$device/Incoming/" -type d`;
	foreach my $d (@dirs)
	{ 
		chomp($d);
		Inotify_Watch($d); 
	}
}

=head2 Write_Logfile($file, $logs)

Writes Known Logs '$logs' into Logfile '$file'

=cut
sub Write_Logfile($$)
{
	my ($logfile, $logs) = @_;

	if ($#{@{$logs}} >= 0)
	{
		my $dir = $logfile;
		$dir =~ s/^(.+)\/.+?\.log/$1\//g;
		Octopussy::Create_Directory($dir);
   	if (defined open(FILE, "|gzip >> $logfile.gz"))
		{
			foreach my $log (@{$logs})
   			{ print FILE "$log\n"; }
			close(FILE);
		}
		else
    {
    	print "Unable to open file '$logfile.gz'\n";
     	AAT::Syslog("octo_parser", "UNABLE_OPEN_FILE", "$logfile.gz");
   	}
	}
}


=head2 Write_Unknown_Logfile($file, $logs)

Writes Unknown Logs '$logs' into Logfile '$file'

=cut
sub Write_Unknown_Logfile($$)
{
  my ($logfile, $logs) = @_;
	$logfile =~ s/^.+\/Incoming\//$dir_data{Unknown}\/$device\/Unknown\//;
	
	if ($#{@{$logs}} >= 0)
  {
    my $dir = $logfile;
    $dir =~ s/^(.+)\/.+?\.log/$1\//g;
    Octopussy::Create_Directory($dir);
   	if (defined open(FILE, "|gzip >> $logfile.gz"))
		{
    	foreach my $log (@{$logs})
      	{ print FILE "$log\n"; }
    	close(FILE);
		}
		else
    {
      print "Unable to open file '$logfile.gz'\n";
      AAT::Syslog("octo_parser", "UNABLE_OPEN_FILE", "$logfile.gz");
    }
  }
}

=head2 Write_Stats($y, $m, $d, $hour, $min)

Writes Services Statistics

=cut
sub Write_Stats($$$$$)
{
	my ($y, $m, $d, $hour, $min) = @_;

	open(STATS, "> $dir_pid/${PROG_NAME}_$device.stats");
	print STATS "[$y$m$d$hour$min]\n";
	foreach my $serv (keys %known_msgs)
 		{ print STATS "$serv: " . ($#{$known_msgs{$serv}} + 1) . "\n"; }
	close(STATS);
}

=head2 Incoming_To_Unknown($incoming)

Moves Incoming file to Unknown file

=cut
sub Incoming_To_Unknown($)
{
	my $incoming = shift;
	my $unknown = $incoming;
  $unknown =~ s/^.+\/Incoming\//$dir_data{Unknown}\/$device\/Unknown\//;
	my $dir = $unknown;
	$dir =~ s/^(.+)\/.+?\.log/$1\//g;
	Octopussy::Create_Directory($dir);
	`gzip -f "$incoming"`;
	`mv "$incoming.gz" "$unknown.gz"`;
}

=head2 Alert_Handler($msg, $line)

Handles Alert(s) for the Message $msg on line $line

=cut
sub Alert_Handler($$)
{
	my ($msg, $line) = @_;

	if ($line =~ /^(\w{3} \s?\d{1,2} \d\d:\d\d:\d\d) \S+/)
	{
  	my $date = $1;
		my $parse_date = Date::Manip::ParseDate($date);
   	my $date_tp = Date::Manip::UnixDate($parse_date, "%A %H:%M");
   	my $secs = Date::Manip::UnixDate($parse_date, "%s");
		foreach my $msg_alert (@{$msg->{alerts}})
  	{
			my $a = $alert{$msg_alert};
			if ((Octopussy::TimePeriod::Match($a->{timeperiod}, $date_tp))
				&& ((AAT::NULL($a->{regexp_incl}) || ($line =~ /$a->{regexp_incl}/)))
        && ((AAT::NULL($a->{regexp_excl}) || ($line !~ /$a->{regexp_excl}/))))
			{
				my $alert_logs = $shared_memory_cache->get($a->{name});
				if (defined $a->{thresold_time})
					{ push(@{$alert_logs}, { timestamp => $secs, log => $line }); }
				if ((!defined $a->{thresold_time}) 
					|| (($#{$alert_logs}+1) >= $a->{thresold_time}))
				{
					my $match = 1;
					if (defined $a->{thresold_time})
					{
						while (($secs - ${$alert_logs}[0]->{timestamp})
										> $a->{thresold_duration})
						{
							shift @{$alert_logs};
							$match = 0;
						}
					}
					if ($match)
					{
						my $data = "";
						if (defined $a->{thresold_time})
						{
							foreach my $d (@{$alert_logs})
								{ $data .= "$d->{log}\n"; } 
						}
						else
							{ $data = $line; }
						$alert_logs = ();
						Octopussy::Alert::Insert_In_DB($device, $a, $data, $date);
						my ($subject, $body) = 
							Octopussy::Alert::Message_Building($a, $device, $data, $msg);
						if (defined $a->{action_jabber})
						{
							AAT::XMPP::Send_Message("Octopussy", $subject, @{$a->{imdest}});
							foreach my $line (split(/\n/, $body))
    					{ AAT::XMPP::Send_Message("Octopussy", $line, @{$a->{imdest}}); }
						}
						AAT::SMTP::Send_Message("Octopussy", $subject, $body, 
							@{$a->{maildest}}) if (defined $a->{action_mail});
						AAT::NSCA::Send("Octopussy", (($a->{level} =~ /Warning/i) ? 1 : 2),								"$subject") if (defined $a->{action_nsca}); 
					}
				}
				$shared_memory_cache->set($a->{name}, $alert_logs);
			}
		}
  }
}

=head2 Line_Handler($line, $serv, $msg, $known_msgs, $match)

Handles line $line for Service $serv & Message $msg

=cut
sub Line_Handler($$$$$)
{
	my ($line, $serv, $msg, $known_msgs, $match) = @_;
 
	if ($line =~ $msg->{re})
  {
  	Alert_Handler($msg, $line)  if ($#{$msg->{alerts}} >= 0);
    push(@{$known_msgs->{$serv}}, $line);
		$taxo{$serv}{$msg->{taxo}} = (defined $taxo{$serv}{$msg->{taxo}} ? 
			$taxo{$serv}{$msg->{taxo}} + 1 : 1);
		$$match = 1;
  }
	elsif ($line =~ /last message repeated \d+ time/)
  {
  	push(@{$known_msgs->{$serv}}, $line);
		$$match = 1;
	}
}

=head2 Service_Handler($serv, $total, $y, $m, $d, $hour, $min, $file_new)

Handles Service $serv

=cut
sub Service_Handler($$$$$$$$)
{
	my ($serv, $total, $y, $m, $d, $hour, $min, $file_new) = @_;	

 	my $service_dir = $serv;
 	$service_dir =~ s/ /_/g;
 	my $dir = "$dir_data{$serv}/$device/$service_dir/$y/$m/$d/";
 	AAT::Syslog($PROG_NAME, "Device: $device - Service: $serv "
  	. "Date: $d/$m/$y $hour:$min - Events: "
    . ($#{$known_msgs{$serv}} + 1) . " / $total");
	Write_Logfile("$dir$file_new", \@{$known_msgs{$serv}});	

	my @taxo_stats = ();
 	foreach my $t (@taxo_list)
 		{ push(@taxo_stats, $taxo{$serv}{$t} || 0); }
	my $seconds = AAT::Datetime::Seconds_Since_1970($y, $m, $d, $hour, $min);
 	Octopussy::RRDTool::Syslog_By_Device_Service_Taxonomy_Update($seconds, 
		$device, $serv, \@taxo_stats);
}

=head2 File_Handler($file)

Handles file $file

=cut
sub File_Handler($)
{
	my $file = shift;

	my $time = time();
	if ($file =~ /\/(\d+)\/(\d+)\/(\d+)\/(msg_(\d\d)h(\d\d)\.log)/)	
	{
		my ($y, $m, $d, $file_new, $hour, $min) = ($1, $2, $3, $4, $5, $6);
    (%taxo, %known_msgs, @unknown_msgs) = ((), (), ());
		my $total = 0;	
		if (defined open(FILE, ($file =~ /.+\.gz$/ ? "zcat" : "cat") ." \"$file\" |"))
		{
			while (<FILE>)
 			{
  			my $line = $_;
   			chomp($line);
				my $match = 0;
    		foreach my $serv (@services)
    		{
   				foreach my $msg (@{$service{$serv}{msgs}})
      		{ 
						Line_Handler($line, $serv, $msg, \%known_msgs, \$match); 
						last if ($match);
					}
					last if ($match);
				}
				push(@unknown_msgs, $line)	if (! $match);
				$total++;
			}
			close(FILE);
			unlink $file;	
		}
		else
    {
    	print "Unable to open file '$file'\n";
     	AAT::Syslog("octo_parser", "UNABLE_OPEN_FILE", $file);
    }
		foreach my $serv (keys %known_msgs)
			{	Service_Handler($serv, $total, $y, $m, $d, $hour, $min, $file_new); }
		Write_Unknown_Logfile($file, \@unknown_msgs);
		Write_Stats($y, $m, $d, $hour, $min);
		AAT::Syslog($PROG_NAME, "Device: $device Date: $d/$m/$y $hour:$min"
			. " Time: " . (time() - $time) . " seconds");
	}
}

#
# MAIN
#

$SIG{HUP} = \&Reload;
$SIG{USR1} = \&Exit;

AAT::Syslog($PROG_NAME, "Device: $device Started !");
Init();
push(@files, Octopussy::Logs::Incoming_Files($device));
AAT::Syslog($PROG_NAME, ($#files + 1) .
    " files to parse for Device $device");
my $last_logs_time = time();
while (!$exit_request)
{
	my @events = $inotify->read;
  foreach my $e (@events)
  {
    if (-d $e->fullname)
      { Inotify_Watch($e->fullname); }
    else
      { push(@files, $e->fullname); }
  }
	if ($#files >= 0)
 	{ 
		AAT::Syslog($PROG_NAME, ($#files + 1) .
    	" files to parse for Device $device");
		$last_logs_time = time(); 
	}
  else
  {
    my $diff = time() - $last_logs_time;
    if ((AAT::NOT_NULL($minutes_without_logs))
      && ($diff > ($minutes_without_logs*60)))
    {
      AAT::Syslog($PROG_NAME, "DEVICE_HASNT_SENT_ANY_LOGS",
        $device, int($diff/60));
    }
  }
	while ($file = shift(@files))
  {
    chomp($file);
    if ($#services < 0)
      { Incoming_To_Unknown($file); }
    else
      { File_Handler($file); }
    last  if ($reload_request);
  }
  if ($reload_request)
  {
    $reload_request = 0;
    Init();
  }

	if (!$exit_request)
		{ sleep(2); }
}

=head1 AUTHOR

Sebastien Thebert <octo.devel@gmail.com>

=head1 SEE ALSO

octo_dispatcher, octo_extractor, octo_uparser, octo_reporter, octo_scheduler

=cut
