AMQ-1632 Fix mdb stopping problem and improve some thread-safety

git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-4.1@640336 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
David Jencks 2008-03-24 07:22:51 +00:00
parent 1ea5ffa913
commit 5a868627e3
5 changed files with 125 additions and 103 deletions

View File

@ -37,10 +37,11 @@ import org.apache.activemq.util.JMSExceptionSupport;
*/ */
public class ActiveMQSessionExecutor implements Task { public class ActiveMQSessionExecutor implements Task {
private ActiveMQSession session; private final ActiveMQSession session;
private MessageDispatchChannel messageQueue = new MessageDispatchChannel(); private final MessageDispatchChannel messageQueue = new MessageDispatchChannel();
private boolean dispatchedBySessionPool; private volatile boolean dispatchedBySessionPool;
private TaskRunner taskRunner; //volatile required to avoid double-checked locking problem.
private volatile TaskRunner taskRunner;
ActiveMQSessionExecutor(ActiveMQSession session) { ActiveMQSessionExecutor(ActiveMQSession session) {
this.session = session; this.session = session;
@ -53,27 +54,31 @@ public class ActiveMQSessionExecutor implements Task {
void execute(MessageDispatch message) throws InterruptedException { void execute(MessageDispatch message) throws InterruptedException {
if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool){ if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) {
dispatch(message); dispatch(message);
}else { } else {
messageQueue.enqueue(message); messageQueue.enqueue(message);
wakeup(); wakeup();
} }
} }
public void wakeup() { public void wakeup() {
if( !dispatchedBySessionPool ) { if (!dispatchedBySessionPool) {
if( session.isSessionAsyncDispatch() ) { if (session.isSessionAsyncDispatch()) {
try { try {
if( taskRunner == null ) { if (taskRunner == null) {
taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this, "ActiveMQ Session: "+session.getSessionId()); synchronized (this) {
} if (taskRunner == null) {
taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this, "ActiveMQ Session: " + session.getSessionId());
}
}
}
taskRunner.wakeup(); taskRunner.wakeup();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} else { } else {
while( iterate() ) while (iterate())
; ;
} }
} }
@ -88,34 +93,34 @@ public class ActiveMQSessionExecutor implements Task {
return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty(); return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty();
} }
void dispatch(MessageDispatch message){ void dispatch(MessageDispatch message) {
// TODO - we should use a Map for this indexed by consumerId // TODO - we should use a Map for this indexed by consumerId
for (Iterator i = this.session.consumers.iterator(); i.hasNext();) { for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next(); ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
ConsumerId consumerId = message.getConsumerId(); ConsumerId consumerId = message.getConsumerId();
if( consumerId.equals(consumer.getConsumerId()) ) { if (consumerId.equals(consumer.getConsumerId())) {
consumer.dispatch(message); consumer.dispatch(message);
} }
} }
} }
synchronized void start() { synchronized void start() {
if( !messageQueue.isRunning() ) { if (!messageQueue.isRunning()) {
messageQueue.start(); messageQueue.start();
if( hasUncomsumedMessages() ) if (hasUncomsumedMessages())
wakeup(); wakeup();
} }
} }
void stop() throws JMSException { void stop() throws JMSException {
try { try {
if( messageQueue.isRunning() ) { if (messageQueue.isRunning()) {
messageQueue.stop(); messageQueue.stop();
if( taskRunner!=null ) { if (taskRunner != null) {
taskRunner.shutdown(); taskRunner.shutdown();
taskRunner=null; taskRunner = null;
} }
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -140,7 +145,7 @@ public class ActiveMQSessionExecutor implements Task {
return (MessageDispatch) messageQueue.dequeueNoWait(); return (MessageDispatch) messageQueue.dequeueNoWait();
} }
protected void clearMessagesInProgress(){ protected void clearMessagesInProgress() {
messageQueue.clear(); messageQueue.clear();
} }
@ -150,17 +155,17 @@ public class ActiveMQSessionExecutor implements Task {
public boolean iterate() { public boolean iterate() {
// Deliver any messages queued on the consumer to their listeners. // Deliver any messages queued on the consumer to their listeners.
for (Iterator i = this.session.consumers.iterator(); i.hasNext();) { for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next(); ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
if( consumer.iterate() ) { if (consumer.iterate()) {
return true; return true;
} }
} }
// No messages left queued on the listeners.. so now dispatch messages queued on the session // No messages left queued on the listeners.. so now dispatch messages queued on the session
MessageDispatch message = messageQueue.dequeueNoWait(); MessageDispatch message = messageQueue.dequeueNoWait();
if( message==null ) { if (message == null) {
return false; return false;
} else { } else {
dispatch(message); dispatch(message);
@ -168,8 +173,8 @@ public class ActiveMQSessionExecutor implements Task {
} }
} }
List getUnconsumedMessages() { List getUnconsumedMessages() {
return messageQueue.removeAll(); return messageQueue.removeAll();
} }
} }

View File

@ -177,10 +177,11 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
context.getTransaction().addSynchronization(new Synchronization(){ context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception{ public void afterCommit() throws Exception{
synchronized(PrefetchSubscription.this){ synchronized(PrefetchSubscription.this){
dequeueCounter++; dequeueCounter++;
dispatched.remove(node); dispatched.remove(node);
node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
prefetchExtension--; prefetchExtension=Math.max(0,prefetchExtension-1);
dispatchMatched();
} }
} }

View File

@ -29,10 +29,13 @@ class PooledTaskRunner implements TaskRunner {
private final Executor executor; private final Executor executor;
private final Task task; private final Task task;
private final Runnable runable; private final Runnable runable;
//guarded by runable
private boolean queued; private boolean queued;
private boolean shutdown; private boolean shutdown;
private boolean iterating; private boolean iterating;
private Thread runningThread;
//setting is not guarded by runable;
private volatile Thread runningThread;
public PooledTaskRunner(Executor executor, Task task, int maxIterationsPerRun) { public PooledTaskRunner(Executor executor, Task task, int maxIterationsPerRun) {
this.executor = executor; this.executor = executor;

View File

@ -61,25 +61,24 @@ public class ActiveMQEndpointWorker {
} }
} }
protected ActiveMQResourceAdapter adapter; protected final ActiveMQEndpointActivationKey endpointActivationKey;
protected ActiveMQEndpointActivationKey endpointActivationKey; protected final MessageEndpointFactory endpointFactory;
protected MessageEndpointFactory endpointFactory; protected final WorkManager workManager;
protected WorkManager workManager; protected final boolean transacted;
protected boolean transacted;
private final ActiveMQDestination dest;
private final Work connectWork;
//access to all non-final variables guarded by connectWork
private ConnectionConsumer consumer; private ConnectionConsumer consumer;
private ServerSessionPoolImpl serverSessionPool; private ServerSessionPoolImpl serverSessionPool;
private ActiveMQDestination dest;
private boolean running; private boolean running;
private Work connectWork; private ActiveMQConnection connection;
protected ActiveMQConnection connection;
private long reconnectDelay=INITIAL_RECONNECT_DELAY; private long reconnectDelay=INITIAL_RECONNECT_DELAY;
/** /**
* @param s * @param s session to close
*/ */
public static void safeClose(Session s) { public static void safeClose(Session s) {
try { try {
@ -88,38 +87,40 @@ public class ActiveMQEndpointWorker {
} }
} }
catch (JMSException e) { catch (JMSException e) {
//ignore
} }
} }
/** /**
* @param c * @param c connection to close
*/ */
public static void safeClose(Connection c) { private static void safeClose(Connection c) {
try { try {
if (c != null) { if (c != null) {
c.close(); c.close();
} }
} }
catch (JMSException e) { catch (JMSException e) {
//ignore
} }
} }
/** /**
* @param cc * @param cc ConnectionConsumer to close
*/ */
public static void safeClose(ConnectionConsumer cc) { private static void safeClose(ConnectionConsumer cc) {
try { try {
if (cc != null) { if (cc != null) {
cc.close(); cc.close();
} }
} }
catch (JMSException e) { catch (JMSException e) {
//ignore
} }
} }
public ActiveMQEndpointWorker(final ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException { public ActiveMQEndpointWorker(final ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
this.endpointActivationKey = key; this.endpointActivationKey = key;
this.adapter = adapter;
this.endpointFactory = endpointActivationKey.getMessageEndpointFactory(); this.endpointFactory = endpointActivationKey.getMessageEndpointFactory();
this.workManager = adapter.getBootstrapContext().getWorkManager(); this.workManager = adapter.getBootstrapContext().getWorkManager();
try { try {
@ -135,7 +136,7 @@ public class ActiveMQEndpointWorker {
} }
synchronized public void run() { synchronized public void run() {
if( !isRunning() ) if( !running)
return; return;
if( connection!=null ) if( connection!=null )
return; return;
@ -187,33 +188,37 @@ public class ActiveMQEndpointWorker {
} }
synchronized public void start() throws WorkException, ResourceException { public void start() throws ResourceException {
if (running) synchronized (connectWork) {
return; if (running)
running = true; return;
running = true;
log.debug("Starting"); log.debug("Starting");
serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue()); serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue());
connect(); connect();
}
log.debug("Started"); log.debug("Started");
} }
/** /**
* *
* @throws InterruptedException
*/ */
synchronized public void stop() throws InterruptedException { public void stop() throws InterruptedException {
if (!running) synchronized (connectWork) {
return; if (!running)
running = false; return;
serverSessionPool.close(); running = false;
disconnect(); serverSessionPool.close();
disconnect();
}
} }
private boolean isRunning() { /**
return running; * Calls must be synchronized on connectWork
} */
private void connect() {
synchronized private void connect() {
if (!running) if (!running)
return; return;
@ -226,38 +231,40 @@ public class ActiveMQEndpointWorker {
} }
/** /**
* * Calls must be synchronized on connectWork
*/ */
synchronized private void disconnect() { private void disconnect() {
safeClose(consumer); safeClose(consumer);
consumer=null; consumer=null;
safeClose(connection); safeClose(connection);
connection=null; connection=null;
} }
private void reconnect(JMSException error){ private void reconnect(JMSException error) {
log.debug("Reconnect cause: ",error); log.debug("Reconnect cause: ", error);
long reconnectDelay; long reconnectDelay;
synchronized(this) {
reconnectDelay = this.reconnectDelay;
// Only log errors if the server is really down.. And not a temp failure.
if (reconnectDelay == MAX_RECONNECT_DELAY) {
log.error("Endpoint connection to JMS broker failed: " + error.getMessage());
log.error("Endpoint will try to reconnect to the JMS broker in "+(MAX_RECONNECT_DELAY/1000)+" seconds");
}
}
try { try {
disconnect(); synchronized (connectWork) {
reconnectDelay = this.reconnectDelay;
// Only log errors if the server is really down.. And not a temp failure.
if (reconnectDelay == MAX_RECONNECT_DELAY) {
log.error("Endpoint connection to JMS broker failed: " + error.getMessage());
log.error("Endpoint will try to reconnect to the JMS broker in " + (MAX_RECONNECT_DELAY / 1000) + " seconds");
}
disconnect();
}
Thread.sleep(reconnectDelay); Thread.sleep(reconnectDelay);
synchronized(this) { synchronized (connectWork) {
// Use exponential rollback. // Use exponential rollback.
this.reconnectDelay*=2; this.reconnectDelay *= 2;
if (this.reconnectDelay > MAX_RECONNECT_DELAY) if (this.reconnectDelay > MAX_RECONNECT_DELAY)
this.reconnectDelay=MAX_RECONNECT_DELAY; this.reconnectDelay = MAX_RECONNECT_DELAY;
connect();
} }
connect(); } catch (InterruptedException e) {
} catch(InterruptedException e) {} Thread.interrupted();
}
} }
protected void registerThreadSession(Session session) { protected void registerThreadSession(Session session) {
@ -268,6 +275,12 @@ public class ActiveMQEndpointWorker {
threadLocal.set(null); threadLocal.set(null);
} }
ActiveMQConnection getConnection() {
synchronized (connectWork) {
return connection;
}
}
private String emptyToNull(String value) { private String emptyToNull(String value) {
if (value == null || value.length() == 0) { if (value == null || value.length() == 0) {
return null; return null;

View File

@ -60,7 +60,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
private ServerSessionImpl createServerSessionImpl() throws JMSException { private ServerSessionImpl createServerSessionImpl() throws JMSException {
ActiveMQActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec(); ActiveMQActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec();
int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession(); int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession();
final ActiveMQSession session = (ActiveMQSession)activeMQAsfEndpointWorker.connection.createSession(activeMQAsfEndpointWorker.transacted, acknowledge); final ActiveMQSession session = (ActiveMQSession)activeMQAsfEndpointWorker.getConnection().createSession(activeMQAsfEndpointWorker.transacted, acknowledge);
MessageEndpoint endpoint; MessageEndpoint endpoint;
try { try {
int batchSize = 0; int batchSize = 0;
@ -227,7 +227,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
} else if (s instanceof ActiveMQTopicSession) { } else if (s instanceof ActiveMQTopicSession) {
session = (ActiveMQSession) s; session = (ActiveMQSession) s;
} else { } else {
activeMQAsfEndpointWorker.connection activeMQAsfEndpointWorker.getConnection()
.onAsyncException(new JMSException( .onAsyncException(new JMSException(
"Session pool provided an invalid session type: " "Session pool provided an invalid session type: "
+ s.getClass())); + s.getClass()));