This closes #284

This commit is contained in:
Clebert Suconic 2015-12-21 23:40:11 -05:00
commit fce14f1f30
6 changed files with 139 additions and 55 deletions

View File

@ -260,7 +260,7 @@ public final class ChannelImpl implements Channel {
}
if (resendCache != null && packet.isRequiresConfirmations()) {
resendCache.add(packet);
addResendPacket(packet);
}
}
finally {
@ -332,7 +332,7 @@ public final class ChannelImpl implements Channel {
response = null;
if (resendCache != null && packet.isRequiresConfirmations()) {
resendCache.add(packet);
addResendPacket(packet);
}
connection.getTransportConnection().write(buffer, false, false);
@ -523,6 +523,10 @@ public final class ChannelImpl implements Channel {
confirmed.setChannelID(id);
if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("ChannelImpl::flushConfirmation flushing confirmation " + confirmed);
}
doWrite(confirmed);
}
}
@ -598,30 +602,35 @@ public final class ChannelImpl implements Channel {
connection.getTransportConnection().write(buffer, false, false);
}
private void addResendPacket(Packet packet) {
resendCache.add(packet);
if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("ChannelImpl::addResendPacket adding packet " + packet + " stored commandID=" + firstStoredCommandID + " possible commandIDr=" + (firstStoredCommandID + resendCache.size()));
}
}
private void clearUpTo(final int lastReceivedCommandID) {
final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
if (numberToClear == -1) {
throw ActiveMQClientMessageBundle.BUNDLE.invalidCommandID(lastReceivedCommandID);
if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("ChannelImpl::clearUpTo lastReceived commandID=" + lastReceivedCommandID +
" first commandID=" + firstStoredCommandID +
" number to clear " + numberToClear);
}
int sizeToFree = 0;
for (int i = 0; i < numberToClear; i++) {
final Packet packet = resendCache.poll();
if (packet == null) {
if (lastReceivedCommandID > 0) {
ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID);
}
ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID);
firstStoredCommandID = lastReceivedCommandID + 1;
return;
}
if (packet.getType() != PacketImpl.PACKETS_CONFIRMED) {
sizeToFree += packet.getPacketSize();
if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("ChannelImpl::clearUpTo confirming " + packet + " towards " + commandConfirmationHandler);
}
if (commandConfirmationHandler != null) {
commandConfirmationHandler.commandConfirmed(packet);
}

View File

@ -25,8 +25,17 @@ public interface DuplicateIDCache {
boolean contains(byte[] duplicateID);
void addToCache(byte[] duplicateID) throws Exception;
void addToCache(byte[] duplicateID, Transaction tx) throws Exception;
/**
* it will add the data to the cache.
* If TX == null it won't use a transaction.
* if instantAdd=true, it won't wait a transaction to add on the cache which is needed on the case of the Bridges
*/
void addToCache(byte[] duplicateID, Transaction tx, boolean instantAdd) throws Exception;
void deleteFromCache(byte[] duplicateID) throws Exception;
void load(List<Pair<byte[], Long>> theIds) throws Exception;

View File

@ -137,7 +137,17 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
}
@Override
public synchronized void addToCache(final byte[] duplID, final Transaction tx) throws Exception {
public void addToCache(final byte[] duplID) throws Exception {
addToCache(duplID, null, false);
}
@Override
public void addToCache(final byte[] duplID, final Transaction tx) throws Exception {
addToCache(duplID, tx, false);
}
@Override
public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
long recordID = -1;
if (tx == null) {
@ -156,9 +166,14 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
tx.setContainsPersistent();
}
// For a tx, it's important that the entry is not added to the cache until commit
// since if the client fails then resends them tx we don't want it to get rejected
tx.addOperation(new AddDuplicateIDOperation(duplID, recordID));
if (instantAdd) {
addToCacheInMemory(duplID, recordID);
}
else {
// For a tx, it's important that the entry is not added to the cache until commit
// since if the client fails then resends them tx we don't want it to get rejected
tx.addOperation(new AddDuplicateIDOperation(duplID, recordID));
}
}
}

View File

@ -1177,7 +1177,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
cacheBridge.addToCache(bridgeDupBytes, context.getTransaction());
// on the bridge case there is a case where the bridge reconnects
// and the send hasn't finished yet (think of CPU outages).
// for that reason we add the cache right away
cacheBridge.addToCache(bridgeDupBytes, context.getTransaction(), true);
message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);

View File

