From e88b30c7705b3625a41982a5a299a6c789a9ad9b Mon Sep 17 00:00:00 2001
From: Nigel Kukard <nkukard@lbsd.net>
Date: Tue, 26 Nov 2013 21:35:37 +0000
Subject: [PATCH] Added basic stats support

---
 .../plugins/statistics/statistics.pm          | 338 +++++++++++++++---
 1 file changed, 281 insertions(+), 57 deletions(-)

diff --git a/opentrafficshaper/plugins/statistics/statistics.pm b/opentrafficshaper/plugins/statistics/statistics.pm
index d4688cd..c6e9752 100644
--- a/opentrafficshaper/plugins/statistics/statistics.pm
+++ b/opentrafficshaper/plugins/statistics/statistics.pm
@@ -51,13 +51,16 @@ our (@ISA,@EXPORT,@EXPORT_OK);
 );
 @EXPORT_OK = qw(
 	getLastStats
+
 	getStatsByClass
+	getStatsByCounter
+
 	getSIDFromCID
 	getSIDFromLID
 );
 
 use constant {
-	VERSION => '0.0.1',
+	VERSION => '0.1.1',
 	# How often our config check ticks
 	TICK_PERIOD => 5,
 
@@ -123,6 +126,8 @@ my $subscribers;
 my $statsPreparedStatements = { };
 # Last cleanup time
 my $lastCleanup = { };
+# Last config manager stats pull
+my $lastConfigManagerStats = 0;
 
 
 # Initialize plugin
@@ -228,8 +233,26 @@ sub plugin_init
 			$dbh->disconnect();
 			$dbh = undef;
 		}
