diff --git a/activemq-core/pom.xml b/activemq-core/pom.xml
index 159b26bcad..bdfcc84be9 100755
--- a/activemq-core/pom.xml
+++ b/activemq-core/pom.xml
@@ -297,6 +297,9 @@
**/LDAPAuthorizationMapTest.*
+
+ **/SslTransportFactoryTest.*
+
diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
index d09a70014b..d4f3b394d2 100755
--- a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
+++ b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
@@ -565,7 +565,7 @@ final public class OpenWireFormat implements WireFormat {
if( preferedWireFormatInfo==null )
throw new IllegalStateException("Wireformat cannot not be renegotiated.");
- this.setVersion(Math.min(preferedWireFormatInfo.getVersion(), info.getVersion()) );
+ this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()) );
this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled();
this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
@@ -573,4 +573,11 @@ final public class OpenWireFormat implements WireFormat {
this.sizePrefixDisabled = info.isSizePrefixDisabled() && preferedWireFormatInfo.isSizePrefixDisabled();
}
+
+ protected int min(int version1, int version2) {
+ if (version1 < version2 && version1 > 0 || version2 <= 0) {
+ return version1;
+ }
+ return version2;
+ }
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java b/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
index 92a813f6c9..3d8d281133 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
@@ -22,7 +22,9 @@ import java.io.InterruptedIOException;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.IOExceptionSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,6 +33,9 @@ import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+/**
+ * Negotiates the wire format with a new connection
+ */
public class WireFormatNegotiator extends TransportFilter {
private static final Log log = LogFactory.getLog(WireFormatNegotiator.class);
@@ -47,11 +52,13 @@ public class WireFormatNegotiator extends TransportFilter {
* Negotiator
*
* @param next
- * @param preferedFormat
*/
public WireFormatNegotiator(Transport next, OpenWireFormat wireFormat, int minimumVersion) {
super(next);
this.wireFormat = wireFormat;
+ if (minimumVersion <= 0) {
+ minimumVersion = 1;
+ }
this.minimumVersion = minimumVersion;
}
@@ -117,6 +124,8 @@ public class WireFormatNegotiator extends TransportFilter {
onException(e);
} catch (InterruptedException e) {
onException((IOException) new InterruptedIOException().initCause(e));
+ } catch (Exception e) {
+ onException(IOExceptionSupport.create(e));
}
readyCountDownLatch.countDown();
onWireFormatNegotiated(info);
@@ -127,7 +136,15 @@ public class WireFormatNegotiator extends TransportFilter {
public void onException(IOException error) {
readyCountDownLatch.countDown();
- super.onException(error);
+ /*
+ try {
+ super.oneway(new ExceptionResponse(error));
+ }
+ catch (IOException e) {
+ // ignore as we are already throwing an exception
+ }
+ */
+ super.onException(error);
}
public String toString() {
diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
index 5b15b854bf..9adf0266af 100644
--- a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
@@ -18,11 +18,12 @@ package org.apache.activemq.transport.tcp;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
import javax.jms.Connection;
+import javax.jms.JMSException;
/**
- *
* @version $Revision$
*/
public class TransportUriTest extends EmbeddedBrokerTestSupport {
@@ -38,6 +39,36 @@ public class TransportUriTest extends EmbeddedBrokerTestSupport {
connection.start();
}
+ public void testBadVersionNumberDoesNotWork() throws Exception {
+ String uri = bindAddress + postfix + "&minmumWireFormatVersion=65535";
+ System.out.println("Connecting via: " + uri);
+
+ try {
+ connection = new ActiveMQConnectionFactory(uri).createConnection();
+ connection.start();
+ fail("Should have thrown an exception!");
+ }
+ catch (Exception e) {
+ System.out.println("Caught expected exception: " + e);
+ }
+ }
+
+
+ public void testBadPropertyNameFails() throws Exception {
+ String uri = bindAddress + postfix + "&cheese=abc";
+ System.out.println("Connecting via: " + uri);
+
+ try {
+ connection = new ActiveMQConnectionFactory(uri).createConnection();
+ connection.start();
+ fail("Should have thrown an exception!");
+ }
+ catch (Exception e) {
+ System.out.println("Caught expected exception: " + e);
+ }
+ }
+
+
protected void setUp() throws Exception {
bindAddress = "tcp://localhost:6161";
super.setUp();
@@ -45,9 +76,21 @@ public class TransportUriTest extends EmbeddedBrokerTestSupport {
protected void tearDown() throws Exception {
if (connection != null) {
- connection.close();
+ try {
+ connection.close();
+ }
+ catch (JMSException e) {
+ e.printStackTrace();
+ }
}
super.tearDown();
}
+ protected BrokerService createBroker() throws Exception {
+ BrokerService answer = new BrokerService();
+ answer.setUseJmx(false);
+ answer.setPersistent(isPersistent());
+ answer.addConnector(bindAddress);
+ return answer;
+ }
}