From 90257a9cba1397753e3298d0bd9a1aec4cac106c Mon Sep 17 00:00:00 2001 From: Nigel Kukard <nkukard@lbsd.net> Date: Sun, 23 Jun 2013 14:35:32 +0000 Subject: [PATCH] Added tc module task scheduler --- opentrafficshaper/plugins/tc/tc.pm | 186 ++++++++++++++++++++++++++++- 1 file changed, 181 insertions(+), 5 deletions(-) diff --git a/opentrafficshaper/plugins/tc/tc.pm b/opentrafficshaper/plugins/tc/tc.pm index 5daec00..d67ab70 100644 --- a/opentrafficshaper/plugins/tc/tc.pm +++ b/opentrafficshaper/plugins/tc/tc.pm @@ -22,7 +22,7 @@ use strict; use warnings; -use POE; +use POE qw( Wheel::Run Filter::Line ); use opentrafficshaper::constants; use opentrafficshaper::logger; @@ -56,6 +56,16 @@ our $pluginInfo = { my $globals; my $logger; +my $classMaps = { + 1 => { + 1 => "Primary Interface", + }, +}; +my $classID = 10; + +my @taskQueue = (); + + # Initialize plugin sub init @@ -67,7 +77,7 @@ sub init $logger = $globals->{'logger'}; - # This is our configuration processing session + # This session is our main session, its alias is "shaper" POE::Session->create( inline_states => { _start => \&session_init, @@ -77,6 +87,20 @@ sub init } ); + # This is our session for communicating directly with tc, its alias is _tc + POE::Session->create( + inline_states => { + _start => \&tc_session_init, + # Public'ish + queue => \&tc_task_add, + # Internal + tc_child_stdout => \&tc_child_stdout, + tc_child_stderr => \&tc_child_stderr, + tc_child_close => \&tc_child_close, + tc_task_run_next => \&tc_task_run_next, + } + ); + $logger->log(LOG_NOTICE,"[TC] OpenTrafficShaper tc Integration v".VERSION." - Copyright (c) 2013, AllWorldIT") } @@ -94,15 +118,42 @@ sub session_init { # Add event for tc sub do_add { - my ($kernel, $uid) = @_[KERNEL, ARG0]; + my ($kernel,$heap,$uid) = @_[KERNEL, HEAP, ARG0]; # Pull in global my $users = $globals->{'users'}; my $user = $users->{$uid}; - $users->{$uid}->{'shaper_live'} = SHAPER_LIVE; + $users->{$uid}->{'shaper.live'} = SHAPER_LIVE; $logger->log(LOG_DEBUG," Add '$user->{'Username'}' [$uid]\n"); + +# tc class add dev eth0 parent 1:1 classid 1:aa htb rate 150kbps ceil 200kbps +# tc filter add dev eth0 parent 1:1 protocol ip prio 1 u32 \ +# match ip dst 10.254.254.235/32 flowid 1:aa + + $classID++; + my $classIDHex = sprintf('%x',$classID); + + $kernel->post("_tc" => "queue" => [ + '/sbin/tc','class','add', + 'dev','eth0', + 'parent','1:1', + 'classid',"1:$classIDHex", + 'htb', + 'rate','150kbps', + 'ceil','200kbps', + ]); + $kernel->post("_tc" => "queue" => [ + '/sbin/tc','filter','add', + 'dev','eth0', + 'parent','1:1', + 'protocol','ip', + 'prio','1', + 'u32', + 'match','ip','dst',$user->{'IP'}, + 'flowid',"1:$classIDHex", + ]); } # Change event for tc @@ -126,11 +177,136 @@ sub do_remove { my $users = $globals->{'users'}; my $user = $users->{$uid}; - $users->{$uid}->{'shaper_live'} = 0; + $users->{$uid}->{'shaper.live'} = SHAPER_NOTLIVE; $logger->log(LOG_DEBUG," Remove '$user->{'Username'}' [$uid]\n"); } + +# +# Task/child communication & handling stuff +# + +# Initialize our tc session +sub tc_session_init { + my $kernel = $_[KERNEL]; + # Set our alias + $kernel->alias_set("_tc"); +} + +# Run a task +sub tc_task_add +{ + my ($kernel,$heap,$cmd) = @_[KERNEL,HEAP,ARG0]; + + + # Build commandline string + my $cmdStr = join(' ',@{$cmd}); + # Shove task on list + $logger->log(LOG_DEBUG,"[TC] TASK: Queue '$cmdStr'"); + push(@taskQueue,$cmd); + + # Trigger a run if list is empty + if (@taskQueue < 2) { + $kernel->yield("tc_task_run_next"); + } +} + + +# Fire up the session starter +sub tc_task_run_next +{ + my ($kernel,$heap) = @_[KERNEL,HEAP]; + + + # Check if we have a task coming off the top of the task queue + if (my $cmd = shift(@taskQueue)) { + + # Create task + my $task = POE::Wheel::Run->new( + Program => $cmd, + # We get full lines back + StdioFilter => POE::Filter::Line->new(), + StderrFilter => POE::Filter::Line->new(), + StdoutEvent => 'tc_child_stdout', + StderrEvent => 'tc_child_stderr', + CloseEvent => 'tc_child_close', + ) or $logger->log(LOG_ERR,"[TC] TASK: Unable to start task"); + + + # Intercept SIGCHLD + $kernel->sig_child($task->PID, "sig_child"); + + # Wheel events include the wheel's ID. + $heap->{task_by_wid}->{$task->ID} = $task; + # Signal events include the process ID. + $heap->{task_by_pid}->{$task->PID} = $task; + + # Build commandline string + my $cmdStr = join(' ',@{$cmd}); + $logger->log(LOG_DEBUG,"[TC] TASK/".$task->ID.": Starting '$cmdStr' as ".$task->ID." with PID ".$task->PID); + } +} + + +# Child writes to STDOUT +sub tc_child_stdout +{ + my ($kernel,$heap,$stdout,$task_id) = @_[KERNEL,HEAP,ARG0,ARG1]; + my $child = $heap->{task_by_wid}->{$task_id}; + + $logger->log(LOG_DEBUG,"[TC] TASK/$task_id: STDOUT => ".$stdout); +} + + +# Child writes to STDERR +sub tc_child_stderr +{ + my ($kernel,$heap,$stdout,$task_id) = @_[KERNEL,HEAP,ARG0,ARG1]; + my $child = $heap->{task_by_wid}->{$task_id}; + + $logger->log(LOG_NOTICE,"[TC] TASK/$task_id: STDERR => ".$stdout); +} + + +# Child closed its handles, it won't communicate with us, so remove it +sub tc_child_close +{ + my ($kernel,$heap,$task_id) = @_[KERNEL,HEAP,ARG0]; + my $child = delete($heap->{task_by_wid}->{$task_id}); + + # May have been reaped by tc_sigchld() + if (!defined($child)) { + $logger->log(LOG_DEBUG,"[TC] TASK/$task_id: Closed dead child"); + return; + } + + $logger->log(LOG_DEBUG,"[TC] TASK/$task_id: Closed PID ".$child->PID); + delete($heap->{task_by_pid}->{$child->PID}); + + # Start next one, if there is a next one + if (@taskQueue > 0) { + $kernel->yield("tc_task_run_next"); + } +} + + +# Reap the dead child +sub tc_sigchld +{ + my ($kernel,$heap,$pid,$status) = @_[KERNEL,HEAP,ARG1,ARG2]; + my $child = delete($heap->{task_by_pid}->{$pid}); + + + $logger->log(LOG_DEBUG,"[TC] TASK: Task with PID $pid exited with status $status"); + + # May have been reaped by tc_child_close() + return if (!defined($child)); + + delete($heap->{task_by_wid}{$child->ID}); +} + + 1; # vim: ts=4 -- GitLab