https://issues.apache.org/activemq/browse/AMQ-1337 - Broker should finish accepting connection in an async thread.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@558814 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-07-23 18:02:41 +00:00
parent 20364163e9
commit 3a7c673dec
7 changed files with 162 additions and 122 deletions

View File

@ -1443,10 +1443,16 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* Returns the broker name if one is available or null if one is not available yet. * Returns the broker name if one is available or null if one is not available yet.
*/ */
public String getBrokerName() { public String getBrokerName() {
if (brokerInfo == null) { try {
return null; brokerInfoReceived.await(5,TimeUnit.SECONDS);
} if (brokerInfo == null) {
return brokerInfo.getBrokerName(); return null;
}
return brokerInfo.getBrokerName();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
} }
/** /**

View File

@ -143,10 +143,23 @@ public class TransportConnector implements Connector {
this.server = server; this.server = server;
this.brokerInfo.setBrokerURL(server.getConnectURI().toString()); this.brokerInfo.setBrokerURL(server.getConnectURI().toString());
this.server.setAcceptListener(new TransportAcceptListener() { this.server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(Transport transport) { public void onAccept(final Transport transport) {
try { try {
Connection connection = createConnection(transport); // Starting the connection could block due to
connection.start(); // wireformat negociation, so start it in an async thread.
Thread startThread = new Thread("ActiveMQ Transport Initiator: "+transport.getRemoteAddress()) {
public void run() {
try {
Connection connection = createConnection(transport);
connection.start();
} catch (Exception e) {
ServiceSupport.dispose(transport);
onAcceptError(e);
}
}
};
startThread.setPriority(4);
startThread.start();
} }
catch (Exception e) { catch (Exception e) {
String remoteHost = transport.getRemoteAddress(); String remoteHost = transport.getRemoteAddress();

View File

@ -21,8 +21,8 @@ import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.thread.Task; import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunner;
@ -52,10 +52,11 @@ public class VMTransport implements Transport,Task{
protected boolean marshal; protected boolean marshal;
protected boolean network; protected boolean network;
protected boolean async=true; protected boolean async=true;
protected AtomicBoolean started=new AtomicBoolean();
protected int asyncQueueDepth=2000; protected int asyncQueueDepth=2000;
protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList()); protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList());
protected LinkedBlockingQueue messageQueue=null; protected LinkedBlockingQueue messageQueue=null;
protected boolean started;
protected final Object startMutex = new Object();
protected final URI location; protected final URI location;
protected final long id; protected final long id;
private TaskRunner taskRunner; private TaskRunner taskRunner;
@ -96,11 +97,15 @@ public class VMTransport implements Transport,Task{
} }
protected void syncOneWay(Object command){ protected void syncOneWay(Object command){
final TransportListener tl=peer.transportListener; TransportListener tl=null;
prePeerSetQueue=peer.prePeerSetQueue; synchronized(peer.startMutex){
if(tl==null){ if( peer.started ) {
prePeerSetQueue.add(command); tl = peer.transportListener;
}else{ } else if(!peer.disposed) {
peer.prePeerSetQueue.add(command);
}
}
if( tl!=null ) {
tl.onCommand(command); tl.onCommand(command);
} }
} }
@ -147,30 +152,33 @@ public class VMTransport implements Transport,Task{
} }
public void start() throws Exception{ public void start() throws Exception{
if(started.compareAndSet(false,true)){ if(transportListener==null)
if(transportListener==null) throw new IOException("TransportListener not set.");
throw new IOException("TransportListener not set."); synchronized(startMutex) {
if(!async){ if( !prePeerSetQueue.isEmpty() ) {
for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){ for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
Command command=(Command)iter.next(); Command command=(Command)iter.next();
transportListener.onCommand(command); transportListener.onCommand(command);
iter.remove(); }
} prePeerSetQueue.clear();
}else{ }
peer.wakeup(); started = true;
wakeup(); if( isAsync() ) {
} peer.wakeup();
wakeup();
}
} }
} }
public void stop() throws Exception{ public void stop() throws Exception{
if(started.compareAndSet(true,false)){ synchronized(startMutex) {
if(!disposed){ if(!disposed){
started=false;
disposed=true; disposed=true;
} if(taskRunner!=null){
if(taskRunner!=null){ taskRunner.shutdown(1000);
taskRunner.shutdown(1000); taskRunner=null;
taskRunner=null; }
} }
} }
} }

View File

@ -36,23 +36,34 @@ import org.apache.activemq.broker.TransportConnector;
public class ActiveMQConnectionFactoryTest extends CombinationTestSupport { public class ActiveMQConnectionFactoryTest extends CombinationTestSupport {
public void testUseURIToSetUseClientIDPrefixOnConnectionFactory() throws URISyntaxException, JMSException { private ActiveMQConnection connection;
private BrokerService broker;
public void testUseURIToSetUseClientIDPrefixOnConnectionFactory() throws URISyntaxException, JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?jms.clientIDPrefix=Cheese"); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?jms.clientIDPrefix=Cheese");
assertEquals("Cheese", cf.getClientIDPrefix()); assertEquals("Cheese", cf.getClientIDPrefix());
ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); connection = (ActiveMQConnection) cf.createConnection();
try { connection.start();
connection.start();
String clientID = connection.getClientID(); String clientID = connection.getClientID();
log.info("Got client ID: " + clientID); log.info("Got client ID: " + clientID);
assertTrue("should start with Cheese! but was: " + clientID, clientID.startsWith("Cheese")); assertTrue("should start with Cheese! but was: " + clientID, clientID.startsWith("Cheese"));
}
finally {
connection.close();
}
} }
protected void tearDown() throws Exception {
// Try our best to close any previously opend connection.
try {
connection.close();
} catch (Throwable ignore) {
}
// Try our best to stop any previously started broker.
try {
broker.stop();
} catch (Throwable ignore) {
}
}
public void testUseURIToSetOptionsOnConnectionFactory() throws URISyntaxException, JMSException { public void testUseURIToSetOptionsOnConnectionFactory() throws URISyntaxException, JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?jms.useAsyncSend=true"); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?jms.useAsyncSend=true");
@ -88,26 +99,27 @@ public class ActiveMQConnectionFactoryTest extends CombinationTestSupport {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
// Make sure the broker is not created until the connection is instantiated. // Make sure the broker is not created until the connection is instantiated.
assertNull( BrokerRegistry.getInstance().lookup("localhost") ); assertNull( BrokerRegistry.getInstance().lookup("localhost") );
Connection connection = cf.createConnection(); connection = (ActiveMQConnection) cf.createConnection();
// This should create the connection. // This should create the connection.
assertNotNull(connection); assertNotNull(connection);
// Verify the broker was created. // Verify the broker was created.
assertNotNull( BrokerRegistry.getInstance().lookup("localhost") ); assertNotNull( BrokerRegistry.getInstance().lookup("localhost") );
connection.close(); connection.close();
// Verify the broker was destroyed. // Verify the broker was destroyed.
assertNull( BrokerRegistry.getInstance().lookup("localhost") ); assertNull( BrokerRegistry.getInstance().lookup("localhost") );
} }
public void testGetBrokerName() throws URISyntaxException, JMSException { public void testGetBrokerName() throws URISyntaxException, JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); connection = (ActiveMQConnection) cf.createConnection();
connection.start(); connection.start();
String brokerName = connection.getBrokerName(); String brokerName = connection.getBrokerName();
log.info("Got broker name: " + brokerName); log.info("Got broker name: " + brokerName);
assertNotNull("No broker name available!", brokerName); assertNotNull("No broker name available!", brokerName);
connection.close();
} }
public void testCreateTcpConnectionUsingAllocatedPort() throws Exception { public void testCreateTcpConnectionUsingAllocatedPort() throws Exception {
@ -143,7 +155,7 @@ public class ActiveMQConnectionFactoryTest extends CombinationTestSupport {
protected void assertCreateConnection(String uri) throws Exception { protected void assertCreateConnection(String uri) throws Exception {
// Start up a broker with a tcp connector. // Start up a broker with a tcp connector.
BrokerService broker = new BrokerService(); broker = new BrokerService();
broker.setPersistent(false); broker.setPersistent(false);
TransportConnector connector = broker.addConnector(uri); TransportConnector connector = broker.addConnector(uri);
broker.start(); broker.start();
@ -162,9 +174,8 @@ public class ActiveMQConnectionFactoryTest extends CombinationTestSupport {
// This should create the connection. // This should create the connection.
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectURI); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectURI);
Connection connection = cf.createConnection(); connection = (ActiveMQConnection) cf.createConnection();
assertNotNull(connection); assertNotNull(connection);
connection.close();
broker.stop(); broker.stop();
} }

View File

@ -17,6 +17,9 @@
*/ */
package org.apache.activemq.broker; package org.apache.activemq.broker;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import javax.jms.DeliveryMode; import javax.jms.DeliveryMode;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -34,9 +37,6 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
public class BrokerTest extends BrokerTestSupport { public class BrokerTest extends BrokerTestSupport {
public ActiveMQDestination destination; public ActiveMQDestination destination;
@ -45,6 +45,61 @@ public class BrokerTest extends BrokerTestSupport {
public byte destinationType; public byte destinationType;
public boolean durableConsumer; public boolean durableConsumer;
public void initCombosForTestQueueOnlyOnceDeliveryWith2Consumers() {
addCombinationValues( "deliveryMode", new Object[]{
Integer.valueOf(DeliveryMode.NON_PERSISTENT),
Integer.valueOf(DeliveryMode.PERSISTENT)} );
}
public void testQueueOnlyOnceDeliveryWith2Consumers() throws Exception {
ActiveMQDestination destination = new ActiveMQQueue("TEST");
// Setup a first connection
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
connection1.send(producerInfo);
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
consumerInfo1.setPrefetchSize(1);
connection1.send(consumerInfo1);
// Setup a second connection
StubConnection connection2 = createConnection();
ConnectionInfo connectionInfo2 = createConnectionInfo();
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
consumerInfo2.setPrefetchSize(1);
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
connection2.send(consumerInfo2);
// Send the messages
connection1.send(createMessage(producerInfo, destination, deliveryMode));
connection1.send(createMessage(producerInfo, destination, deliveryMode));
connection1.send(createMessage(producerInfo, destination, deliveryMode));
connection1.send(createMessage(producerInfo, destination, deliveryMode));
for( int i=0; i < 2 ; i++ ) {
Message m1 = receiveMessage(connection1);
Message m2 = receiveMessage(connection2);
assertNotNull("m1 is null for index: " + i, m1);
assertNotNull("m2 is null for index: " + i, m2);
assertNotSame(m1.getMessageId(), m2.getMessageId());
connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE));
}
assertNoMessagesLeft(connection1);
assertNoMessagesLeft(connection2);
}
public void initCombosForTestQueuBrowserWith2Consumers() { public void initCombosForTestQueuBrowserWith2Consumers() {
addCombinationValues( "deliveryMode", new Object[]{ addCombinationValues( "deliveryMode", new Object[]{
Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.NON_PERSISTENT),
@ -1339,61 +1394,7 @@ public class BrokerTest extends BrokerTestSupport {
assertNoMessagesLeft(connection); assertNoMessagesLeft(connection);
} }
public void initCombosForTestQueueOnlyOnceDeliveryWith2Consumers() {
addCombinationValues( "deliveryMode", new Object[]{
Integer.valueOf(DeliveryMode.NON_PERSISTENT),
Integer.valueOf(DeliveryMode.PERSISTENT)} );
}
public void testQueueOnlyOnceDeliveryWith2Consumers() throws Exception {
ActiveMQDestination destination = new ActiveMQQueue("TEST");
// Setup a first connection
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
connection1.send(producerInfo);
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
consumerInfo1.setPrefetchSize(1);
connection1.send(consumerInfo1);
// Setup a second connection
StubConnection connection2 = createConnection();
ConnectionInfo connectionInfo2 = createConnectionInfo();
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
consumerInfo2.setPrefetchSize(1);
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
connection2.send(consumerInfo2);
// Send the messages
connection1.send(createMessage(producerInfo, destination, deliveryMode));
connection1.send(createMessage(producerInfo, destination, deliveryMode));
connection1.send(createMessage(producerInfo, destination, deliveryMode));
connection1.send(createMessage(producerInfo, destination, deliveryMode));
for( int i=0; i < 2 ; i++ ) {
Message m1 = receiveMessage(connection1);
Message m2 = receiveMessage(connection2);
assertNotNull("m1 is null for index: " + i, m1);
assertNotNull("m2 is null for index: " + i, m2);
assertNotSame(m1.getMessageId(), m2.getMessageId());
connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE));
}
assertNoMessagesLeft(connection1);
assertNoMessagesLeft(connection2);
}
public void initCombosForTestQueueSendThenAddConsumer() { public void initCombosForTestQueueSendThenAddConsumer() {
addCombinationValues( "deliveryMode", new Object[]{ addCombinationValues( "deliveryMode", new Object[]{

View File

@ -117,6 +117,14 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination); ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination);
connection2.send(consumerInfo); connection2.send(consumerInfo);
// Give demand forwarding bridge a chance to finish forwarding the subscriptions.
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
// Send the message to the local boker. // Send the message to the local boker.
connection1.request(createMessage(producerInfo, destination, deliveryMode)); connection1.request(createMessage(producerInfo, destination, deliveryMode));
// Make sure the message was delivered via the remote. // Make sure the message was delivered via the remote.
@ -129,14 +137,7 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
config.setBrokerName("local"); config.setBrokerName("local");
config.setDispatchAsync(false); config.setDispatchAsync(false);
bridge = new DemandForwardingBridge(config,createTransport(), createRemoteTransport()); bridge = new DemandForwardingBridge(config,createTransport(), createRemoteTransport());
bridge.start(); bridge.start();
// PATCH: Give demand forwarding bridge a chance to finish setting up
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
} }
protected void tearDown() throws Exception { protected void tearDown() throws Exception {

View File

@ -68,6 +68,13 @@ public class ForwardingBridgeTest extends NetworkTestSupport {
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination); ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination);
connection2.send(consumerInfo); connection2.send(consumerInfo);
Thread.sleep(1000); Thread.sleep(1000);
// Give forwarding bridge a chance to finish setting up
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
// Send the message to the local boker. // Send the message to the local boker.
connection1.send(createMessage(producerInfo, destination, deliveryMode)); connection1.send(createMessage(producerInfo, destination, deliveryMode));
@ -82,14 +89,7 @@ public class ForwardingBridgeTest extends NetworkTestSupport {
bridge = new ForwardingBridge(createTransport(), createRemoteTransport()); bridge = new ForwardingBridge(createTransport(), createRemoteTransport());
bridge.setClientId("local-remote-bridge"); bridge.setClientId("local-remote-bridge");
bridge.setDispatchAsync(false); bridge.setDispatchAsync(false);
bridge.start(); bridge.start();
// PATCH: Give forwarding bridge a chance to finish setting up
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
} }
protected void tearDown() throws Exception { protected void tearDown() throws Exception {