diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index bb1723fc29..c4f07517cd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -25,6 +25,15 @@ import java.net.URISyntaxException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.ConnectionConsumer; @@ -45,6 +54,7 @@ import javax.jms.TopicConnection; import javax.jms.TopicSession; import javax.jms.XAConnection; +import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTempDestination; @@ -52,6 +62,7 @@ import org.apache.activemq.command.ActiveMQTempQueue; import org.apache.activemq.command.ActiveMQTempTopic; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; +import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.ConnectionError; import org.apache.activemq.command.ConnectionId; @@ -70,6 +81,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionId; import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.management.JMSConnectionStatsImpl; import org.apache.activemq.management.JMSStatsImpl; import org.apache.activemq.management.StatsCapable; @@ -82,19 +94,9 @@ import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.ServiceSupport; -import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener { @@ -166,7 +168,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private final CountDownLatch brokerInfoReceived = new CountDownLatch(1); private BrokerInfo brokerInfo; private IOException firstFailureError; - + + // Assume that protocol is the latest. Change to the actual protocol + // version when a WireFormatInfo is received. + private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION); /** * Construct an ActiveMQConnection @@ -1562,6 +1567,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon onConnectionControl((ConnectionControl) command); }else if (command instanceof ConsumerControl){ onConsumerControl((ConsumerControl) command); + }else if ( command.isWireFormatInfo() ) { + onWireFormatInfo((WireFormatInfo)command); } } for (Iterator iter = transportListeners.iterator(); iter.hasNext();) { @@ -1570,7 +1577,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } } - /** + protected void onWireFormatInfo(WireFormatInfo info) { + protocolVersion.set(info.getVersion()); + } + + + /** * Used for handling async exceptions * * @param error @@ -1989,4 +2001,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon protected BlobTransferPolicy createBlobTransferPolicy() { return new BlobTransferPolicy(); } + + + public int getProtocolVersion() { + return protocolVersion.get(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 3d03e2a854..7865a9c0e9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.Service; @@ -34,6 +35,7 @@ import org.apache.activemq.broker.region.ConnectionStatistics; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; +import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.ConnectionError; import org.apache.activemq.command.ConnectionId; @@ -123,6 +125,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); protected AtomicBoolean dispatchStopped=new AtomicBoolean(false); private boolean networkConnection; + private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION); static class ConnectionState extends org.apache.activemq.state.ConnectionState{ @@ -326,6 +329,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit public Response processWireFormat(WireFormatInfo info) throws Exception{ wireFormatInfo=info; + protocolVersion.set(info.getVersion()); return null; } @@ -1158,5 +1162,9 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit } } } + + public int getProtocolVersion() { + return protocolVersion.get(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java b/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java index 42f4376eb8..967c14da41 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java @@ -23,6 +23,9 @@ package org.apache.activemq.command; * @version $Revision: 1.21 $ */ public interface CommandTypes { + + // What is the latest version of the openwire protocol + byte PROTOCOL_VERSION = 3; // A marshaling layer can use this type to specify a null object. byte NULL = 0; diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java index 9d7c0e83bc..2c202433fe 100755 --- a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java @@ -17,26 +17,31 @@ */ package org.apache.activemq.openwire; -import org.apache.activemq.command.CommandTypes; -import org.apache.activemq.command.DataStructure; -import org.apache.activemq.command.MarshallAware; -import org.apache.activemq.command.WireFormatInfo; -import org.apache.activemq.util.*; -import org.apache.activemq.wireformat.WireFormat; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.lang.reflect.Method; import java.util.HashMap; +import org.apache.activemq.command.CommandTypes; +import org.apache.activemq.command.DataStructure; +import org.apache.activemq.command.MarshallAware; +import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.ByteSequenceData; +import org.apache.activemq.util.ClassLoading; +import org.apache.activemq.util.DataByteArrayInputStream; +import org.apache.activemq.util.DataByteArrayOutputStream; +import org.apache.activemq.util.IdGenerator; +import org.apache.activemq.wireformat.WireFormat; + /** * * @version $Revision$ */ final public class OpenWireFormat implements WireFormat { - public static final int DEFAULT_VERSION = 3; + public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_VERSION; static final byte NULL_TYPE = CommandTypes.NULL; private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE/2; @@ -561,15 +566,28 @@ final public class OpenWireFormat implements WireFormat { throw new IllegalStateException("Wireformat cannot not be renegotiated."); this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()) ); + info.setVersion(this.getVersion()); + this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled(); + info.setStackTraceEnabled(this.stackTraceEnabled); + this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled(); + info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled); + this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled(); + info.setCacheEnabled(this.cacheEnabled); + this.tightEncodingEnabled = info.isTightEncodingEnabled() && preferedWireFormatInfo.isTightEncodingEnabled(); + info.setTightEncodingEnabled(this.tightEncodingEnabled); + this.sizePrefixDisabled = info.isSizePrefixDisabled() && preferedWireFormatInfo.isSizePrefixDisabled(); + info.setSizePrefixDisabled(this.sizePrefixDisabled); if( cacheEnabled ) { int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize()); + info.setCacheSize(size); + if( size == 0 ) { size = MARSHAL_CACHE_SIZE; } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java new file mode 100644 index 0000000000..f789b4fa1c --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.tcp; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.command.CommandTypes; +import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportAcceptListener; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.transport.TransportServer; + +import javax.net.SocketFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + + +public class WireformatNegociationTest extends CombinationTestSupport { + + private TransportServer server; + private Transport clientTransport; + private Transport serverTransport; + + private final AtomicReference clientWF = new AtomicReference(); + private final AtomicReference serverWF = new AtomicReference(); + private final AtomicReference asyncError = new AtomicReference(); + private final AtomicBoolean ignoreAsycError = new AtomicBoolean(); + + private final CountDownLatch negociationCounter = new CountDownLatch(2); + + protected void setUp() throws Exception { + super.setUp(); + } + + /** + * @throws Exception + * @throws URISyntaxException + */ + private void startClient(String uri) throws Exception, URISyntaxException { + clientTransport = TransportFactory.connect(new URI(uri)); + clientTransport.setTransportListener(new TransportListener() { + public void onCommand(Object command) { + if( command instanceof WireFormatInfo ) { + clientWF.set((WireFormatInfo) command); + negociationCounter.countDown(); + } + } + public void onException(IOException error) { + if( !ignoreAsycError.get() ) { + log.info("Client transport error: ", error); + asyncError.set(error); + negociationCounter.countDown(); + } + } + public void transportInterupted() { + } + public void transportResumed() { + }}); + clientTransport.start(); + } + + /** + * @throws IOException + * @throws URISyntaxException + * @throws Exception + */ + private void startServer(String uri ) throws IOException, URISyntaxException, Exception { + server = TransportFactory.bind("localhost", new URI(uri)); + server.setAcceptListener(new TransportAcceptListener(){ + public void onAccept(Transport transport) { + try { + log.info("["+getName()+"] Server Accepted a Connection"); + serverTransport = transport; + serverTransport.setTransportListener(new TransportListener() { + public void onCommand(Object command) { + if( command instanceof WireFormatInfo ) { + serverWF.set((WireFormatInfo) command); + negociationCounter.countDown(); + } + } + public void onException(IOException error) { + if( !ignoreAsycError.get() ) { + log.info("Server transport error: ", error); + asyncError.set(error); + negociationCounter.countDown(); + } + } + public void transportInterupted() { + } + public void transportResumed() { + }}); + serverTransport.start(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void onAcceptError(Exception error) { + error.printStackTrace(); + } + }); + server.start(); + } + + protected void tearDown() throws Exception { + ignoreAsycError.set(true); + try { + if( clientTransport!=null ) + clientTransport.stop(); + if( serverTransport!=null ) + serverTransport.stop(); + if( server!=null ) + server.stop(); + } catch (Throwable e) { + e.printStackTrace(); + } + super.tearDown(); + } + + + /** + * @throws Exception + */ + public void testWireFomatInfoSeverVersion1() throws Exception { + + startServer("tcp://localhost:61616?wireFormat.version=1"); + startClient("tcp://localhost:61616"); + + assertTrue("Connect timeout", negociationCounter.await(10, TimeUnit.SECONDS)); + assertNull("Async error: "+asyncError, asyncError.get()); + + assertNotNull(clientWF.get()); + assertEquals(1, clientWF.get().getVersion()); + + assertNotNull(serverWF.get()); + assertEquals(1, serverWF.get().getVersion()); + } + + /** + * @throws Exception + */ + public void testWireFomatInfoClientVersion1() throws Exception { + + startServer("tcp://localhost:61616"); + startClient("tcp://localhost:61616?wireFormat.version=1"); + + assertTrue("Connect timeout", negociationCounter.await(10, TimeUnit.SECONDS)); + assertNull("Async error: "+asyncError, asyncError.get()); + + assertNotNull(clientWF.get()); + assertEquals(1, clientWF.get().getVersion()); + + assertNotNull(serverWF.get()); + assertEquals(1, serverWF.get().getVersion()); + } + + /** + * @throws Exception + */ + public void testWireFomatInfoCurrentVersion() throws Exception { + + startServer("tcp://localhost:61616"); + startClient("tcp://localhost:61616"); + + assertTrue("Connect timeout", negociationCounter.await(10, TimeUnit.SECONDS)); + assertNull("Async error: "+asyncError, asyncError.get()); + + assertNotNull(clientWF.get()); + assertEquals(CommandTypes.PROTOCOL_VERSION, clientWF.get().getVersion()); + + assertNotNull(serverWF.get()); + assertEquals(CommandTypes.PROTOCOL_VERSION, serverWF.get().getVersion()); + } + +}