ARTEMIS-302 - Improving XA Resilience

This commit is contained in:
Clebert Suconic 2015-11-10 15:26:44 -05:00
parent a21a447b4c
commit 7bbd17cd37
18 changed files with 683 additions and 142 deletions

View File

@ -49,6 +49,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext; import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.utils.ConfirmationWindowWarning; import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl; import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.XidCodecSupport; import org.apache.activemq.artemis.utils.XidCodecSupport;
public final class ClientSessionImpl implements ClientSessionInternal, FailureListener { public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {
@ -57,7 +58,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
private final ClientSessionFactoryInternal sessionFactory; private final ClientSessionFactoryInternal sessionFactory;
private final String name; private String name;
private final String username; private final String username;
@ -857,6 +858,15 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
boolean reattached = sessionContext.reattachOnNewConnection(backupConnection); boolean reattached = sessionContext.reattachOnNewConnection(backupConnection);
if (!reattached) { if (!reattached) {
// We change the name of the Session, otherwise the server could close it while we are still sending the recreate
// in certain failure scenarios
// For instance the fact we didn't change the name of the session after failover or reconnect
// was the reason allowing multiple Sessions to be closed simultaneously breaking concurrency
this.name = UUIDGenerator.getInstance().generateStringUUID();
sessionContext.resetName(name);
for (ClientConsumerInternal consumer : cloneConsumers()) { for (ClientConsumerInternal consumer : cloneConsumers()) {
consumer.clearAtFailover(); consumer.clearAtFailover();
} }

View File

@ -112,7 +112,7 @@ public class ActiveMQSessionContext extends SessionContext {
private final Channel sessionChannel; private final Channel sessionChannel;
private final int serverVersion; private final int serverVersion;
private int confirmationWindow; private int confirmationWindow;
private final String name; private String name;
protected Channel getSessionChannel() { protected Channel getSessionChannel() {
return sessionChannel; return sessionChannel;
@ -122,6 +122,11 @@ public class ActiveMQSessionContext extends SessionContext {
return name; return name;
} }
public void resetName(String name) {
this.name = name;
}
protected int getConfirmationWindow() { protected int getConfirmationWindow() {
return confirmationWindow; return confirmationWindow;

View File

@ -34,4 +34,10 @@ public abstract class MessagePacket extends PacketImpl implements MessagePacketI
return message; return message;
} }
public String toString() {
return this.getParentString() + ",message=" + message + "]";
}
} }

View File

@ -59,6 +59,8 @@ public abstract class SessionContext {
this.session = session; this.session = session;
} }
public abstract void resetName(String name);
/** /**
* it will eather reattach or reconnect, preferably reattaching it. * it will eather reattach or reconnect, preferably reattaching it.
* *

View File

@ -115,6 +115,8 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
public class ServerSessionPacketHandler implements ChannelHandler { public class ServerSessionPacketHandler implements ChannelHandler {
private final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
private final ServerSession session; private final ServerSession session;
private final StorageManager storageManager; private final StorageManager storageManager;
@ -193,6 +195,10 @@ public class ServerSessionPacketHandler implements ChannelHandler {
boolean closeChannel = false; boolean closeChannel = false;
boolean requiresResponse = false; boolean requiresResponse = false;
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("ServerSessionPacketHandler::handlePacket," + packet);
}
try { try {
try { try {
switch (type) { switch (type) {
@ -522,6 +528,10 @@ public class ServerSessionPacketHandler implements ChannelHandler {
final Packet response, final Packet response,
final boolean flush, final boolean flush,
final boolean closeChannel) { final boolean closeChannel) {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("ServerSessionPacketHandler::scheduling response::" + response);
}
storageManager.afterCompleteOperations(new IOCallback() { storageManager.afterCompleteOperations(new IOCallback() {
public void onError(final int errorCode, final String errorMessage) { public void onError(final int errorCode, final String errorMessage) {
ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage); ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);
@ -529,6 +539,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
ActiveMQExceptionMessage exceptionMessage = new ActiveMQExceptionMessage(ActiveMQExceptionType.createException(errorCode, errorMessage)); ActiveMQExceptionMessage exceptionMessage = new ActiveMQExceptionMessage(ActiveMQExceptionType.createException(errorCode, errorMessage));
doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel); doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel);
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("ServerSessionPacketHandler::response sent::" + response);
}
} }
public void done() { public void done() {

View File

@ -379,8 +379,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override @Override
public void close(final boolean failed) throws Exception { public void close(final boolean failed) throws Exception {
if (isTrace) if (isTrace) {
{
ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace")); ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace"));
} }
@ -405,8 +404,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
while (iter.hasNext()) { while (iter.hasNext()) {
MessageReference ref = iter.next(); MessageReference ref = iter.next();
if (isTrace) if (isTrace) {
{
ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " cancelling reference " + ref); ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " cancelling reference " + ref);
} }

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.server.impl;
import javax.transaction.xa.XAException; import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid; import javax.transaction.xa.Xid;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -39,11 +38,11 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl; import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
import org.apache.activemq.artemis.core.exception.ActiveMQXAException; import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl; import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
@ -57,10 +56,10 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
@ -996,7 +995,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
ActiveMQServerLogger.LOGGER.xidReplacedOnXStart(tx.getXid().toString(), xid.toString()); ActiveMQServerLogger.LOGGER.xidReplacedOnXStart(tx.getXid().toString(), xid.toString());
try { try {
if (tx.getState() != Transaction.State.PREPARED) { if (!tx.isEffective()) {
// we don't want to rollback anything prepared here // we don't want to rollback anything prepared here
if (tx.getXid() != null) { if (tx.getXid() != null) {
resourceManager.removeTransaction(tx.getXid()); resourceManager.removeTransaction(tx.getXid());
@ -1025,27 +1024,24 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
} }
public synchronized void xaFailed(final Xid xid) throws Exception { public synchronized void xaFailed(final Xid xid) throws Exception {
if (tx != null) { Transaction theTX = resourceManager.getTransaction(xid);
final String msg = "Cannot start, session is already doing work in a transaction " + tx.getXid();
throw new ActiveMQXAException(XAException.XAER_PROTO, msg); if (theTX == null) {
theTX = newTransaction(xid);
resourceManager.putTransaction(xid, theTX);
}
if (theTX.isEffective()) {
ActiveMQServerLogger.LOGGER.debug("Client failed with Xid " + xid + " but the server already had it prepared");
tx = null;
} }
else { else {
theTX.markAsRollbackOnly(new ActiveMQException("Can't commit as a Failover happened during the operation"));
tx = theTX;
}
tx = newTransaction(xid); if (isTrace) {
tx.markAsRollbackOnly(new ActiveMQException("Can't commit as a Failover happened during the operation")); ActiveMQServerLogger.LOGGER.trace("xastart into tx= " + tx);
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("xastart into tx= " + tx);
}
boolean added = resourceManager.putTransaction(xid, tx);
if (!added) {
final String msg = "Cannot start, there is already a xid " + tx.getXid();
throw new ActiveMQXAException(XAException.XAER_DUPID, msg);
}
} }
} }

View File

@ -33,6 +33,8 @@ public interface Transaction {
ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY
} }
boolean isEffective();
void prepare() throws Exception; void prepare() throws Exception;
void commit() throws Exception; void commit() throws Exception;

View File

@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.transaction.impl; package org.apache.activemq.artemis.core.transaction.impl;
import javax.transaction.xa.Xid; import javax.transaction.xa.Xid;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -104,6 +103,11 @@ public class TransactionImpl implements Transaction {
// Transaction implementation // Transaction implementation
// ----------------------------------------------------------- // -----------------------------------------------------------
public boolean isEffective() {
return state == State.PREPARED || state == State.COMMITTED;
}
public void setContainsPersistent() { public void setContainsPersistent() {
containsPersistent = true; containsPersistent = true;
} }
@ -142,6 +146,10 @@ public class TransactionImpl implements Transaction {
storageManager.readLock(); storageManager.readLock();
try { try {
synchronized (timeoutLock) { synchronized (timeoutLock) {
if (isEffective()) {
ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has already been prepared or committed before, just ignoring the prepare call");
return;
}
if (state == State.ROLLBACK_ONLY) { if (state == State.ROLLBACK_ONLY) {
if (exception != null) { if (exception != null) {
// this TX will never be rolled back, // this TX will never be rolled back,
@ -197,6 +205,11 @@ public class TransactionImpl implements Transaction {
public void commit(final boolean onePhase) throws Exception { public void commit(final boolean onePhase) throws Exception {
synchronized (timeoutLock) { synchronized (timeoutLock) {
if (state == State.COMMITTED) {
// I don't think this could happen, but just in case
ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has been committed before, just ignoring the commit call");
return;
}
if (state == State.ROLLBACK_ONLY) { if (state == State.ROLLBACK_ONLY) {
rollback(); rollback();
@ -248,15 +261,21 @@ public class TransactionImpl implements Transaction {
*/ */
protected void doCommit() throws Exception { protected void doCommit() throws Exception {
if (containsPersistent || xid != null && state == State.PREPARED) { if (containsPersistent || xid != null && state == State.PREPARED) {
// ^^ These are the scenarios where we require a storage.commit
// for anything else we won't use the journal
storageManager.commit(id); storageManager.commit(id);
state = State.COMMITTED;
} }
state = State.COMMITTED;
} }
public void rollback() throws Exception { public void rollback() throws Exception {
synchronized (timeoutLock) { synchronized (timeoutLock) {
if (state == State.ROLLEDBACK) {
// I don't think this could happen, but just in case
ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has been rolledBack before, just ignoring the rollback call", new Exception("trace"));
return;
}
if (xid != null) { if (xid != null) {
if (state != State.PREPARED && state != State.ACTIVE && state != State.ROLLBACK_ONLY) { if (state != State.PREPARED && state != State.ACTIVE && state != State.ROLLBACK_ONLY) {
throw new IllegalStateException("Transaction is in invalid state " + state); throw new IllegalStateException("Transaction is in invalid state " + state);
@ -290,17 +309,21 @@ public class TransactionImpl implements Transaction {
} }
public void suspend() { public void suspend() {
if (state != State.ACTIVE) { synchronized (timeoutLock) {
throw new IllegalStateException("Can only suspend active transaction"); if (state != State.ACTIVE) {
throw new IllegalStateException("Can only suspend active transaction");
}
state = State.SUSPENDED;
} }
state = State.SUSPENDED;
} }
public void resume() { public void resume() {
if (state != State.SUSPENDED) { synchronized (timeoutLock) {
throw new IllegalStateException("Can only resume a suspended transaction"); if (state != State.SUSPENDED) {
throw new IllegalStateException("Can only resume a suspended transaction");
}
state = State.ACTIVE;
} }
state = State.ACTIVE;
} }
public Transaction.State getState() { public Transaction.State getState() {
@ -316,12 +339,19 @@ public class TransactionImpl implements Transaction {
} }
public void markAsRollbackOnly(final ActiveMQException exception1) { public void markAsRollbackOnly(final ActiveMQException exception1) {
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { synchronized (timeoutLock) {
ActiveMQServerLogger.LOGGER.debug("Marking Transaction " + this.id + " as rollback only"); if (isEffective()) {
} ActiveMQServerLogger.LOGGER.debug("Trying to mark transaction " + this.id + " xid=" + this.xid + " as rollbackOnly but it was already effective (prepared or committed!)");
state = State.ROLLBACK_ONLY; return;
}
this.exception = exception1; if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("Marking Transaction " + this.id + " as rollback only");
}
state = State.ROLLBACK_ONLY;
this.exception = exception1;
}
} }
public synchronized void addOperation(final TransactionOperation operation) { public synchronized void addOperation(final TransactionOperation operation) {
@ -425,6 +455,7 @@ public class TransactionImpl implements Transaction {
return "TransactionImpl [xid=" + xid + return "TransactionImpl [xid=" + xid +
", id=" + ", id=" +
id + id +
", xid=" + xid +
", state=" + ", state=" +
state + state +
", createTime=" + ", createTime=" +

View File

@ -49,51 +49,40 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
/** This test will force two consumers to be waiting on close and introduce a race I saw in a production system */ /**
* This test will force two consumers to be waiting on close and introduce a race I saw in a production system
*/
@RunWith(BMUnitRunner.class) @RunWith(BMUnitRunner.class)
public class ConcurrentDeliveryCancelTest extends JMSTestBase public class ConcurrentDeliveryCancelTest extends JMSTestBase {
{
// used to wait the thread to align at the same place and create the race // used to wait the thread to align at the same place and create the race
private static final ReusableLatch latchEnter = new ReusableLatch(2); private static final ReusableLatch latchEnter = new ReusableLatch(2);
// used to start // used to start
private static final ReusableLatch latchFlag = new ReusableLatch(1); private static final ReusableLatch latchFlag = new ReusableLatch(1);
public static void enterCancel() public static void enterCancel() {
{
latchEnter.countDown(); latchEnter.countDown();
try try {
{
latchFlag.await(); latchFlag.await();
} }
catch (Exception ignored) catch (Exception ignored) {
{
} }
} }
public static void resetLatches(int numberOfThreads) public static void resetLatches(int numberOfThreads) {
{
latchEnter.setCount(numberOfThreads); latchEnter.setCount(numberOfThreads);
latchFlag.setCount(1); latchFlag.setCount(1);
} }
@Test @Test
@BMRules @BMRules(
( rules = {@BMRule(
rules = name = "enterCancel-holdThere",
{ targetClass = "org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl",
@BMRule targetMethod = "close",
( targetLocation = "ENTRY",
name = "enterCancel-holdThere", action = "org.apache.activemq.artemis.tests.extras.byteman.ConcurrentDeliveryCancelTest.enterCancel();")})
targetClass = "org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl", public void testConcurrentCancels() throws Exception {
targetMethod = "close",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.ConcurrentDeliveryCancelTest.enterCancel();"
)
}
)
public void testConcurrentCancels() throws Exception
{
server.getAddressSettingsRepository().clear(); server.getAddressSettingsRepository().clear();
AddressSettings settings = new AddressSettings(); AddressSettings settings = new AddressSettings();
@ -103,10 +92,8 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase
cf.setReconnectAttempts(0); cf.setReconnectAttempts(0);
cf.setRetryInterval(10); cf.setRetryInterval(10);
System.out.println("....."); System.out.println(".....");
for (ServerSession srvSess : server.getSessions()) for (ServerSession srvSess : server.getSessions()) {
{
System.out.println(srvSess); System.out.println(srvSess);
} }
@ -120,8 +107,7 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase
Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < numberOfMessages; i++) for (int i = 0; i < numberOfMessages; i++) {
{
TextMessage msg = session.createTextMessage("message " + i); TextMessage msg = session.createTextMessage("message " + i);
msg.setIntProperty("i", i); msg.setIntProperty("i", i);
producer.send(msg); producer.send(msg);
@ -131,24 +117,22 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase
connection.close(); connection.close();
} }
for (int i = 0; i < 100; i++) for (int i = 0; i < 100; i++) {
{
XAConnectionFactory xacf = ActiveMQJMSClient.createConnectionFactory("tcp://localhost:61616", "test"); XAConnectionFactory xacf = ActiveMQJMSClient.createConnectionFactory("tcp://localhost:61616", "test");
final XAConnection connection = xacf.createXAConnection(); final XAConnection connection = xacf.createXAConnection();
final XASession theSession = connection.createXASession(); final XASession theSession = connection.createXASession();
((ActiveMQSession)theSession).getCoreSession().addMetaData("theSession", "true"); ((ActiveMQSession) theSession).getCoreSession().addMetaData("theSession", "true");
connection.start(); connection.start();
final MessageConsumer consumer = theSession.createConsumer(queue); final MessageConsumer consumer = theSession.createConsumer(queue);
XidImpl xid = newXID(); XidImpl xid = newXID();
theSession.getXAResource().start(xid, XAResource.TMNOFLAGS); theSession.getXAResource().start(xid, XAResource.TMNOFLAGS);
theSession.getXAResource().setTransactionTimeout(1); // I'm setting a small timeout just because I'm lazy to call end myself theSession.getXAResource().setTransactionTimeout(1); // I'm setting a small timeout just because I'm lazy to call end myself
for (int msg = 0; msg < 11; msg++) for (int msg = 0; msg < 11; msg++) {
{
Assert.assertNotNull(consumer.receiveNoWait()); Assert.assertNotNull(consumer.receiveNoWait());
} }
@ -157,83 +141,68 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase
final List<ServerSession> serverSessions = new LinkedList<ServerSession>(); final List<ServerSession> serverSessions = new LinkedList<ServerSession>();
// We will force now the failure simultaneously from several places // We will force now the failure simultaneously from several places
for (ServerSession srvSess : server.getSessions()) for (ServerSession srvSess : server.getSessions()) {
{ if (srvSess.getMetaData("theSession") != null) {
if (srvSess.getMetaData("theSession") != null)
{
System.out.println(srvSess); System.out.println(srvSess);
serverSessions.add(srvSess); serverSessions.add(srvSess);
} }
} }
resetLatches(2); // from Transactional reaper resetLatches(2); // from Transactional reaper
List<Thread> threads = new LinkedList<Thread>(); List<Thread> threads = new LinkedList<Thread>();
threads.add(new Thread("ConsumerCloser") threads.add(new Thread("ConsumerCloser") {
{ public void run() {
public void run() try {
{
try
{
System.out.println(Thread.currentThread().getName() + " closing consumer"); System.out.println(Thread.currentThread().getName() + " closing consumer");
consumer.close(); consumer.close();
System.out.println(Thread.currentThread().getName() + " closed consumer"); System.out.println(Thread.currentThread().getName() + " closed consumer");
} }
catch (Exception e) catch (Exception e) {
{
e.printStackTrace(); e.printStackTrace();
} }
} }
}); });
threads.add(new Thread("SessionCloser") threads.add(new Thread("SessionCloser") {
{ public void run() {
public void run() for (ServerSession sess : serverSessions) {
{
for (ServerSession sess : serverSessions)
{
System.out.println("Thread " + Thread.currentThread().getName() + " starting"); System.out.println("Thread " + Thread.currentThread().getName() + " starting");
try try {
{
// A session.close could sneak in through failover or some other scenarios. // A session.close could sneak in through failover or some other scenarios.
// a call to RemotingConnection.fail wasn't replicating the issue. // a call to RemotingConnection.fail wasn't replicating the issue.
// I needed to call Session.close() directly to replicate what was happening in production // I needed to call Session.close() directly to replicate what was happening in production
sess.close(true); sess.close(true);
} }
catch (Exception e) catch (Exception e) {
{
e.printStackTrace(); e.printStackTrace();
} }
System.out.println("Thread " + Thread.currentThread().getName() + " done"); System.out.println("Thread " + Thread.currentThread().getName() + " done");
} }
} }
}); });
// //
// consumer.close(); // consumer.close();
// //
// threads.add(new Thread("ClientFailing") // threads.add(new Thread("ClientFailing")
// { // {
// public void run() // public void run()
// { // {
// ClientSessionInternal impl = (ClientSessionInternal) ((HornetQSession)theSession).getCoreSession(); // ClientSessionInternal impl = (ClientSessionInternal) ((HornetQSession)theSession).getCoreSession();
// impl.getConnection().fail(new HornetQException("failure")); // impl.getConnection().fail(new HornetQException("failure"));
// } // }
// }); // });
// //
for (Thread t : threads) {
for (Thread t : threads)
{
t.start(); t.start();
} }
Assert.assertTrue(latchEnter.await(10, TimeUnit.MINUTES)); Assert.assertTrue(latchEnter.await(10, TimeUnit.MINUTES));
latchFlag.countDown(); latchFlag.countDown();
for (Thread t: threads) for (Thread t : threads) {
{
t.join(5000); t.join(5000);
Assert.assertFalse(t.isAlive()); Assert.assertFalse(t.isAlive());
} }
@ -250,19 +219,16 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
HashMap<Integer, AtomicInteger> mapCount = new HashMap<Integer, AtomicInteger>(); HashMap<Integer, AtomicInteger> mapCount = new HashMap<Integer, AtomicInteger>();
while (true) while (true) {
{ TextMessage message = (TextMessage) consumer.receiveNoWait();
TextMessage message = (TextMessage)consumer.receiveNoWait(); if (message == null) {
if (message == null)
{
break; break;
} }
Integer value = message.getIntProperty("i"); Integer value = message.getIntProperty("i");
AtomicInteger count = mapCount.get(value); AtomicInteger count = mapCount.get(value);
if (count == null) if (count == null) {
{
count = new AtomicInteger(0); count = new AtomicInteger(0);
mapCount.put(message.getIntProperty("i"), count); mapCount.put(message.getIntProperty("i"), count);
} }
@ -271,16 +237,13 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase
} }
boolean failed = false; boolean failed = false;
for (int i = 0; i < numberOfMessages; i++) for (int i = 0; i < numberOfMessages; i++) {
{
AtomicInteger count = mapCount.get(i); AtomicInteger count = mapCount.get(i);
if (count == null) if (count == null) {
{
System.out.println("Message " + i + " not received"); System.out.println("Message " + i + " not received");
failed = true; failed = true;
} }
else if (count.get() > 1) else if (count.get() > 1) {
{
System.out.println("Message " + i + " received " + count.get() + " times"); System.out.println("Message " + i + " received " + count.get() + " times");
failed = true; failed = true;
} }
@ -290,6 +253,5 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase
connection.close(); connection.close();
} }
} }

View File

@ -1727,7 +1727,7 @@ public class JMSBridgeTest extends BridgeTestBase {
@Test @Test
public void testSetTMClass() throws Exception { public void testSetTMClass() throws Exception {
TransactionManagerLocatorImpl.tm = new DummyTransactionManager(); TransactionManagerLocatorImpl.setTransactionManager(new DummyTransactionManager());
JMSBridgeImpl bridge = null; JMSBridgeImpl bridge = null;
try { try {

View File

@ -23,14 +23,15 @@ import org.apache.activemq.artemis.service.extensions.transactions.TransactionMa
public class TransactionManagerLocatorImpl implements TransactionManagerLocator { public class TransactionManagerLocatorImpl implements TransactionManagerLocator {
public static TransactionManager tm = null; private static TransactionManager tm = null;
@Override @Override
public TransactionManager getTransactionManager() { public TransactionManager getTransactionManager() {
new Exception("trace").printStackTrace();
return tm; return tm;
} }
public void setTransactionManager(TransactionManager transactionManager) { public static void setTransactionManager(TransactionManager transactionManager) {
tm = transactionManager; tm = transactionManager;
} }
} }

View File

@ -0,0 +1,412 @@
/*
* Copyright 2005-2014 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.jms.ra;
import javax.jms.Message;
import javax.resource.ResourceException;
import javax.resource.spi.UnavailableException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import java.lang.reflect.Method;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
import com.arjuna.ats.arjuna.coordinator.TxControl;
import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.extras.jms.bridge.TransactionManagerLocatorImpl;
import org.apache.activemq.artemis.tests.integration.ra.ActiveMQRATestBase;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Simulates several messages being received over multiple instances with reconnects during the process.
*/
public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase {
final ConcurrentHashMap<Integer, AtomicInteger> mapCounter = new ConcurrentHashMap<Integer, AtomicInteger>();
volatile ActiveMQResourceAdapter resourceAdapter;
ServerLocator nettyLocator;
@Before
public void setUp() throws Exception {
nettyLocator = createNettyNonHALocator();
nettyLocator.setRetryInterval(10);
nettyLocator.setReconnectAttempts(-1);
mapCounter.clear();
resourceAdapter = null;
super.setUp();
createQueue(true, "outQueue");
DummyTMLocator.startTM();
}
@After
public void tearDown() throws Exception {
DummyTMLocator.stopTM();
super.tearDown();
}
protected boolean usePersistence() {
return true;
}
@Override
public boolean useSecurity() {
return false;
}
@Test
public void testReconnectMDBNoMessageLoss() throws Exception {
AddressSettings settings = new AddressSettings();
settings.setRedeliveryDelay(1000);
settings.setMaxDeliveryAttempts(-1);
server.getAddressSettingsRepository().clear();
server.getAddressSettingsRepository().addMatch("#", settings);
ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
resourceAdapter = qResourceAdapter;
// qResourceAdapter.setTransactionManagerLocatorClass(DummyTMLocator.class.getName());
// qResourceAdapter.setTransactionManagerLocatorMethod("getTM");
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.setConnectorClassName(NETTY_CONNECTOR_FACTORY);
qResourceAdapter.start(ctx);
final int NUMBER_OF_SESSIONS = 10;
ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
spec.setMaxSession(NUMBER_OF_SESSIONS);
spec.setTransactionTimeout(1);
spec.setReconnectAttempts(-1);
spec.setConfirmationWindowSize(-1);
spec.setReconnectInterval(1000);
spec.setCallTimeout(1000L);
spec.setResourceAdapter(qResourceAdapter);
spec.setUseJNDI(false);
spec.setDestinationType("javax.jms.Queue");
spec.setDestination(MDBQUEUE);
spec.setConsumerWindowSize(1024 * 1024);
TestEndpointFactory endpointFactory = new TestEndpointFactory(true);
qResourceAdapter.endpointActivation(endpointFactory, spec);
Assert.assertEquals(1, resourceAdapter.getActivations().values().size());
final int NUMBER_OF_MESSAGES = 3000;
Thread producer = new Thread() {
public void run() {
try {
ServerLocator locator = createInVMLocator(0);
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, false);
ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("teststring " + i);
message.putIntProperty("i", i);
clientProducer.send(message);
if (i % 100 == 0) {
session.commit();
}
}
session.commit();
}
catch (Exception e) {
e.printStackTrace();
}
}
};
producer.start();
final AtomicBoolean metaDataFailed = new AtomicBoolean(false);
// This thread will keep bugging the handlers.
// if they behave well with XA, the test pass!
final AtomicBoolean running = new AtomicBoolean(true);
Thread buggerThread = new Thread() {
public void run() {
while (running.get()) {
try {
Thread.sleep(RandomUtil.randomInterval(100, 200));
}
catch (InterruptedException intex) {
intex.printStackTrace();
return;
}
List<ServerSession> serverSessions = new LinkedList<>();
for (ServerSession session : server.getSessions()) {
if (session.getMetaData("resource-adapter") != null) {
serverSessions.add(session);
}
}
System.err.println("Contains " + serverSessions.size() + " RA sessions");
if (serverSessions.size() != NUMBER_OF_SESSIONS) {
System.err.println("the server was supposed to have " + NUMBER_OF_SESSIONS + " RA Sessions but it only contained accordingly to the meta-data");
metaDataFailed.set(true);
}
else if (serverSessions.size() == NUMBER_OF_SESSIONS) {
// it became the same after some reconnect? which would be acceptable
metaDataFailed.set(false);
}
if (serverSessions.size() > 0) {
int randomBother = RandomUtil.randomInterval(0, serverSessions.size() - 1);
System.out.println("bugging session " + randomBother);
RemotingConnection connection = serverSessions.get(randomBother).getRemotingConnection();
connection.fail(new ActiveMQException("failed at random " + randomBother));
}
}
}
};
buggerThread.start();
ServerLocator locator = createInVMLocator(0);
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, false);
session.start();
ClientConsumer consumer = session.createConsumer("jms.queue.outQueue");
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
ClientMessage message = consumer.receive(5000);
if (message == null) {
break;
}
Assert.assertNotNull(message);
message.acknowledge();
Integer value = message.getIntProperty("i");
AtomicInteger mapCount = new AtomicInteger(1);
mapCount = mapCounter.putIfAbsent(value, mapCount);
if (mapCount != null) {
mapCount.incrementAndGet();
}
if (i % 200 == 0) {
System.out.println("received " + i);
session.commit();
}
}
session.commit();
Assert.assertNull(consumer.receiveImmediate());
boolean failed = false;
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
AtomicInteger atomicInteger = mapCounter.get(Integer.valueOf(i));
if (atomicInteger == null) {
System.out.println("didn't receive message with i=" + i);
failed = true;
}
else if (atomicInteger.get() > 1) {
System.out.println("message with i=" + i + " received " + atomicInteger.get() + " times");
failed = true;
}
}
running.set(false);
buggerThread.join();
producer.join();
Assert.assertFalse("There was meta-data failures, some sessions didn't reconnect properly", metaDataFailed.get());
Assert.assertFalse(failed);
System.out.println("Received " + NUMBER_OF_MESSAGES + " messages");
qResourceAdapter.stop();
session.close();
}
protected class TestEndpointFactory implements MessageEndpointFactory {
private final boolean isDeliveryTransacted;
public TestEndpointFactory(boolean deliveryTransacted) {
isDeliveryTransacted = deliveryTransacted;
}
public MessageEndpoint createEndpoint(XAResource xaResource) throws UnavailableException {
TestEndpoint retEnd = new TestEndpoint();
if (xaResource != null) {
retEnd.setXAResource(xaResource);
}
return retEnd;
}
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return isDeliveryTransacted;
}
}
public class TestEndpoint extends DummyMessageEndpoint {
ClientSessionFactory factory;
ClientSession endpointSession;
ClientProducer producer;
Transaction currentTX;
public TestEndpoint() {
super(null);
try {
factory = nettyLocator.createSessionFactory();
// buggingList.add(factory);
endpointSession = factory.createSession(true, false, false);
producer = endpointSession.createProducer("jms.queue.outQueue");
}
catch (Throwable e) {
throw new RuntimeException(e);
}
}
@Override
public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException {
super.beforeDelivery(method);
try {
DummyTMLocator.tm.begin();
currentTX = DummyTMLocator.tm.getTransaction();
currentTX.enlistResource(xaResource);
}
catch (Throwable e) {
throw new RuntimeException(e.getMessage(), e);
}
}
public void onMessage(Message message) {
// try
// {
// System.out.println(Thread.currentThread().getName() + "**** onMessage enter " + message.getIntProperty("i"));
// }
// catch (Exception e)
// {
// }
Integer value = 0;
try {
value = message.getIntProperty("i");
}
catch (Exception e) {
}
super.onMessage(message);
try {
currentTX.enlistResource(endpointSession);
ClientMessage message1 = endpointSession.createMessage(true);
message1.putIntProperty("i", message.getIntProperty("i"));
producer.send(message1);
currentTX.delistResource(endpointSession, XAResource.TMSUCCESS);
}
catch (Exception e) {
e.printStackTrace();
try {
currentTX.setRollbackOnly();
}
catch (Exception ex) {
}
e.printStackTrace();
// throw new RuntimeException(e);
}
}
@Override
public void afterDelivery() throws ResourceException {
try {
DummyTMLocator.tm.commit();
// currentTX.commit();
}
catch (Throwable e) {
}
super.afterDelivery();
}
}
public static class DummyTMLocator {
public static TransactionManagerImple tm;
public static void stopTM() {
try {
TransactionManagerLocatorImpl.setTransactionManager(null);
TransactionReaper.terminate(true);
TxControl.disable(true);
}
catch (Exception e) {
e.printStackTrace();
}
tm = null;
}
public static void startTM() {
tm = new TransactionManagerImple();
TransactionManagerLocatorImpl.setTransactionManager(tm);
TxControl.enable();
}
public TransactionManager getTM() {
return tm;
}
}
}

View File

@ -138,7 +138,6 @@ public abstract class ActiveMQRATestBase extends JMSTestBase {
public void onMessage(Message message) { public void onMessage(Message message) {
lastMessage = (ActiveMQMessage) message; lastMessage = (ActiveMQMessage) message;
System.err.println(message);
} }
public void reset(CountDownLatch latch) { public void reset(CountDownLatch latch) {

View File

@ -17,9 +17,14 @@
package org.apache.activemq.artemis.tests.integration.remoting; package org.apache.activemq.artemis.tests.integration.remoting;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -109,6 +114,97 @@ public class ReconnectTest extends ActiveMQTestBase {
} }
@Test
public void testMetadataAfterReconnectionNetty() throws Exception {
internalMetadataAfterRetry(true);
}
@Test
public void testMetadataAfterReconnectionInVM() throws Exception {
internalMetadataAfterRetry(false);
}
public void internalMetadataAfterRetry(final boolean isNetty) throws Exception {
final int pingPeriod = 1000;
ActiveMQServer server = createServer(false, isNetty);
server.start();
ClientSessionInternal session = null;
try {
for (int i = 0; i < 100; i++) {
ServerLocator locator = createFactory(isNetty);
locator.setClientFailureCheckPeriod(pingPeriod);
locator.setRetryInterval(1);
locator.setRetryIntervalMultiplier(1d);
locator.setReconnectAttempts(-1);
locator.setConfirmationWindowSize(-1);
ClientSessionFactory factory = createSessionFactory(locator);
session = (ClientSessionInternal) factory.createSession();
session.addMetaData("meta1", "meta1");
ServerSession[] sessions = countMetadata(server, "meta1", 1);
Assert.assertEquals(1, sessions.length);
final AtomicInteger count = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(1);
session.addFailoverListener(new FailoverEventListener() {
@Override
public void failoverEvent(FailoverEventType eventType) {
if (eventType == FailoverEventType.FAILOVER_COMPLETED) {
latch.countDown();
}
}
});
sessions[0].getRemotingConnection().fail(new ActiveMQException("failure!"));
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
sessions = countMetadata(server, "meta1", 1);
Assert.assertEquals(1, sessions.length);
locator.close();
}
}
finally {
try {
session.close();
}
catch (Throwable e) {
}
server.stop();
}
}
private ServerSession[] countMetadata(ActiveMQServer server, String parameter, int expected) throws Exception {
List<ServerSession> sessionList = new LinkedList<ServerSession>();
for (int i = 0; i < 10 && sessionList.size() != expected; i++) {
sessionList.clear();
for (ServerSession sess : server.getSessions()) {
if (sess.getMetaData(parameter) != null) {
sessionList.add(sess);
}
}
if (sessionList.size() != expected) {
Thread.sleep(100);
}
}
return sessionList.toArray(new ServerSession[sessionList.size()]);
}
@Test @Test
public void testInterruptReconnectNetty() throws Exception { public void testInterruptReconnectNetty() throws Exception {
internalTestInterruptReconnect(true, false); internalTestInterruptReconnect(true, false);

View File

@ -133,7 +133,9 @@ public class JMSTestBase extends ActiveMQTestBase {
mbeanServer = MBeanServerFactory.createMBeanServer(); mbeanServer = MBeanServerFactory.createMBeanServer();
Configuration config = createDefaultConfig(true).setSecurityEnabled(useSecurity()).addConnectorConfiguration("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY)); Configuration config = createDefaultConfig(true).setSecurityEnabled(useSecurity()).
addConnectorConfiguration("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY)).
setTransactionTimeoutScanPeriod(100);
server = addServer(ActiveMQServers.newActiveMQServer(config, mbeanServer, usePersistence())); server = addServer(ActiveMQServers.newActiveMQServer(config, mbeanServer, usePersistence()));
jmsServer = new JMSServerManagerImpl(server); jmsServer = new JMSServerManagerImpl(server);

View File

@ -782,8 +782,7 @@ public class AcknowledgementTest extends JMSTestCase {
messageReceived = (TextMessage)consumer.receiveNoWait(); messageReceived = (TextMessage)consumer.receiveNoWait();
if (messageReceived != null) if (messageReceived != null) {
{
System.out.println("Message received " + messageReceived.getText()); System.out.println("Message received " + messageReceived.getText());
} }
Assert.assertNull(messageReceived); Assert.assertNull(messageReceived);

View File

@ -108,6 +108,11 @@ public class BindingsImplTest extends ActiveMQTestBase {
} }
@Override
public boolean isEffective() {
return false;
}
public boolean hasTimedOut(long currentTime, int defaultTimeout) { public boolean hasTimedOut(long currentTime, int defaultTimeout) {
return false; return false;
} }