diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java index 7bd463b7ad..cea97d5861 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java @@ -18,6 +18,7 @@ package org.apache.activemq.ra; import java.lang.reflect.Method; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; import javax.jms.ConnectionConsumer; import javax.jms.ExceptionListener; @@ -61,24 +62,23 @@ public class ActiveMQEndpointWorker { } } - protected MessageResourceAdapter adapter; - protected ActiveMQEndpointActivationKey endpointActivationKey; - protected MessageEndpointFactory endpointFactory; - protected WorkManager workManager; - protected boolean transacted; - protected ActiveMQConnection connection; + protected final ActiveMQEndpointActivationKey endpointActivationKey; + protected final MessageEndpointFactory endpointFactory; + protected final WorkManager workManager; + protected final boolean transacted; + private final ActiveMQDestination dest; + private final Work connectWork; + private final AtomicBoolean connecting = new AtomicBoolean(false); + private final String shutdownMutex = "shutdownMutex"; + + private ActiveMQConnection connection; private ConnectionConsumer consumer; private ServerSessionPoolImpl serverSessionPool; - private ActiveMQDestination dest; private boolean running; - private Work connectWork; - private long reconnectDelay = INITIAL_RECONNECT_DELAY; - - public ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException { + protected ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException { this.endpointActivationKey = key; - this.adapter = adapter; this.endpointFactory = endpointActivationKey.getMessageEndpointFactory(); this.workManager = adapter.getBootstrapContext().getWorkManager(); try { @@ -88,43 +88,98 @@ public class ActiveMQEndpointWorker { } connectWork = new Work() { + long currentReconnectDelay = INITIAL_RECONNECT_DELAY; public void release() { // } public synchronized void run() { - if (!isRunning()) { - return; - } - if (connection != null) { - return; + currentReconnectDelay = INITIAL_RECONNECT_DELAY; + MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec(); + if ( LOG.isInfoEnabled() ) { + LOG.info("Establishing connection to broker [" + adapter.getInfo().getServerUrl() + "]"); } - MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec(); + while ( connecting.get() && running ) { try { connection = adapter.makeConnection(activationSpec); - connection.start(); connection.setExceptionListener(new ExceptionListener() { public void onException(JMSException error) { if (!serverSessionPool.isClosing()) { - reconnect(error); + // initiate reconnection only once, i.e. on initial exception + // and only if not already trying to connect + LOG.error("Connection to broker failed: " + error.getMessage(), error); + if ( connecting.compareAndSet(false, true) ) { + synchronized ( connectWork ) { + disconnect(); + serverSessionPool.closeIdleSessions(); + connect(); } + } else { + // connection attempt has already been initiated + LOG.info("Connection attempt already in progress, ignoring connection exception"); } + } + } }); + connection.start(); + int prefetchSize = activationSpec.getMaxMessagesPerSessionsIntValue() * activationSpec.getMaxSessionsIntValue(); if (activationSpec.isDurableSubscription()) { - consumer = connection.createDurableConnectionConsumer((Topic)dest, activationSpec.getSubscriptionName(), emptyToNull(activationSpec.getMessageSelector()), serverSessionPool, - activationSpec.getMaxMessagesPerSessionsIntValue(), activationSpec.getNoLocalBooleanValue()); + consumer = connection.createDurableConnectionConsumer( + (Topic) dest, + activationSpec.getSubscriptionName(), + emptyToNull(activationSpec.getMessageSelector()), + serverSessionPool, + prefetchSize, + activationSpec.getNoLocalBooleanValue()); } else { - consumer = connection.createConnectionConsumer(dest, emptyToNull(activationSpec.getMessageSelector()), serverSessionPool, activationSpec.getMaxMessagesPerSessionsIntValue(), + consumer = connection.createConnectionConsumer( + dest, + emptyToNull(activationSpec.getMessageSelector()), + serverSessionPool, + prefetchSize, activationSpec.getNoLocalBooleanValue()); } + + if ( connecting.compareAndSet(true, false) ) { + if ( LOG.isInfoEnabled() ) { + LOG.info("Successfully established connection to broker [" + adapter.getInfo().getServerUrl() + "]"); + } + } else { + LOG.error("Could not release connection lock"); + } } catch (JMSException error) { - LOG.debug("Fail to to connect: " + error, error); - reconnect(error); + if ( LOG.isDebugEnabled() ) { + LOG.debug("Failed to connect: " + error.getMessage(), error); } + disconnect(); + pause(error); + } + } + } + + private void pause(JMSException error) { + if (currentReconnectDelay == MAX_RECONNECT_DELAY) { + LOG.error("Failed to connect to broker [" + adapter.getInfo().getServerUrl() + "]: " + + error.getMessage(), error); + LOG.error("Endpoint will try to reconnect to the JMS broker in " + (MAX_RECONNECT_DELAY / 1000) + " seconds"); + } + try { + synchronized ( shutdownMutex ) { + // shutdownMutex will be notified by stop() method in + // order to accelerate shutdown of endpoint + shutdownMutex.wait(currentReconnectDelay); + } + } catch ( InterruptedException e ) { + Thread.interrupted(); + } + currentReconnectDelay *= 2; + if (currentReconnectDelay > MAX_RECONNECT_DELAY) { + currentReconnectDelay = MAX_RECONNECT_DELAY; + } } }; @@ -139,25 +194,13 @@ public class ActiveMQEndpointWorker { } - /** - * @param s - */ - public static void safeClose(Session s) { - try { - if (s != null) { - s.close(); - } - } catch (JMSException e) { - // - } - } - /** * @param c */ public static void safeClose(Connection c) { try { if (c != null) { + LOG.debug("Closing connection to broker"); c.close(); } } catch (JMSException e) { @@ -171,6 +214,7 @@ public class ActiveMQEndpointWorker { public static void safeClose(ConnectionConsumer cc) { try { if (cc != null) { + LOG.debug("Closing ConnectionConsumer"); cc.close(); } } catch (JMSException e) { @@ -181,35 +225,44 @@ public class ActiveMQEndpointWorker { /** * */ - public synchronized void start() throws WorkException, ResourceException { - if (running) { + public void start() throws ResourceException { + synchronized (connectWork) { + if (running) return; - } running = true; - LOG.debug("Starting"); + if ( connecting.compareAndSet(false, true) ) { + LOG.info("Starting"); serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue()); connect(); - LOG.debug("Started"); + } else { + LOG.warn("Ignoring start command, EndpointWorker is already trying to connect"); + } + } } /** * */ - public synchronized void stop() throws InterruptedException { - if (!running) { + public void stop() throws InterruptedException { + synchronized (shutdownMutex) { + if (!running) return; - } running = false; + LOG.info("Stopping"); + // wake up pausing reconnect attempt + shutdownMutex.notifyAll(); serverSessionPool.close(); disconnect(); } + } private boolean isRunning() { return running; } - private synchronized void connect() { + private void connect() { + synchronized ( connectWork ) { if (!running) { return; } @@ -221,45 +274,19 @@ public class ActiveMQEndpointWorker { LOG.error("Work Manager did not accept work: ", e); } } + } /** * */ - private synchronized void disconnect() { + private void disconnect() { + synchronized ( connectWork ) { safeClose(consumer); consumer = null; safeClose(connection); connection = null; } - - private void reconnect(JMSException error) { - LOG.debug("Reconnect cause: ", error); - long reconnectDelay; - synchronized (this) { - reconnectDelay = this.reconnectDelay; - // Only log errors if the server is really down.. And not a temp - // failure. - if (reconnectDelay == MAX_RECONNECT_DELAY) { - LOG.error("Endpoint connection to JMS broker failed: " + error.getMessage()); - LOG.error("Endpoint will try to reconnect to the JMS broker in " + (MAX_RECONNECT_DELAY / 1000) + " seconds"); } - } - try { - disconnect(); - Thread.sleep(reconnectDelay); - - synchronized (this) { - // Use exponential rollback. - this.reconnectDelay *= 2; - if (this.reconnectDelay > MAX_RECONNECT_DELAY) { - this.reconnectDelay = MAX_RECONNECT_DELAY; - } - } - connect(); - } catch (InterruptedException e) { - // - } - } protected void registerThreadSession(Session session) { THREAD_LOCAL.set(session); @@ -269,6 +296,16 @@ public class ActiveMQEndpointWorker { THREAD_LOCAL.set(null); } + protected ActiveMQConnection getConnection() { + // make sure we only return a working connection + // in particular make sure that we do not return null + // after the resource adapter got disconnected from + // the broker via the disconnect() method + synchronized ( connectWork ) { + return connection; + } + } + private String emptyToNull(String value) { if (value == null || value.length() == 0) { return null; diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java index 971ea05473..ab6558beae 100644 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java @@ -71,6 +71,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement this.bootstrapContext = bootstrapContext; if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) { brokerStartThread = new Thread("Starting ActiveMQ Broker") { + @Override public void run () { try { synchronized( ActiveMQResourceAdapter.this ) { @@ -110,21 +111,21 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement * @param activationSpec */ public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException { - ActiveMQConnectionFactory connectionFactory = this.connectionFactory; - if (connectionFactory == null) { - connectionFactory = createConnectionFactory(getInfo()); + ActiveMQConnectionFactory cf = getConnectionFactory(); + if (cf == null) { + cf = createConnectionFactory(getInfo()); } String userName = defaultValue(activationSpec.getUserName(), getInfo().getUserName()); String password = defaultValue(activationSpec.getPassword(), getInfo().getPassword()); String clientId = activationSpec.getClientId(); if (clientId != null) { - connectionFactory.setClientID(clientId); + cf.setClientID(clientId); } else { if (activationSpec.isDurableSubscription()) { log.warn("No clientID specified for durable subscription: " + activationSpec); } } - ActiveMQConnection physicalConnection = (ActiveMQConnection)connectionFactory.createConnection(userName, password); + ActiveMQConnection physicalConnection = (ActiveMQConnection) cf.createConnection(userName, password); // have we configured a redelivery policy RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy(); @@ -318,8 +319,8 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement return connectionFactory; } - public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) { - this.connectionFactory = connectionFactory; + public void setConnectionFactory(ActiveMQConnectionFactory aConnectionFactory) { + this.connectionFactory = aConnectionFactory; } diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/MessageEndpointProxy.java b/activemq-ra/src/main/java/org/apache/activemq/ra/MessageEndpointProxy.java index a6bf96801b..575e7cae39 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/MessageEndpointProxy.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/MessageEndpointProxy.java @@ -22,6 +22,8 @@ import javax.jms.Message; import javax.jms.MessageListener; import javax.resource.ResourceException; import javax.resource.spi.endpoint.MessageEndpoint; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * @author Michael Gaffney @@ -30,6 +32,7 @@ public class MessageEndpointProxy implements MessageListener, MessageEndpoint { private static final MessageEndpointState ALIVE = new MessageEndpointAlive(); private static final MessageEndpointState DEAD = new MessageEndpointDead(); + private static final Log LOG = LogFactory.getLog(MessageEndpointProxy.class); private static int proxyCount; private final int proxyID; @@ -52,18 +55,22 @@ public class MessageEndpointProxy implements MessageListener, MessageEndpoint { } public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException { + LOG.trace("Invoking MessageEndpoint.beforeDelivery()"); state.beforeDelivery(this, method); } public void onMessage(Message message) { + LOG.trace("Invoking MessageEndpoint.onMethod()"); state.onMessage(this, message); } public void afterDelivery() throws ResourceException { + LOG.trace("Invoking MessageEndpoint.afterDelivery()"); state.afterDelivery(this); } public void release() { + LOG.trace("Invoking MessageEndpoint.release()"); state.release(this); } diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/MessageResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/MessageResourceAdapter.java index 248d7ce1ef..4baa7a827a 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/MessageResourceAdapter.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/MessageResourceAdapter.java @@ -30,7 +30,7 @@ import org.apache.activemq.ActiveMQConnection; * * @version $Revision$ */ -interface MessageResourceAdapter extends ResourceAdapter { +public interface MessageResourceAdapter extends ResourceAdapter { /** */ diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java index 47184e7624..3d0f8d145a 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java @@ -104,6 +104,10 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D return session; } + protected boolean isStale() { + return stale || !session.isRunning(); + } + public MessageProducer getMessageProducer() throws JMSException { if (messageProducer == null) { messageProducer = getSession().createProducer(null); @@ -156,12 +160,12 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D */ public void run() { log.debug("Running"); + currentBatchSize = 0; while (true) { log.debug("run loop start"); try { - if ( session.isRunning() ) { InboundContextSupport.register(this); - currentBatchSize = 0; + if ( session.isRunning() ) { session.run(); } else { log.debug("JMS Session is no longer running (maybe due to loss of connection?), marking ServerSesison as stale"); @@ -169,8 +173,11 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D } } catch (Throwable e) { stale = true; + if ( log.isInfoEnabled() ) { + log.info("Endpoint failed to process message. Reason: " + e.getMessage()); + } else if ( log.isDebugEnabled() ) { log.debug("Endpoint failed to process message.", e); - log.info("Endpoint failed to process message. Reason: " + e); + } } finally { InboundContextSupport.unregister(this); log.debug("run loop end"); @@ -224,7 +231,7 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D // Sanitiy Check: If the local transaction has not been // commited.. // Commit it now. - log.warn("Local transaction had not been commited. Commiting now."); + log.warn("Local transaction had not been commited. Commiting now."); } try { session.commit(); @@ -246,6 +253,7 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D /** * @see java.lang.Object#toString() */ + @Override public String toString() { return "ServerSessionImpl:" + serverSessionId; } @@ -254,12 +262,12 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D try { endpoint.release(); } catch (Throwable e) { - log.debug("Endpoint did not release properly: " + e, e); + log.debug("Endpoint did not release properly: " + e.getMessage(), e); } try { session.close(); } catch (Throwable e) { - log.debug("Session did not close properly: " + e, e); + log.debug("Session did not close properly: " + e.getMessage(), e); } } diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java index dae5af9d2e..53f5ccfe8d 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java @@ -60,7 +60,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool { private ServerSessionImpl createServerSessionImpl() throws JMSException { MessageActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec(); int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession(); - final ActiveMQSession session = (ActiveMQSession)activeMQAsfEndpointWorker.connection.createSession(activeMQAsfEndpointWorker.transacted, acknowledge); + final ActiveMQSession session = (ActiveMQSession)activeMQAsfEndpointWorker.getConnection().createSession(activeMQAsfEndpointWorker.transacted, acknowledge); MessageEndpoint endpoint; try { int batchSize = 0; @@ -188,13 +188,21 @@ public class ServerSessionPoolImpl implements ServerSessionPool { } public void returnToPool(ServerSessionImpl ss) { - if (LOG.isDebugEnabled()) { - LOG.debug("Session returned to pool: " + ss); - } sessionLock.lock(); - try { activeSessions.remove(ss); + try { + // make sure we only return non-stale sessions to the pool + if ( ss.isStale() ) { + if ( LOG.isDebugEnabled() ) { + LOG.debug("Discarding stale ServerSession to be returned to pool: " + ss); + } + ss.close(); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("ServerSession returned to pool: " + ss); + } idleSessions.add(ss); + } } finally { sessionLock.unlock(); } @@ -243,7 +251,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool { } else if (s instanceof ActiveMQTopicSession) { session = (ActiveMQSession) s; } else { - activeMQAsfEndpointWorker.connection + activeMQAsfEndpointWorker.getConnection() .onAsyncException(new JMSException( "Session pool provided an invalid session type: " + s.getClass())); @@ -275,7 +283,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool { } - private int closeIdleSessions() { + protected int closeIdleSessions() { sessionLock.lock(); try { for (ServerSessionImpl ss : idleSessions) {