Skip to content
Snippets Groups Projects
Commit 40ff47be authored by Nigel Kukard's avatar Nigel Kukard
Browse files

Added a few functions & made streaming tc possible

parent 7cbf12db
No related branches found
No related tags found
No related merge requests found
......@@ -41,6 +41,9 @@ our (@ISA,@EXPORT,@EXPORT_OK);
@EXPORT = qw(
);
@EXPORT_OK = qw(
getInterfaces
getConfigTxIface
getConfigRxIface
);
use constant {
......@@ -161,6 +164,7 @@ sub plugin_init
task_child_stdout => \&task_child_stdout,
task_child_stderr => \&task_child_stderr,
task_child_close => \&task_child_close,
task_child_stdin => \&task_child_stdin,
task_run_next => \&task_run_next,
}
);
......@@ -668,6 +672,7 @@ sub getTcClass
return $id;
}
# Function to dispose of a TC class
sub disposeTcClass
{
......@@ -678,6 +683,7 @@ sub disposeTcClass
# Blank the value
$tcClasses->{'track'}->{$id} = undef;
}
# Grab user from TC class
sub getUIDFromTcClass
{
......@@ -685,11 +691,19 @@ sub getUIDFromTcClass
return $tcClasses->{'track'}->{$id};
}
# Get interfaces we manage
sub getInterfaces
{
return ($config->{'txiface'},$config->{'rxiface'});
}
# Get TX iface
sub getConfigTxIface
{
return $config->{'txiface'};
}
# Get RX iface
sub getConfigRxIface
{
......@@ -1216,9 +1230,36 @@ sub _task_add_to_queue
my $cmdStr = join(' ',@{$cmd});
# Shove task on list
$logger->log(LOG_DEBUG,"[TC] TASK: Queue '$cmdStr'");
push(@taskQueue,$cmd);
}
# Send the next command in the task direction
sub _task_put_next
{
my ($heap,$task) = @_;
# Task was busy, this signifies its done, so lets take the next command
if (my $cmd = shift(@taskQueue)) {
# Remove off idle task list if its there
delete($heap->{'idle_tasks'}->{$task->ID});
# Chop off /sbin/tc
shift(@{$cmd});
# Build commandline string
my $cmdStr = join(' ',@{$cmd});
$task->put($cmdStr);
$logger->log(LOG_DEBUG,"[TC] TASK/".$task->ID.": Starting '$cmdStr' as ".$task->ID." with PID ".$task->PID);
# If there is no commands in the queue, set it to idle
} else {
# Set task to idle
$heap->{'idle_tasks'}->{$task->ID} = $task;
}
}
# Run a task
sub task_add
......@@ -1230,44 +1271,59 @@ sub task_add
_task_add_to_queue($cmd);
# Trigger a run if list is empty
if (@taskQueue < 2) {
#if (@taskQueue < 2) {
if (@taskQueue) {
$kernel->yield("task_run_next");
}
}
# Fire up the session starter
# Run next task
sub task_run_next
{
my ($kernel,$heap) = @_[KERNEL,HEAP];
# If we already have children processing tasks, don't create another
if (keys %{$heap->{'task_by_wid'}}) {
# Loop with idle tasks ... return if we found one
foreach my $task_id (keys %{$heap->{'idle_tasks'}}) {
_task_put_next($heap,$heap->{'idle_tasks'}->{$task_id});
# XXX: Limit concurrency to 1
last;
}
# XXX: Limit concurrency to 1
return;
}
# Check if we have a task coming off the top of the task queue
if (my $cmd = shift(@taskQueue)) {
if (@taskQueue) {
# Create task
my $task = POE::Wheel::Run->new(
Program => $cmd,
# We get full lines back
Program => [ '/sbin/tc', '-batch' ],
StdioFilter => POE::Filter::Line->new(),
StderrFilter => POE::Filter::Line->new(),
StdoutEvent => 'task_child_stdout',
StderrEvent => 'task_child_stderr',
StdoutEvent => 'task_child_stdout',
StderrEvent => 'task_child_stderr',
CloseEvent => 'task_child_close',
StdinEvent => 'task_child_stdin',
ErrorEvent => 'task_child_error',
) or $logger->log(LOG_ERR,"[TC] TASK: Unable to start task");
# Set task ID
my $task_id = $task->ID;
# Intercept SIGCHLD
$kernel->sig_child($task->PID, "sig_child");
$kernel->sig_child($task_id, "sig_child");
# Wheel events include the wheel's ID.
$heap->{task_by_wid}->{$task->ID} = $task;
$heap->{'task_by_wid'}->{$task_id} = $task;
# Signal events include the process ID.
$heap->{task_by_pid}->{$task->PID} = $task;
$heap->{'task_by_pid'}->{$task_id} = $task;
# Build commandline string
my $cmdStr = join(' ',@{$cmd});
$logger->log(LOG_DEBUG,"[TC] TASK/".$task->ID.": Starting '$cmdStr' as ".$task->ID." with PID ".$task->PID);
_task_put_next($heap,$task);
}
}
......@@ -1275,8 +1331,8 @@ sub task_run_next
# Child writes to STDOUT
sub task_child_stdout
{
my ($kernel,$heap,$stdout,$task_id) = @_[KERNEL,HEAP,ARG0,ARG1];
my $child = $heap->{task_by_wid}->{$task_id};
my ($kernel,$heap,$stdout,$task_id) = @_[KERNEL,HEAP,ARG0,ARG1];
my $task = $heap->{'task_by_wid'}->{$task_id};
$logger->log(LOG_INFO,"[TC] TASK/$task_id: STDOUT => ".$stdout);
}
......@@ -1285,30 +1341,54 @@ sub task_child_stdout
# Child writes to STDERR
sub task_child_stderr
{
my ($kernel,$heap,$stdout,$task_id) = @_[KERNEL,HEAP,ARG0,ARG1];
my $child = $heap->{task_by_wid}->{$task_id};
my ($kernel,$heap,$stdout,$task_id) = @_[KERNEL,HEAP,ARG0,ARG1];
my $task = $heap->{'task_by_wid'}->{$task_id};
$logger->log(LOG_WARN,"[TC] TASK/$task_id: STDERR => ".$stdout);
}
# Child flushed to STDIN
sub task_child_stdin
{
my ($kernel,$heap,$task_id) = @_[KERNEL,HEAP,ARG0];
my $task = $heap->{'task_by_wid'}->{$task_id};
$logger->log(LOG_DEBUG,"[TC] TASK/$task_id is READY");
# And shove another queued command its direction
_task_put_next($heap,$task);
}
# Child encountered an error
sub task_child_error
{
my ($operation, $errnum, $errstr, $task_id) = @_[ARG0..ARG3];
$errstr = "remote end closed" if $operation eq "read" and !$errnum;
$logger->log(LOG_ERR,"[TC] Task $task_id generated $operation error $errnum: $errstr");
}
# Child closed its handles, it won't communicate with us, so remove it
sub task_child_close
{
my ($kernel,$heap,$task_id) = @_[KERNEL,HEAP,ARG0];
my $child = delete($heap->{task_by_wid}->{$task_id});
my ($kernel,$heap,$task_id) = @_[KERNEL,HEAP,ARG0];
my $task = delete($heap->{'task_by_wid'}->{$task_id});
# May have been reaped by task_sigchld()
if (!defined($child)) {
# May have been reaped by task_sigchld()
if (!defined($task)) {
$logger->log(LOG_DEBUG,"[TC] TASK/$task_id: Closed dead child");
return;
}
}
$logger->log(LOG_DEBUG,"[TC] TASK/$task_id: Closed PID ".$task->PID);
$logger->log(LOG_DEBUG,"[TC] TASK/$task_id: Closed PID ".$child->PID);
delete($heap->{task_by_pid}->{$child->PID});
# Remove other references
delete($heap->{'task_by_pid'}->{$task->PID});
delete($heap->{'idle_tasks'}->{$task->PID});
# Start next one, if there is a next one
if (@taskQueue > 0) {
if (@taskQueue) {
$kernel->yield("task_run_next");
}
}
......@@ -1318,15 +1398,17 @@ sub task_child_close
sub task_sigchld
{
my ($kernel,$heap,$pid,$status) = @_[KERNEL,HEAP,ARG1,ARG2];
my $child = delete($heap->{task_by_pid}->{$pid});
my $task = delete($heap->{'task_by_pid'}->{$pid});
$logger->log(LOG_DEBUG,"[TC] TASK: Task with PID $pid exited with status $status");
# May have been reaped by task_child_close()
return if (!defined($child));
# May have been reaped by task_child_close()
return if (!defined($task));
delete($heap->{task_by_wid}{$child->ID});
# Remove other references
delete($heap->{'task_by_wid'}{$task->ID});
delete($heap->{'idle_tasks'}->{$task->PID});
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment