AMQ-7000 - ensure server sessions with closed active sessions get removed from the pool on stop, fix and test

This commit is contained in:
gtully 2018-06-26 11:06:38 +01:00
parent 9abe2c6f97
commit 5ebee0ace7
5 changed files with 254 additions and 31 deletions

View File

@ -1043,9 +1043,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
messageListener.onMessage(message);
} catch (Throwable e) {
LOG.error("error dispatching message: ", e);
if (!isClosed()) {
LOG.error("{} error dispatching message: {} ", this, message.getMessageId(), e);
}
if (getTransactionContext().isInXATransaction()) {
if (getTransactionContext() != null && getTransactionContext().isInXATransaction()) {
LOG.debug("Marking transaction: {} rollbackOnly", getTransactionContext());
getTransactionContext().setRollbackOnly(true);
}
@ -2168,7 +2170,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
@Override
public String toString() {
return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "} " + sendMutex;
return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + ",closed=" + closed + "} " + sendMutex;
}
public void checkMessageListener() throws JMSException {

View File

@ -283,7 +283,11 @@ public class ActiveMQEndpointWorker {
LOG.info("Stopping");
// wake up pausing reconnect attempt
shutdownMutex.notifyAll();
serverSessionPool.close();
try {
serverSessionPool.close();
} catch (Throwable ignored) {
LOG.debug("Unexpected error on server session pool close", ignored);
}
}
disconnect();
}

View File

@ -105,7 +105,7 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D
}
protected boolean isStale() {
return stale || !session.isRunning();
return stale || !session.isRunning() || !session.isClosed();
}
public MessageProducer getMessageProducer() throws JMSException {
@ -159,13 +159,15 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D
* @see java.lang.Runnable#run()
*/
public void run() {
log.debug("Running");
log.debug("{} Running", this);
currentBatchSize = 0;
while (true) {
log.debug("run loop start");
log.debug("{} run loop", this);
try {
InboundContextSupport.register(this);
if ( session.isRunning() ) {
if (session.isClosed()) {
stale = true;
} else if (session.isRunning() ) {
session.run();
} else {
log.debug("JMS Session {} with unconsumed {} is no longer running (maybe due to loss of connection?), marking ServerSession as stale", session, session.getUnconsumedMessages().size());
@ -174,9 +176,9 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D
} catch (Throwable e) {
stale = true;
if ( log.isDebugEnabled() ) {
log.debug("Endpoint {} failed to process message.", session, e);
log.debug("Endpoint {} failed to process message.", this, e);
} else if ( log.isInfoEnabled() ) {
log.info("Endpoint {} failed to process message. Reason: " + e.getMessage(), session);
log.info("Endpoint {} failed to process message. Reason: " + e.getMessage(), this);
}
} finally {
InboundContextSupport.unregister(this);
@ -184,20 +186,23 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D
synchronized (runControlMutex) {
// This endpoint may have gone stale due to error
if (stale) {
log.debug("Session {} stale, removing from pool", this);
runningFlag = false;
pool.removeFromPool(this);
break;
}
if (!session.hasUncomsumedMessages()) {
runningFlag = false;
log.debug("Session has no unconsumed message, returning to pool");
log.debug("Session {} has no unconsumed message, returning to pool", this);
pool.returnToPool(this);
break;
} else {
log.debug("Session has session has more work to do b/c of unconsumed", this);
}
}
}
}
log.debug("Run finished");
log.debug("{} Run finished", this);
}
/**

View File

@ -194,7 +194,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
public void returnToPool(ServerSessionImpl ss) {
sessionLock.lock();
activeSessions.remove(ss);
activeSessions.remove(ss);
try {
// make sure we only return non-stale sessions to the pool
if ( ss.isStale() ) {
@ -226,7 +226,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
try {
ActiveMQSession session = (ActiveMQSession)ss.getSession();
List<MessageDispatch> l = session.getUnconsumedMessages();
if (!l.isEmpty()) {
if (!isClosing() && !l.isEmpty()) {
ActiveMQConnection connection = activeMQAsfEndpointWorker.getConnection();
if (connection != null) {
for (Iterator<MessageDispatch> i = l.iterator(); i.hasNext();) {
@ -276,6 +276,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
public void close() {
closing.set(true);
LOG.debug("{} close", this);
int activeCount = closeSessions();
// we may have to wait erroneously 250ms if an
// active session is removed during our wait and we
@ -300,11 +301,16 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
protected int closeSessions() {
sessionLock.lock();
try {
List<ServerSessionImpl> alreadyClosedServerSessions = new ArrayList<>(activeSessions.size());
for (ServerSessionImpl ss : activeSessions) {
try {
ActiveMQSession session = (ActiveMQSession) ss.getSession();
if (!session.isClosed()) {
session.close();
} else {
LOG.debug("Session {} already closed", session);
alreadyClosedServerSessions.add(ss);
}
} catch (JMSException ignored) {
if (LOG.isDebugEnabled()) {
@ -312,6 +318,11 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
}
}
}
for (ServerSessionImpl ss : alreadyClosedServerSessions) {
removeFromPool(ss);
}
alreadyClosedServerSessions.clear();
for (ServerSessionImpl ss : idleSessions) {
ss.close();
}

View File

@ -16,23 +16,6 @@
*/
package org.apache.activemq.ra;
import static org.junit.Assert.assertTrue;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Session;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.ExecutionContext;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkListener;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQMessage;
@ -40,6 +23,7 @@ import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.util.Wait;
import org.hamcrest.Description;
import org.hamcrest.Matchers;
@ -56,6 +40,25 @@ import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.ExecutionContext;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkListener;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertTrue;
@RunWith(JMock.class)
public class ServerSessionImplTest {
@ -234,4 +237,202 @@ public class ServerSessionImplTest {
assertTrue("run has completed", runState.await(20, TimeUnit.SECONDS));
assertTrue("not all messages consumed", messageCount.getCount() > 0);
}
@Test
public void testGetWhenClosed() throws Exception {
final int maxMessages = 2000;
final AtomicReference<CountDownLatch> messageCountRef = new AtomicReference<CountDownLatch>();
ExecutorService executorService = Executors.newCachedThreadPool();
final MessageEndpointFactory messageEndpointFactory = context.mock(MessageEndpointFactory.class);
final MessageResourceAdapter resourceAdapter = context.mock(MessageResourceAdapter.class);
final ActiveMQEndpointActivationKey key = context.mock(ActiveMQEndpointActivationKey.class);
messageEndpoint = context.mock(MessageEndpointProxy.class);
workManager = context.mock(WorkManager.class);
final MessageActivationSpec messageActivationSpec = context.mock(MessageActivationSpec.class);
final BootstrapContext boostrapContext = context.mock(BootstrapContext.class);
context.checking(new Expectations() {
{
allowing(boostrapContext).getWorkManager();
will(returnValue(workManager));
allowing(resourceAdapter).getBootstrapContext();
will(returnValue(boostrapContext));
allowing(messageEndpointFactory).isDeliveryTransacted(with(any(Method.class)));
will(returnValue(Boolean.FALSE));
allowing(key).getMessageEndpointFactory();
will(returnValue(messageEndpointFactory));
allowing(key).getActivationSpec();
will(returnValue(messageActivationSpec));
allowing(messageActivationSpec).isUseJndi();
will(returnValue(Boolean.FALSE));
allowing(messageActivationSpec).getDestinationType();
will(returnValue("javax.jms.Queue"));
allowing(messageActivationSpec).getDestination();
will(returnValue("Queue"));
allowing(messageActivationSpec).getAcknowledgeModeForSession();
will(returnValue(1));
allowing(messageActivationSpec).getMaxSessionsIntValue();
will(returnValue(10));
allowing(messageActivationSpec).getEnableBatchBooleanValue();
will(returnValue(Boolean.FALSE));
allowing(messageActivationSpec).isUseRAManagedTransactionEnabled();
will(returnValue(Boolean.TRUE));
allowing(messageEndpointFactory).createEndpoint(with(any(XAResource.class)));
will(returnValue(messageEndpoint));
allowing(workManager).scheduleWork((Work) with(Matchers.instanceOf(Work.class)), with(any(long.class)), with(any(ExecutionContext.class)),
with(any(WorkListener.class)));
will(new Action() {
@Override
public Object invoke(Invocation invocation) throws Throwable {
LOG.info("Wok manager invocation: " + invocation);
if (invocation.getParameter(0) instanceof ServerSessionImpl) {
final ServerSessionImpl serverSession1 = (ServerSessionImpl)invocation.getParameter(0);
executorService.execute(new Runnable() {
@Override
public void run() {
try {
serverSession1.run();
} catch (Exception e) {
LOG.error("Error on Work run: {}", serverSession1, e);
e.printStackTrace();
}
}
});
}
return null;
}
@Override
public void describeTo(Description description) {
}
});
allowing(messageEndpoint).beforeDelivery((Method) with(Matchers.instanceOf(Method.class)));
allowing(messageEndpoint).onMessage(with(any(javax.jms.Message.class)));
will(new Action() {
@Override
public Object invoke(Invocation invocation) throws Throwable {
messageCountRef.get().countDown();
return null;
}
@Override
public void describeTo(Description description) {
description.appendText("Keep message count");
}
});
allowing(messageEndpoint).afterDelivery();
will(new Action() {
@Override
public void describeTo(Description description) {
description.appendText("do sync work on broker");
}
@Override
public Object invoke(Invocation invocation) throws Throwable {
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setType(TransactionInfo.END);
LOG.info("AfterDelivery on: " + messageCountRef.get().getCount());
return null;
}
});
allowing(messageEndpoint).release();
}
});
endpointWorker = new ActiveMQEndpointWorker(resourceAdapter, key);
endpointWorker.setConnection(con);
for (int i=0; i<40; i++) {
final int iteration = i;
LOG.info("ITERATION: " + iteration);
pool = new ServerSessionPoolImpl(endpointWorker, 2);
endpointWorker.start();
messageCountRef.set(new CountDownLatch(maxMessages));
final CountDownLatch senderDone = new CountDownLatch(1);
final CountDownLatch messageSent = new CountDownLatch(maxMessages);
final AtomicBoolean foundClosedSession = new AtomicBoolean(false);
executorService.execute(new Runnable() {
@Override
public void run() {
try {
// preload the session dispatch queue to keep the session active
for (int i = 0; i < maxMessages; i++) {
MessageDispatch messageDispatch = new MessageDispatch();
ActiveMQMessage message = new ActiveMQTextMessage();
message.setMessageId(new MessageId("0:0:0:" + i));
message.getMessageId().setBrokerSequenceId(i);
messageDispatch.setMessage(message);
messageDispatch.setConsumerId(new ConsumerId("0:0:0"));
ServerSessionImpl serverSession1 = null;
try {
serverSession1 = (ServerSessionImpl) pool.getServerSession();
ActiveMQSession session1 = (ActiveMQSession) serverSession1.getSession();
if (session1.isClosed()) {
// closed flag is not volatile - ok to give a whirl with it closed
foundClosedSession.set(true);
}
session1.dispatch(messageDispatch);
messageSent.countDown();
serverSession1.start();
} catch (JMSException okOnClose) {
LOG.info("Exception on dispatch to {}", serverSession1, okOnClose);
}
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
senderDone.countDown();
}
}
});
assertTrue("[" + iteration + "] Some messages dispatched", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("[" + iteration + "] Wait before close work MessageSent: " + messageSent.getCount() + ", messages got: "+ messageCountRef.get().getCount());
return messageSent.getCount() < maxMessages - 20 && messageCountRef.get().getCount() < maxMessages - 5;
}
}, 5000, 10));
assertTrue("some messages consumed", messageCountRef.get().getCount() < maxMessages);
final CountDownLatch closeDone = new CountDownLatch(1);
final CountDownLatch closeSuccess = new CountDownLatch(1);
executorService.execute(new Runnable() {
@Override
public void run() {
LOG.info("[" + iteration + "] Closing pool on delivered {} and dispatched {}", messageSent.getCount(), messageCountRef.get().getCount());
try {
pool.close();
closeSuccess.countDown();
} catch (InvalidMessageEndpointException error) {
LOG.error("Ex on pool close", error);
//error.printStackTrace();
} finally {
closeDone.countDown();
}
}
});
assertTrue("[" + iteration + "] Pool close does not block", closeDone.await(10, TimeUnit.SECONDS));
assertTrue("[" + iteration + "] Pool close ok", closeSuccess.await(10, TimeUnit.MILLISECONDS));
assertTrue("[" + iteration + "] not all delivered", messageCountRef.get().getCount() > 0);
assertTrue("[" + iteration + "] sender complete", senderDone.await(30, TimeUnit.SECONDS));
}
}
}