git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1379433 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2012-08-31 12:53:53 +00:00
parent 1e57750dd6
commit ee2069f85c
2 changed files with 54 additions and 34 deletions

View File

@ -21,6 +21,7 @@ import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionConsumer; import javax.jms.ConnectionConsumer;
import javax.jms.Destination;
import javax.jms.ExceptionListener; import javax.jms.ExceptionListener;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
@ -97,67 +98,76 @@ public class ActiveMQEndpointWorker {
public void run() { public void run() {
currentReconnectDelay = INITIAL_RECONNECT_DELAY; currentReconnectDelay = INITIAL_RECONNECT_DELAY;
MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec(); MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
if ( LOG.isInfoEnabled() ) { if (LOG.isInfoEnabled()) {
LOG.info("Establishing connection to broker [" + adapter.getInfo().getServerUrl() + "]"); LOG.info("Establishing connection to broker [" + adapter.getInfo().getServerUrl() + "]");
} }
while ( connecting.get() && running ) { while (connecting.get() && running) {
try { try {
connection = adapter.makeConnection(activationSpec); connection = adapter.makeConnection(activationSpec);
connection.setExceptionListener(new ExceptionListener() { connection.setExceptionListener(new ExceptionListener() {
public void onException(JMSException error) { public void onException(JMSException error) {
if (!serverSessionPool.isClosing()) { if (!serverSessionPool.isClosing()) {
// initiate reconnection only once, i.e. on initial exception // initiate reconnection only once, i.e. on initial exception
// and only if not already trying to connect // and only if not already trying to connect
LOG.error("Connection to broker failed: " + error.getMessage(), error); LOG.error("Connection to broker failed: " + error.getMessage(), error);
if ( connecting.compareAndSet(false, true) ) { if (connecting.compareAndSet(false, true)) {
synchronized ( connectWork ) { synchronized (connectWork) {
disconnect(); disconnect();
serverSessionPool.closeIdleSessions(); serverSessionPool.closeIdleSessions();
connect(); connect();
} }
} else { } else {
// connection attempt has already been initiated // connection attempt has already been initiated
LOG.info("Connection attempt already in progress, ignoring connection exception"); LOG.info("Connection attempt already in progress, ignoring connection exception");
} }
} }
} }
}); });
connection.start(); connection.start();
int prefetchSize = activationSpec.getMaxMessagesPerSessionsIntValue() * activationSpec.getMaxSessionsIntValue(); if (activationSpec.isDurableSubscription()) {
if (activationSpec.isDurableSubscription()) {
consumer = connection.createDurableConnectionConsumer( consumer = connection.createDurableConnectionConsumer(
(Topic) dest, (Topic) dest,
activationSpec.getSubscriptionName(), activationSpec.getSubscriptionName(),
emptyToNull(activationSpec.getMessageSelector()), emptyToNull(activationSpec.getMessageSelector()),
serverSessionPool, serverSessionPool,
prefetchSize, connection.getPrefetchPolicy().getDurableTopicPrefetch(),
activationSpec.getNoLocalBooleanValue()); activationSpec.getNoLocalBooleanValue());
} else { } else {
consumer = connection.createConnectionConsumer( consumer = connection.createConnectionConsumer(
dest, dest,
emptyToNull(activationSpec.getMessageSelector()), emptyToNull(activationSpec.getMessageSelector()),
serverSessionPool, serverSessionPool,
prefetchSize, getPrefetch(activationSpec, connection, dest),
activationSpec.getNoLocalBooleanValue()); activationSpec.getNoLocalBooleanValue());
} }
if ( connecting.compareAndSet(true, false) ) { if (connecting.compareAndSet(true, false)) {
if ( LOG.isInfoEnabled() ) { if (LOG.isInfoEnabled()) {
LOG.info("Successfully established connection to broker [" + adapter.getInfo().getServerUrl() + "]"); LOG.info("Successfully established connection to broker [" + adapter.getInfo().getServerUrl() + "]");
} }
} else { } else {
LOG.error("Could not release connection lock"); LOG.error("Could not release connection lock");
} }
} catch (JMSException error) { } catch (JMSException error) {
if ( LOG.isDebugEnabled() ) { if (LOG.isDebugEnabled()) {
LOG.debug("Failed to connect: " + error.getMessage(), error); LOG.debug("Failed to connect: " + error.getMessage(), error);
} }
disconnect(); disconnect();
pause(error); pause(error);
}
}
} }
private int getPrefetch(MessageActivationSpec activationSpec, ActiveMQConnection connection, ActiveMQDestination destination) {
if (destination.isTopic()) {
return connection.getPrefetchPolicy().getTopicPrefetch();
} else if (destination.isQueue()) {
return connection.getPrefetchPolicy().getQueuePrefetch();
} else {
return activationSpec.getMaxMessagesPerSessionsIntValue() * activationSpec.getMaxSessionsIntValue();
} }
} }

View File

@ -48,7 +48,12 @@ import javax.transaction.xa.Xid;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerInfo;
public class MDBTest extends TestCase { public class MDBTest extends TestCase {
@ -133,10 +138,14 @@ public class MDBTest extends TestCase {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
Connection connection = factory.createConnection(); Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisory = session.createConsumer(AdvisorySupport.getConsumerAdvisoryTopic(new ActiveMQQueue("TEST")));
ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter(); ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
adapter.setServerUrl("vm://localhost?broker.persistent=false"); adapter.setServerUrl("vm://localhost?broker.persistent=false");
adapter.setQueuePrefetch(1);
adapter.start(new StubBootstrapContext()); adapter.start(new StubBootstrapContext());
final CountDownLatch messageDelivered = new CountDownLatch(1); final CountDownLatch messageDelivered = new CountDownLatch(1);
@ -168,16 +177,17 @@ public class MDBTest extends TestCase {
// Activate an Endpoint // Activate an Endpoint
adapter.endpointActivation(messageEndpointFactory, activationSpec); adapter.endpointActivation(messageEndpointFactory, activationSpec);
// Give endpoint a chance to setup and register its listeners ActiveMQMessage msg = (ActiveMQMessage)advisory.receive(1000);
try { if (msg != null) {
Thread.sleep(1000); assertEquals("Prefetch size hasn't been set", 1, ((ConsumerInfo)msg.getDataStructure()).getPrefetchSize());
} catch (Exception e) { } else {
fail("Consumer hasn't been created");
} }
// Send the broker a message to that endpoint // Send the broker a message to that endpoint
MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST")); MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
producer.send(session.createTextMessage("Hello!")); producer.send(session.createTextMessage("Hello!"));
connection.close(); connection.close();
// Wait for the message to be delivered. // Wait for the message to be delivered.