@ -18,9 +18,10 @@ package org.apache.activemq.artemis.core.server.cluster.impl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.ListIterator;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@ -98,7 +99,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private final SimpleString forwardingAddress;
private final java.util.Queue<MessageReference> refs = new ConcurrentLinkedQueue<>();
private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
private final Transformer transformer;
@ -127,6 +128,9 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
protected volatile ClientSessionInternal session;
// on cases where sub-classes need a consumer
protected volatile ClientSessionInternal sessionConsumer;
protected String targetNodeID;
protected TopologyMember targetNode;
@ -230,8 +234,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
*/
@Override
public List<MessageReference> getDeliveringMessages() {
synchronized (this) {
return new ArrayList<>(refs);
synchronized (refs) {
return new ArrayList<>(refs.values());
}
}
@ -271,34 +275,43 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
private void cancelRefs() {
MessageReference ref;
LinkedList<MessageReference> list = new LinkedList<>();
while ((ref = refs.poll()) != null) {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("Cancelling reference " + ref + " on bridge " + this);
}
list.addFirst(ref);
synchronized (refs) {
list.addAll(refs.values());
refs.clear();
}
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("BridgeImpl::cancelRefs cancelling " + list.size() + " references");
}
if (isTrace && list.isEmpty()) {
ActiveMQServerLogger.LOGGER.trace("didn't have any references to cancel on bridge " + this);
return;
}
Queue refqueue = null;
ListIterator<MessageReference> listIterator = list.listIterator(list.size());
Queue refqueue;
long timeBase = System.currentTimeMillis();
for (MessageReference ref2 : list) {
refqueue = ref2.getQueue();
while (listIterator.hasPrevious()) {
MessageReference ref = listIterator.previous();
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("BridgeImpl::cancelRefs Cancelling reference " + ref + " on bridge " + this);
}
refqueue = ref.getQueue();
try {
refqueue.cancel(ref2, timeBase);
refqueue.cancel(ref, timeBase);
}
catch (Exception e) {
// There isn't much we can do besides log an error
ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(e, ref2);
ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(e, ref);
}
}
}
@ -317,10 +330,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
}
@Override
public void disconnect() {
executor.execute(new Runnable() {
@Override
public void run() {
if (session != null) {
try {
@ -331,6 +342,15 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
session = null;
}
if (sessionConsumer != null) {
try {
sessionConsumer.cleanUp(false);
}
catch (Exception dontcare) {
ActiveMQServerLogger.LOGGER.debug(dontcare.getMessage(), dontcare);
}
sessionConsumer = null;
}
}
});
}
@ -379,7 +399,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
}
@Override
public void pause() throws Exception {
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("Bridge " + this.name + " being paused");
@ -452,17 +471,30 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
@Override
public void sendAcknowledged(final Message message) {
if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
ActiveMQServerLogger.LOGGER.trace("BridgeImpl::sendAcknowledged received confirmation for message " + message);
}
if (active) {
try {
final MessageReference ref = refs.poll();
final MessageReference ref;
synchronized (refs) {
ref = refs.remove(message.getMessageID());
}
if (ref != null) {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace(this + " Acking " + ref + " on queue " + ref.getQueue());
ActiveMQServerLogger.LOGGER.trace("BridgeImpl::sendAcknowledged bridge " + this + " Acking " + ref + " on queue " + ref.getQueue());
}
ref.getQueue().acknowledge(ref);
pendingAcks.countDown();
}
else {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("BridgeImpl::sendAcknowledged bridge " + this + " could not find reference for message " + message);
}
}
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.bridgeFailedToAck(e);
@ -529,7 +561,9 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
ref.handled();
refs.add(ref);
synchronized (refs) {
refs.put(ref.getMessage().getMessageID(), ref);
}
final ServerMessage message = beforeForward(ref.getMessage());
@ -686,9 +720,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
catch (final ActiveMQException e) {
ActiveMQServerLogger.LOGGER.bridgeUnableToSendMessage(e, ref);
// We remove this reference as we are returning busy which means the reference will never leave the Queue.
// because of this we have to remove the reference here
refs.remove(ref);
synchronized (refs) {
// We remove this reference as we are returning busy which means the reference will never leave the Queue.
// because of this we have to remove the reference here
refs.remove(message.getMessageID());
}
connectionFailed(e, false);
@ -840,6 +876,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
// Session is pre-acknowledge
session = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1);
sessionConsumer = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1);
}
if (forwardingAddress != null) {
@ -1031,7 +1068,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
bridge = bridge2;
}
@Override
public void run() {
bridge.connect();
}
@ -1058,8 +1094,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
internalCancelReferences();
if (session != null) {
ActiveMQServerLogger.LOGGER.debug("Cleaning up session " + session);
session.removeFailureListener(BridgeImpl.this);
@ -1071,6 +1105,18 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
}
if (sessionConsumer != null) {
ActiveMQServerLogger.LOGGER.debug("Cleaning up session " + session);
try {
sessionConsumer.close();
sessionConsumer = null;
}
catch (ActiveMQException dontcare) {
}
}
internalCancelReferences();
if (csf != null) {
csf.cleanup();
}

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.filter.Filter;
@ -42,9 +43,9 @@ import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.utils.UUID;
@ -240,27 +241,28 @@ public class ClusterConnectionBridge extends BridgeImpl {
createSelectorFromAddress(flowRecord.getAddress()) +
")");
session.createTemporaryQueue(managementNotificationAddress, notifQueueName, filter);
sessionConsumer.createTemporaryQueue(managementNotificationAddress, notifQueueName, filter);
notifConsumer = session.createConsumer(notifQueueName);
notifConsumer = sessionConsumer.createConsumer(notifQueueName);
notifConsumer.setMessageHandler(flowRecord);
session.start();
sessionConsumer.start();
ClientMessage message = session.createMessage(false);
if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
ActiveMQServerLogger.LOGGER.trace("Requesting sendQueueInfoToQueue through " + this, new Exception("trace"));
ClientMessage message = sessionConsumer.createMessage(false);
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
ActiveMQClientLogger.LOGGER.trace("Requesting sendQueueInfoToQueue through " + this, new Exception("trace"));
}
ManagementHelper.putOperationInvocation(message, ResourceNames.CORE_SERVER, "sendQueueInfoToQueue", notifQueueName.toString(), flowRecord.getAddress());
ClientProducer prod = session.createProducer(managementAddress);
ClientProducer prod = sessionConsumer.createProducer(managementAddress);
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("Cluster connetion bridge on " + clusterConnection + " requesting information on queues");
if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
ActiveMQClientLogger.LOGGER.debug("Cluster connection bridge on " + clusterConnection + " requesting information on queues");
}
prod.send(message);
prod.close();
}
}