Thanks to the insights of all of you who have responded (special mention to
Phil D.) I have finally managed to resolve my "FollowTail talks to
Client::TCP" problem and make it work as desired. Sorry for the delay in
answering due to a vacation.
The result is that I have now deployed the script on multiple hosts, where
it watches for some pattern on log files and send matches to a central log
server. Before that, investigations required to parse huge log files on each
host. Now, all relevant events are conveniently concatenated on a single
file on a single server.
Please find below the final code that we have put in production.
Cheers
Emmanuel
#!/usr/local/bin/perl
use warnings;
use strict;
use FileHandle;
use POE qw(Wheel::FollowTail Component::Client::TCP);
my $logfile = "/home/namp/code/client.log";
my $mmsfile = "/namp_log/log/TRmms.log";
my (%wap_logs, %poe_records);
my $PATH = '/namp_log/log';
my $pattern = "mms";
my $MMSC = "10.131.26.132";
my $logPort = "31008";
my $tcp_client_session_id;
sub InitWapLog
# Define POE records for log watcher
{
$poe_records{_start} = \&begin_watchers;
$poe_records{current_record} = \¤t_got_record;
$poe_records{current_log_reset} = \¤t_log_reset;
$poe_records{log_error} = \&generic_log_error;
&Log("POE init complete");
}
sub begin_watchers
# Start log watcher POE session
{
my $heap = $_[HEAP];
my $logid;
my $log_watcher = POE::Wheel::FollowTail->new
(
Filename => $mmsfile,
InputEvent => "current_record",
ResetEvent => "current_log_reset",
ErrorEvent => "log_error",
);
$logid = $log_watcher->ID;
$heap->{services}->{$logid} = "current";
$heap->{watchers}->{$logid} = $log_watcher;
&Log("Started watch $logid on file $mmsfile");
}
sub CreateClient
# Create the TCP client session
{
$tcp_client_session_id = POE::Component::Client::TCP->new
(
RemoteAddress => $MMSC,
RemotePort => $logPort,
ServerInput => \&server_input,
Disconnected => \&handle_disconnect,
ConnectError => \&connect_error,
InlineStates => { clientSend => \&client_send }
);
}
sub current_got_record
# Handle log events
{
my ($record, $wheel_id, $heap) = @_[ARG0, ARG1, HEAP];
$poe_kernel->post($tcp_client_session_id, 'clientSend', $record) if
$record =~ /$pattern/;
}
sub client_send
# Send to server
{
my ($heap, $stuff) = @_[HEAP, ARG0];
$heap->{server}->put($stuff);
}
sub server_input
# Server spoke
{
my $input = $_[ARG0];
&Log("Client: received $input from server");
}
sub handle_disconnect
# Automatic reconnect
{
$_[KERNEL]->delay(reconnect=>1);
&Log("Reconnecting to server");
}
sub connect_error
# Failure to connect to server
{
&Log("Cannot connect to $MMSC on port $logPort");
}
sub current_log_reset
# Handles log resets for current log, basically do nothing
{
my ($heap, $wheel_id) = @_[HEAP, ARG0];
my $service = $heap->{services}->{$wheel_id};
&Log("POE service $service log reset");
}
sub generic_log_error
# Handles log errors
{
my ($heap, $operation, $errno, $error_string, $wheel_id) = @_[HEAP,
ARG0, ARG1, ARG2, ARG3];
my $service = $heap->{services}->{$wheel_id};
&Log("$service log $operation error $errno: $error_string\n");
&Log("Shutting down $service log watcher.\n");
delete $heap->{services}->{$wheel_id};
delete $heap->{watchers}->{$wheel_id};
}
sub Log
# Write log entries
{
my $text = shift;
my $log = new FileHandle(">>$logfile");
my $now = `date +"%Y%m%d %H:%M:%S"`;
chomp($now);
print $log "[watcher]\t$now\t$text\n";
close($log);
}
# MAIN
&Log("Starting WAP log watch");
&InitWapLog;
&CreateClient;
POE::Session->create(inline_states => {%poe_records});
$poe_kernel->run();
&Log("All watchers died, exiting");