ARTEMIS-328 Fixing message loss through the bridge

https://issues.apache.org/jira/browse/ARTEMIS-328
This commit is contained in:
Clebert Suconic 2015-12-21 22:50:10 -05:00
parent d435a3d00a
commit 9167213f00
6 changed files with 139 additions and 55 deletions

View File

@ -260,7 +260,7 @@ public final class ChannelImpl implements Channel {
} }
if (resendCache != null && packet.isRequiresConfirmations()) { if (resendCache != null && packet.isRequiresConfirmations()) {
resendCache.add(packet); addResendPacket(packet);
} }
} }
finally { finally {
@ -332,7 +332,7 @@ public final class ChannelImpl implements Channel {
response = null; response = null;
if (resendCache != null && packet.isRequiresConfirmations()) { if (resendCache != null && packet.isRequiresConfirmations()) {
resendCache.add(packet); addResendPacket(packet);
} }
connection.getTransportConnection().write(buffer, false, false); connection.getTransportConnection().write(buffer, false, false);
@ -523,6 +523,10 @@ public final class ChannelImpl implements Channel {
confirmed.setChannelID(id); confirmed.setChannelID(id);
if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("ChannelImpl::flushConfirmation flushing confirmation " + confirmed);
}
doWrite(confirmed); doWrite(confirmed);
} }
} }
@ -598,30 +602,35 @@ public final class ChannelImpl implements Channel {
connection.getTransportConnection().write(buffer, false, false); connection.getTransportConnection().write(buffer, false, false);
} }
private void addResendPacket(Packet packet) {
resendCache.add(packet);
if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("ChannelImpl::addResendPacket adding packet " + packet + " stored commandID=" + firstStoredCommandID + " possible commandIDr=" + (firstStoredCommandID + resendCache.size()));
}
}
private void clearUpTo(final int lastReceivedCommandID) { private void clearUpTo(final int lastReceivedCommandID) {
final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID; final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
if (numberToClear == -1) { if (isTrace) {
throw ActiveMQClientMessageBundle.BUNDLE.invalidCommandID(lastReceivedCommandID); ActiveMQClientLogger.LOGGER.trace("ChannelImpl::clearUpTo lastReceived commandID=" + lastReceivedCommandID +
" first commandID=" + firstStoredCommandID +
" number to clear " + numberToClear);
} }
int sizeToFree = 0;
for (int i = 0; i < numberToClear; i++) { for (int i = 0; i < numberToClear; i++) {
final Packet packet = resendCache.poll(); final Packet packet = resendCache.poll();
if (packet == null) { if (packet == null) {
if (lastReceivedCommandID > 0) { ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID);
ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID);
}
firstStoredCommandID = lastReceivedCommandID + 1; firstStoredCommandID = lastReceivedCommandID + 1;
return; return;
} }
if (packet.getType() != PacketImpl.PACKETS_CONFIRMED) { if (isTrace) {
sizeToFree += packet.getPacketSize(); ActiveMQClientLogger.LOGGER.trace("ChannelImpl::clearUpTo confirming " + packet + " towards " + commandConfirmationHandler);
} }
if (commandConfirmationHandler != null) { if (commandConfirmationHandler != null) {
commandConfirmationHandler.commandConfirmed(packet); commandConfirmationHandler.commandConfirmed(packet);
} }

View File

@ -25,8 +25,17 @@ public interface DuplicateIDCache {
boolean contains(byte[] duplicateID); boolean contains(byte[] duplicateID);
void addToCache(byte[] duplicateID) throws Exception;
void addToCache(byte[] duplicateID, Transaction tx) throws Exception; void addToCache(byte[] duplicateID, Transaction tx) throws Exception;
/**
* it will add the data to the cache.
* If TX == null it won't use a transaction.
* if instantAdd=true, it won't wait a transaction to add on the cache which is needed on the case of the Bridges
*/
void addToCache(byte[] duplicateID, Transaction tx, boolean instantAdd) throws Exception;
void deleteFromCache(byte[] duplicateID) throws Exception; void deleteFromCache(byte[] duplicateID) throws Exception;
void load(List<Pair<byte[], Long>> theIds) throws Exception; void load(List<Pair<byte[], Long>> theIds) throws Exception;

View File

@ -137,7 +137,17 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
} }
@Override @Override
public synchronized void addToCache(final byte[] duplID, final Transaction tx) throws Exception { public void addToCache(final byte[] duplID) throws Exception {
addToCache(duplID, null, false);
}
@Override
public void addToCache(final byte[] duplID, final Transaction tx) throws Exception {
addToCache(duplID, tx, false);
}
@Override
public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
long recordID = -1; long recordID = -1;
if (tx == null) { if (tx == null) {
@ -156,9 +166,14 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
tx.setContainsPersistent(); tx.setContainsPersistent();
} }
// For a tx, it's important that the entry is not added to the cache until commit if (instantAdd) {
// since if the client fails then resends them tx we don't want it to get rejected addToCacheInMemory(duplID, recordID);
tx.addOperation(new AddDuplicateIDOperation(duplID, recordID)); }
else {
// For a tx, it's important that the entry is not added to the cache until commit
// since if the client fails then resends them tx we don't want it to get rejected
tx.addOperation(new AddDuplicateIDOperation(duplID, recordID));
}
} }
} }