+		if ($dbh && (my $res = $dbh->prepare('
+			SELECT
+				`IdentifierID`, `Timestamp` - (`Timestamp` % ?) AS TimestampM,
+				MAX(`Counter`) AS `Counter`
+			FROM
+				stats_basic
+			WHERE
+				`Key` = ?
+				AND `Timestamp` < ?
+			GROUP BY
+				`IdentifierID`, `TimestampM`
+		'))) {
+			$statsPreparedStatements->{'stats_basic_consolidate'} = $res;
+		} else {
+			$logger->log(LOG_ERR,"[STATISTICS] Failed to prepare statement 'stats_basic_consolidate': %s",$DBI::errstr);
+			$dbh->disconnect();
+			$dbh = undef;
+		}
 
-		# Prepare stats cleanup statement
+		# Prepare stats cleanup statements
 		if ($dbh && (my $res = $dbh->prepare('DELETE FROM stats WHERE `Key` = ? AND `Timestamp` < ?'))) {
 			$statsPreparedStatements->{'stats_cleanup'} = $res;
 		} else {
@@ -237,13 +260,20 @@ sub plugin_init
 			$dbh->disconnect();
 			$dbh = undef;
 		}
+		if ($dbh && (my $res = $dbh->prepare('DELETE FROM stats_basic WHERE `Key` = ? AND `Timestamp` < ?'))) {
+			$statsPreparedStatements->{'stats_basic_cleanup'} = $res;
+		} else {
+			$logger->log(LOG_ERR,"[STATISTICS] Failed to prepare statement 'stats_basic_cleanup': %s",$DBI::errstr);
+			$dbh->disconnect();
+			$dbh = undef;
+		}
 
 		# Set last cleanup to now
-		if ($dbh) {
-			foreach my $key (keys %{$statsConfig}) {
-				$lastCleanup->{$key} = time();
-			}
+		my $now = time();
+		foreach my $key (keys %{$statsConfig}) {
+			$lastCleanup->{$key} = $now;
 		}
+		$lastConfigManagerStats = $now;
 	}
 
 	return 1;
@@ -287,9 +317,11 @@ sub session_stop
 	$globals = undef;
 	$dbh = undef;
 	$statsDBIdentifierMap = { };
+	$statsQueue = [ ];
 	$subscribers = undef;
 	$statsPreparedStatements = { };
-	$lastCleanup = undef;
+	$lastCleanup = { };
+	$lastConfigManagerStats = 0;
 
 	$logger->log(LOG_DEBUG,"[STATISTICS] Shutdown");
 
@@ -308,32 +340,61 @@ sub session_tick
 		return;
 	}
 
+	my $now = time();
+	my $timer1 = [gettimeofday];
+
 	# Pull in statements
 	my $sthStatsConsolidate = $statsPreparedStatements->{'stats_consolidate'};
 	my $sthStatsCleanup = $statsPreparedStatements->{'stats_cleanup'};
-
-	my $now = time();
-	my $timer1 = [gettimeofday];
+	my $sthStatsBasicConsolidate = $statsPreparedStatements->{'stats_basic_consolidate'};
+	my $sthStatsBasicCleanup = $statsPreparedStatements->{'stats_basic_cleanup'};
 
 	# Even out flushing over 10s to absorb spikes
 	my $maxFlush = int(@{$statsQueue} / 10) + 100;
 	my $numFlush = 0;
 
 	# Loop and build the data to create our multi-insert
-	my @insertHolders;
-	my @insertData;
+	my (@insertHolders,@insertBasicHolders);
+	my (@insertData,@insertBasicData);
 	while (defined(my $stat = shift(@{$statsQueue})) && $numFlush < $maxFlush) {
-		push(@insertData,
-			$stat->{'identifierid'}, $stat->{'key'}, $stat->{'timestamp'},
-			$stat->{'direction'},
-			$stat->{'cir'}, $stat->{'limit'}, $stat->{'rate'}, $stat->{'pps'}, $stat->{'queue_len'},
-			$stat->{'total_bytes'}, $stat->{'total_packets'}, $stat->{'total_overlimits'}, $stat->{'total_dropped'}
-		);
-		push(@insertHolders,"(?,?,?,?,?,?,?,?,?,?,?,?,?)");
+		# This is a basic counter
+		if (defined($stat->{'counter'})) {
+			push(@insertBasicHolders,"(?,?,?,?)");
+			push(@insertBasicData,
+				$stat->{'identifierid'}, $stat->{'key'}, $stat->{'timestamp'},
+				$stat->{'counter'}
+			);
+		# Full stats counter
+		} else {
+			push(@insertHolders,"(?,?,?,?,?,?,?,?,?,?,?,?,?)");
+			push(@insertData,
+				$stat->{'identifierid'}, $stat->{'key'}, $stat->{'timestamp'},
+				$stat->{'direction'},
+				$stat->{'cir'}, $stat->{'limit'}, $stat->{'rate'}, $stat->{'pps'}, $stat->{'queue_len'},
+				$stat->{'total_bytes'}, $stat->{'total_packets'}, $stat->{'total_overlimits'}, $stat->{'total_dropped'}
+			);
+		}
+
 		$numFlush++;
 	}
 
 	# If we got things to insert, do it
+	if (@insertBasicHolders > 0) {
+		my $res = $dbh->do('
+			INSERT DELAYED INTO stats_basic
+				(
+					`IdentifierID`, `Key`, `Timestamp`,
+					`Counter`
+				)
+			VALUES
+				'.join(',',@insertBasicHolders),undef,@insertBasicData
+		);
+		# Check for error
+		if (!defined($res)) {
+			$logger->log(LOG_ERR,"[STATISTICS] Failed to execute delayed stats_basic insert: %s",$DBI::errstr);
+		}
+	}
+	# And normal stats...
 	if (@insertHolders > 0) {
 		my $res = $dbh->do('
 			INSERT DELAYED INTO stats
@@ -356,29 +417,49 @@ sub session_tick
 	# We only need stats if we did something, right?
 	if ($numFlush) {
 		my $timediff2 = tv_interval($timer1,$timer2);
-		$logger->log(LOG_INFO,"[STATISTICS] Stats flush time %s/%s records: %s",$numFlush,$maxFlush,sprintf('%.3fs',$timediff2));
+		$logger->log(LOG_INFO,"[STATISTICS] Total stats flush time %s/%s records: %s",$numFlush,$maxFlush,sprintf('%.3fs',$timediff2));
 	}
 
 	my $res;
 
 	# Loop with our stats consolidation configuration
-	foreach my $key (keys %{$statsConfig}) {
+	foreach my $key (sort keys %{$statsConfig}) {
 		my $timerA = [gettimeofday];
 
 		my $precision = $statsConfig->{$key}->{'precision'};
 		my $thisPeriod = _getAlignedTime($now,$precision);
 		my $lastPeriod = $thisPeriod - $precision;
 		my $prevKey = $key - 1;
-
 		# If we havn't exited the last period, then skip
 		if ($lastCleanup->{$key} > $lastPeriod) {
 			next;
 		}
-		# Set last cleanup to now
-		$lastCleanup->{$key} = $now;
+
+		# Stats
+		my $numStatsBasicConsolidated = 0;
+		my $numStatsConsolidated = 0;
+
+		my $consolidateUpTo = $lastPeriod - $precision;
 
 		# Execute and pull in consolidated stats
-		$res = $sthStatsConsolidate->execute($prevKey,$precision,$lastPeriod - $precision);
+		$res = $sthStatsBasicConsolidate->execute($precision,$prevKey,$consolidateUpTo);
+		if ($res) {
+			# Loop with items returned
+			while (my $item = $sthStatsBasicConsolidate->fetchrow_hashref()) {
+				$item->{'key'} = $key;
+				$item->{'timestamp'} = $item->{'timestampm'};
+
+				# Queue for insert
+				push(@{$statsQueue},$item);
+
+				$numStatsBasicConsolidated++;
+			}
+		# If there was an error, make sure we report it
+		} else {
+			$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats_basic consolidation statement: %s",$sthStatsBasicConsolidate->errstr());
+		}
+		# And the normal stats...
+		$res = $sthStatsConsolidate->execute($precision,$prevKey,$consolidateUpTo);
 		if ($res) {
 			# Loop with items returned
 			while (my $item = $sthStatsConsolidate->fetchrow_hashref()) {
@@ -387,28 +468,51 @@ sub session_tick
 
 				# Queue for insert
 				push(@{$statsQueue},$item);
+
+				$numStatsConsolidated++;
 			}
 		# If there was an error, make sure we report it
 		} else {
 			$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats consolidation statement: %s",$sthStatsConsolidate->errstr());
 		}
 
+		# Set last cleanup to now
+		$lastCleanup->{$key} = $now;
+
 		my $timerB = [gettimeofday];
 		my $timediffB = tv_interval($timerA,$timerB);
-		$logger->log(LOG_INFO,"[STATISTICS] Stats consolidation time for key %s: %s",$key,sprintf('%.3fs',$timediffB));
+
+		$logger->log(LOG_INFO,"[STATISTICS] Stats consolidation time for key %s: %s (%s basic, %s normal), up to %s [%s]",$key,
+				sprintf('%.3fs',$timediffB),$numStatsBasicConsolidated,$numStatsConsolidated,$consolidateUpTo,scalar(localtime($consolidateUpTo)));
 	}
 
 	# Setup another timer
 	my $timer3 = [gettimeofday];
 
 	# We only need to run as often as the first precision
+	# - If cleanup has not yet run?
+	# - or if the 0 cleanup plus precision of the first key is in the past (data is now stale?)
 	if (!defined($lastCleanup->{'0'}) || $lastCleanup->{'0'} + $statsConfig->{1}->{'precision'} < $now) {
+		# We're going to clean up for the first stats precision * 3, which should be enough
+		my $cleanUpTo = $now - ($statsConfig->{1}->{'precision'} * 3);
 
 		# Streamed stats is removed 3 time periods past the first precision
-		if ($res = $sthStatsCleanup->execute(0, $now - ($statsConfig->{1}->{'precision'} * 3))) {
+		if ($res = $sthStatsBasicCleanup->execute(0, $cleanUpTo)) {
 			# We get 0E0 for 0 when none were removed
 			if ($res ne "0E0") {
-				$logger->log(LOG_INFO,"[STATISTICS] Cleanup streamed stats %s",$res);
+				$logger->log(LOG_INFO,"[STATISTICS] Cleanup streamed stats_basic %s, up to %s [%s]",$res,$cleanUpTo,
+						scalar(localtime($cleanUpTo)));
+			}
+		} else {
+			$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats_basic cleanup statement: %s",
+					$sthStatsBasicCleanup->errstr());
+		}
+		# And the normal stats...
+		if ($res = $sthStatsCleanup->execute(0, $cleanUpTo)) {
+			# We get 0E0 for 0 when none were removed
+			if ($res ne "0E0") {
+				$logger->log(LOG_INFO,"[STATISTICS] Cleanup streamed stats %s, up to %s [%s]",$res,$cleanUpTo,
+						scalar(localtime($cleanUpTo)));
 			}
 		} else {
 			$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats cleanup statement: %s",$sthStatsCleanup->errstr());
@@ -416,11 +520,26 @@ sub session_tick
 
 		# Loop and remove retained stats
 		foreach my $key (keys %{$statsConfig}) {
+			# Work out timestamp to clean up to by multiplying the retention period by days
+			$cleanUpTo = $now - ($statsConfig->{$key}->{'retention'} * 86400);
+
 			# Retention period is in # days
-			if ($res = $sthStatsCleanup->execute($key, $now - ($statsConfig->{$key}->{'retention'} * 86400))) {
+			if ($res = $sthStatsBasicCleanup->execute($key, $cleanUpTo)) {
+				# We get 0E0 for 0 when none were removed
+				if ($res ne "0E0") {
+					$logger->log(LOG_INFO,"[STATISTICS] Cleanup key %s stats_basic %s, up to %s [%s]",$key,$res,$cleanUpTo,
+							scalar(localtime($cleanUpTo)));
+				}
+			} else {
+				$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats_basic cleanup statement for key %s: %s",$key,
+						$sthStatsBasicCleanup->errstr());
+			}
+			# And normal stats...
+			if ($res = $sthStatsCleanup->execute($key, $cleanUpTo)) {
 				# We get 0E0 for 0 when none were removed
 				if ($res ne "0E0") {
-					$logger->log(LOG_INFO,"[STATISTICS] Cleanup key %s stats %s",$key,$res);
+					$logger->log(LOG_INFO,"[STATISTICS] Cleanup key %s stats %s, up to %s [%s]",$key,$res,$cleanUpTo,
+							scalar(localtime($cleanUpTo)));
 				}
 			} else {
 				$logger->log(LOG_ERR,"[STATISTICS] Failed to execute stats cleanup statement for key %s: %s",$key,$sthStatsCleanup->errstr());
@@ -435,6 +554,13 @@ sub session_tick
 		$logger->log(LOG_INFO,"[STATISTICS] Stats cleanup time: %s",sprintf('%.3fs',$timediff4));
 	}
 
+	# Check if we need to pull config manager stats
+	if ($now - $lastConfigManagerStats > STATISTICS_PERIOD) {
+		my $configManagerStats = _getConfigManagerStats();
+		_processStatistics($kernel,$configManagerStats);
+		$lastConfigManagerStats = $now;
+	}
+
 	# Set delay on config updates
 	$kernel->delay(tick => TICK_PERIOD);
 }
@@ -455,33 +581,7 @@ sub do_update
 		return;
 	}
 
-	# Loop through stats data we got
-	while ((my $sid, my $stat) = each(%{$statsData})) {
-
-		$stat->{'identifierid'} = $sid;
-		$stat->{'key'} = 0;
-
-		push(@{$statsQueue},$stat);
-
-#		# Check if we have an event handler subscriber for this item
-#		if (defined($subscribers->{$statsItem}) && %{$subscribers->{$statsItem}}) {
-#			# If we do, loop with them
-#			foreach my $handler (keys %{$subscribers->{$statsItem}}) {
-#
-#				# If no events are linked to this handler, continue
-#				if (!(keys %{$subscribers->{$statsItem}->{$handler}})) {
-#					next;
-#				}
-#
-#				# Or ... If we have events, process them
-#				foreach my $event (keys %{$subscribers->{$statsItem}->{$handler}}) {
-#
-#					$kernel->post($handler => $event => $statsItem => $stat);
-#				}
-#			}
-#		}
-	}
-
+	_processStatistics($kernel,$statsData);
 }
 
 
@@ -619,10 +719,97 @@ sub getTrafficDirection
 }
 
 
+# Generate ConfigManager counters
+sub getConfigManagerCounters
+{
+	my @limits = getLimits();
+	my $classes = getTrafficClasses();
+
+	# Grab user count
+	my %counters;
+
+	$counters{"ConfigManager:TotalUsers"} = @limits;
+
+	# Start off with 0's
+	foreach my $cid (keys %{$classes}) {
+		$counters{"ConfigManager:ClassUsers:$cid"} = 0;
+	}
+	# Generate ClassID counts
+	foreach my $lid (@limits) {
+		my $limit = getLimit($lid);
+		my $cid = $limit->{'ClassID'};
+		# Bump the class counter
+		$counters{"ConfigManager:ClassUsers:$cid"}++;
+	}
+
+	return \%counters;
+}
+
+
 #
 # Internal Functions
 #
 
+# Function to process a bunch of statistics
+sub _processStatistics
+{
+	my ($kernel,$statsData) = @_;
+
+
+	# Loop through stats data we got
+	while ((my $sid, my $stat) = each(%{$statsData})) {
+
+		$stat->{'identifierid'} = $sid;
+		$stat->{'key'} = 0;
+
+		push(@{$statsQueue},$stat);
+#		# Check if we have an event handler subscriber for this item
+#		if (defined($subscribers->{$statsItem}) && %{$subscribers->{$statsItem}}) {
+#			# If we do, loop with them
+#			foreach my $handler (keys %{$subscribers->{$statsItem}}) {
+#
+#				# If no events are linked to this handler, continue
+#				if (!(keys %{$subscribers->{$statsItem}->{$handler}})) {
+#					next;
+#				}
+#
+#				# Or ... If we have events, process them
+#				foreach my $event (keys %{$subscribers->{$statsItem}->{$handler}}) {
+#
+#					$kernel->post($handler => $event => $statsItem => $stat);
+#				}
+#			}
+#		}
+
+	}
+}
+
+
+# Generate ConfigManager stats
+sub _getConfigManagerStats
+{
+	my $counters = getConfigManagerCounters();
+
+
+	my $now = time();
+	my $statsData = { };
+
+	# Loop through counters and create stats items
+	foreach my $item (%{$counters}) {
+		my $identifierID = getSIDFromCounter($item);
+		my $stat = {
+			'identifierid' => $identifierID,
+			'timestamp' => $now,
+			'counter' => $counters->{$item}
+		};
+		$statsData->{$identifierID} = $stat;
+	}
+
+	return $statsData;
+}
+
+
+# Return a cached SID if its cached
 sub _getCachedSIDFromIdentifier
 {
 	my $identifier = shift;
@@ -729,6 +916,43 @@ sub _getStatsBySID
 }
 
 
+# Internal function to get basic stats by SID
+sub _getStatsBasicBySID
+{
+	my $sid = shift;
+
+
+	my $now = time();
+
+	# Prepare query
+	my $sth = $dbh->prepare('
+		SELECT
+			`Timestamp`, `Counter`
+		FROM
+			stats_basic
+		WHERE
+			`IdentifierID` = ?
+			AND `Key` = ?
+			AND `Timestamp` > ?
+			AND `Timestamp` < ?
+		ORDER BY
+			`Timestamp` DESC
+		LIMIT 100
+	');
+	# Grab last 60 mins of data
+	$sth->execute($sid,0,$now - 3600, $now);
+
+	my $statistics;
+	while (my $item = $sth->fetchrow_hashref()) {
+		$statistics->{$item->{'timestamp'}} = {
+			'counter' => $item->{'counter'},
+		}
+	}
+
+	return $statistics;
+}
+
+
 
 
 1;
-- 
GitLab