Skip to content
Snippets Groups Projects
Commit e88b30c7 authored by Nigel Kukard's avatar Nigel Kukard
Browse files

Added basic stats support

parent 2f8d5cf2
No related branches found
No related tags found
No related merge requests found
...@@ -51,13 +51,16 @@ our (@ISA,@EXPORT,@EXPORT_OK); ...@@ -51,13 +51,16 @@ our (@ISA,@EXPORT,@EXPORT_OK);
); );
@EXPORT_OK = qw( @EXPORT_OK = qw(
getLastStats getLastStats
getStatsByClass getStatsByClass
getStatsByCounter
getSIDFromCID getSIDFromCID
getSIDFromLID getSIDFromLID
); );
use constant { use constant {
VERSION => '0.0.1', VERSION => '0.1.1',
# How often our config check ticks # How often our config check ticks
TICK_PERIOD => 5, TICK_PERIOD => 5,
...@@ -123,6 +126,8 @@ my $subscribers; ...@@ -123,6 +126,8 @@ my $subscribers;
my $statsPreparedStatements = { }; my $statsPreparedStatements = { };
# Last cleanup time # Last cleanup time
my $lastCleanup = { }; my $lastCleanup = { };
# Last config manager stats pull
my $lastConfigManagerStats = 0;
# Initialize plugin # Initialize plugin
...@@ -228,8 +233,26 @@ sub plugin_init ...@@ -228,8 +233,26 @@ sub plugin_init
$dbh->disconnect(); $dbh->disconnect();
$dbh = undef; $dbh = undef;
} }
if ($dbh && (my $res = $dbh->prepare('
SELECT
`IdentifierID`, `Timestamp` - (`Timestamp` % ?) AS TimestampM,
MAX(`Counter`) AS `Counter`
FROM
stats_basic
WHERE
`Key` = ?
AND `Timestamp` < ?
GROUP BY
`IdentifierID`, `TimestampM`
'))) {
$statsPreparedStatements->{'stats_basic_consolidate'} = $res;
} else {
$logger->log(LOG_ERR,"[STATISTICS] Failed to prepare statement 'stats_basic_consolidate': %s",$DBI::errstr);
$dbh->disconnect();
$dbh = undef;
}
# Prepare stats cleanup statement # Prepare stats cleanup statements
if ($dbh && (my $res = $dbh->prepare('DELETE FROM stats WHERE `Key` = ? AND `Timestamp` < ?'))) { if ($dbh && (my $res = $dbh->prepare('DELETE FROM stats WHERE `Key` = ? AND `Timestamp` < ?'))) {
$statsPreparedStatements->{'stats_cleanup'} = $res; $statsPreparedStatements->{'stats_cleanup'} = $res;
} else { } else {
...@@ -237,13 +260,20 @@ sub plugin_init ...@@ -237,13 +260,20 @@ sub plugin_init
$dbh->disconnect(); $dbh->disconnect();
$dbh = undef; $dbh = undef;
} }
if ($dbh && (my $res = $dbh->prepare('DELETE FROM stats_basic WHERE `Key` = ? AND `Timestamp` < ?'))) {
$statsPreparedStatements->{'stats_basic_cleanup'} = $res;
} else {
$logger->log(LOG_ERR,"[STATISTICS] Failed to prepare statement 'stats_basic_cleanup': %s",$DBI::errstr);
$dbh->disconnect();
$dbh = undef;
}
# Set last cleanup to now # Set last cleanup to now
if ($dbh) { my $now = time();
foreach my $key (keys %{$statsConfig}) { foreach my $key (keys %{$statsConfig}) {
$lastCleanup->{$key} = time(); $lastCleanup->{$key} = $now;
}
} }
$lastConfigManagerStats = $now;
} }
return 1; return 1;
...@@ -287,9 +317,11 @@ sub session_stop ...@@ -287,9 +317,11 @@ sub session_stop
$globals = undef; $globals = undef;
$dbh = undef; $dbh = undef;
$statsDBIdentifierMap = { }; $statsDBIdentifierMap = { };
$statsQueue = [ ];
$subscribers = undef; $subscribers = undef;
$statsPreparedStatements = { }; $statsPreparedStatements = { };
$lastCleanup = undef; $lastCleanup = { };
$lastConfigManagerStats = 0;
$logger->log(LOG_DEBUG,"[STATISTICS] Shutdown"); $logger->log(LOG_DEBUG,"[STATISTICS] Shutdown");
...@@ -308,32 +340,61 @@ sub session_tick ...@@ -308,32 +340,61 @@ sub session_tick
return; return;
} }
my $now = time();
my $timer1 = [gettimeofday];
# Pull in statements # Pull in statements
my $sthStatsConsolidate = $statsPreparedStatements->{'stats_consolidate'}; my $sthStatsConsolidate = $statsPreparedStatements->{'stats_consolidate'};
my $sthStatsCleanup = $statsPreparedStatements->{'stats_cleanup'}; my $sthStatsCleanup = $statsPreparedStatements->{'stats_cleanup'};
my $sthStatsBasicConsolidate = $statsPreparedStatements->{'stats_basic_consolidate'};
my $now = time(); my $sthStatsBasicCleanup = $statsPreparedStatements->{'stats_basic_cleanup'};
my $timer1 = [gettimeofday];
# Even out flushing over 10s to absorb spikes # Even out flushing over 10s to absorb spikes
my $maxFlush = int(@{$statsQueue} / 10) + 100; my $maxFlush = int(@{$statsQueue} / 10) + 100;
my $numFlush = 0; my $numFlush = 0;
# Loop and build the data to create our multi-insert # Loop and build the data to create our multi-insert
my @insertHolders; my (@insertHolders,@insertBasicHolders);
my @insertData; my (@insertData,@insertBasicData);
while (defined(my $stat = shift(@{$statsQueue})) && $numFlush < $maxFlush) { while (defined(my $stat = shift(@{$statsQueue})) && $numFlush < $maxFlush) {
push(@insertData, # This is a basic counter
$stat->{'identifierid'}, $stat->{'key'}, $stat->{'timestamp'}, if (defined($stat->{'counter'})) {
$stat->{'direction'}, push(@insertBasicHolders,"(?,?,?,?)");
$stat->{'cir'}, $stat->{'limit'}, $stat->{'rate'}, $stat->{'pps'}, $stat->{'queue_len'}, push(@insertBasicData,
$stat->{'total_bytes'}, $stat->{'total_packets'}, $stat->{'total_overlimits'}, $stat->{'total_dropped'} $stat->{'identifierid'}, $stat->{'key'}, $stat->{'timestamp'},
); $stat->{'counter'}
push(@insertHolders,"(?,?,?,?,?,?,?,?,?,?,?,?,?)"); );
# Full stats counter
} else {
push(@insertHolders,"(?,?,?,?,?,?,?,?,?,?,?,?,?)");
push(@insertData,
$stat->{'identifierid'}, $stat->{'key'}, $stat->{'timestamp'},
$stat->{'direction'},
$stat->{'cir'}, $stat->{'limit'}, $stat->{'rate'}, $stat->{'pps'}, $stat->{'queue_len'},
$stat->{'total_bytes'}, $stat->{'total_packets'}, $stat->{'total_overlimits'}, $stat->{'total_dropped'}
);
}
$numFlush++; $numFlush++;
} }
# If we got things to insert, do it # If we got things to insert, do it
if (@insertBasicHolders > 0) {
my $res = $dbh->do('
INSERT DELAYED INTO stats_basic
(
`IdentifierID`, `Key`, `Timestamp`,
`Counter`
)
VALUES
'.join(',',@insertBasicHolders),undef,@insertBasicData
);
# Check for error
if (!defined($res)) {
$logger->log(LOG_ERR,"[STATISTICS] Failed to execute delayed stats_basic insert: %s",$DBI::errstr);
}
}
# And normal stats...
if (@insertHolders > 0) { if (@insertHolders > 0) {
my $res = $dbh->do(' my $res = $dbh->do('
INSERT DELAYED INTO stats INSERT DELAYED INTO stats
...@@ -356,29 +417,49 @@ sub session_tick ...@@ -356,29 +417,49 @@ sub session_tick
# We only need stats if we did something, right? # We only need stats if we did something, right?
if ($numFlush) { if ($numFlush) {
my $timediff2 = tv_interval($timer1,$timer2); my $timediff2 = tv_interval($timer1,$timer2);
$logger->log(LOG_INFO,"[STATISTICS] Stats flush time %s/%s records: %s",$numFlush,$maxFlush,sprintf('%.3fs',$timediff2)); $logger->log(LOG_INFO,"[STATISTICS] Total stats flush time %s/%s records: %s",$numFlush,$maxFlush,sprintf('%.3fs',$timediff2));
} }
my $res; my $res;
# Loop with our stats consolidation configuration # Loop with our stats consolidation configuration
foreach my $key (keys %{$statsConfig}) { foreach my $key (sort keys %{$statsConfig}) {
my $timerA = [gettimeofday]; my $timerA = [gettimeofday];
my $precision = $statsConfig->{$key}->{'precision'}; my $precision = $statsConfig->{$key}->{'precision'};
my $thisPeriod = _getAlignedTime($now,$precision); my $thisPeriod = _getAlignedTime($now,$precision);
my $lastPeriod = $thisPeriod - $precision; my $lastPeriod = $thisPeriod - $precision;
my $prevKey = $key - 1; my $prevKey = $key - 1;
# If we havn't exited the last period, then skip # If we havn't exited the last period, then skip
if ($lastCleanup->{$key} > $lastPeriod) { if ($lastCleanup->{$key} > $lastPeriod) {
next; next;
} }
# Set last cleanup to now
$lastCleanup->{$key} = $now; # Stats
my $numStatsBasicConsolidated = 0;
my $numStatsConsolidated = 0;
my $consolidateUpTo = $lastPeriod - $precision;
# Execute and pull in consolidated stats # Execute and pull in consolidated stats
$res = $sthStatsConsolidate->execute($prevKey,$precision,$lastPeriod - $precision); $res = $sthStatsBasicConsolidate->execute($precision,$prevKey,$consolidateUpTo);
if ($res) {
# Loop with items returned
while (my $item = $sthStatsBasicConsolidate->fetchrow_hashref()) {
$item->{'key'} = $key;
$item->{'timestamp'} = $item->{'timestampm'};
# Queue for insert
push(@{$statsQueue},$item);
$numStatsBasicConsolidated++;
}
# If there was an error, make sure we report it
} else {
$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats_basic consolidation statement: %s",$sthStatsBasicConsolidate->errstr());
}
# And the normal stats...
$res = $sthStatsConsolidate->execute($precision,$prevKey,$consolidateUpTo);
if ($res) { if ($res) {
# Loop with items returned # Loop with items returned
while (my $item = $sthStatsConsolidate->fetchrow_hashref()) { while (my $item = $sthStatsConsolidate->fetchrow_hashref()) {
...@@ -387,28 +468,51 @@ sub session_tick ...@@ -387,28 +468,51 @@ sub session_tick
# Queue for insert # Queue for insert
push(@{$statsQueue},$item); push(@{$statsQueue},$item);
$numStatsConsolidated++;
} }
# If there was an error, make sure we report it # If there was an error, make sure we report it
} else { } else {
$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats consolidation statement: %s",$sthStatsConsolidate->errstr()); $logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats consolidation statement: %s",$sthStatsConsolidate->errstr());
} }
# Set last cleanup to now
$lastCleanup->{$key} = $now;
my $timerB = [gettimeofday]; my $timerB = [gettimeofday];
my $timediffB = tv_interval($timerA,$timerB); my $timediffB = tv_interval($timerA,$timerB);
$logger->log(LOG_INFO,"[STATISTICS] Stats consolidation time for key %s: %s",$key,sprintf('%.3fs',$timediffB));
$logger->log(LOG_INFO,"[STATISTICS] Stats consolidation time for key %s: %s (%s basic, %s normal), up to %s [%s]",$key,
sprintf('%.3fs',$timediffB),$numStatsBasicConsolidated,$numStatsConsolidated,$consolidateUpTo,scalar(localtime($consolidateUpTo)));
} }
# Setup another timer # Setup another timer
my $timer3 = [gettimeofday]; my $timer3 = [gettimeofday];
# We only need to run as often as the first precision # We only need to run as often as the first precision
# - If cleanup has not yet run?
# - or if the 0 cleanup plus precision of the first key is in the past (data is now stale?)
if (!defined($lastCleanup->{'0'}) || $lastCleanup->{'0'} + $statsConfig->{1}->{'precision'} < $now) { if (!defined($lastCleanup->{'0'}) || $lastCleanup->{'0'} + $statsConfig->{1}->{'precision'} < $now) {
# We're going to clean up for the first stats precision * 3, which should be enough
my $cleanUpTo = $now - ($statsConfig->{1}->{'precision'} * 3);
# Streamed stats is removed 3 time periods past the first precision # Streamed stats is removed 3 time periods past the first precision
if ($res = $sthStatsCleanup->execute(0, $now - ($statsConfig->{1}->{'precision'} * 3))) { if ($res = $sthStatsBasicCleanup->execute(0, $cleanUpTo)) {
# We get 0E0 for 0 when none were removed # We get 0E0 for 0 when none were removed
if ($res ne "0E0") { if ($res ne "0E0") {
$logger->log(LOG_INFO,"[STATISTICS] Cleanup streamed stats %s",$res); $logger->log(LOG_INFO,"[STATISTICS] Cleanup streamed stats_basic %s, up to %s [%s]",$res,$cleanUpTo,
scalar(localtime($cleanUpTo)));
}
} else {
$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats_basic cleanup statement: %s",
$sthStatsBasicCleanup->errstr());
}
# And the normal stats...
if ($res = $sthStatsCleanup->execute(0, $cleanUpTo)) {
# We get 0E0 for 0 when none were removed
if ($res ne "0E0") {
$logger->log(LOG_INFO,"[STATISTICS] Cleanup streamed stats %s, up to %s [%s]",$res,$cleanUpTo,
scalar(localtime($cleanUpTo)));
} }
} else { } else {
$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats cleanup statement: %s",$sthStatsCleanup->errstr()); $logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats cleanup statement: %s",$sthStatsCleanup->errstr());
...@@ -416,11 +520,26 @@ sub session_tick ...@@ -416,11 +520,26 @@ sub session_tick
# Loop and remove retained stats # Loop and remove retained stats
foreach my $key (keys %{$statsConfig}) { foreach my $key (keys %{$statsConfig}) {
# Work out timestamp to clean up to by multiplying the retention period by days
$cleanUpTo = $now - ($statsConfig->{$key}->{'retention'} * 86400);
# Retention period is in # days # Retention period is in # days
if ($res = $sthStatsCleanup->execute($key, $now - ($statsConfig->{$key}->{'retention'} * 86400))) { if ($res = $sthStatsBasicCleanup->execute($key, $cleanUpTo)) {
# We get 0E0 for 0 when none were removed
if ($res ne "0E0") {
$logger->log(LOG_INFO,"[STATISTICS] Cleanup key %s stats_basic %s, up to %s [%s]",$key,$res,$cleanUpTo,
scalar(localtime($cleanUpTo)));
}
} else {
$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats_basic cleanup statement for key %s: %s",$key,
$sthStatsBasicCleanup->errstr());
}
# And normal stats...
if ($res = $sthStatsCleanup->execute($key, $cleanUpTo)) {
# We get 0E0 for 0 when none were removed # We get 0E0 for 0 when none were removed
if ($res ne "0E0") { if ($res ne "0E0") {
$logger->log(LOG_INFO,"[STATISTICS] Cleanup key %s stats %s",$key,$res); $logger->log(LOG_INFO,"[STATISTICS] Cleanup key %s stats %s, up to %s [%s]",$key,$res,$cleanUpTo,
scalar(localtime($cleanUpTo)));
} }
} else { } else {
$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats cleanup statement for key %s: %s",$key,$sthStatsCleanup->errstr()); $logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats cleanup statement for key %s: %s",$key,$sthStatsCleanup->errstr());
...@@ -435,6 +554,13 @@ sub session_tick ...@@ -435,6 +554,13 @@ sub session_tick
$logger->log(LOG_INFO,"[STATISTICS] Stats cleanup time: %s",sprintf('%.3fs',$timediff4)); $logger->log(LOG_INFO,"[STATISTICS] Stats cleanup time: %s",sprintf('%.3fs',$timediff4));
} }
# Check if we need to pull config manager stats
if ($now - $lastConfigManagerStats > STATISTICS_PERIOD) {
my $configManagerStats = _getConfigManagerStats();
_processStatistics($kernel,$configManagerStats);
$lastConfigManagerStats = $now;
}
# Set delay on config updates # Set delay on config updates
$kernel->delay(tick => TICK_PERIOD); $kernel->delay(tick => TICK_PERIOD);
} }
...@@ -455,33 +581,7 @@ sub do_update ...@@ -455,33 +581,7 @@ sub do_update
return; return;
} }
# Loop through stats data we got _processStatistics($kernel,$statsData);
while ((my $sid, my $stat) = each(%{$statsData})) {
$stat->{'identifierid'} = $sid;
$stat->{'key'} = 0;
push(@{$statsQueue},$stat);
# # Check if we have an event handler subscriber for this item
# if (defined($subscribers->{$statsItem}) && %{$subscribers->{$statsItem}}) {
# # If we do, loop with them
# foreach my $handler (keys %{$subscribers->{$statsItem}}) {
#
# # If no events are linked to this handler, continue
# if (!(keys %{$subscribers->{$statsItem}->{$handler}})) {
# next;
# }
#
# # Or ... If we have events, process them
# foreach my $event (keys %{$subscribers->{$statsItem}->{$handler}}) {
#
# $kernel->post($handler => $event => $statsItem => $stat);
# }
# }
# }
}
} }
...@@ -619,10 +719,97 @@ sub getTrafficDirection ...@@ -619,10 +719,97 @@ sub getTrafficDirection
} }
# Generate ConfigManager counters
sub getConfigManagerCounters
{
my @limits = getLimits();
my $classes = getTrafficClasses();
# Grab user count
my %counters;
$counters{"ConfigManager:TotalUsers"} = @limits;
# Start off with 0's
foreach my $cid (keys %{$classes}) {
$counters{"ConfigManager:ClassUsers:$cid"} = 0;
}
# Generate ClassID counts
foreach my $lid (@limits) {
my $limit = getLimit($lid);
my $cid = $limit->{'ClassID'};
# Bump the class counter
$counters{"ConfigManager:ClassUsers:$cid"}++;
}
return \%counters;
}
# #
# Internal Functions # Internal Functions
# #
# Function to process a bunch of statistics
sub _processStatistics
{
my ($kernel,$statsData) = @_;
# Loop through stats data we got
while ((my $sid, my $stat) = each(%{$statsData})) {
$stat->{'identifierid'} = $sid;
$stat->{'key'} = 0;
push(@{$statsQueue},$stat);
# # Check if we have an event handler subscriber for this item
# if (defined($subscribers->{$statsItem}) && %{$subscribers->{$statsItem}}) {
# # If we do, loop with them
# foreach my $handler (keys %{$subscribers->{$statsItem}}) {
#
# # If no events are linked to this handler, continue
# if (!(keys %{$subscribers->{$statsItem}->{$handler}})) {
# next;
# }
#
# # Or ... If we have events, process them
# foreach my $event (keys %{$subscribers->{$statsItem}->{$handler}}) {
#
# $kernel->post($handler => $event => $statsItem => $stat);
# }
# }
# }
}
}
# Generate ConfigManager stats
sub _getConfigManagerStats
{
my $counters = getConfigManagerCounters();
my $now = time();
my $statsData = { };
# Loop through counters and create stats items
foreach my $item (%{$counters}) {
my $identifierID = getSIDFromCounter($item);
my $stat = {
'identifierid' => $identifierID,
'timestamp' => $now,
'counter' => $counters->{$item}
};
$statsData->{$identifierID} = $stat;
}
return $statsData;
}
# Return a cached SID if its cached
sub _getCachedSIDFromIdentifier sub _getCachedSIDFromIdentifier
{ {
my $identifier = shift; my $identifier = shift;
...@@ -729,6 +916,43 @@ sub _getStatsBySID ...@@ -729,6 +916,43 @@ sub _getStatsBySID
} }
# Internal function to get basic stats by SID
sub _getStatsBasicBySID
{
my $sid = shift;
my $now = time();
# Prepare query
my $sth = $dbh->prepare('
SELECT
`Timestamp`, `Counter`
FROM
stats_basic
WHERE
`IdentifierID` = ?
AND `Key` = ?
AND `Timestamp` > ?
AND `Timestamp` < ?
ORDER BY
`Timestamp` DESC
LIMIT 100
');
# Grab last 60 mins of data
$sth->execute($sid,0,$now - 3600, $now);
my $statistics;
while (my $item = $sth->fetchrow_hashref()) {
$statistics->{$item->{'timestamp'}} = {
'counter' => $item->{'counter'},
}
}
return $statistics;
}
1; 1;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment