diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/QualityOfServiceUtils.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/QualityOfServiceUtils.java new file mode 100644 index 0000000000..21675b945b --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/QualityOfServiceUtils.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.tcp; + +import java.net.Socket; +import java.net.SocketException; +import java.util.HashMap; +import java.util.Map; + +/** + * Utilities for determining the values for the bits in the headers of the + * outgoing TCP/IP packets that indicate Traffic Class for use in Quality of + * Service forwarding policies. + */ +public class QualityOfServiceUtils { + + private static final int MAX_DIFF_SERV = 64; + private static final int MIN_DIFF_SERV = 0; + private static final Map DIFF_SERV_NAMES + = new HashMap(); + // TODO: Find other names used for Differentiated Services values. + static { + DIFF_SERV_NAMES.put("EF", 46); + DIFF_SERV_NAMES.put("AF11", 10); + DIFF_SERV_NAMES.put("AF12", 12); + DIFF_SERV_NAMES.put("AF13", 14); + DIFF_SERV_NAMES.put("AF21", 18); + DIFF_SERV_NAMES.put("AF22", 20); + DIFF_SERV_NAMES.put("AF23", 22); + DIFF_SERV_NAMES.put("AF31", 26); + DIFF_SERV_NAMES.put("AF32", 28); + DIFF_SERV_NAMES.put("AF33", 30); + DIFF_SERV_NAMES.put("AF41", 34); + DIFF_SERV_NAMES.put("AF42", 36); + DIFF_SERV_NAMES.put("AF43", 38); + } + + /** + * @param The value to be used for Differentiated Services. + * @return The corresponding Differentiated Services Code Point (DSCP). + * @throws IllegalArgumentException if the value does not correspond to a + * Differentiated Services Code Point or setting the DSCP is not + * supported. + */ + public static int getDSCP(String value) throws IllegalArgumentException { + int intValue = -1; + + // Check the names first. + if (DIFF_SERV_NAMES.containsKey(value)) { + intValue = DIFF_SERV_NAMES.get(value); + } else { + try { + intValue = Integer.parseInt(value); + if (intValue >= MAX_DIFF_SERV || intValue < MIN_DIFF_SERV) { + throw new IllegalArgumentException("Differentiated Services " + + "value: " + intValue + " must be between " + + MIN_DIFF_SERV + " and " + (MAX_DIFF_SERV - 1) + "."); + } + } catch (NumberFormatException e) { + // value must have been a malformed name. + throw new IllegalArgumentException("No such Differentiated " + + "Services name: " + value); + } + } + + return adjustDSCPForECN(intValue); + } + + /** + * The Differentiated Services values use only 6 of the 8 bits in the field + * in the TCP/IP packet header. Make sure any values the system has set for + * the other two bits (the ECN bits) are maintained. + * + * @param The Differentiated Services Code Point. + * @return A Differentiated Services Code Point that respects the ECN bits + * set on the system. + * @throws IllegalArgumentException if setting Differentiated Services is + * not supported. + */ + private static int adjustDSCPForECN(int value) + throws IllegalArgumentException { + // The only way to see if there are any values set for the ECN is to + // read the traffic class automatically set by the system and isolate + // the ECN bits. + Socket socket = new Socket(); + try { + int systemTrafficClass = socket.getTrafficClass(); + // The 7th and 8th bits of the system traffic class are the ECN bits. + return value | (systemTrafficClass & 192); + } catch (SocketException e) { + throw new IllegalArgumentException("Setting Differentiated Services " + + "not supported: " + e); + } + } + + // TODO: Add getter methods for ToS values. +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index 79056d511a..74704743b3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -68,6 +68,15 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S protected DataOutputStream dataOut; protected DataInputStream dataIn; protected TcpBufferedOutputStream buffOut = null; + /** + * Differentiated Services Code Point. Determines the Traffic Class to be + * set on the socket. + */ + protected int dscp = 0; + /** + * Keeps track of attempts to set the Traffic Class. + */ + private boolean trafficClassSet = false; /** * trace=true -> the Transport stack where this TcpTransport * object will be, will have a TransportLogger layer @@ -212,6 +221,18 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S // Properties // ------------------------------------------------------------------------- + public String getDiffServ() { + // This is the value requested by the user by setting the Tcp Transport + // options. If the socket hasn't been created, then this value may not + // reflect the value returned by Socket.getTrafficClass(). + return Integer.toString(dscp); + } + + public void setDiffServ(String diffServ) throws IllegalArgumentException { + this.dscp = QualityOfServiceUtils.getDSCP(diffServ); + } + + // TODO: Add methods for setting and getting a ToS value. public boolean isTrace() { return trace; @@ -395,6 +416,9 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S if (tcpNoDelay != null) { sock.setTcpNoDelay(tcpNoDelay.booleanValue()); } + if (!trafficClassSet) { + trafficClassSet = setTrafficClass(sock); + } } @Override @@ -422,6 +446,9 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S String host = resolveHostName(remoteLocation.getHost()); remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); } + // Set the traffic class before the socket is connected when possible so + // that the connection packets are given the correct traffic class. + trafficClassSet = setTrafficClass(socket); if (socket != null) { @@ -579,4 +606,27 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S public int getReceiveCounter() { return receiveCounter; } + + /** + * @return Whether or not the Traffic Class was set on the given socket. + */ + private boolean setTrafficClass(Socket sock) { + // TODO: Add in ToS support. + + if (sock == null) + return false; + + boolean success = false; + + try { + sock.setTrafficClass(this.dscp); + success = true; + } catch (SocketException e) { + // The system does not support setting the traffic class through + // setTrafficClass. + LOG.error("Unable to set the traffic class: " + e); + } + + return success; + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/QualityOfServiceUtilsTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/QualityOfServiceUtilsTest.java new file mode 100644 index 0000000000..9653dd59ca --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/QualityOfServiceUtilsTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.tcp; + +import java.net.Socket; +import java.util.HashMap; +import java.util.Map; +import junit.framework.TestCase; + +public class QualityOfServiceUtilsTest extends TestCase { + private int ECN; + + protected void setUp() throws Exception { + Socket socket = new Socket(); + ECN = socket.getTrafficClass(); + ECN = ECN & Integer.parseInt("11000000", 2); + } + + protected void tearDown() throws Exception { + super.tearDown(); + } + + public void testValidDiffServIntegerValues() { + int[] values = {0, 1, 32, 62, 63}; + for (int val : values) { + testValidDiffServIntegerValue(val); + } + } + + public void testInvalidDiffServIntegerValues() { + int[] values = {-2, -1, 64, 65}; + for (int val : values) { + testInvalidDiffServIntegerValue(val); + } + } + + public void testValidDiffServNames() { + Map namesToExpected = new HashMap(); + namesToExpected.put("EF", Integer.valueOf("101110", 2)); + namesToExpected.put("AF11", Integer.valueOf("001010", 2)); + namesToExpected.put("AF12", Integer.valueOf("001100", 2)); + namesToExpected.put("AF13", Integer.valueOf("001110", 2)); + namesToExpected.put("AF21", Integer.valueOf("010010", 2)); + namesToExpected.put("AF22", Integer.valueOf("010100", 2)); + namesToExpected.put("AF23", Integer.valueOf("010110", 2)); + namesToExpected.put("AF31", Integer.valueOf("011010", 2)); + namesToExpected.put("AF32", Integer.valueOf("011100", 2)); + namesToExpected.put("AF33", Integer.valueOf("011110", 2)); + namesToExpected.put("AF41", Integer.valueOf("100010", 2)); + namesToExpected.put("AF42", Integer.valueOf("100100", 2)); + namesToExpected.put("AF43", Integer.valueOf("100110", 2)); + for (String name : namesToExpected.keySet()) { + testValidDiffServName(name, namesToExpected.get(name)); + } + } + + public void testInvalidDiffServNames() { + String[] names = {"hello_world", "", "abcd"}; + for (String name : names) { + testInvalidDiffServName(name); + } + } + + private void testValidDiffServIntegerValue(int val) { + int dscp = -1; + try { + dscp = QualityOfServiceUtils.getDSCP(Integer.toString(val)); + } catch (IllegalArgumentException e) { + fail("IllegalArgumentException thrown for valid Differentiated Services " + + "value: " + val); + } + // Make sure it adjusted for any system ECN values. + assertEquals("Incorrect Differentiated Services Code Point " + + dscp + " returned for value " + val + ".", + ECN | val, dscp); + } + + private void testInvalidDiffServIntegerValue(int val) { + try { + int dscp = QualityOfServiceUtils.getDSCP(Integer.toString(val)); + fail("No IllegalArgumentException thrown for invalid Differentiated " + + "Services value: " + val + "."); + } catch (IllegalArgumentException e) { + } + } + + private void testValidDiffServName(String name, int expected) { + int dscp = -1; + try { + dscp = QualityOfServiceUtils.getDSCP(name); + } catch (IllegalArgumentException e) { + fail("IllegalArgumentException thrown for valid Differentiated " + + " Services name: " + name); + } + // Make sure it adjusted for any system ECN values. + assertEquals("Incorrect Differentiated Services Code Point " + + dscp + " returned for name " + name + ".", + ECN | expected, dscp); + } + + private void testInvalidDiffServName(String name) { + try { + int dscp = QualityOfServiceUtils.getDSCP(name); + fail("No IllegalArgumentException thrown for invalid Differentiated " + + "Services value: " + name + "."); + } catch (IllegalArgumentException e) { + } + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java index fac8a7fd97..8b9d51aef7 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java @@ -51,7 +51,71 @@ public class TransportUriTest extends EmbeddedBrokerTestSupport { connection = new ActiveMQConnectionFactory(uri).createConnection(); connection.start(); } - + + public void initCombosForTestValidDiffServOptionsWork() { + addCombinationValues("prefix", new Object[] {""}); + // TODO: Add more combinations so that we know it plays nice with other + // transport options. + addCombinationValues("postfix", new Object[] {"?tcpNoDelay=true&keepAlive=true"}); + } + + public void testValidDiffServOptionsWork() throws Exception { + String[] validIntegerOptions = {"0", "1", "32", "62", "63"}; + for (String opt : validIntegerOptions) { + testValidDiffServOption(opt); + } + String[] validNameOptions = {"EF", "AF11", "AF12", "AF13", "AF21", + "AF22", "AF23", "AF31", "AF32", "AF33", + "AF41", "AF42", "AF43"}; + for (String opt : validNameOptions) { + testValidDiffServOption(opt); + } + } + + private void testValidDiffServOption(String value) { + String uri = prefix + bindAddress + postfix + "&diffServ=" + value; + LOG.info("Connecting via: " + uri); + + try { + connection = new ActiveMQConnectionFactory(uri).createConnection(); + connection.start(); + } catch (Exception e) { + fail("Valid Differentiated Services option: diffServ=" + value + + ", should not have thrown an exception: " + e); + } + } + + public void initCombosForTestInvalidDiffServOptionDoesNotWork() { + addCombinationValues("prefix", new Object[] {""}); + // TODO: Add more combinations so that we know it plays nice with other + // transport options. + addCombinationValues("postfix", new Object[] {"?tcpNoDelay=true&keepAlive=true"}); + } + + public void testInvalidDiffServOptionsDoesNotWork() throws Exception { + String[] invalidIntegerOptions = {"-2", "-1", "64", "65", "100", "255"}; + for (String opt : invalidIntegerOptions) { + testInvalidDiffServOption(opt); + } + String[] invalidNameOptions = {"hi", "", "A", "AF", "-AF21"}; + for (String opt : invalidNameOptions) { + testInvalidDiffServOption(opt); + } + } + + private void testInvalidDiffServOption(String value) { + String uri = prefix + bindAddress + postfix + "&diffServ=" + value; + LOG.info("Connecting via: " + uri); + + try { + connection = new ActiveMQConnectionFactory(uri).createConnection(); + connection.start(); + fail("Invalid Differentiated Services option: diffServ=" + value + + " should have thrown an exception!"); + } catch (Exception expected) { + } + } + public void initCombosForTestBadVersionNumberDoesNotWork() { addCombinationValues("prefix", new Object[] {""}); addCombinationValues("postfix", new Object[] {"?tcpNoDelay=true&keepAlive=true"});