View File

@ -1177,7 +1177,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
} }
} }
cacheBridge.addToCache(bridgeDupBytes, context.getTransaction()); // on the bridge case there is a case where the bridge reconnects
// and the send hasn't finished yet (think of CPU outages).
// for that reason we add the cache right away
cacheBridge.addToCache(bridgeDupBytes, context.getTransaction(), true);
message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID); message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);

View File

@ -18,9 +18,10 @@ package org.apache.activemq.artemis.core.server.cluster.impl;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.ListIterator;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
@ -98,7 +99,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private final SimpleString forwardingAddress; private final SimpleString forwardingAddress;
private final java.util.Queue<MessageReference> refs = new ConcurrentLinkedQueue<>(); private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
private final Transformer transformer; private final Transformer transformer;
@ -127,6 +128,9 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
protected volatile ClientSessionInternal session; protected volatile ClientSessionInternal session;
// on cases where sub-classes need a consumer
protected volatile ClientSessionInternal sessionConsumer;
protected String targetNodeID; protected String targetNodeID;
protected TopologyMember targetNode; protected TopologyMember targetNode;
@ -230,8 +234,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
*/ */
@Override @Override
public List<MessageReference> getDeliveringMessages() { public List<MessageReference> getDeliveringMessages() {
synchronized (this) { synchronized (refs) {
return new ArrayList<>(refs); return new ArrayList<>(refs.values());
} }
} }
@ -271,34 +275,43 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
} }
private void cancelRefs() { private void cancelRefs() {
MessageReference ref;
LinkedList<MessageReference> list = new LinkedList<>(); LinkedList<MessageReference> list = new LinkedList<>();
while ((ref = refs.poll()) != null) { synchronized (refs) {
if (isTrace) { list.addAll(refs.values());
ActiveMQServerLogger.LOGGER.trace("Cancelling reference " + ref + " on bridge " + this); refs.clear();
} }
list.addFirst(ref);
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("BridgeImpl::cancelRefs cancelling " + list.size() + " references");
} }
if (isTrace && list.isEmpty()) { if (isTrace && list.isEmpty()) {
ActiveMQServerLogger.LOGGER.trace("didn't have any references to cancel on bridge " + this); ActiveMQServerLogger.LOGGER.trace("didn't have any references to cancel on bridge " + this);
return;
} }
Queue refqueue = null; ListIterator<MessageReference> listIterator = list.listIterator(list.size());
Queue refqueue;
long timeBase = System.currentTimeMillis(); long timeBase = System.currentTimeMillis();
for (MessageReference ref2 : list) { while (listIterator.hasPrevious()) {
refqueue = ref2.getQueue(); MessageReference ref = listIterator.previous();
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("BridgeImpl::cancelRefs Cancelling reference " + ref + " on bridge " + this);
}
refqueue = ref.getQueue();
try { try {
refqueue.cancel(ref2, timeBase); refqueue.cancel(ref, timeBase);
} }
catch (Exception e) { catch (Exception e) {
// There isn't much we can do besides log an error // There isn't much we can do besides log an error
ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(e, ref2); ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(e, ref);
} }
} }
} }
@ -317,10 +330,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
} }
} }
@Override
public void disconnect() { public void disconnect() {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override
public void run() { public void run() {
if (session != null) { if (session != null) {
try { try {
@ -331,6 +342,15 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
} }
session = null; session = null;
} }
if (sessionConsumer != null) {
try {
sessionConsumer.cleanUp(false);
}
catch (Exception dontcare) {
ActiveMQServerLogger.LOGGER.debug(dontcare.getMessage(), dontcare);
}
sessionConsumer = null;
}
} }
}); });
} }
@ -379,7 +399,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
} }
} }
@Override
public void pause() throws Exception { public void pause() throws Exception {
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("Bridge " + this.name + " being paused"); ActiveMQServerLogger.LOGGER.debug("Bridge " + this.name + " being paused");
@ -452,17 +471,30 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
@Override @Override
public void sendAcknowledged(final Message message) { public void sendAcknowledged(final Message message) {
if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
ActiveMQServerLogger.LOGGER.trace("BridgeImpl::sendAcknowledged received confirmation for message " + message);
}
if (active) { if (active) {
try { try {
final MessageReference ref = refs.poll();
final MessageReference ref;
synchronized (refs) {
ref = refs.remove(message.getMessageID());
}
if (ref != null) { if (ref != null) {
if (isTrace) { if (isTrace) {
ActiveMQServerLogger.LOGGER.trace(this + " Acking " + ref + " on queue " + ref.getQueue()); ActiveMQServerLogger.LOGGER.trace("BridgeImpl::sendAcknowledged bridge " + this + " Acking " + ref + " on queue " + ref.getQueue());
} }
ref.getQueue().acknowledge(ref); ref.getQueue().acknowledge(ref);
pendingAcks.countDown(); pendingAcks.countDown();
} }
else {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("BridgeImpl::sendAcknowledged bridge " + this + " could not find reference for message " + message);
}
}
} }
catch (Exception e) { catch (Exception e) {
ActiveMQServerLogger.LOGGER.bridgeFailedToAck(e); ActiveMQServerLogger.LOGGER.bridgeFailedToAck(e);
@ -529,7 +561,9 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
ref.handled(); ref.handled();
refs.add(ref); synchronized (refs) {
refs.put(ref.getMessage().getMessageID(), ref);
}
final ServerMessage message = beforeForward(ref.getMessage()); final ServerMessage message = beforeForward(ref.getMessage());
@ -686,9 +720,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
catch (final ActiveMQException e) { catch (final ActiveMQException e) {
ActiveMQServerLogger.LOGGER.bridgeUnableToSendMessage(e, ref); ActiveMQServerLogger.LOGGER.bridgeUnableToSendMessage(e, ref);
// We remove this reference as we are returning busy which means the reference will never leave the Queue. synchronized (refs) {
// because of this we have to remove the reference here // We remove this reference as we are returning busy which means the reference will never leave the Queue.
refs.remove(ref); // because of this we have to remove the reference here
refs.remove(message.getMessageID());
}
connectionFailed(e, false); connectionFailed(e, false);
@ -840,6 +876,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
} }
// Session is pre-acknowledge // Session is pre-acknowledge
session = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1); session = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1);
sessionConsumer = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1);
} }
if (forwardingAddress != null) { if (forwardingAddress != null) {
@ -1031,7 +1068,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
bridge = bridge2; bridge = bridge2;
} }
@Override
public void run() { public void run() {
bridge.connect(); bridge.connect();
} }
@ -1058,8 +1094,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
} }
internalCancelReferences();
if (session != null) { if (session != null) {
ActiveMQServerLogger.LOGGER.debug("Cleaning up session " + session); ActiveMQServerLogger.LOGGER.debug("Cleaning up session " + session);
session.removeFailureListener(BridgeImpl.this); session.removeFailureListener(BridgeImpl.this);
@ -1071,6 +1105,18 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
} }
} }
if (sessionConsumer != null) {
ActiveMQServerLogger.LOGGER.debug("Cleaning up session " + session);
try {
sessionConsumer.close();
sessionConsumer = null;
}
catch (ActiveMQException dontcare) {
}
}
internalCancelReferences();
if (csf != null) { if (csf != null) {
csf.cleanup(); csf.cleanup();
} }

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
@ -42,9 +43,9 @@ import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord; import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.Transformer; import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUID;
@ -240,27 +241,28 @@ public class ClusterConnectionBridge extends BridgeImpl {
createSelectorFromAddress(flowRecord.getAddress()) + createSelectorFromAddress(flowRecord.getAddress()) +
")"); ")");
session.createTemporaryQueue(managementNotificationAddress, notifQueueName, filter); sessionConsumer.createTemporaryQueue(managementNotificationAddress, notifQueueName, filter);
notifConsumer = session.createConsumer(notifQueueName); notifConsumer = sessionConsumer.createConsumer(notifQueueName);
notifConsumer.setMessageHandler(flowRecord); notifConsumer.setMessageHandler(flowRecord);
session.start(); sessionConsumer.start();
ClientMessage message = session.createMessage(false); ClientMessage message = sessionConsumer.createMessage(false);
if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) { if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
ActiveMQServerLogger.LOGGER.trace("Requesting sendQueueInfoToQueue through " + this, new Exception("trace")); ActiveMQClientLogger.LOGGER.trace("Requesting sendQueueInfoToQueue through " + this, new Exception("trace"));
} }
ManagementHelper.putOperationInvocation(message, ResourceNames.CORE_SERVER, "sendQueueInfoToQueue", notifQueueName.toString(), flowRecord.getAddress()); ManagementHelper.putOperationInvocation(message, ResourceNames.CORE_SERVER, "sendQueueInfoToQueue", notifQueueName.toString(), flowRecord.getAddress());
ClientProducer prod = session.createProducer(managementAddress); ClientProducer prod = sessionConsumer.createProducer(managementAddress);
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("Cluster connetion bridge on " + clusterConnection + " requesting information on queues"); ActiveMQClientLogger.LOGGER.debug("Cluster connection bridge on " + clusterConnection + " requesting information on queues");
} }
prod.send(message); prod.send(message);
prod.close();
} }
} }