mirror of https://github.com/apache/activemq.git
Apply patch for https://issues.apache.org/activemq/browse/AMQ-2636
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@920395 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3634d64747
commit
54fa83da8b
activemq-core/src
main/java/org/apache/activemq/transport/tcp
test/java/org/apache/activemq/transport/tcp
|
@ -0,0 +1,111 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.transport.tcp;
|
||||||
|
|
||||||
|
import java.net.Socket;
|
||||||
|
import java.net.SocketException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utilities for determining the values for the bits in the headers of the
|
||||||
|
* outgoing TCP/IP packets that indicate Traffic Class for use in Quality of
|
||||||
|
* Service forwarding policies.
|
||||||
|
*/
|
||||||
|
public class QualityOfServiceUtils {
|
||||||
|
|
||||||
|
private static final int MAX_DIFF_SERV = 64;
|
||||||
|
private static final int MIN_DIFF_SERV = 0;
|
||||||
|
private static final Map<String, Integer> DIFF_SERV_NAMES
|
||||||
|
= new HashMap<String, Integer>();
|
||||||
|
// TODO: Find other names used for Differentiated Services values.
|
||||||
|
static {
|
||||||
|
DIFF_SERV_NAMES.put("EF", 46);
|
||||||
|
DIFF_SERV_NAMES.put("AF11", 10);
|
||||||
|
DIFF_SERV_NAMES.put("AF12", 12);
|
||||||
|
DIFF_SERV_NAMES.put("AF13", 14);
|
||||||
|
DIFF_SERV_NAMES.put("AF21", 18);
|
||||||
|
DIFF_SERV_NAMES.put("AF22", 20);
|
||||||
|
DIFF_SERV_NAMES.put("AF23", 22);
|
||||||
|
DIFF_SERV_NAMES.put("AF31", 26);
|
||||||
|
DIFF_SERV_NAMES.put("AF32", 28);
|
||||||
|
DIFF_SERV_NAMES.put("AF33", 30);
|
||||||
|
DIFF_SERV_NAMES.put("AF41", 34);
|
||||||
|
DIFF_SERV_NAMES.put("AF42", 36);
|
||||||
|
DIFF_SERV_NAMES.put("AF43", 38);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param The value to be used for Differentiated Services.
|
||||||
|
* @return The corresponding Differentiated Services Code Point (DSCP).
|
||||||
|
* @throws IllegalArgumentException if the value does not correspond to a
|
||||||
|
* Differentiated Services Code Point or setting the DSCP is not
|
||||||
|
* supported.
|
||||||
|
*/
|
||||||
|
public static int getDSCP(String value) throws IllegalArgumentException {
|
||||||
|
int intValue = -1;
|
||||||
|
|
||||||
|
// Check the names first.
|
||||||
|
if (DIFF_SERV_NAMES.containsKey(value)) {
|
||||||
|
intValue = DIFF_SERV_NAMES.get(value);
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
intValue = Integer.parseInt(value);
|
||||||
|
if (intValue >= MAX_DIFF_SERV || intValue < MIN_DIFF_SERV) {
|
||||||
|
throw new IllegalArgumentException("Differentiated Services "
|
||||||
|
+ "value: " + intValue + " must be between "
|
||||||
|
+ MIN_DIFF_SERV + " and " + (MAX_DIFF_SERV - 1) + ".");
|
||||||
|
}
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
// value must have been a malformed name.
|
||||||
|
throw new IllegalArgumentException("No such Differentiated "
|
||||||
|
+ "Services name: " + value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return adjustDSCPForECN(intValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Differentiated Services values use only 6 of the 8 bits in the field
|
||||||
|
* in the TCP/IP packet header. Make sure any values the system has set for
|
||||||
|
* the other two bits (the ECN bits) are maintained.
|
||||||
|
*
|
||||||
|
* @param The Differentiated Services Code Point.
|
||||||
|
* @return A Differentiated Services Code Point that respects the ECN bits
|
||||||
|
* set on the system.
|
||||||
|
* @throws IllegalArgumentException if setting Differentiated Services is
|
||||||
|
* not supported.
|
||||||
|
*/
|
||||||
|
private static int adjustDSCPForECN(int value)
|
||||||
|
throws IllegalArgumentException {
|
||||||
|
// The only way to see if there are any values set for the ECN is to
|
||||||
|
// read the traffic class automatically set by the system and isolate
|
||||||
|
// the ECN bits.
|
||||||
|
Socket socket = new Socket();
|
||||||
|
try {
|
||||||
|
int systemTrafficClass = socket.getTrafficClass();
|
||||||
|
// The 7th and 8th bits of the system traffic class are the ECN bits.
|
||||||
|
return value | (systemTrafficClass & 192);
|
||||||
|
} catch (SocketException e) {
|
||||||
|
throw new IllegalArgumentException("Setting Differentiated Services "
|
||||||
|
+ "not supported: " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Add getter methods for ToS values.
|
||||||
|
}
|
|
@ -68,6 +68,15 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
||||||
protected DataOutputStream dataOut;
|
protected DataOutputStream dataOut;
|
||||||
protected DataInputStream dataIn;
|
protected DataInputStream dataIn;
|
||||||
protected TcpBufferedOutputStream buffOut = null;
|
protected TcpBufferedOutputStream buffOut = null;
|
||||||
|
/**
|
||||||
|
* Differentiated Services Code Point. Determines the Traffic Class to be
|
||||||
|
* set on the socket.
|
||||||
|
*/
|
||||||
|
protected int dscp = 0;
|
||||||
|
/**
|
||||||
|
* Keeps track of attempts to set the Traffic Class.
|
||||||
|
*/
|
||||||
|
private boolean trafficClassSet = false;
|
||||||
/**
|
/**
|
||||||
* trace=true -> the Transport stack where this TcpTransport
|
* trace=true -> the Transport stack where this TcpTransport
|
||||||
* object will be, will have a TransportLogger layer
|
* object will be, will have a TransportLogger layer
|
||||||
|
@ -212,6 +221,18 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
||||||
|
|
||||||
// Properties
|
// Properties
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
public String getDiffServ() {
|
||||||
|
// This is the value requested by the user by setting the Tcp Transport
|
||||||
|
// options. If the socket hasn't been created, then this value may not
|
||||||
|
// reflect the value returned by Socket.getTrafficClass().
|
||||||
|
return Integer.toString(dscp);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDiffServ(String diffServ) throws IllegalArgumentException {
|
||||||
|
this.dscp = QualityOfServiceUtils.getDSCP(diffServ);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Add methods for setting and getting a ToS value.
|
||||||
|
|
||||||
public boolean isTrace() {
|
public boolean isTrace() {
|
||||||
return trace;
|
return trace;
|
||||||
|
@ -395,6 +416,9 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
||||||
if (tcpNoDelay != null) {
|
if (tcpNoDelay != null) {
|
||||||
sock.setTcpNoDelay(tcpNoDelay.booleanValue());
|
sock.setTcpNoDelay(tcpNoDelay.booleanValue());
|
||||||
}
|
}
|
||||||
|
if (!trafficClassSet) {
|
||||||
|
trafficClassSet = setTrafficClass(sock);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -422,6 +446,9 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
||||||
String host = resolveHostName(remoteLocation.getHost());
|
String host = resolveHostName(remoteLocation.getHost());
|
||||||
remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
|
remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
|
||||||
}
|
}
|
||||||
|
// Set the traffic class before the socket is connected when possible so
|
||||||
|
// that the connection packets are given the correct traffic class.
|
||||||
|
trafficClassSet = setTrafficClass(socket);
|
||||||
|
|
||||||
if (socket != null) {
|
if (socket != null) {
|
||||||
|
|
||||||
|
@ -579,4 +606,27 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
||||||
public int getReceiveCounter() {
|
public int getReceiveCounter() {
|
||||||
return receiveCounter;
|
return receiveCounter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Whether or not the Traffic Class was set on the given socket.
|
||||||
|
*/
|
||||||
|
private boolean setTrafficClass(Socket sock) {
|
||||||
|
// TODO: Add in ToS support.
|
||||||
|
|
||||||
|
if (sock == null)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
boolean success = false;
|
||||||
|
|
||||||
|
try {
|
||||||
|
sock.setTrafficClass(this.dscp);
|
||||||
|
success = true;
|
||||||
|
} catch (SocketException e) {
|
||||||
|
// The system does not support setting the traffic class through
|
||||||
|
// setTrafficClass.
|
||||||
|
LOG.error("Unable to set the traffic class: " + e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return success;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,123 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.transport.tcp;
|
||||||
|
|
||||||
|
import java.net.Socket;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
|
public class QualityOfServiceUtilsTest extends TestCase {
|
||||||
|
private int ECN;
|
||||||
|
|
||||||
|
protected void setUp() throws Exception {
|
||||||
|
Socket socket = new Socket();
|
||||||
|
ECN = socket.getTrafficClass();
|
||||||
|
ECN = ECN & Integer.parseInt("11000000", 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void tearDown() throws Exception {
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testValidDiffServIntegerValues() {
|
||||||
|
int[] values = {0, 1, 32, 62, 63};
|
||||||
|
for (int val : values) {
|
||||||
|
testValidDiffServIntegerValue(val);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testInvalidDiffServIntegerValues() {
|
||||||
|
int[] values = {-2, -1, 64, 65};
|
||||||
|
for (int val : values) {
|
||||||
|
testInvalidDiffServIntegerValue(val);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testValidDiffServNames() {
|
||||||
|
Map<String, Integer> namesToExpected = new HashMap<String, Integer>();
|
||||||
|
namesToExpected.put("EF", Integer.valueOf("101110", 2));
|
||||||
|
namesToExpected.put("AF11", Integer.valueOf("001010", 2));
|
||||||
|
namesToExpected.put("AF12", Integer.valueOf("001100", 2));
|
||||||
|
namesToExpected.put("AF13", Integer.valueOf("001110", 2));
|
||||||
|
namesToExpected.put("AF21", Integer.valueOf("010010", 2));
|
||||||
|
namesToExpected.put("AF22", Integer.valueOf("010100", 2));
|
||||||
|
namesToExpected.put("AF23", Integer.valueOf("010110", 2));
|
||||||
|
namesToExpected.put("AF31", Integer.valueOf("011010", 2));
|
||||||
|
namesToExpected.put("AF32", Integer.valueOf("011100", 2));
|
||||||
|
namesToExpected.put("AF33", Integer.valueOf("011110", 2));
|
||||||
|
namesToExpected.put("AF41", Integer.valueOf("100010", 2));
|
||||||
|
namesToExpected.put("AF42", Integer.valueOf("100100", 2));
|
||||||
|
namesToExpected.put("AF43", Integer.valueOf("100110", 2));
|
||||||
|
for (String name : namesToExpected.keySet()) {
|
||||||
|
testValidDiffServName(name, namesToExpected.get(name));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testInvalidDiffServNames() {
|
||||||
|
String[] names = {"hello_world", "", "abcd"};
|
||||||
|
for (String name : names) {
|
||||||
|
testInvalidDiffServName(name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testValidDiffServIntegerValue(int val) {
|
||||||
|
int dscp = -1;
|
||||||
|
try {
|
||||||
|
dscp = QualityOfServiceUtils.getDSCP(Integer.toString(val));
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
fail("IllegalArgumentException thrown for valid Differentiated Services "
|
||||||
|
+ "value: " + val);
|
||||||
|
}
|
||||||
|
// Make sure it adjusted for any system ECN values.
|
||||||
|
assertEquals("Incorrect Differentiated Services Code Point "
|
||||||
|
+ dscp + " returned for value " + val + ".",
|
||||||
|
ECN | val, dscp);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testInvalidDiffServIntegerValue(int val) {
|
||||||
|
try {
|
||||||
|
int dscp = QualityOfServiceUtils.getDSCP(Integer.toString(val));
|
||||||
|
fail("No IllegalArgumentException thrown for invalid Differentiated "
|
||||||
|
+ "Services value: " + val + ".");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testValidDiffServName(String name, int expected) {
|
||||||
|
int dscp = -1;
|
||||||
|
try {
|
||||||
|
dscp = QualityOfServiceUtils.getDSCP(name);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
fail("IllegalArgumentException thrown for valid Differentiated "
|
||||||
|
+ " Services name: " + name);
|
||||||
|
}
|
||||||
|
// Make sure it adjusted for any system ECN values.
|
||||||
|
assertEquals("Incorrect Differentiated Services Code Point "
|
||||||
|
+ dscp + " returned for name " + name + ".",
|
||||||
|
ECN | expected, dscp);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testInvalidDiffServName(String name) {
|
||||||
|
try {
|
||||||
|
int dscp = QualityOfServiceUtils.getDSCP(name);
|
||||||
|
fail("No IllegalArgumentException thrown for invalid Differentiated "
|
||||||
|
+ "Services value: " + name + ".");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -52,6 +52,70 @@ public class TransportUriTest extends EmbeddedBrokerTestSupport {
|
||||||
connection.start();
|
connection.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void initCombosForTestValidDiffServOptionsWork() {
|
||||||
|
addCombinationValues("prefix", new Object[] {""});
|
||||||
|
// TODO: Add more combinations so that we know it plays nice with other
|
||||||
|
// transport options.
|
||||||
|
addCombinationValues("postfix", new Object[] {"?tcpNoDelay=true&keepAlive=true"});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testValidDiffServOptionsWork() throws Exception {
|
||||||
|
String[] validIntegerOptions = {"0", "1", "32", "62", "63"};
|
||||||
|
for (String opt : validIntegerOptions) {
|
||||||
|
testValidDiffServOption(opt);
|
||||||
|
}
|
||||||
|
String[] validNameOptions = {"EF", "AF11", "AF12", "AF13", "AF21",
|
||||||
|
"AF22", "AF23", "AF31", "AF32", "AF33",
|
||||||
|
"AF41", "AF42", "AF43"};
|
||||||
|
for (String opt : validNameOptions) {
|
||||||
|
testValidDiffServOption(opt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testValidDiffServOption(String value) {
|
||||||
|
String uri = prefix + bindAddress + postfix + "&diffServ=" + value;
|
||||||
|
LOG.info("Connecting via: " + uri);
|
||||||
|
|
||||||
|
try {
|
||||||
|
connection = new ActiveMQConnectionFactory(uri).createConnection();
|
||||||
|
connection.start();
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("Valid Differentiated Services option: diffServ=" + value
|
||||||
|
+ ", should not have thrown an exception: " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void initCombosForTestInvalidDiffServOptionDoesNotWork() {
|
||||||
|
addCombinationValues("prefix", new Object[] {""});
|
||||||
|
// TODO: Add more combinations so that we know it plays nice with other
|
||||||
|
// transport options.
|
||||||
|
addCombinationValues("postfix", new Object[] {"?tcpNoDelay=true&keepAlive=true"});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testInvalidDiffServOptionsDoesNotWork() throws Exception {
|
||||||
|
String[] invalidIntegerOptions = {"-2", "-1", "64", "65", "100", "255"};
|
||||||
|
for (String opt : invalidIntegerOptions) {
|
||||||
|
testInvalidDiffServOption(opt);
|
||||||
|
}
|
||||||
|
String[] invalidNameOptions = {"hi", "", "A", "AF", "-AF21"};
|
||||||
|
for (String opt : invalidNameOptions) {
|
||||||
|
testInvalidDiffServOption(opt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testInvalidDiffServOption(String value) {
|
||||||
|
String uri = prefix + bindAddress + postfix + "&diffServ=" + value;
|
||||||
|
LOG.info("Connecting via: " + uri);
|
||||||
|
|
||||||
|
try {
|
||||||
|
connection = new ActiveMQConnectionFactory(uri).createConnection();
|
||||||
|
connection.start();
|
||||||
|
fail("Invalid Differentiated Services option: diffServ=" + value
|
||||||
|
+ " should have thrown an exception!");
|
||||||
|
} catch (Exception expected) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void initCombosForTestBadVersionNumberDoesNotWork() {
|
public void initCombosForTestBadVersionNumberDoesNotWork() {
|
||||||
addCombinationValues("prefix", new Object[] {""});
|
addCombinationValues("prefix", new Object[] {""});
|
||||||
addCombinationValues("postfix", new Object[] {"?tcpNoDelay=true&keepAlive=true"});
|
addCombinationValues("postfix", new Object[] {"?tcpNoDelay=true&keepAlive=true"});
|
||||||
|
|
Loading…
Reference in New Issue