Skip to content
Snippets Groups Projects
Commit 6778ae6a authored by Nigel Kukard's avatar Nigel Kukard
Browse files

Reworked statistics plugin against new API

parent 46d67b77
No related branches found
No related tags found
No related merge requests found
# OpenTrafficShaper Traffic shaping statistics
# Copyright (C) 2007-2013, AllWorldIT
# Copyright (C) 2007-2014, AllWorldIT
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
......@@ -21,7 +21,6 @@ package opentrafficshaper::plugins::statistics;
use strict;
use warnings;
use DBI;
use POE;
......@@ -30,15 +29,15 @@ use opentrafficshaper::logger;
use opentrafficshaper::utils;
use opentrafficshaper::plugins::configmanager qw(
getLimit
getLimits
getLimitUsername
getPool
getPools
getLimitTxInterface
getLimitRxInterface
getPoolTxInterface
getPoolRxInterface
getPoolTrafficClassID
getTrafficClasses
getAllTrafficClasses
);
# NK: TODO: Maybe we want to remove timing at some stage? maybe not?
......@@ -65,7 +64,7 @@ our (@ISA,@EXPORT,@EXPORT_OK);
);
use constant {
VERSION => '0.1.1',
VERSION => '0.2.2',
# How often our config check ticks
TICK_PERIOD => 5,
......@@ -144,43 +143,37 @@ sub plugin_init
# Setup our environment
$logger = $globals->{'logger'};
$logger->log(LOG_NOTICE,"[STATISTICS] OpenTrafficShaper Statistics v".VERSION." - Copyright (c) 2013, AllWorldIT");
$logger->log(LOG_NOTICE,"[STATISTICS] OpenTrafficShaper Statistics v%s - Copyright (c) 2007-2014, AllWorldIT",VERSION);
# Check our interfaces
if (defined(my $dbdsn = $globals->{'file.config'}->{'plugin.statistics'}->{'db_dsn'})) {
$logger->log(LOG_INFO,"[STATISTICS] Set db_dsn to '$dbdsn'");
$logger->log(LOG_INFO,"[STATISTICS] Set db_dsn to '%s'",$dbdsn);
$config->{'db_dsn'} = $dbdsn;
} else {
$logger->log(LOG_WARN,"[STATISTICS] No db_dsn to specified in configuration file. Stats storage disabled!");
}
if (defined(my $dbuser = $globals->{'file.config'}->{'plugin.statistics'}->{'db_username'})) {
$logger->log(LOG_INFO,"[STATISTICS] Set db_username to '$dbuser'");
$logger->log(LOG_INFO,"[STATISTICS] Set db_username to '%s'",$dbuser);
$config->{'db_username'} = $dbuser;
}
if (defined(my $dbpass = $globals->{'file.config'}->{'plugin.statistics'}->{'db_password'})) {
$logger->log(LOG_INFO,"[STATISTICS] Set db_password to '$dbpass'");
$logger->log(LOG_INFO,"[STATISTICS] Set db_password to '%s'",$dbpass);
$config->{'db_password'} = $dbpass;
}
# This is our main stats session
POE::Session->create(
inline_states => {
_start => \&session_start,
_stop => \&session_stop,
tick => \&session_tick,
db_ready => \&do_db_ready,
db_notready => \&do_db_notready,
db_query_success => \&do_db_query_success,
db_query_failure => \&do_db_query_failure,
_start => \&_session_start,
_stop => \&_session_stop,
_tick => \&_session_tick,
# Stats update event
update => \&do_update,
update => \&_session_update,
# Subscription events
subscribe => \&do_subscribe,
unsubscribe => \&do_unsubscribe,
subscribe => \&_session_subscribe,
unsubscribe => \&_session_unsubscribe,
}
);
......@@ -191,7 +184,7 @@ sub plugin_init
$config->{'db_dsn'}, $config->{'db_username'}, $config->{'db_password'},
{
'AutoCommit' => 1,
'RaiseError' => 1,
'RaiseError' => 0,
'FetchHashKeyName' => 'NAME_lc'
}
);
......@@ -221,8 +214,9 @@ sub plugin_init
SELECT
`IdentifierID`, `Timestamp` - (`Timestamp` % ?) AS TimestampM,
`Direction`,
MAX(`CIR`) AS `CIR`, MAX(`Limit`) AS `Limit`, MAX(`Rate`) AS `Rate`, MAX(`PPS`) AS `PPS`, MAX(`Queue_Len`) AS `Queue_Len`,
AVG(`Total_Bytes`) AS `Total_Bytes`, AVG(`Total_Packets`) AS `Total_Packets`, AVG(`Total_Overlimits`) AS `Total_Overlimits`, AVG(`Total_Dropped`) AS `Total_Dropped`
MAX(`CIR`) AS `CIR`, MAX(`Limit`) AS `Limit`, MAX(`Rate`) AS `Rate`, MAX(`PPS`) AS `PPS`,
MAX(`Queue_Len`) AS `Queue_Len`, AVG(`Total_Bytes`) AS `Total_Bytes`, AVG(`Total_Packets`) AS `Total_Packets`,
AVG(`Total_Overlimits`) AS `Total_Overlimits`, AVG(`Total_Dropped`) AS `Total_Dropped`
FROM
stats
WHERE
......@@ -294,7 +288,7 @@ sub plugin_start
# Initialize this plugins main POE session
sub session_start
sub _session_start
{
my ($kernel,$heap) = @_[KERNEL, HEAP];
......@@ -303,14 +297,14 @@ sub session_start
$kernel->alias_set("statistics");
# Set delay on config updates
$kernel->delay(tick => TICK_PERIOD);
$kernel->delay('_tick' => TICK_PERIOD);
$logger->log(LOG_DEBUG,"[STATISTICS] Initialized");
}
# Stop session
sub session_stop
sub _session_stop
{
my ($kernel,$heap) = @_[KERNEL, HEAP];
......@@ -335,7 +329,7 @@ sub session_stop
# Time ticker for processing changes
sub session_tick
sub _session_tick
{
my ($kernel,$heap) = @_[KERNEL,HEAP];
......@@ -422,7 +416,11 @@ sub session_tick
# We only need stats if we did something, right?
if ($numFlush) {
my $timediff2 = tv_interval($timer1,$timer2);
$logger->log(LOG_INFO,"[STATISTICS] Total stats flush time %s/%s records: %s",$numFlush,$maxFlush,sprintf('%.3fs',$timediff2));
$logger->log(LOG_INFO,"[STATISTICS] Total stats flush time %s/%s records: %s",
$numFlush,
$maxFlush,
sprintf('%.3fs',$timediff2)
);
}
my $res;
......@@ -461,7 +459,9 @@ sub session_tick
}
# If there was an error, make sure we report it
} else {
$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats_basic consolidation statement: %s",$sthStatsBasicConsolidate->errstr());
$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats_basic consolidation statement: %s",
$sthStatsBasicConsolidate->errstr()
);
}
# And the normal stats...
$res = $sthStatsConsolidate->execute($precision,$prevKey,$consolidateUpTo);
......@@ -478,7 +478,9 @@ sub session_tick
}
# If there was an error, make sure we report it
} else {
$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats consolidation statement: %s",$sthStatsConsolidate->errstr());
$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats consolidation statement: %s",
$sthStatsConsolidate->errstr()
);
}
# Set last cleanup to now
......@@ -487,8 +489,14 @@ sub session_tick
my $timerB = [gettimeofday];
my $timediffB = tv_interval($timerA,$timerB);
$logger->log(LOG_INFO,"[STATISTICS] Stats consolidation time for key %s: %s (%s basic, %s normal), up to %s [%s]",$key,
sprintf('%.3fs',$timediffB),$numStatsBasicConsolidated,$numStatsConsolidated,$consolidateUpTo,scalar(localtime($consolidateUpTo)));
$logger->log(LOG_INFO,"[STATISTICS] Stats consolidation time for key %s: %s (%s basic, %s normal), up to %s [%s]",
$key,
sprintf('%.3fs',$timediffB),
$numStatsBasicConsolidated,
$numStatsConsolidated,
$consolidateUpTo,
scalar(localtime($consolidateUpTo))
);
}
# Setup another timer
......@@ -505,22 +513,30 @@ sub session_tick
if ($res = $sthStatsBasicCleanup->execute(0, $cleanUpTo)) {
# We get 0E0 for 0 when none were removed
if ($res ne "0E0") {
$logger->log(LOG_INFO,"[STATISTICS] Cleanup streamed stats_basic %s, up to %s [%s]",$res,$cleanUpTo,
scalar(localtime($cleanUpTo)));
$logger->log(LOG_INFO,"[STATISTICS] Cleanup streamed stats_basic %s, up to %s [%s]",
$res,
$cleanUpTo,
scalar(localtime($cleanUpTo))
);
}
} else {
$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats_basic cleanup statement: %s",
$sthStatsBasicCleanup->errstr());
$sthStatsBasicCleanup->errstr()
);
}
# And the normal stats...
if ($res = $sthStatsCleanup->execute(0, $cleanUpTo)) {
# We get 0E0 for 0 when none were removed
if ($res ne "0E0") {
$logger->log(LOG_INFO,"[STATISTICS] Cleanup streamed stats %s, up to %s [%s]",$res,$cleanUpTo,
scalar(localtime($cleanUpTo)));
$logger->log(LOG_INFO,"[STATISTICS] Cleanup streamed stats %s, up to %s [%s]",
$res,
$cleanUpTo,scalar(localtime($cleanUpTo))
);
}
} else {
$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats cleanup statement: %s",$sthStatsCleanup->errstr());
$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats cleanup statement: %s",
$sthStatsCleanup->errstr()
);
}
# Loop and remove retained stats
......@@ -532,22 +548,35 @@ sub session_tick
if ($res = $sthStatsBasicCleanup->execute($key, $cleanUpTo)) {
# We get 0E0 for 0 when none were removed
if ($res ne "0E0") {
$logger->log(LOG_INFO,"[STATISTICS] Cleanup key %s stats_basic %s, up to %s [%s]",$key,$res,$cleanUpTo,
scalar(localtime($cleanUpTo)));
$logger->log(LOG_INFO,"[STATISTICS] Cleanup key %s stats_basic %s, up to %s [%s]",
$key,
$res,
$cleanUpTo,
scalar(localtime($cleanUpTo))
);
}
} else {
$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats_basic cleanup statement for key %s: %s",$key,
$sthStatsBasicCleanup->errstr());
$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats_basic cleanup statement for key %s: %s",
$key,
$sthStatsBasicCleanup->errstr()
);
}
# And normal stats...
if ($res = $sthStatsCleanup->execute($key, $cleanUpTo)) {
# We get 0E0 for 0 when none were removed
if ($res ne "0E0") {
$logger->log(LOG_INFO,"[STATISTICS] Cleanup key %s stats %s, up to %s [%s]",$key,$res,$cleanUpTo,
scalar(localtime($cleanUpTo)));
$logger->log(LOG_INFO,"[STATISTICS] Cleanup key %s stats %s, up to %s [%s]",
$key,
$res,
$cleanUpTo,
scalar(localtime($cleanUpTo))
);
}
} else {
$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats cleanup statement for key %s: %s",$key,$sthStatsCleanup->errstr());
$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats cleanup statement for key %s: %s",
$key,
$sthStatsCleanup->errstr()
);
}
}
......@@ -556,7 +585,9 @@ sub session_tick
my $timer4 = [gettimeofday];
my $timediff4 = tv_interval($timer3,$timer4);
$logger->log(LOG_INFO,"[STATISTICS] Stats cleanup time: %s",sprintf('%.3fs',$timediff4));
$logger->log(LOG_INFO,"[STATISTICS] Stats cleanup time: %s",
sprintf('%.3fs',$timediff4)
);
}
# Check if we need to pull config manager stats
......@@ -567,7 +598,7 @@ sub session_tick
}
# Set delay on config updates
$kernel->delay(tick => TICK_PERIOD);
$kernel->delay('_tick' => TICK_PERIOD);
}
......@@ -576,7 +607,7 @@ sub session_tick
# main:$iface:all - Interface total stats
# main:$iface:classes - Interface classified traffic
# main:$iface:besteffort - Interface best effort traffic
sub do_update
sub _session_update
{
my ($kernel, $statsData) = @_[KERNEL, ARG0];
......@@ -591,24 +622,24 @@ sub do_update
# Handle subscriptions to updates
sub do_subscribe
sub _session_subscribe
{
my ($kernel, $handler, $handlerEvent, $item) = @_[KERNEL, ARG0, ARG1, ARG2];
$logger->log(LOG_INFO,"[STATISTICS] Got subscription request from '$handler' for '$item' via event '$handlerEvent'");
$logger->log(LOG_INFO,"[STATISTICS] Got subscription request from '%s' for '%s' via event '%s'",$handler,$item,$handlerEvent);
$subscribers->{$item}->{$handler}->{$handlerEvent} = $item;
}
# Handle unsubscribes
sub do_unsubscribe
sub _session_unsubscribe
{
my ($kernel, $handler, $handlerEvent, $item) = @_[KERNEL, ARG0, ARG1, ARG2];
$logger->log(LOG_INFO,"[STATISTICS] Got unsubscription request for '$handler' regarding '$item'");
$logger->log(LOG_INFO,"[STATISTICS] Got unsubscription request for '%s' regarding '%s'",$handler,$item);
delete($subscribers->{$item}->{$handler}->{$handlerEvent});
}
......@@ -650,7 +681,7 @@ sub getStatsBySID
{
my $sid = shift;
return _getStatsBySID($sid);
return _getStatsBySID($sid);
}
......@@ -659,7 +690,7 @@ sub getStatsBasicBySID
{
my $sid = shift;
return _getStatsBasicBySID($sid);
return _getStatsBasicBySID($sid);
}
......@@ -669,7 +700,13 @@ sub getSIDFromCID
my ($iface,$cid) = @_;
my $identifier = "Class:$iface:$cid";
# Grab identifier based on class ID
my $identifier = _getIdentifierFromCID($iface,$cid);
if (!defined($identifier)) {
return undef;
}
# Return the SID fo the identifier
return _getSIDFromIdentifier($identifier);
}
......@@ -680,46 +717,58 @@ sub setSIDFromCID
my ($iface,$cid) = @_;
my $identifier = "Class:$iface:$cid";
my $sid = _getSIDFromIdentifier($identifier);
# See if we can get a SID from the CID
my $sid = getSIDFromCID($iface,$cid);
if (!defined($sid)) {
# If not, grab the identifier
my $identifier = _getIdentifierFromCID($iface,$cid);
if (!defined($identifier)) {
return undef;
}
# And setup a new SID
$sid = _setSIDFromIdentifier($identifier);
}
return $sid;
}
# Get the stats ID from a LID
sub getSIDFromLID
# Get the stats ID from a PID
sub getSIDFromPID
{
my $lid = shift;
my $pid = shift;
if (defined(my $username = getLimitUsername($lid))) {
my $identifier = "Username:$username";
return _getSIDFromIdentifier($identifier);
# Grab identifier from a PID
my $identifier = _getIdentifierFromPID($pid);
if (!defined($identifier)) {
return undef;
}
return undef;
# Return the SID for the PID
return _getSIDFromIdentifier($identifier);
}
# Set the stats ID from a LID
sub setSIDFromLID
# Set the stats ID from a PID
sub setSIDFromPID
{
my $lid = shift;
my $pid = shift;
if (defined(my $username = getLimitUsername($lid))) {
my $identifier = "Username:$username";
my $sid = _getSIDFromIdentifier($identifier);
if (!defined($sid)) {
$sid = _setSIDFromIdentifier($identifier);
# Try grab the SID for the PID
my $sid = getSIDFromPID($pid);
if (!defined($sid)) {
# If we can't, grab the identifier instead
my $identifier = _getIdentifierFromPID($pid);
if (!defined($identifier)) {
return undef;
}
return $sid;
# And setup the SID
$sid = _setSIDFromIdentifier($identifier);
}
return undef;
return $sid;
}
......@@ -729,7 +778,13 @@ sub getSIDFromCounter
my $counter = shift;
my $identifier = "Counter:$counter";
# Grab identifier from a counter
my $identifier = _getIdentifierFromCounter($counter);
if (!defined($identifier)) {
return undef;
}
# Return the SID for the counter
return _getSIDFromIdentifier($identifier);
}
......@@ -740,11 +795,18 @@ sub setSIDFromCounter
my $counter = shift;
my $identifier = "Counter:$counter";
my $sid = _getSIDFromIdentifier($identifier);
# Try grab the SID for the counter
my $sid = getSIDFromCounter($counter);
if (!defined($sid)) {
# If we can't, grab the identifier instead
my $identifier = _getIdentifierFromCounter($counter);
if (!defined($identifier)) {
return undef;
}
# And setup the SID
$sid = _setSIDFromIdentifier($identifier);
}
return $sid;
}
......@@ -752,12 +814,12 @@ sub setSIDFromCounter
# Return traffic direction
sub getTrafficDirection
{
my ($lid,$interface) = @_;
my ($pid,$interface) = @_;
# Grab the interfaces for this limit
my $txInterface = getLimitTxInterface($lid);
my $rxInterface = getLimitRxInterface($lid);
my $txInterface = getPoolTxInterface($pid);
my $rxInterface = getPoolRxInterface($pid);
# Check what it matches...
if ($interface eq $txInterface) {
......@@ -773,22 +835,22 @@ sub getTrafficDirection
# Generate ConfigManager counters
sub getConfigManagerCounters
{
my @limits = getLimits();
my $classes = getTrafficClasses();
my @poolList = getPools();
my $classes = getAllTrafficClasses();
# Grab user count
my %counters;
$counters{"ConfigManager:TotalLimits"} = @limits;
$counters{"ConfigManager:TotalPools"} = @poolList;
# Start off with 0's
# Zero the number of pools in each class to start off with
foreach my $cid (keys %{$classes}) {
$counters{"ConfigManager:ClassLimits:$cid"} = 0;
$counters{"ConfigManager:ClassPools:$cid"} = 0;
}
# Generate ClassID counts
foreach my $lid (@limits) {
my $limit = getLimit($lid);
my $cid = $limit->{'ClassID'};
# Pull in each pool and bump up the class counter
foreach my $pid (@poolList) {
my $cid = getPoolTrafficClassID($pid);
# Bump the class counter
$counters{"ConfigManager:ClassLimits:$cid"}++;
}
......@@ -846,7 +908,7 @@ sub _getConfigManagerStats
my $statsData = { };
# Loop through counters and create stats items
foreach my $item (%{$counters}) {
foreach my $item (keys %{$counters}) {
my $identifierID = setSIDFromCounter($item);
my $stat = {
'identifierid' => $identifierID,
......@@ -860,18 +922,47 @@ sub _getConfigManagerStats
}
# Function to get a SID identifier from a class ID
sub _getIdentifierFromCID
{
my ($iface,$cid) = @_;
return sprintf("Class:%s:%s",$iface,$cid);
}
# Function to get a SID identifier from a pool ID
sub _getIdentifierFromPID
{
my $pid = shift;
my $pool = getPool($pid);
if (!defined($pool)) {
return undef;
}
return sprintf("Pool:%s",$pool->{'Identifier'});
}
# Function to get a SID identifier from a counter
sub _getIdentifierFromCounter
{
my $counter = shift;
return sprintf("Counter:%s",$counter);
}
# Return a cached SID if its cached
sub _getCachedSIDFromIdentifier
{
my $identifier = shift;
# If we don't have a user mapped
if (defined(my $sid = $statsDBIdentifierMap->{$identifier})) {
return $sid;
}
return undef;
return $statsDBIdentifierMap->{$identifier};
}
......@@ -894,8 +985,7 @@ sub _getSIDFromIdentifier
return $statsDBIdentifierMap->{$identifier} = $row->{'id'};
}
} else {
# FIXME
warn "FAILED TO EXECUTE GETUSER: ".$identifierGetSTH->errstr;
$logger->log(LOG_ERR,"[STATISTICS] Failed to get SID from identifier '%s': %s",$identifier,$identifierGetSTH->errstr);
}
return undef;
......@@ -913,8 +1003,7 @@ sub _setSIDFromIdentifier
if (my $res = $identifierAddSTH->execute($identifier)) {
return $statsDBIdentifierMap->{$identifier} = $dbh->last_insert_id("","","","");
} else {
# FIXME
warn "DB ADD IDENTIFIER ERROR: ".$identifierAddSTH->errstr;
$logger->log(LOG_ERR,"[STATISTICS] Failed to get SID from identifier '%s': %s",$identifier,$identifierAddSTH->errstr);
}
return undef;
......@@ -963,8 +1052,8 @@ sub _getStatsBySID
$direction = 'tx';
} elsif ($item->{'direction'} eq STATISTICS_DIR_RX) {
$direction = 'rx';
# FIXME
} else {
$logger->log(LOG_ERR,"[STATISTICS] Unknown direction when getting stats '%s'",$direction);
next;
}
......@@ -1019,6 +1108,5 @@ sub _getStatsBasicBySID
1;
# vim: ts=4
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment