From 5a868627e3d62b082a39523c16cf126a3ed42c00 Mon Sep 17 00:00:00 2001 From: David Jencks Date: Mon, 24 Mar 2008 07:22:51 +0000 Subject: [PATCH] AMQ-1632 Fix mdb stopping problem and improve some thread-safety git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-4.1@640336 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/ActiveMQSessionExecutor.java | 83 +++++------ .../broker/region/PrefetchSubscription.java | 5 +- .../activemq/thread/PooledTaskRunner.java | 5 +- .../activemq/ra/ActiveMQEndpointWorker.java | 131 ++++++++++-------- .../activemq/ra/ServerSessionPoolImpl.java | 4 +- 5 files changed, 125 insertions(+), 103 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java index 7f298b1f34..16f369d46b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java @@ -36,11 +36,12 @@ import org.apache.activemq.util.JMSExceptionSupport; * @see javax.jms.Session */ public class ActiveMQSessionExecutor implements Task { - - private ActiveMQSession session; - private MessageDispatchChannel messageQueue = new MessageDispatchChannel(); - private boolean dispatchedBySessionPool; - private TaskRunner taskRunner; + + private final ActiveMQSession session; + private final MessageDispatchChannel messageQueue = new MessageDispatchChannel(); + private volatile boolean dispatchedBySessionPool; + //volatile required to avoid double-checked locking problem. + private volatile TaskRunner taskRunner; ActiveMQSessionExecutor(ActiveMQSession session) { this.session = session; @@ -50,30 +51,34 @@ public class ActiveMQSessionExecutor implements Task { dispatchedBySessionPool = value; wakeup(); } - + void execute(MessageDispatch message) throws InterruptedException { - if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool){ + if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) { dispatch(message); - }else { + } else { messageQueue.enqueue(message); wakeup(); } } public void wakeup() { - if( !dispatchedBySessionPool ) { - if( session.isSessionAsyncDispatch() ) { + if (!dispatchedBySessionPool) { + if (session.isSessionAsyncDispatch()) { try { - if( taskRunner == null ) { - taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this, "ActiveMQ Session: "+session.getSessionId()); - } + if (taskRunner == null) { + synchronized (this) { + if (taskRunner == null) { + taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this, "ActiveMQ Session: " + session.getSessionId()); + } + } + } taskRunner.wakeup(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else { - while( iterate() ) + while (iterate()) ; } } @@ -88,34 +93,34 @@ public class ActiveMQSessionExecutor implements Task { return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty(); } - void dispatch(MessageDispatch message){ + void dispatch(MessageDispatch message) { // TODO - we should use a Map for this indexed by consumerId - + for (Iterator i = this.session.consumers.iterator(); i.hasNext();) { ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next(); ConsumerId consumerId = message.getConsumerId(); - if( consumerId.equals(consumer.getConsumerId()) ) { + if (consumerId.equals(consumer.getConsumerId())) { consumer.dispatch(message); } } } - + synchronized void start() { - if( !messageQueue.isRunning() ) { + if (!messageQueue.isRunning()) { messageQueue.start(); - if( hasUncomsumedMessages() ) - wakeup(); + if (hasUncomsumedMessages()) + wakeup(); } } void stop() throws JMSException { try { - if( messageQueue.isRunning() ) { + if (messageQueue.isRunning()) { messageQueue.stop(); - if( taskRunner!=null ) { + if (taskRunner != null) { taskRunner.shutdown(); - taskRunner=null; + taskRunner = null; } } } catch (InterruptedException e) { @@ -123,7 +128,7 @@ public class ActiveMQSessionExecutor implements Task { throw JMSExceptionSupport.create(e); } } - + boolean isRunning() { return messageQueue.isRunning(); } @@ -139,8 +144,8 @@ public class ActiveMQSessionExecutor implements Task { MessageDispatch dequeueNoWait() { return (MessageDispatch) messageQueue.dequeueNoWait(); } - - protected void clearMessagesInProgress(){ + + protected void clearMessagesInProgress() { messageQueue.clear(); } @@ -150,17 +155,17 @@ public class ActiveMQSessionExecutor implements Task { public boolean iterate() { - // Deliver any messages queued on the consumer to their listeners. - for (Iterator i = this.session.consumers.iterator(); i.hasNext();) { + // Deliver any messages queued on the consumer to their listeners. + for (Iterator i = this.session.consumers.iterator(); i.hasNext();) { ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next(); - if( consumer.iterate() ) { - return true; - } + if (consumer.iterate()) { + return true; + } } - - // No messages left queued on the listeners.. so now dispatch messages queued on the session + + // No messages left queued on the listeners.. so now dispatch messages queued on the session MessageDispatch message = messageQueue.dequeueNoWait(); - if( message==null ) { + if (message == null) { return false; } else { dispatch(message); @@ -168,8 +173,8 @@ public class ActiveMQSessionExecutor implements Task { } } - List getUnconsumedMessages() { - return messageQueue.removeAll(); - } - + List getUnconsumedMessages() { + return messageQueue.removeAll(); + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 3f78289b5a..7722f59b27 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -177,10 +177,11 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ context.getTransaction().addSynchronization(new Synchronization(){ public void afterCommit() throws Exception{ synchronized(PrefetchSubscription.this){ - dequeueCounter++; + dequeueCounter++; dispatched.remove(node); node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); - prefetchExtension--; + prefetchExtension=Math.max(0,prefetchExtension-1); + dispatchMatched(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java b/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java index 5d2460b155..62212dbab6 100644 --- a/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java @@ -29,10 +29,13 @@ class PooledTaskRunner implements TaskRunner { private final Executor executor; private final Task task; private final Runnable runable; + //guarded by runable private boolean queued; private boolean shutdown; private boolean iterating; - private Thread runningThread; + + //setting is not guarded by runable; + private volatile Thread runningThread; public PooledTaskRunner(Executor executor, Task task, int maxIterationsPerRun) { this.executor = executor; diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java index a101ed004e..279394dbfe 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java @@ -61,25 +61,24 @@ public class ActiveMQEndpointWorker { } } - protected ActiveMQResourceAdapter adapter; - protected ActiveMQEndpointActivationKey endpointActivationKey; - protected MessageEndpointFactory endpointFactory; - protected WorkManager workManager; - protected boolean transacted; - - + protected final ActiveMQEndpointActivationKey endpointActivationKey; + protected final MessageEndpointFactory endpointFactory; + protected final WorkManager workManager; + protected final boolean transacted; + + private final ActiveMQDestination dest; + private final Work connectWork; + + //access to all non-final variables guarded by connectWork private ConnectionConsumer consumer; private ServerSessionPoolImpl serverSessionPool; - private ActiveMQDestination dest; private boolean running; - private Work connectWork; - protected ActiveMQConnection connection; - + private ActiveMQConnection connection; private long reconnectDelay=INITIAL_RECONNECT_DELAY; /** - * @param s + * @param s session to close */ public static void safeClose(Session s) { try { @@ -88,38 +87,40 @@ public class ActiveMQEndpointWorker { } } catch (JMSException e) { + //ignore } } /** - * @param c + * @param c connection to close */ - public static void safeClose(Connection c) { + private static void safeClose(Connection c) { try { if (c != null) { c.close(); } } catch (JMSException e) { + //ignore } } /** - * @param cc + * @param cc ConnectionConsumer to close */ - public static void safeClose(ConnectionConsumer cc) { + private static void safeClose(ConnectionConsumer cc) { try { if (cc != null) { cc.close(); } } catch (JMSException e) { + //ignore } } public ActiveMQEndpointWorker(final ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException { this.endpointActivationKey = key; - this.adapter = adapter; this.endpointFactory = endpointActivationKey.getMessageEndpointFactory(); this.workManager = adapter.getBootstrapContext().getWorkManager(); try { @@ -135,7 +136,7 @@ public class ActiveMQEndpointWorker { } synchronized public void run() { - if( !isRunning() ) + if( !running) return; if( connection!=null ) return; @@ -187,33 +188,37 @@ public class ActiveMQEndpointWorker { } - synchronized public void start() throws WorkException, ResourceException { - if (running) - return; - running = true; + public void start() throws ResourceException { + synchronized (connectWork) { + if (running) + return; + running = true; - log.debug("Starting"); - serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue()); - connect(); + log.debug("Starting"); + serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue()); + connect(); + } log.debug("Started"); } /** - * + * + * @throws InterruptedException */ - synchronized public void stop() throws InterruptedException { - if (!running) - return; - running = false; - serverSessionPool.close(); - disconnect(); + public void stop() throws InterruptedException { + synchronized (connectWork) { + if (!running) + return; + running = false; + serverSessionPool.close(); + disconnect(); + } } - private boolean isRunning() { - return running; - } - - synchronized private void connect() { + /** + * Calls must be synchronized on connectWork + */ + private void connect() { if (!running) return; @@ -226,38 +231,40 @@ public class ActiveMQEndpointWorker { } /** - * + * Calls must be synchronized on connectWork */ - synchronized private void disconnect() { + private void disconnect() { safeClose(consumer); consumer=null; safeClose(connection); connection=null; } - private void reconnect(JMSException error){ - log.debug("Reconnect cause: ",error); + private void reconnect(JMSException error) { + log.debug("Reconnect cause: ", error); long reconnectDelay; - synchronized(this) { - reconnectDelay = this.reconnectDelay; - // Only log errors if the server is really down.. And not a temp failure. - if (reconnectDelay == MAX_RECONNECT_DELAY) { - log.error("Endpoint connection to JMS broker failed: " + error.getMessage()); - log.error("Endpoint will try to reconnect to the JMS broker in "+(MAX_RECONNECT_DELAY/1000)+" seconds"); - } - } try { - disconnect(); - Thread.sleep(reconnectDelay); - - synchronized(this) { - // Use exponential rollback. - this.reconnectDelay*=2; - if (this.reconnectDelay > MAX_RECONNECT_DELAY) - this.reconnectDelay=MAX_RECONNECT_DELAY; + synchronized (connectWork) { + reconnectDelay = this.reconnectDelay; + // Only log errors if the server is really down.. And not a temp failure. + if (reconnectDelay == MAX_RECONNECT_DELAY) { + log.error("Endpoint connection to JMS broker failed: " + error.getMessage()); + log.error("Endpoint will try to reconnect to the JMS broker in " + (MAX_RECONNECT_DELAY / 1000) + " seconds"); + } + disconnect(); } - connect(); - } catch(InterruptedException e) {} + Thread.sleep(reconnectDelay); + + synchronized (connectWork) { + // Use exponential rollback. + this.reconnectDelay *= 2; + if (this.reconnectDelay > MAX_RECONNECT_DELAY) + this.reconnectDelay = MAX_RECONNECT_DELAY; + connect(); + } + } catch (InterruptedException e) { + Thread.interrupted(); + } } protected void registerThreadSession(Session session) { @@ -268,6 +275,12 @@ public class ActiveMQEndpointWorker { threadLocal.set(null); } + ActiveMQConnection getConnection() { + synchronized (connectWork) { + return connection; + } + } + private String emptyToNull(String value) { if (value == null || value.length() == 0) { return null; diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java index 40c6e239a4..a2500e26dd 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java @@ -60,7 +60,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool { private ServerSessionImpl createServerSessionImpl() throws JMSException { ActiveMQActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec(); int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession(); - final ActiveMQSession session = (ActiveMQSession)activeMQAsfEndpointWorker.connection.createSession(activeMQAsfEndpointWorker.transacted, acknowledge); + final ActiveMQSession session = (ActiveMQSession)activeMQAsfEndpointWorker.getConnection().createSession(activeMQAsfEndpointWorker.transacted, acknowledge); MessageEndpoint endpoint; try { int batchSize = 0; @@ -227,7 +227,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool { } else if (s instanceof ActiveMQTopicSession) { session = (ActiveMQSession) s; } else { - activeMQAsfEndpointWorker.connection + activeMQAsfEndpointWorker.getConnection() .onAsyncException(new JMSException( "Session pool provided an invalid session type: " + s.getClass()));