From 1a3f54c93f54f946342cc307848bb29722bfe8ae Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Thu, 8 Mar 2007 00:22:18 +0000 Subject: [PATCH] Expose the wire format protocol to the ActiveMQConnection and TransportConnection objects so that they know when then can use more advanced protocol options. This will be needed to implement producer flow control acking. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@515863 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/ActiveMQConnection.java | 41 ++-- .../activemq/broker/TransportConnection.java | 8 + .../apache/activemq/command/CommandTypes.java | 3 + .../activemq/openwire/OpenWireFormat.java | 34 ++- .../tcp/WireformatNegociationTest.java | 199 ++++++++++++++++++ 5 files changed, 265 insertions(+), 20 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index bb1723fc29..c4f07517cd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -25,6 +25,15 @@ import java.net.URISyntaxException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.ConnectionConsumer; @@ -45,6 +54,7 @@ import javax.jms.TopicConnection; import javax.jms.TopicSession; import javax.jms.XAConnection; +import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTempDestination; @@ -52,6 +62,7 @@ import org.apache.activemq.command.ActiveMQTempQueue; import org.apache.activemq.command.ActiveMQTempTopic; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; +import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.ConnectionError; import org.apache.activemq.command.ConnectionId; @@ -70,6 +81,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionId; import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.management.JMSConnectionStatsImpl; import org.apache.activemq.management.JMSStatsImpl; import org.apache.activemq.management.StatsCapable; @@ -82,19 +94,9 @@ import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.ServiceSupport; -import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener { @@ -166,7 +168,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private final CountDownLatch brokerInfoReceived = new CountDownLatch(1); private BrokerInfo brokerInfo; private IOException firstFailureError; - + + // Assume that protocol is the latest. Change to the actual protocol + // version when a WireFormatInfo is received. + private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION); /** * Construct an ActiveMQConnection @@ -1562,6 +1567,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon onConnectionControl((ConnectionControl) command); }else if (command instanceof ConsumerControl){ onConsumerControl((ConsumerControl) command); + }else if ( command.isWireFormatInfo() ) { + onWireFormatInfo((WireFormatInfo)command); } } for (Iterator iter = transportListeners.iterator(); iter.hasNext();) { @@ -1570,7 +1577,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } } - /** + protected void onWireFormatInfo(WireFormatInfo info) { + protocolVersion.set(info.getVersion()); + } + + + /** * Used for handling async exceptions * * @param error @@ -1989,4 +2001,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon protected BlobTransferPolicy createBlobTransferPolicy() { return new BlobTransferPolicy(); } + + + public int getProtocolVersion() { + return protocolVersion.get(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 3d03e2a854..7865a9c0e9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.Service; @@ -34,6 +35,7 @@ import org.apache.activemq.broker.region.ConnectionStatistics; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; +import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.ConnectionError; import org.apache.activemq.command.ConnectionId; @@ -123,6 +125,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); protected AtomicBoolean dispatchStopped=new AtomicBoolean(false); private boolean networkConnection; + private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION); static class ConnectionState extends org.apache.activemq.state.ConnectionState{ @@ -326,6 +329,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit public Response processWireFormat(WireFormatInfo info) throws Exception{ wireFormatInfo=info; + protocolVersion.set(info.getVersion()); return null; } @@ -1158,5 +1162,9 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit } } } + + public int getProtocolVersion() { + return protocolVersion.get(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java b/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java index 42f4376eb8..967c14da41 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java @@ -23,6 +23,9 @@ package org.apache.activemq.command; * @version $Revision: 1.21 $ */ public interface CommandTypes { + + // What is the latest version of the openwire protocol + byte PROTOCOL_VERSION = 3; // A marshaling layer can use this type to specify a null object. byte NULL = 0; diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java index 9d7c0e83bc..2c202433fe 100755 --- a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java @@ -17,26 +17,31 @@ */ package org.apache.activemq.openwire; -import org.apache.activemq.command.CommandTypes; -import org.apache.activemq.command.DataStructure; -import org.apache.activemq.command.MarshallAware; -import org.apache.activemq.command.WireFormatInfo; -import org.apache.activemq.util.*; -import org.apache.activemq.wireformat.WireFormat; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.lang.reflect.Method; import java.util.HashMap; +import org.apache.activemq.command.CommandTypes; +import org.apache.activemq.command.DataStructure; +import org.apache.activemq.command.MarshallAware; +import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.ByteSequenceData; +import org.apache.activemq.util.ClassLoading; +import org.apache.activemq.util.DataByteArrayInputStream; +import org.apache.activemq.util.DataByteArrayOutputStream; +import org.apache.activemq.util.IdGenerator; +import org.apache.activemq.wireformat.WireFormat; + /** * * @version $Revision$ */ final public class OpenWireFormat implements WireFormat { - public static final int DEFAULT_VERSION = 3; + public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_VERSION; static final byte NULL_TYPE = CommandTypes.NULL; private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE/2; @@ -561,15 +566,28 @@ final public class OpenWireFormat implements WireFormat { throw new IllegalStateException("Wireformat cannot not be renegotiated."); this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()) ); + info.setVersion(this.getVersion()); + this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled(); + info.setStackTraceEnabled(this.stackTraceEnabled); + this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled(); + info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled); + this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled(); + info.setCacheEnabled(this.cacheEnabled); + this.tightEncodingEnabled = info.isTightEncodingEnabled() && preferedWireFormatInfo.isTightEncodingEnabled(); + info.setTightEncodingEnabled(this.tightEncodingEnabled); + this.sizePrefixDisabled = info.isSizePrefixDisabled() && preferedWireFormatInfo.isSizePrefixDisabled(); + info.setSizePrefixDisabled(this.sizePrefixDisabled); if( cacheEnabled ) { int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize()); + info.setCacheSize(size); + if( size == 0 ) { size = MARSHAL_CACHE_SIZE; } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java new file mode 100644 index 0000000000..f789b4fa1c --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.tcp; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.command.CommandTypes; +import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportAcceptListener; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.transport.TransportServer; + +import javax.net.SocketFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + + +public class WireformatNegociationTest extends CombinationTestSupport { + + private TransportServer server; + private Transport clientTransport; + private Transport serverTransport; + + private final AtomicReference clientWF = new AtomicReference(); + private final AtomicReference serverWF = new AtomicReference(); + private final AtomicReference asyncError = new AtomicReference(); + private final AtomicBoolean ignoreAsycError = new AtomicBoolean(); + + private final CountDownLatch negociationCounter = new CountDownLatch(2); + + protected void setUp() throws Exception { + super.setUp(); + } + + /** + * @throws Exception + * @throws URISyntaxException + */ + private void startClient(String uri) throws Exception, URISyntaxException { + clientTransport = TransportFactory.connect(new URI(uri)); + clientTransport.setTransportListener(new TransportListener() { + public void onCommand(Object command) { + if( command instanceof WireFormatInfo ) { + clientWF.set((WireFormatInfo) command); + negociationCounter.countDown(); + } + } + public void onException(IOException error) { + if( !ignoreAsycError.get() ) { + log.info("Client transport error: ", error); + asyncError.set(error); + negociationCounter.countDown(); + } + } + public void transportInterupted() { + } + public void transportResumed() { + }}); + clientTransport.start(); + } + + /** + * @throws IOException + * @throws URISyntaxException + * @throws Exception + */ + private void startServer(String uri ) throws IOException, URISyntaxException, Exception { + server = TransportFactory.bind("localhost", new URI(uri)); + server.setAcceptListener(new TransportAcceptListener(){ + public void onAccept(Transport transport) { + try { + log.info("["+getName()+"] Server Accepted a Connection"); + serverTransport = transport; + serverTransport.setTransportListener(new TransportListener() { + public void onCommand(Object command) { + if( command instanceof WireFormatInfo ) { + serverWF.set((WireFormatInfo) command); + negociationCounter.countDown(); + } + } + public void onException(IOException error) { + if( !ignoreAsycError.get() ) { + log.info("Server transport error: ", error); + asyncError.set(error); + negociationCounter.countDown(); + } + } + public void transportInterupted() { + } + public void transportResumed() { + }}); + serverTransport.start(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void onAcceptError(Exception error) { + error.printStackTrace(); + } + }); + server.start(); + } + + protected void tearDown() throws Exception { + ignoreAsycError.set(true); + try { + if( clientTransport!=null ) + clientTransport.stop(); + if( serverTransport!=null ) + serverTransport.stop(); + if( server!=null ) + server.stop(); + } catch (Throwable e) { + e.printStackTrace(); + } + super.tearDown(); + } + + + /** + * @throws Exception + */ + public void testWireFomatInfoSeverVersion1() throws Exception { + + startServer("tcp://localhost:61616?wireFormat.version=1"); + startClient("tcp://localhost:61616"); + + assertTrue("Connect timeout", negociationCounter.await(10, TimeUnit.SECONDS)); + assertNull("Async error: "+asyncError, asyncError.get()); + + assertNotNull(clientWF.get()); + assertEquals(1, clientWF.get().getVersion()); + + assertNotNull(serverWF.get()); + assertEquals(1, serverWF.get().getVersion()); + } + + /** + * @throws Exception + */ + public void testWireFomatInfoClientVersion1() throws Exception { + + startServer("tcp://localhost:61616"); + startClient("tcp://localhost:61616?wireFormat.version=1"); + + assertTrue("Connect timeout", negociationCounter.await(10, TimeUnit.SECONDS)); + assertNull("Async error: "+asyncError, asyncError.get()); + + assertNotNull(clientWF.get()); + assertEquals(1, clientWF.get().getVersion()); + + assertNotNull(serverWF.get()); + assertEquals(1, serverWF.get().getVersion()); + } + + /** + * @throws Exception + */ + public void testWireFomatInfoCurrentVersion() throws Exception { + + startServer("tcp://localhost:61616"); + startClient("tcp://localhost:61616"); + + assertTrue("Connect timeout", negociationCounter.await(10, TimeUnit.SECONDS)); + assertNull("Async error: "+asyncError, asyncError.get()); + + assertNotNull(clientWF.get()); + assertEquals(CommandTypes.PROTOCOL_VERSION, clientWF.get().getVersion()); + + assertNotNull(serverWF.get()); + assertEquals(CommandTypes.PROTOCOL_VERSION, serverWF.get().getVersion()); + } + +}