diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index a0985fc965..9e5ccb3892 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -260,7 +260,7 @@ public final class ChannelImpl implements Channel { } if (resendCache != null && packet.isRequiresConfirmations()) { - resendCache.add(packet); + addResendPacket(packet); } } finally { @@ -332,7 +332,7 @@ public final class ChannelImpl implements Channel { response = null; if (resendCache != null && packet.isRequiresConfirmations()) { - resendCache.add(packet); + addResendPacket(packet); } connection.getTransportConnection().write(buffer, false, false); @@ -523,6 +523,10 @@ public final class ChannelImpl implements Channel { confirmed.setChannelID(id); + if (isTrace) { + ActiveMQClientLogger.LOGGER.trace("ChannelImpl::flushConfirmation flushing confirmation " + confirmed); + } + doWrite(confirmed); } } @@ -598,30 +602,35 @@ public final class ChannelImpl implements Channel { connection.getTransportConnection().write(buffer, false, false); } + private void addResendPacket(Packet packet) { + resendCache.add(packet); + + if (isTrace) { + ActiveMQClientLogger.LOGGER.trace("ChannelImpl::addResendPacket adding packet " + packet + " stored commandID=" + firstStoredCommandID + " possible commandIDr=" + (firstStoredCommandID + resendCache.size())); + } + } + private void clearUpTo(final int lastReceivedCommandID) { final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID; - if (numberToClear == -1) { - throw ActiveMQClientMessageBundle.BUNDLE.invalidCommandID(lastReceivedCommandID); + if (isTrace) { + ActiveMQClientLogger.LOGGER.trace("ChannelImpl::clearUpTo lastReceived commandID=" + lastReceivedCommandID + + " first commandID=" + firstStoredCommandID + + " number to clear " + numberToClear); } - int sizeToFree = 0; - for (int i = 0; i < numberToClear; i++) { final Packet packet = resendCache.poll(); if (packet == null) { - if (lastReceivedCommandID > 0) { - ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID); - } + ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID); firstStoredCommandID = lastReceivedCommandID + 1; return; } - if (packet.getType() != PacketImpl.PACKETS_CONFIRMED) { - sizeToFree += packet.getPacketSize(); + if (isTrace) { + ActiveMQClientLogger.LOGGER.trace("ChannelImpl::clearUpTo confirming " + packet + " towards " + commandConfirmationHandler); } - if (commandConfirmationHandler != null) { commandConfirmationHandler.commandConfirmed(packet); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java index 35d2f83834..f316c562ac 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java @@ -25,8 +25,17 @@ public interface DuplicateIDCache { boolean contains(byte[] duplicateID); + void addToCache(byte[] duplicateID) throws Exception; + void addToCache(byte[] duplicateID, Transaction tx) throws Exception; + /** + * it will add the data to the cache. + * If TX == null it won't use a transaction. + * if instantAdd=true, it won't wait a transaction to add on the cache which is needed on the case of the Bridges + */ + void addToCache(byte[] duplicateID, Transaction tx, boolean instantAdd) throws Exception; + void deleteFromCache(byte[] duplicateID) throws Exception; void load(List> theIds) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java index 0ce63663e5..7059671960 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java @@ -137,7 +137,17 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { } @Override - public synchronized void addToCache(final byte[] duplID, final Transaction tx) throws Exception { + public void addToCache(final byte[] duplID) throws Exception { + addToCache(duplID, null, false); + } + + @Override + public void addToCache(final byte[] duplID, final Transaction tx) throws Exception { + addToCache(duplID, tx, false); + } + + @Override + public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception { long recordID = -1; if (tx == null) { @@ -156,9 +166,14 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { tx.setContainsPersistent(); } - // For a tx, it's important that the entry is not added to the cache until commit - // since if the client fails then resends them tx we don't want it to get rejected - tx.addOperation(new AddDuplicateIDOperation(duplID, recordID)); + if (instantAdd) { + addToCacheInMemory(duplID, recordID); + } + else { + // For a tx, it's important that the entry is not added to the cache until commit + // since if the client fails then resends them tx we don't want it to get rejected + tx.addOperation(new AddDuplicateIDOperation(duplID, recordID)); + } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index da22bb9e0c..4b250232a2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1177,7 +1177,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } - cacheBridge.addToCache(bridgeDupBytes, context.getTransaction()); + // on the bridge case there is a case where the bridge reconnects + // and the send hasn't finished yet (think of CPU outages). + // for that reason we add the cache right away + cacheBridge.addToCache(bridgeDupBytes, context.getTransaction(), true); message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index f638fbcf79..d81dd3eec8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -18,9 +18,10 @@ package org.apache.activemq.artemis.core.server.cluster.impl; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.ListIterator; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -98,7 +99,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled private final SimpleString forwardingAddress; - private final java.util.Queue refs = new ConcurrentLinkedQueue<>(); + private final java.util.Map refs = new LinkedHashMap<>(); private final Transformer transformer; @@ -127,6 +128,9 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled protected volatile ClientSessionInternal session; + // on cases where sub-classes need a consumer + protected volatile ClientSessionInternal sessionConsumer; + protected String targetNodeID; protected TopologyMember targetNode; @@ -230,8 +234,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled */ @Override public List getDeliveringMessages() { - synchronized (this) { - return new ArrayList<>(refs); + synchronized (refs) { + return new ArrayList<>(refs.values()); } } @@ -271,34 +275,43 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } private void cancelRefs() { - MessageReference ref; - LinkedList list = new LinkedList<>(); - while ((ref = refs.poll()) != null) { - if (isTrace) { - ActiveMQServerLogger.LOGGER.trace("Cancelling reference " + ref + " on bridge " + this); - } - list.addFirst(ref); + synchronized (refs) { + list.addAll(refs.values()); + refs.clear(); + } + + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("BridgeImpl::cancelRefs cancelling " + list.size() + " references"); } if (isTrace && list.isEmpty()) { ActiveMQServerLogger.LOGGER.trace("didn't have any references to cancel on bridge " + this); + return; } - Queue refqueue = null; + ListIterator listIterator = list.listIterator(list.size()); + + Queue refqueue; long timeBase = System.currentTimeMillis(); - for (MessageReference ref2 : list) { - refqueue = ref2.getQueue(); + while (listIterator.hasPrevious()) { + MessageReference ref = listIterator.previous(); + + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("BridgeImpl::cancelRefs Cancelling reference " + ref + " on bridge " + this); + } + + refqueue = ref.getQueue(); try { - refqueue.cancel(ref2, timeBase); + refqueue.cancel(ref, timeBase); } catch (Exception e) { // There isn't much we can do besides log an error - ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(e, ref2); + ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(e, ref); } } } @@ -317,10 +330,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } } - @Override public void disconnect() { executor.execute(new Runnable() { - @Override public void run() { if (session != null) { try { @@ -331,6 +342,15 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } session = null; } + if (sessionConsumer != null) { + try { + sessionConsumer.cleanUp(false); + } + catch (Exception dontcare) { + ActiveMQServerLogger.LOGGER.debug(dontcare.getMessage(), dontcare); + } + sessionConsumer = null; + } } }); } @@ -379,7 +399,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } } - @Override public void pause() throws Exception { if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { ActiveMQServerLogger.LOGGER.debug("Bridge " + this.name + " being paused"); @@ -452,17 +471,30 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled @Override public void sendAcknowledged(final Message message) { + if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) { + ActiveMQServerLogger.LOGGER.trace("BridgeImpl::sendAcknowledged received confirmation for message " + message); + } if (active) { try { - final MessageReference ref = refs.poll(); + + final MessageReference ref; + + synchronized (refs) { + ref = refs.remove(message.getMessageID()); + } if (ref != null) { if (isTrace) { - ActiveMQServerLogger.LOGGER.trace(this + " Acking " + ref + " on queue " + ref.getQueue()); + ActiveMQServerLogger.LOGGER.trace("BridgeImpl::sendAcknowledged bridge " + this + " Acking " + ref + " on queue " + ref.getQueue()); } ref.getQueue().acknowledge(ref); pendingAcks.countDown(); } + else { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("BridgeImpl::sendAcknowledged bridge " + this + " could not find reference for message " + message); + } + } } catch (Exception e) { ActiveMQServerLogger.LOGGER.bridgeFailedToAck(e); @@ -529,7 +561,9 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled ref.handled(); - refs.add(ref); + synchronized (refs) { + refs.put(ref.getMessage().getMessageID(), ref); + } final ServerMessage message = beforeForward(ref.getMessage()); @@ -686,9 +720,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled catch (final ActiveMQException e) { ActiveMQServerLogger.LOGGER.bridgeUnableToSendMessage(e, ref); - // We remove this reference as we are returning busy which means the reference will never leave the Queue. - // because of this we have to remove the reference here - refs.remove(ref); + synchronized (refs) { + // We remove this reference as we are returning busy which means the reference will never leave the Queue. + // because of this we have to remove the reference here + refs.remove(message.getMessageID()); + } connectionFailed(e, false); @@ -840,6 +876,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } // Session is pre-acknowledge session = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1); + sessionConsumer = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1); } if (forwardingAddress != null) { @@ -1031,7 +1068,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled bridge = bridge2; } - @Override public void run() { bridge.connect(); } @@ -1058,8 +1094,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } - internalCancelReferences(); - if (session != null) { ActiveMQServerLogger.LOGGER.debug("Cleaning up session " + session); session.removeFailureListener(BridgeImpl.this); @@ -1071,6 +1105,18 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } } + if (sessionConsumer != null) { + ActiveMQServerLogger.LOGGER.debug("Cleaning up session " + session); + try { + sessionConsumer.close(); + sessionConsumer = null; + } + catch (ActiveMQException dontcare) { + } + } + + internalCancelReferences(); + if (csf != null) { csf.cleanup(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java index 7bcc6b2b99..ad34d23a38 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java @@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ResourceNames; +import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.filter.Filter; @@ -42,9 +43,9 @@ import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; -import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory; import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord; import org.apache.activemq.artemis.core.server.cluster.Transformer; import org.apache.activemq.artemis.utils.UUID; @@ -240,27 +241,28 @@ public class ClusterConnectionBridge extends BridgeImpl { createSelectorFromAddress(flowRecord.getAddress()) + ")"); - session.createTemporaryQueue(managementNotificationAddress, notifQueueName, filter); + sessionConsumer.createTemporaryQueue(managementNotificationAddress, notifQueueName, filter); - notifConsumer = session.createConsumer(notifQueueName); + notifConsumer = sessionConsumer.createConsumer(notifQueueName); notifConsumer.setMessageHandler(flowRecord); - session.start(); + sessionConsumer.start(); - ClientMessage message = session.createMessage(false); - if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) { - ActiveMQServerLogger.LOGGER.trace("Requesting sendQueueInfoToQueue through " + this, new Exception("trace")); + ClientMessage message = sessionConsumer.createMessage(false); + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { + ActiveMQClientLogger.LOGGER.trace("Requesting sendQueueInfoToQueue through " + this, new Exception("trace")); } ManagementHelper.putOperationInvocation(message, ResourceNames.CORE_SERVER, "sendQueueInfoToQueue", notifQueueName.toString(), flowRecord.getAddress()); - ClientProducer prod = session.createProducer(managementAddress); + ClientProducer prod = sessionConsumer.createProducer(managementAddress); - if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQServerLogger.LOGGER.debug("Cluster connetion bridge on " + clusterConnection + " requesting information on queues"); + if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) { + ActiveMQClientLogger.LOGGER.debug("Cluster connection bridge on " + clusterConnection + " requesting information on queues"); } prod.send(message); + prod.close(); } }