From f6d4f9f1480745c2aaae2dd9b6738b983dbed865 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Fri, 22 Sep 2006 17:58:37 +0000 Subject: [PATCH] tidied up the wire format negotiation to ensure that we don't try verison 0 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@449023 13f79535-47bb-0310-9956-ffa450edef68 --- activemq-core/pom.xml | 3 ++ .../activemq/openwire/OpenWireFormat.java | 9 +++- .../transport/WireFormatNegotiator.java | 21 ++++++++- .../transport/tcp/TransportUriTest.java | 47 ++++++++++++++++++- 4 files changed, 75 insertions(+), 5 deletions(-) diff --git a/activemq-core/pom.xml b/activemq-core/pom.xml index 159b26bcad..bdfcc84be9 100755 --- a/activemq-core/pom.xml +++ b/activemq-core/pom.xml @@ -297,6 +297,9 @@ **/LDAPAuthorizationMapTest.* + + **/SslTransportFactoryTest.* + diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java index d09a70014b..d4f3b394d2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java @@ -565,7 +565,7 @@ final public class OpenWireFormat implements WireFormat { if( preferedWireFormatInfo==null ) throw new IllegalStateException("Wireformat cannot not be renegotiated."); - this.setVersion(Math.min(preferedWireFormatInfo.getVersion(), info.getVersion()) ); + this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()) ); this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled(); this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled(); this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled(); @@ -573,4 +573,11 @@ final public class OpenWireFormat implements WireFormat { this.sizePrefixDisabled = info.isSizePrefixDisabled() && preferedWireFormatInfo.isSizePrefixDisabled(); } + + protected int min(int version1, int version2) { + if (version1 < version2 && version1 > 0 || version2 <= 0) { + return version1; + } + return version2; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java b/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java index 92a813f6c9..3d8d281133 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java @@ -22,7 +22,9 @@ import java.io.InterruptedIOException; import org.apache.activemq.command.Command; import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.util.IOExceptionSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +33,9 @@ import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; +/** + * Negotiates the wire format with a new connection + */ public class WireFormatNegotiator extends TransportFilter { private static final Log log = LogFactory.getLog(WireFormatNegotiator.class); @@ -47,11 +52,13 @@ public class WireFormatNegotiator extends TransportFilter { * Negotiator * * @param next - * @param preferedFormat */ public WireFormatNegotiator(Transport next, OpenWireFormat wireFormat, int minimumVersion) { super(next); this.wireFormat = wireFormat; + if (minimumVersion <= 0) { + minimumVersion = 1; + } this.minimumVersion = minimumVersion; } @@ -117,6 +124,8 @@ public class WireFormatNegotiator extends TransportFilter { onException(e); } catch (InterruptedException e) { onException((IOException) new InterruptedIOException().initCause(e)); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); } readyCountDownLatch.countDown(); onWireFormatNegotiated(info); @@ -127,7 +136,15 @@ public class WireFormatNegotiator extends TransportFilter { public void onException(IOException error) { readyCountDownLatch.countDown(); - super.onException(error); + /* + try { + super.oneway(new ExceptionResponse(error)); + } + catch (IOException e) { + // ignore as we are already throwing an exception + } + */ + super.onException(error); } public String toString() { diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java index 5b15b854bf..9adf0266af 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java @@ -18,11 +18,12 @@ package org.apache.activemq.transport.tcp; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; import javax.jms.Connection; +import javax.jms.JMSException; /** - * * @version $Revision$ */ public class TransportUriTest extends EmbeddedBrokerTestSupport { @@ -38,6 +39,36 @@ public class TransportUriTest extends EmbeddedBrokerTestSupport { connection.start(); } + public void testBadVersionNumberDoesNotWork() throws Exception { + String uri = bindAddress + postfix + "&minmumWireFormatVersion=65535"; + System.out.println("Connecting via: " + uri); + + try { + connection = new ActiveMQConnectionFactory(uri).createConnection(); + connection.start(); + fail("Should have thrown an exception!"); + } + catch (Exception e) { + System.out.println("Caught expected exception: " + e); + } + } + + + public void testBadPropertyNameFails() throws Exception { + String uri = bindAddress + postfix + "&cheese=abc"; + System.out.println("Connecting via: " + uri); + + try { + connection = new ActiveMQConnectionFactory(uri).createConnection(); + connection.start(); + fail("Should have thrown an exception!"); + } + catch (Exception e) { + System.out.println("Caught expected exception: " + e); + } + } + + protected void setUp() throws Exception { bindAddress = "tcp://localhost:6161"; super.setUp(); @@ -45,9 +76,21 @@ public class TransportUriTest extends EmbeddedBrokerTestSupport { protected void tearDown() throws Exception { if (connection != null) { - connection.close(); + try { + connection.close(); + } + catch (JMSException e) { + e.printStackTrace(); + } } super.tearDown(); } + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setUseJmx(false); + answer.setPersistent(isPersistent()); + answer.addConnector(bindAddress); + return answer; + } }