Expose the wire format protocol to the ActiveMQConnection and TransportConnection objects

so that they know when then can use more advanced protocol options.  This will be needed
to implement producer flow control acking.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@515863 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-03-08 00:22:18 +00:00
parent 10185cadd7
commit 1a3f54c93f
5 changed files with 265 additions and 20 deletions

View File

@ -25,6 +25,15 @@ import java.net.URISyntaxException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionConsumer; import javax.jms.ConnectionConsumer;
@ -45,6 +54,7 @@ import javax.jms.TopicConnection;
import javax.jms.TopicSession; import javax.jms.TopicSession;
import javax.jms.XAConnection; import javax.jms.XAConnection;
import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination; import org.apache.activemq.command.ActiveMQTempDestination;
@ -52,6 +62,7 @@ import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic; import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionError; import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionId;
@ -70,6 +81,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId; import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.management.JMSConnectionStatsImpl; import org.apache.activemq.management.JMSConnectionStatsImpl;
import org.apache.activemq.management.JMSStatsImpl; import org.apache.activemq.management.JMSStatsImpl;
import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsCapable;
@ -82,19 +94,9 @@ import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.ServiceSupport; import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener { public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener {
@ -167,6 +169,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private BrokerInfo brokerInfo; private BrokerInfo brokerInfo;
private IOException firstFailureError; private IOException firstFailureError;
// Assume that protocol is the latest. Change to the actual protocol
// version when a WireFormatInfo is received.
private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
/** /**
* Construct an <code>ActiveMQConnection</code> * Construct an <code>ActiveMQConnection</code>
@ -1562,6 +1567,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
onConnectionControl((ConnectionControl) command); onConnectionControl((ConnectionControl) command);
}else if (command instanceof ConsumerControl){ }else if (command instanceof ConsumerControl){
onConsumerControl((ConsumerControl) command); onConsumerControl((ConsumerControl) command);
}else if ( command.isWireFormatInfo() ) {
onWireFormatInfo((WireFormatInfo)command);
} }
} }
for (Iterator iter = transportListeners.iterator(); iter.hasNext();) { for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
@ -1570,6 +1577,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} }
} }
protected void onWireFormatInfo(WireFormatInfo info) {
protocolVersion.set(info.getVersion());
}
/** /**
* Used for handling async exceptions * Used for handling async exceptions
* *
@ -1989,4 +2001,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
protected BlobTransferPolicy createBlobTransferPolicy() { protected BlobTransferPolicy createBlobTransferPolicy() {
return new BlobTransferPolicy(); return new BlobTransferPolicy();
} }
public int getProtocolVersion() {
return protocolVersion.get();
}
} }

View File

@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.Service; import org.apache.activemq.Service;
@ -34,6 +35,7 @@ import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionError; import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionId;
@ -123,6 +125,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
protected AtomicBoolean dispatchStopped=new AtomicBoolean(false); protected AtomicBoolean dispatchStopped=new AtomicBoolean(false);
private boolean networkConnection; private boolean networkConnection;
private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
static class ConnectionState extends org.apache.activemq.state.ConnectionState{ static class ConnectionState extends org.apache.activemq.state.ConnectionState{
@ -326,6 +329,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
public Response processWireFormat(WireFormatInfo info) throws Exception{ public Response processWireFormat(WireFormatInfo info) throws Exception{
wireFormatInfo=info; wireFormatInfo=info;
protocolVersion.set(info.getVersion());
return null; return null;
} }
@ -1159,4 +1163,8 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
} }
} }
public int getProtocolVersion() {
return protocolVersion.get();
}
} }

View File

@ -24,6 +24,9 @@ package org.apache.activemq.command;
*/ */
public interface CommandTypes { public interface CommandTypes {
// What is the latest version of the openwire protocol
byte PROTOCOL_VERSION = 3;
// A marshaling layer can use this type to specify a null object. // A marshaling layer can use this type to specify a null object.
byte NULL = 0; byte NULL = 0;

View File

@ -17,26 +17,31 @@
*/ */
package org.apache.activemq.openwire; package org.apache.activemq.openwire;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.MarshallAware;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.util.*;
import org.apache.activemq.wireformat.WireFormat;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.HashMap; import java.util.HashMap;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.MarshallAware;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.ClassLoading;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.wireformat.WireFormat;
/** /**
* *
* @version $Revision$ * @version $Revision$
*/ */
final public class OpenWireFormat implements WireFormat { final public class OpenWireFormat implements WireFormat {
public static final int DEFAULT_VERSION = 3; public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_VERSION;
static final byte NULL_TYPE = CommandTypes.NULL; static final byte NULL_TYPE = CommandTypes.NULL;
private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE/2; private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE/2;
@ -561,15 +566,28 @@ final public class OpenWireFormat implements WireFormat {
throw new IllegalStateException("Wireformat cannot not be renegotiated."); throw new IllegalStateException("Wireformat cannot not be renegotiated.");
this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()) ); this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()) );
info.setVersion(this.getVersion());
this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled(); this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
info.setStackTraceEnabled(this.stackTraceEnabled);
this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled(); this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled();
info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled);
this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled(); this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
info.setCacheEnabled(this.cacheEnabled);
this.tightEncodingEnabled = info.isTightEncodingEnabled() && preferedWireFormatInfo.isTightEncodingEnabled(); this.tightEncodingEnabled = info.isTightEncodingEnabled() && preferedWireFormatInfo.isTightEncodingEnabled();
info.setTightEncodingEnabled(this.tightEncodingEnabled);
this.sizePrefixDisabled = info.isSizePrefixDisabled() && preferedWireFormatInfo.isSizePrefixDisabled(); this.sizePrefixDisabled = info.isSizePrefixDisabled() && preferedWireFormatInfo.isSizePrefixDisabled();
info.setSizePrefixDisabled(this.sizePrefixDisabled);
if( cacheEnabled ) { if( cacheEnabled ) {
int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize()); int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize());
info.setCacheSize(size);
if( size == 0 ) { if( size == 0 ) {
size = MARSHAL_CACHE_SIZE; size = MARSHAL_CACHE_SIZE;
} }

View File

@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.tcp;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportServer;
import javax.net.SocketFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class WireformatNegociationTest extends CombinationTestSupport {
private TransportServer server;
private Transport clientTransport;
private Transport serverTransport;
private final AtomicReference<WireFormatInfo> clientWF = new AtomicReference<WireFormatInfo>();
private final AtomicReference<WireFormatInfo> serverWF = new AtomicReference<WireFormatInfo>();
private final AtomicReference<Exception> asyncError = new AtomicReference<Exception>();
private final AtomicBoolean ignoreAsycError = new AtomicBoolean();
private final CountDownLatch negociationCounter = new CountDownLatch(2);
protected void setUp() throws Exception {
super.setUp();
}
/**
* @throws Exception
* @throws URISyntaxException
*/
private void startClient(String uri) throws Exception, URISyntaxException {
clientTransport = TransportFactory.connect(new URI(uri));
clientTransport.setTransportListener(new TransportListener() {
public void onCommand(Object command) {
if( command instanceof WireFormatInfo ) {
clientWF.set((WireFormatInfo) command);
negociationCounter.countDown();
}
}
public void onException(IOException error) {
if( !ignoreAsycError.get() ) {
log.info("Client transport error: ", error);
asyncError.set(error);
negociationCounter.countDown();
}
}
public void transportInterupted() {
}
public void transportResumed() {
}});
clientTransport.start();
}
/**
* @throws IOException
* @throws URISyntaxException
* @throws Exception
*/
private void startServer(String uri ) throws IOException, URISyntaxException, Exception {
server = TransportFactory.bind("localhost", new URI(uri));
server.setAcceptListener(new TransportAcceptListener(){
public void onAccept(Transport transport) {
try {
log.info("["+getName()+"] Server Accepted a Connection");
serverTransport = transport;
serverTransport.setTransportListener(new TransportListener() {
public void onCommand(Object command) {
if( command instanceof WireFormatInfo ) {
serverWF.set((WireFormatInfo) command);
negociationCounter.countDown();
}
}
public void onException(IOException error) {
if( !ignoreAsycError.get() ) {
log.info("Server transport error: ", error);
asyncError.set(error);
negociationCounter.countDown();
}
}
public void transportInterupted() {
}
public void transportResumed() {
}});
serverTransport.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public void onAcceptError(Exception error) {
error.printStackTrace();
}
});
server.start();
}
protected void tearDown() throws Exception {
ignoreAsycError.set(true);
try {
if( clientTransport!=null )
clientTransport.stop();
if( serverTransport!=null )
serverTransport.stop();
if( server!=null )
server.stop();
} catch (Throwable e) {
e.printStackTrace();
}
super.tearDown();
}
/**
* @throws Exception
*/
public void testWireFomatInfoSeverVersion1() throws Exception {
startServer("tcp://localhost:61616?wireFormat.version=1");
startClient("tcp://localhost:61616");
assertTrue("Connect timeout", negociationCounter.await(10, TimeUnit.SECONDS));
assertNull("Async error: "+asyncError, asyncError.get());
assertNotNull(clientWF.get());
assertEquals(1, clientWF.get().getVersion());
assertNotNull(serverWF.get());
assertEquals(1, serverWF.get().getVersion());
}
/**
* @throws Exception
*/
public void testWireFomatInfoClientVersion1() throws Exception {
startServer("tcp://localhost:61616");
startClient("tcp://localhost:61616?wireFormat.version=1");
assertTrue("Connect timeout", negociationCounter.await(10, TimeUnit.SECONDS));
assertNull("Async error: "+asyncError, asyncError.get());
assertNotNull(clientWF.get());
assertEquals(1, clientWF.get().getVersion());
assertNotNull(serverWF.get());
assertEquals(1, serverWF.get().getVersion());
}
/**
* @throws Exception
*/
public void testWireFomatInfoCurrentVersion() throws Exception {
startServer("tcp://localhost:61616");
startClient("tcp://localhost:61616");
assertTrue("Connect timeout", negociationCounter.await(10, TimeUnit.SECONDS));
assertNull("Async error: "+asyncError, asyncError.get());
assertNotNull(clientWF.get());
assertEquals(CommandTypes.PROTOCOL_VERSION, clientWF.get().getVersion());
assertNotNull(serverWF.get());
assertEquals(CommandTypes.PROTOCOL_VERSION, serverWF.get().getVersion());
}
}