merge #61 - Improving ScaleDown and allowing snapshots on builds
This commit is contained in:
commit
b388bf2397
|
@ -28,7 +28,7 @@ import java.util.Iterator;
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public interface LinkedListIterator<E> extends Iterator<E>
|
public interface LinkedListIterator<E> extends Iterator<E>, AutoCloseable
|
||||||
{
|
{
|
||||||
void repeat();
|
void repeat();
|
||||||
|
|
||||||
|
|
|
@ -1247,6 +1247,8 @@ final class PageSubscriptionImpl implements PageSubscription
|
||||||
|
|
||||||
private volatile boolean isredelivery = false;
|
private volatile boolean isredelivery = false;
|
||||||
|
|
||||||
|
private PagedReference currentDelivery = null;
|
||||||
|
|
||||||
private volatile PagedReference lastRedelivery = null;
|
private volatile PagedReference lastRedelivery = null;
|
||||||
|
|
||||||
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
|
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
|
||||||
|
@ -1298,9 +1300,9 @@ final class PageSubscriptionImpl implements PageSubscription
|
||||||
|
|
||||||
if (cachedNext != null)
|
if (cachedNext != null)
|
||||||
{
|
{
|
||||||
PagedReference retPos = cachedNext;
|
currentDelivery = cachedNext;
|
||||||
cachedNext = null;
|
cachedNext = null;
|
||||||
return retPos;
|
return currentDelivery;
|
||||||
}
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
|
@ -1310,7 +1312,8 @@ final class PageSubscriptionImpl implements PageSubscription
|
||||||
position = getStartPosition();
|
position = getStartPosition();
|
||||||
}
|
}
|
||||||
|
|
||||||
return moveNext();
|
currentDelivery = moveNext();
|
||||||
|
return currentDelivery;
|
||||||
}
|
}
|
||||||
catch (RuntimeException e)
|
catch (RuntimeException e)
|
||||||
{
|
{
|
||||||
|
@ -1473,10 +1476,13 @@ final class PageSubscriptionImpl implements PageSubscription
|
||||||
public void remove()
|
public void remove()
|
||||||
{
|
{
|
||||||
deliveredCount.incrementAndGet();
|
deliveredCount.incrementAndGet();
|
||||||
PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(position);
|
if (currentDelivery != null)
|
||||||
if (info != null)
|
|
||||||
{
|
{
|
||||||
info.remove(position);
|
PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(currentDelivery.getPosition());
|
||||||
|
if (info != null)
|
||||||
|
{
|
||||||
|
info.remove(currentDelivery.getPosition());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.activemq.core.postoffice;
|
package org.apache.activemq.core.postoffice;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.activemq.api.core.SimpleString;
|
import org.apache.activemq.api.core.SimpleString;
|
||||||
import org.apache.activemq.core.transaction.Transaction;
|
import org.apache.activemq.core.transaction.Transaction;
|
||||||
|
@ -50,4 +51,6 @@ public interface AddressManager
|
||||||
Binding getBinding(SimpleString queueName);
|
Binding getBinding(SimpleString queueName);
|
||||||
|
|
||||||
Map<SimpleString, Binding> getBindings();
|
Map<SimpleString, Binding> getBindings();
|
||||||
|
|
||||||
|
Set<SimpleString> getAddresses();
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.activemq.core.postoffice;
|
package org.apache.activemq.core.postoffice;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.activemq.api.core.Pair;
|
import org.apache.activemq.api.core.Pair;
|
||||||
import org.apache.activemq.api.core.SimpleString;
|
import org.apache.activemq.api.core.SimpleString;
|
||||||
|
@ -95,4 +96,6 @@ public interface PostOffice extends ActiveMQComponent
|
||||||
|
|
||||||
boolean isAddressBound(final SimpleString address) throws Exception;
|
boolean isAddressBound(final SimpleString address) throws Exception;
|
||||||
|
|
||||||
|
Set<SimpleString> getAddresses();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -861,6 +861,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
return notificationLock;
|
return notificationLock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Set<SimpleString> getAddresses()
|
||||||
|
{
|
||||||
|
return addressManager.getAddresses();
|
||||||
|
}
|
||||||
|
|
||||||
public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
|
public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
|
||||||
{
|
{
|
||||||
// We send direct to the queue so we can send it to the same queue that is bound to the notifications address -
|
// We send direct to the queue so we can send it to the same queue that is bound to the notifications address -
|
||||||
|
|
|
@ -16,7 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.core.postoffice.impl;
|
package org.apache.activemq.core.postoffice.impl;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
|
@ -151,6 +153,15 @@ public class SimpleAddressManager implements AddressManager
|
||||||
mappings.clear();
|
mappings.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<SimpleString> getAddresses()
|
||||||
|
{
|
||||||
|
Set<SimpleString> addresses = new HashSet<>();
|
||||||
|
addresses.addAll(mappings.keySet());
|
||||||
|
return addresses;
|
||||||
|
}
|
||||||
|
|
||||||
protected void removeBindingInternal(final SimpleString address, final SimpleString bindableName)
|
protected void removeBindingInternal(final SimpleString address, final SimpleString bindableName)
|
||||||
{
|
{
|
||||||
Bindings bindings = mappings.get(address);
|
Bindings bindings = mappings.get(address);
|
||||||
|
|
|
@ -176,6 +176,10 @@ public interface Queue extends Bindable
|
||||||
|
|
||||||
boolean checkRedelivery(MessageReference ref, long timeBase, boolean ignoreRedeliveryDelay) throws Exception;
|
boolean checkRedelivery(MessageReference ref, long timeBase, boolean ignoreRedeliveryDelay) throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* It will iterate thorugh memory only (not paging)
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
LinkedListIterator<MessageReference> iterator();
|
LinkedListIterator<MessageReference> iterator();
|
||||||
|
|
||||||
LinkedListIterator<MessageReference> totalIterator();
|
LinkedListIterator<MessageReference> totalIterator();
|
||||||
|
@ -228,7 +232,10 @@ public interface Queue extends Bindable
|
||||||
|
|
||||||
void incrementMesssagesAdded();
|
void incrementMesssagesAdded();
|
||||||
|
|
||||||
List<MessageReference> cancelScheduledMessages();
|
/**
|
||||||
|
* cancels scheduled messages and send them to the head of the queue.
|
||||||
|
*/
|
||||||
|
void deliverScheduledMessages();
|
||||||
|
|
||||||
void postAcknowledge(MessageReference ref);
|
void postAcknowledge(MessageReference ref);
|
||||||
|
|
||||||
|
|
|
@ -90,7 +90,7 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader
|
||||||
@Override
|
@Override
|
||||||
public void postLoad(Journal messageJournal, ResourceManager resourceManager, Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
|
public void postLoad(Journal messageJournal, ResourceManager resourceManager, Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
|
||||||
{
|
{
|
||||||
ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController);
|
ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer.getStorageManager());
|
||||||
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
|
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
|
||||||
|
|
||||||
try (ClientSessionFactory sessionFactory = locator.createSessionFactory())
|
try (ClientSessionFactory sessionFactory = locator.createSessionFactory())
|
||||||
|
|
|
@ -187,7 +187,8 @@ public class LiveOnlyActivation extends Activation
|
||||||
ScaleDownHandler scaleDownHandler = new ScaleDownHandler(activeMQServer.getPagingManager(),
|
ScaleDownHandler scaleDownHandler = new ScaleDownHandler(activeMQServer.getPagingManager(),
|
||||||
activeMQServer.getPostOffice(),
|
activeMQServer.getPostOffice(),
|
||||||
activeMQServer.getNodeManager(),
|
activeMQServer.getNodeManager(),
|
||||||
activeMQServer.getClusterManager().getClusterController());
|
activeMQServer.getClusterManager().getClusterController(),
|
||||||
|
activeMQServer.getStorageManager());
|
||||||
ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = ((PostOfficeImpl) activeMQServer.getPostOffice()).getDuplicateIDCaches();
|
ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = ((PostOfficeImpl) activeMQServer.getPostOffice()).getDuplicateIDCaches();
|
||||||
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<>();
|
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<>();
|
||||||
for (SimpleString address : duplicateIDCaches.keySet())
|
for (SimpleString address : duplicateIDCaches.keySet())
|
||||||
|
|
|
@ -1220,9 +1220,18 @@ public class QueueImpl implements Queue
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<MessageReference> cancelScheduledMessages()
|
public void deliverScheduledMessages()
|
||||||
{
|
{
|
||||||
return scheduledDeliveryHandler.cancel(null);
|
List<MessageReference> scheduledMessages = scheduledDeliveryHandler.cancel(null);
|
||||||
|
if (scheduledMessages != null && scheduledMessages.size() > 0)
|
||||||
|
{
|
||||||
|
for (MessageReference ref : scheduledMessages)
|
||||||
|
{
|
||||||
|
ref.getMessage().putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, ref.getScheduledDeliveryTime());
|
||||||
|
ref.setScheduledDeliveryTime(0);
|
||||||
|
}
|
||||||
|
this.addHead(scheduledMessages);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getMessagesAdded()
|
public long getMessagesAdded()
|
||||||
|
@ -3105,6 +3114,8 @@ public class QueueImpl implements Queue
|
||||||
Iterator<MessageReference> interIterator = null;
|
Iterator<MessageReference> interIterator = null;
|
||||||
LinkedListIterator<MessageReference> messagesIterator = null;
|
LinkedListIterator<MessageReference> messagesIterator = null;
|
||||||
|
|
||||||
|
Iterator lastIterator = null;
|
||||||
|
|
||||||
public TotalQueueIterator()
|
public TotalQueueIterator()
|
||||||
{
|
{
|
||||||
if (pageSubscription != null)
|
if (pageSubscription != null)
|
||||||
|
@ -3118,18 +3129,21 @@ public class QueueImpl implements Queue
|
||||||
@Override
|
@Override
|
||||||
public boolean hasNext()
|
public boolean hasNext()
|
||||||
{
|
{
|
||||||
if (messagesIterator.hasNext())
|
if (messagesIterator != null && messagesIterator.hasNext())
|
||||||
{
|
{
|
||||||
|
lastIterator = messagesIterator;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if (interIterator.hasNext())
|
if (interIterator.hasNext())
|
||||||
{
|
{
|
||||||
|
lastIterator = interIterator;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if (pageIter != null)
|
if (pageIter != null)
|
||||||
{
|
{
|
||||||
if (pageIter.hasNext())
|
if (pageIter.hasNext())
|
||||||
{
|
{
|
||||||
|
lastIterator = pageIter;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3140,18 +3154,21 @@ public class QueueImpl implements Queue
|
||||||
@Override
|
@Override
|
||||||
public MessageReference next()
|
public MessageReference next()
|
||||||
{
|
{
|
||||||
if (messagesIterator.hasNext())
|
if (messagesIterator != null && messagesIterator.hasNext())
|
||||||
{
|
{
|
||||||
return messagesIterator.next();
|
MessageReference msg = messagesIterator.next();
|
||||||
|
return msg;
|
||||||
}
|
}
|
||||||
if (interIterator.hasNext())
|
if (interIterator.hasNext())
|
||||||
{
|
{
|
||||||
|
lastIterator = interIterator;
|
||||||
return interIterator.next();
|
return interIterator.next();
|
||||||
}
|
}
|
||||||
if (pageIter != null)
|
if (pageIter != null)
|
||||||
{
|
{
|
||||||
if (pageIter.hasNext())
|
if (pageIter.hasNext())
|
||||||
{
|
{
|
||||||
|
lastIterator = pageIter;
|
||||||
return pageIter.next();
|
return pageIter.next();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3162,6 +3179,10 @@ public class QueueImpl implements Queue
|
||||||
@Override
|
@Override
|
||||||
public void remove()
|
public void remove()
|
||||||
{
|
{
|
||||||
|
if (lastIterator != null)
|
||||||
|
{
|
||||||
|
lastIterator.remove();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -3172,8 +3193,14 @@ public class QueueImpl implements Queue
|
||||||
@Override
|
@Override
|
||||||
public void close()
|
public void close()
|
||||||
{
|
{
|
||||||
if (pageIter != null) pageIter.close();
|
if (pageIter != null)
|
||||||
messagesIterator.close();
|
{
|
||||||
|
pageIter.close();
|
||||||
|
}
|
||||||
|
if (messagesIterator != null)
|
||||||
|
{
|
||||||
|
messagesIterator.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,11 +20,13 @@ import javax.transaction.xa.XAResource;
|
||||||
import javax.transaction.xa.Xid;
|
import javax.transaction.xa.Xid;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
import org.apache.activemq.api.core.Message;
|
import org.apache.activemq.api.core.Message;
|
||||||
import org.apache.activemq.api.core.Pair;
|
import org.apache.activemq.api.core.Pair;
|
||||||
|
@ -42,7 +44,9 @@ import org.apache.activemq.core.paging.PagingManager;
|
||||||
import org.apache.activemq.core.paging.PagingStore;
|
import org.apache.activemq.core.paging.PagingStore;
|
||||||
import org.apache.activemq.core.paging.cursor.PageSubscription;
|
import org.apache.activemq.core.paging.cursor.PageSubscription;
|
||||||
import org.apache.activemq.core.paging.cursor.PagedReference;
|
import org.apache.activemq.core.paging.cursor.PagedReference;
|
||||||
|
import org.apache.activemq.core.persistence.StorageManager;
|
||||||
import org.apache.activemq.core.postoffice.Binding;
|
import org.apache.activemq.core.postoffice.Binding;
|
||||||
|
import org.apache.activemq.core.postoffice.Bindings;
|
||||||
import org.apache.activemq.core.postoffice.PostOffice;
|
import org.apache.activemq.core.postoffice.PostOffice;
|
||||||
import org.apache.activemq.core.postoffice.impl.LocalQueueBinding;
|
import org.apache.activemq.core.postoffice.impl.LocalQueueBinding;
|
||||||
import org.apache.activemq.core.postoffice.impl.PostOfficeImpl;
|
import org.apache.activemq.core.postoffice.impl.PostOfficeImpl;
|
||||||
|
@ -56,6 +60,7 @@ import org.apache.activemq.core.server.cluster.ClusterController;
|
||||||
import org.apache.activemq.core.transaction.ResourceManager;
|
import org.apache.activemq.core.transaction.ResourceManager;
|
||||||
import org.apache.activemq.core.transaction.Transaction;
|
import org.apache.activemq.core.transaction.Transaction;
|
||||||
import org.apache.activemq.core.transaction.TransactionOperation;
|
import org.apache.activemq.core.transaction.TransactionOperation;
|
||||||
|
import org.apache.activemq.core.transaction.impl.TransactionImpl;
|
||||||
import org.apache.activemq.utils.LinkedListIterator;
|
import org.apache.activemq.utils.LinkedListIterator;
|
||||||
|
|
||||||
public class ScaleDownHandler
|
public class ScaleDownHandler
|
||||||
|
@ -64,20 +69,22 @@ public class ScaleDownHandler
|
||||||
final PostOffice postOffice;
|
final PostOffice postOffice;
|
||||||
private NodeManager nodeManager;
|
private NodeManager nodeManager;
|
||||||
private final ClusterController clusterController;
|
private final ClusterController clusterController;
|
||||||
|
private final StorageManager storageManager;
|
||||||
private String targetNodeId;
|
private String targetNodeId;
|
||||||
|
|
||||||
public ScaleDownHandler(PagingManager pagingManager, PostOffice postOffice, NodeManager nodeManager, ClusterController clusterController)
|
public ScaleDownHandler(PagingManager pagingManager, PostOffice postOffice, NodeManager nodeManager, ClusterController clusterController, StorageManager storageManager)
|
||||||
{
|
{
|
||||||
this.pagingManager = pagingManager;
|
this.pagingManager = pagingManager;
|
||||||
this.postOffice = postOffice;
|
this.postOffice = postOffice;
|
||||||
this.nodeManager = nodeManager;
|
this.nodeManager = nodeManager;
|
||||||
this.clusterController = clusterController;
|
this.clusterController = clusterController;
|
||||||
|
this.storageManager = storageManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long scaleDown(ClientSessionFactory sessionFactory,
|
public long scaleDown(ClientSessionFactory sessionFactory,
|
||||||
ResourceManager resourceManager,
|
ResourceManager resourceManager,
|
||||||
Map<SimpleString,
|
Map<SimpleString,
|
||||||
List<Pair<byte[], Long>>> duplicateIDMap,
|
List<Pair<byte[], Long>>> duplicateIDMap,
|
||||||
SimpleString managementAddress,
|
SimpleString managementAddress,
|
||||||
SimpleString targetNodeId) throws Exception
|
SimpleString targetNodeId) throws Exception
|
||||||
{
|
{
|
||||||
|
@ -91,218 +98,239 @@ public class ScaleDownHandler
|
||||||
return num;
|
return num;
|
||||||
}
|
}
|
||||||
|
|
||||||
private long scaleDownMessages(ClientSessionFactory sessionFactory, SimpleString nodeId) throws Exception
|
public long scaleDownMessages(ClientSessionFactory sessionFactory, SimpleString nodeId) throws Exception
|
||||||
{
|
{
|
||||||
long messageCount = 0;
|
long messageCount = 0;
|
||||||
targetNodeId = nodeId != null ? nodeId.toString() : getTargetNodeId(sessionFactory);
|
targetNodeId = nodeId != null ? nodeId.toString() : getTargetNodeId(sessionFactory);
|
||||||
|
|
||||||
ClientSession session = sessionFactory.createSession(false, true, true);
|
try (ClientSession session = sessionFactory.createSession(false, true, true))
|
||||||
Map<String, Long> queueIDs = new HashMap<>();
|
|
||||||
ClientProducer producer = session.createProducer();
|
|
||||||
|
|
||||||
List<SimpleString> addresses = new ArrayList<>();
|
|
||||||
for (Map.Entry<SimpleString, Binding> entry : postOffice.getAllBindings().entrySet())
|
|
||||||
{
|
{
|
||||||
if (entry.getValue() instanceof LocalQueueBinding)
|
ClientProducer producer = session.createProducer();
|
||||||
{
|
|
||||||
SimpleString address = entry.getValue().getAddress();
|
|
||||||
|
|
||||||
// There is a special case involving store-and-forward queues used for clustering.
|
// perform a loop per address
|
||||||
// If this queue is supposed to forward messages to the server that I'm scaling down to I need to handle these messages differently.
|
for (SimpleString address : postOffice.getAddresses())
|
||||||
boolean storeAndForward = false;
|
{
|
||||||
if (address.toString().startsWith("sf."))
|
ActiveMQServerLogger.LOGGER.debug("Scaling down address " + address);
|
||||||
|
Bindings bindings = postOffice.getBindingsForAddress(address);
|
||||||
|
|
||||||
|
// It will get a list of queues on this address, ordered by the number of messages
|
||||||
|
Set<Queue> queues = new TreeSet<>(new OrderQueueByNumberOfReferencesComparator());
|
||||||
|
for (Binding binding : bindings.getBindings())
|
||||||
{
|
{
|
||||||
// these get special treatment later
|
if (binding instanceof LocalQueueBinding)
|
||||||
storeAndForward = true;
|
{
|
||||||
|
Queue queue = ((LocalQueueBinding) binding).getQueue();
|
||||||
|
// as part of scale down we will cancel any scheduled message and pass it to theWhile we scan for the queues we will also cancel any scheduled messages and deliver them right away
|
||||||
|
queue.deliverScheduledMessages();
|
||||||
|
queues.add(queue);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// this means we haven't inspected this address before
|
|
||||||
if (!addresses.contains(address))
|
if (address.toString().startsWith("sf."))
|
||||||
{
|
{
|
||||||
addresses.add(address);
|
messageCount += scaleDownSNF(address, queues, producer);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
messageCount += scaleDownRegularMessages(address, queues, session, producer);
|
||||||
|
}
|
||||||
|
|
||||||
PagingStore store = pagingManager.getPageStore(address);
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// compile a list of all the relevant queues and queue iterators for this address
|
return messageCount;
|
||||||
List<Queue> queues = new ArrayList<>();
|
}
|
||||||
Map<SimpleString, LinkedListIterator<MessageReference>> queueIterators = new HashMap<>();
|
|
||||||
for (Binding binding : postOffice.getBindingsForAddress(address).getBindings())
|
public long scaleDownRegularMessages(final SimpleString address, final Set<Queue> queues, final ClientSession clientSession, final ClientProducer producer) throws Exception
|
||||||
|
{
|
||||||
|
ActiveMQServerLogger.LOGGER.debug("Scaling down messages on address " + address);
|
||||||
|
long messageCount = 0;
|
||||||
|
|
||||||
|
final HashMap<Queue, QueuesXRefInnerManager> controls = new HashMap<Queue, QueuesXRefInnerManager>();
|
||||||
|
|
||||||
|
PagingStore pageStore = pagingManager.getPageStore(address);
|
||||||
|
|
||||||
|
Transaction tx = new TransactionImpl(storageManager);
|
||||||
|
|
||||||
|
pageStore.disableCleanup();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
|
||||||
|
for (Queue queue : queues)
|
||||||
|
{
|
||||||
|
controls.put(queue, new QueuesXRefInnerManager(clientSession, queue, pageStore));
|
||||||
|
}
|
||||||
|
|
||||||
|
// compile a list of all the relevant queues and queue iterators for this address
|
||||||
|
for (Queue loopQueue : queues)
|
||||||
|
{
|
||||||
|
ActiveMQServerLogger.LOGGER.debug("Scaling down messages on address " + address + " / performing loop on queue " + loopQueue);
|
||||||
|
|
||||||
|
try (LinkedListIterator<MessageReference> messagesIterator = loopQueue.totalIterator())
|
||||||
|
{
|
||||||
|
|
||||||
|
while (messagesIterator.hasNext())
|
||||||
{
|
{
|
||||||
if (binding instanceof LocalQueueBinding)
|
MessageReference messageReference = messagesIterator.next();
|
||||||
|
Message message = messageReference.getMessage().copy();
|
||||||
|
|
||||||
|
ActiveMQServerLogger.LOGGER.debug("Reading message " + message + " from queue " + loopQueue);
|
||||||
|
Set<QueuesXRefInnerManager> queuesFound = new HashSet<>();
|
||||||
|
|
||||||
|
for (Map.Entry<Queue, QueuesXRefInnerManager> controlEntry : controls.entrySet())
|
||||||
{
|
{
|
||||||
Queue queue = ((LocalQueueBinding) binding).getQueue();
|
if (controlEntry.getKey() == loopQueue)
|
||||||
//remove the scheduled messages and reset on the actual message ready for sending
|
|
||||||
//we may set the time multiple times on a message but it will always be the same.
|
|
||||||
//set the ref scheduled time to 0 so it is in the queue ready for resending
|
|
||||||
List<MessageReference> messageReferences = queue.cancelScheduledMessages();
|
|
||||||
for (MessageReference ref : messageReferences)
|
|
||||||
{
|
{
|
||||||
ref.getMessage().putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, ref.getScheduledDeliveryTime());
|
// no need to lookup on itself, we just add it
|
||||||
ref.setScheduledDeliveryTime(0);
|
queuesFound.add(controlEntry.getValue());
|
||||||
}
|
}
|
||||||
queue.addHead(messageReferences);
|
else if (controlEntry.getValue().lookup(messageReference))
|
||||||
queues.add(queue);
|
{
|
||||||
queueIterators.put(queue.getName(), queue.totalIterator());
|
ActiveMQServerLogger.LOGGER.debug("Message existed on queue " + controlEntry.getKey().getID() + " removeID=" + controlEntry.getValue().getQueueID());
|
||||||
|
queuesFound.add(controlEntry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// get the ID for every queue that contains the message
|
||||||
|
ByteBuffer buffer = ByteBuffer.allocate(queuesFound.size() * 8);
|
||||||
|
|
||||||
|
|
||||||
|
for (QueuesXRefInnerManager control : queuesFound)
|
||||||
|
{
|
||||||
|
long queueID = control.getQueueID();
|
||||||
|
buffer.putLong(queueID);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
|
||||||
|
|
||||||
|
if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
|
||||||
|
{
|
||||||
|
if (messageReference.isPaged())
|
||||||
|
{
|
||||||
|
ActiveMQServerLogger.LOGGER.debug("*********************<<<<< Scaling down pdgmessage " + message);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ActiveMQServerLogger.LOGGER.debug("*********************<<<<< Scaling down message " + message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
producer.send(address, message);
|
||||||
|
messageCount++;
|
||||||
|
|
||||||
|
messagesIterator.remove();
|
||||||
|
|
||||||
|
// We need to perform the ack / removal after sending, otherwise the message could been removed before the send is finished
|
||||||
|
for (QueuesXRefInnerManager queueFound : queuesFound)
|
||||||
|
{
|
||||||
|
ackMessageOnQueue(tx, queueFound.getQueue(), messageReference);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.commit();
|
||||||
|
|
||||||
|
|
||||||
|
for (QueuesXRefInnerManager controlRemoved : controls.values())
|
||||||
|
{
|
||||||
|
controlRemoved.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
return messageCount;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
pageStore.enableCleanup();
|
||||||
|
pageStore.getCursorProvider().scheduleCleanup();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private long scaleDownSNF(final SimpleString address, final Set<Queue> queues, final ClientProducer producer) throws Exception
|
||||||
|
{
|
||||||
|
long messageCount = 0;
|
||||||
|
|
||||||
|
final String propertyEnd;
|
||||||
|
|
||||||
|
// If this SNF is towards our targetNodeId
|
||||||
|
boolean queueOnTarget = address.toString().endsWith(targetNodeId);
|
||||||
|
|
||||||
|
if (queueOnTarget)
|
||||||
|
{
|
||||||
|
propertyEnd = targetNodeId;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
propertyEnd = address.toString().substring(address.toString().lastIndexOf("."));
|
||||||
|
}
|
||||||
|
|
||||||
|
Transaction tx = new TransactionImpl(storageManager);
|
||||||
|
|
||||||
|
for (Queue queue : queues)
|
||||||
|
{
|
||||||
|
// using auto-closeable
|
||||||
|
try (LinkedListIterator<MessageReference> messagesIterator = queue.totalIterator())
|
||||||
|
{
|
||||||
|
// loop through every message of this queue
|
||||||
|
while (messagesIterator.hasNext())
|
||||||
|
{
|
||||||
|
MessageReference messageRef = messagesIterator.next();
|
||||||
|
Message message = messageRef.getMessage().copy();
|
||||||
|
|
||||||
|
/* Here we are taking messages out of a store-and-forward queue and sending them to the corresponding
|
||||||
|
* address on the scale-down target server. However, we have to take the existing _HQ_ROUTE_TOsf.*
|
||||||
|
* property and put its value into the _HQ_ROUTE_TO property so the message is routed properly.
|
||||||
|
*/
|
||||||
|
|
||||||
|
byte[] oldRouteToIDs = null;
|
||||||
|
|
||||||
|
List<SimpleString> propertiesToRemove = new ArrayList<>();
|
||||||
|
message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
|
||||||
|
for (SimpleString propName : message.getPropertyNames())
|
||||||
|
{
|
||||||
|
if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
|
||||||
|
{
|
||||||
|
if (propName.toString().endsWith(propertyEnd))
|
||||||
|
{
|
||||||
|
oldRouteToIDs = message.getBytesProperty(propName);
|
||||||
|
}
|
||||||
|
propertiesToRemove.add(propName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// sort into descending order - order is based on the number of references in the queue
|
// TODO: what if oldRouteToIDs == null ??
|
||||||
Collections.sort(queues, new OrderQueueByNumberOfReferencesComparator());
|
|
||||||
|
|
||||||
// loop through every queue on this address
|
for (SimpleString propertyToRemove : propertiesToRemove)
|
||||||
List<SimpleString> checkedQueues = new ArrayList<>();
|
|
||||||
for (Queue bigLoopQueue : queues)
|
|
||||||
{
|
{
|
||||||
checkedQueues.add(bigLoopQueue.getName());
|
message.removeProperty(propertyToRemove);
|
||||||
|
|
||||||
LinkedListIterator<MessageReference> bigLoopMessageIterator = bigLoopQueue.totalIterator();
|
|
||||||
try
|
|
||||||
{
|
|
||||||
// loop through every message of this queue
|
|
||||||
while (bigLoopMessageIterator.hasNext())
|
|
||||||
{
|
|
||||||
MessageReference bigLoopRef = bigLoopMessageIterator.next();
|
|
||||||
Message message = bigLoopRef.getMessage().copy();
|
|
||||||
|
|
||||||
if (storeAndForward)
|
|
||||||
{
|
|
||||||
if (address.toString().endsWith(targetNodeId))
|
|
||||||
{
|
|
||||||
/* Here we are taking messages out of a store-and-forward queue and sending them to the corresponding
|
|
||||||
* address on the scale-down target server. However, we have to take the existing _HQ_ROUTE_TOsf.*
|
|
||||||
* property and put its value into the _HQ_ROUTE_TO property so the message is routed properly.
|
|
||||||
*/
|
|
||||||
|
|
||||||
byte[] oldRouteToIDs = null;
|
|
||||||
|
|
||||||
List<SimpleString> propertiesToRemove = new ArrayList<>();
|
|
||||||
message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
|
|
||||||
for (SimpleString propName : message.getPropertyNames())
|
|
||||||
{
|
|
||||||
if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
|
|
||||||
{
|
|
||||||
if (propName.toString().endsWith(targetNodeId))
|
|
||||||
{
|
|
||||||
oldRouteToIDs = message.getBytesProperty(propName);
|
|
||||||
}
|
|
||||||
propertiesToRemove.add(propName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (SimpleString propertyToRemove : propertiesToRemove)
|
|
||||||
{
|
|
||||||
message.removeProperty(propertyToRemove);
|
|
||||||
}
|
|
||||||
|
|
||||||
message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, oldRouteToIDs);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* Here we are taking messages out of a store-and-forward queue and sending them to the corresponding
|
|
||||||
* store-and-forward address on the scale-down target server. In this case we use a special property
|
|
||||||
* for the queue ID so that the scale-down target server can route it appropriately.
|
|
||||||
*/
|
|
||||||
byte[] oldRouteToIDs = null;
|
|
||||||
|
|
||||||
List<SimpleString> propertiesToRemove = new ArrayList<>();
|
|
||||||
message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
|
|
||||||
for (SimpleString propName : message.getPropertyNames())
|
|
||||||
{
|
|
||||||
if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
|
|
||||||
{
|
|
||||||
if (propName.toString().endsWith(address.toString().substring(address.toString().lastIndexOf("."))))
|
|
||||||
{
|
|
||||||
oldRouteToIDs = message.getBytesProperty(propName);
|
|
||||||
}
|
|
||||||
propertiesToRemove.add(propName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (SimpleString propertyToRemove : propertiesToRemove)
|
|
||||||
{
|
|
||||||
message.removeProperty(propertyToRemove);
|
|
||||||
}
|
|
||||||
|
|
||||||
message.putBytesProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS, oldRouteToIDs);
|
|
||||||
}
|
|
||||||
|
|
||||||
ActiveMQServerLogger.LOGGER.debug("Scaling down message " + message + " from " + address + " to " + message.getAddress() + " on node " + targetNodeId);
|
|
||||||
producer.send(message.getAddress(), message);
|
|
||||||
messageCount++;
|
|
||||||
bigLoopQueue.deleteReference(message.getMessageID());
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
List<Queue> queuesWithMessage = new ArrayList<>();
|
|
||||||
queuesWithMessage.add(bigLoopQueue);
|
|
||||||
long messageId = message.getMessageID();
|
|
||||||
|
|
||||||
getQueuesWithMessage(store, queues, queueIterators, checkedQueues, bigLoopQueue, queuesWithMessage, bigLoopRef, messageId);
|
|
||||||
|
|
||||||
// get the ID for every queue that contains the message
|
|
||||||
ByteBuffer buffer = ByteBuffer.allocate(queuesWithMessage.size() * 8);
|
|
||||||
StringBuilder logMessage = new StringBuilder();
|
|
||||||
logMessage.append("Scaling down message ").append(messageId).append(" to ");
|
|
||||||
for (Queue queue : queuesWithMessage)
|
|
||||||
{
|
|
||||||
long queueID;
|
|
||||||
String queueName = queue.getName().toString();
|
|
||||||
|
|
||||||
if (queueIDs.containsKey(queueName))
|
|
||||||
{
|
|
||||||
queueID = queueIDs.get(queueName);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
queueID = createQueueIfNecessaryAndGetID(session, queue, address);
|
|
||||||
queueIDs.put(queueName, queueID); // store it so we don't have to look it up every time
|
|
||||||
}
|
|
||||||
|
|
||||||
logMessage.append(queueName).append("(").append(queueID).append(")").append(", ");
|
|
||||||
buffer.putLong(queueID);
|
|
||||||
}
|
|
||||||
|
|
||||||
logMessage.delete(logMessage.length() - 2, logMessage.length()); // trim off the trailing comma and space
|
|
||||||
ActiveMQServerLogger.LOGGER.debug(logMessage.append(" on address ").append(address));
|
|
||||||
|
|
||||||
message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
|
|
||||||
//we need this incase we are sending back to the source server of the message, this basically
|
|
||||||
//acts like the bridge and ignores dup detection
|
|
||||||
if (message.containsProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID))
|
|
||||||
{
|
|
||||||
byte[] bytes = new byte[24];
|
|
||||||
|
|
||||||
ByteBuffer bb = ByteBuffer.wrap(bytes);
|
|
||||||
bb.put(nodeManager.getUUID().asBytes());
|
|
||||||
bb.putLong(messageId);
|
|
||||||
|
|
||||||
message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bb.array());
|
|
||||||
}
|
|
||||||
|
|
||||||
producer.send(address, message);
|
|
||||||
messageCount++;
|
|
||||||
|
|
||||||
// delete the reference from all queues which contain it
|
|
||||||
bigLoopQueue.deleteReference(messageId);
|
|
||||||
for (Queue queue : queuesWithMessage)
|
|
||||||
{
|
|
||||||
queue.deleteReference(messageId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
bigLoopMessageIterator.close();
|
|
||||||
queueIterators.get(bigLoopQueue.getName()).close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (queueOnTarget)
|
||||||
|
{
|
||||||
|
message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, oldRouteToIDs);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
message.putBytesProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS, oldRouteToIDs);
|
||||||
|
}
|
||||||
|
|
||||||
|
ActiveMQServerLogger.LOGGER.debug("Scaling down message " + message + " from " + address + " to " + message.getAddress() + " on node " + targetNodeId);
|
||||||
|
producer.send(message.getAddress(), message);
|
||||||
|
|
||||||
|
messageCount++;
|
||||||
|
|
||||||
|
messagesIterator.remove();
|
||||||
|
|
||||||
|
ackMessageOnQueue(tx, queue, messageRef);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
producer.close();
|
tx.commit();
|
||||||
session.close();
|
|
||||||
|
|
||||||
return messageCount;
|
return messageCount;
|
||||||
}
|
}
|
||||||
|
@ -324,6 +352,8 @@ public class ScaleDownHandler
|
||||||
Transaction transaction = resourceManager.getTransaction(xid);
|
Transaction transaction = resourceManager.getTransaction(xid);
|
||||||
session.start(xid, XAResource.TMNOFLAGS);
|
session.start(xid, XAResource.TMNOFLAGS);
|
||||||
List<TransactionOperation> allOperations = transaction.getAllOperations();
|
List<TransactionOperation> allOperations = transaction.getAllOperations();
|
||||||
|
|
||||||
|
// Get the information of the Prepared TXs so it could replay the TXs
|
||||||
Map<ServerMessage, Pair<List<Long>, List<Long>>> queuesToSendTo = new HashMap<>();
|
Map<ServerMessage, Pair<List<Long>, List<Long>>> queuesToSendTo = new HashMap<>();
|
||||||
for (TransactionOperation operation : allOperations)
|
for (TransactionOperation operation : allOperations)
|
||||||
{
|
{
|
||||||
|
@ -387,6 +417,7 @@ public class ScaleDownHandler
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ClientProducer producer = session.createProducer();
|
ClientProducer producer = session.createProducer();
|
||||||
for (Map.Entry<ServerMessage, Pair<List<Long>, List<Long>>> entry : queuesToSendTo.entrySet())
|
for (Map.Entry<ServerMessage, Pair<List<Long>, List<Long>>> entry : queuesToSendTo.entrySet())
|
||||||
{
|
{
|
||||||
|
@ -436,78 +467,6 @@ public class ScaleDownHandler
|
||||||
}
|
}
|
||||||
session.close();
|
session.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Loop through every *other* queue on this address to see if it also contains this message.
|
|
||||||
* Skip queues with filters that don't match as matching messages will never be in there.
|
|
||||||
* Also skip queues that we've already checked in the "big" loop.
|
|
||||||
*/
|
|
||||||
private void getQueuesWithMessage(PagingStore store, List<Queue> queues, Map<SimpleString, LinkedListIterator<MessageReference>> queueIterators, List<SimpleString> checkedQueues, Queue bigLoopQueue, List<Queue> queuesWithMessage, MessageReference bigLoopRef, long messageId) throws Exception
|
|
||||||
{
|
|
||||||
for (Queue queue : queues)
|
|
||||||
{
|
|
||||||
if (!checkedQueues.contains(queue.getName()) &&
|
|
||||||
((queue.getFilter() == null &&
|
|
||||||
bigLoopQueue.getFilter() == null) ||
|
|
||||||
(queue.getFilter() != null &&
|
|
||||||
queue.getFilter().equals(bigLoopQueue.getFilter()))))
|
|
||||||
{
|
|
||||||
// an optimization for paged messages, eliminates the need to (potentially) scan the whole queue
|
|
||||||
if (bigLoopRef.isPaged())
|
|
||||||
{
|
|
||||||
PageSubscription subscription = store.getCursorProvider().getSubscription(queue.getID());
|
|
||||||
if (subscription.contains((PagedReference) bigLoopRef))
|
|
||||||
{
|
|
||||||
queuesWithMessage.add(queue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LinkedListIterator<MessageReference> queueIterator = queueIterators.get(queue.getName());
|
|
||||||
boolean first = true;
|
|
||||||
long initialMessageID = 0;
|
|
||||||
while (queueIterator.hasNext())
|
|
||||||
{
|
|
||||||
Message m = queueIterator.next().getMessage();
|
|
||||||
if (first)
|
|
||||||
{
|
|
||||||
initialMessageID = m.getMessageID();
|
|
||||||
first = false;
|
|
||||||
}
|
|
||||||
if (m.getMessageID() == messageId)
|
|
||||||
{
|
|
||||||
queuesWithMessage.add(queue);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* if we've reached the end then reset the iterator and go through again until we
|
|
||||||
* get back to the place where we started
|
|
||||||
*/
|
|
||||||
if (!queueIterator.hasNext())
|
|
||||||
{
|
|
||||||
queueIterator = queue.totalIterator();
|
|
||||||
queueIterators.put(queue.getName(), queueIterator);
|
|
||||||
while (queueIterator.hasNext())
|
|
||||||
{
|
|
||||||
Message m = queueIterator.next().getMessage();
|
|
||||||
if (m.getMessageID() == initialMessageID)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
else if (m.getMessageID() == messageId)
|
|
||||||
{
|
|
||||||
queuesWithMessage.add(queue);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the ID of the queues involved so the message can be routed properly. This is done because we cannot
|
* Get the ID of the queues involved so the message can be routed properly. This is done because we cannot
|
||||||
* send directly to a queue, we have to send to an address instead but not all the queues related to the
|
* send directly to a queue, we have to send to an address instead but not all the queues related to the
|
||||||
|
@ -557,11 +516,159 @@ public class ScaleDownHandler
|
||||||
|
|
||||||
if (queue1 == queue2) return EQUAL;
|
if (queue1 == queue2) return EQUAL;
|
||||||
|
|
||||||
if (queue1.getMessageCount() == queue2.getMessageCount()) return EQUAL;
|
if (queue1.getMessageCount() == queue2.getMessageCount())
|
||||||
|
{
|
||||||
|
// if it's the same count we will use the ID as a tie breaker:
|
||||||
|
|
||||||
|
long tieBreak = queue2.getID() - queue1.getID();
|
||||||
|
|
||||||
|
if (tieBreak > 0) return AFTER;
|
||||||
|
else if (tieBreak < 0) return BEFORE;
|
||||||
|
else return EQUAL; // EQUAL here shouldn't really happen... but lets do the check anyways
|
||||||
|
|
||||||
|
}
|
||||||
if (queue1.getMessageCount() > queue2.getMessageCount()) return BEFORE;
|
if (queue1.getMessageCount() > queue2.getMessageCount()) return BEFORE;
|
||||||
if (queue1.getMessageCount() < queue2.getMessageCount()) return AFTER;
|
if (queue1.getMessageCount() < queue2.getMessageCount()) return AFTER;
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void ackMessageOnQueue(Transaction tx, Queue queue, MessageReference messageRef) throws Exception
|
||||||
|
{
|
||||||
|
queue.acknowledge(tx, messageRef);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* this class will control iterations while
|
||||||
|
* looking over for messages relations
|
||||||
|
*/
|
||||||
|
private class QueuesXRefInnerManager
|
||||||
|
{
|
||||||
|
private final Queue queue;
|
||||||
|
private LinkedListIterator<MessageReference> memoryIterator;
|
||||||
|
private MessageReference lastRef = null;
|
||||||
|
private final PagingStore store;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ClientSession used for looking up and creating queues
|
||||||
|
*/
|
||||||
|
private final ClientSession clientSession;
|
||||||
|
|
||||||
|
private long targetQueueID = -1;
|
||||||
|
|
||||||
|
|
||||||
|
QueuesXRefInnerManager(final ClientSession clientSession, final Queue queue, final PagingStore store)
|
||||||
|
{
|
||||||
|
this.queue = queue;
|
||||||
|
this.store = store;
|
||||||
|
this.clientSession = clientSession;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Queue getQueue()
|
||||||
|
{
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getQueueID() throws Exception
|
||||||
|
{
|
||||||
|
|
||||||
|
if (targetQueueID < 0)
|
||||||
|
{
|
||||||
|
targetQueueID = createQueueIfNecessaryAndGetID(clientSession, queue, queue.getAddress());
|
||||||
|
}
|
||||||
|
return targetQueueID;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
if (memoryIterator != null)
|
||||||
|
{
|
||||||
|
memoryIterator.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean lookup(MessageReference reference) throws Exception
|
||||||
|
{
|
||||||
|
|
||||||
|
if (reference.isPaged())
|
||||||
|
{
|
||||||
|
PageSubscription subscription = store.getCursorProvider().getSubscription(queue.getID());
|
||||||
|
if (subscription.contains((PagedReference) reference))
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
|
||||||
|
if (lastRef != null && lastRef.getMessage().equals(reference.getMessage()))
|
||||||
|
{
|
||||||
|
lastRef = null;
|
||||||
|
memoryIterator.remove();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int numberOfScans = 2;
|
||||||
|
|
||||||
|
if (memoryIterator == null)
|
||||||
|
{
|
||||||
|
// If we have a brand new iterator, and we can't find something
|
||||||
|
numberOfScans = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
MessageReference initialRef = null;
|
||||||
|
for (int i = 0; i < numberOfScans; i++)
|
||||||
|
{
|
||||||
|
ActiveMQServerLogger.LOGGER.debug("iterating on queue " + queue + " while looking for reference " + reference);
|
||||||
|
memoryIterator = queue.iterator();
|
||||||
|
|
||||||
|
while (memoryIterator.hasNext())
|
||||||
|
{
|
||||||
|
lastRef = memoryIterator.next();
|
||||||
|
|
||||||
|
ActiveMQServerLogger.LOGGER.debug("Iterating on message " + lastRef);
|
||||||
|
|
||||||
|
if (lastRef.getMessage().equals(reference.getMessage()))
|
||||||
|
{
|
||||||
|
memoryIterator.remove();
|
||||||
|
lastRef = null;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (initialRef == null)
|
||||||
|
{
|
||||||
|
lastRef = initialRef;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (initialRef.equals(lastRef))
|
||||||
|
{
|
||||||
|
if (!memoryIterator.hasNext())
|
||||||
|
{
|
||||||
|
// if by coincidence we are at the end of the iterator, we just reset the iterator
|
||||||
|
lastRef = null;
|
||||||
|
memoryIterator.close();
|
||||||
|
memoryIterator = null;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we reached two iterations without finding anything.. we just go away by cleaning everything up
|
||||||
|
lastRef = null;
|
||||||
|
memoryIterator.close();
|
||||||
|
memoryIterator = null;
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1483,9 +1483,9 @@ public class ScheduledDeliveryHandlerTest extends Assert
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<MessageReference> cancelScheduledMessages()
|
public void deliverScheduledMessages()
|
||||||
{
|
{
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -972,7 +972,7 @@ public abstract class ServiceTestBase extends UnitTestCase
|
||||||
protected HashMap<Integer, AtomicInteger> countJournal(Configuration config) throws Exception
|
protected HashMap<Integer, AtomicInteger> countJournal(Configuration config) throws Exception
|
||||||
{
|
{
|
||||||
final HashMap<Integer, AtomicInteger> recordsType = new HashMap<Integer, AtomicInteger>();
|
final HashMap<Integer, AtomicInteger> recordsType = new HashMap<Integer, AtomicInteger>();
|
||||||
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(getJournalDir(), null);
|
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(config.getJournalDirectory(), null);
|
||||||
|
|
||||||
JournalImpl messagesJournal = new JournalImpl(config.getJournalFileSize(),
|
JournalImpl messagesJournal = new JournalImpl(config.getJournalFileSize(),
|
||||||
config.getJournalMinFiles(),
|
config.getJournalMinFiles(),
|
||||||
|
|
2
pom.xml
2
pom.xml
|
@ -414,7 +414,7 @@
|
||||||
<repositories>
|
<repositories>
|
||||||
<repository>
|
<repository>
|
||||||
<snapshots>
|
<snapshots>
|
||||||
<enabled>false</enabled>
|
<enabled>true</enabled>
|
||||||
<updatePolicy>never</updatePolicy>
|
<updatePolicy>never</updatePolicy>
|
||||||
</snapshots>
|
</snapshots>
|
||||||
<releases>
|
<releases>
|
||||||
|
|
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.tests.integration.client;
|
package org.apache.activemq.tests.integration.client;
|
||||||
import org.apache.activemq.api.core.ActiveMQException;
|
import org.apache.activemq.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.core.server.MessageReference;
|
|
||||||
import org.apache.activemq.core.server.ServerConsumer;
|
import org.apache.activemq.core.server.ServerConsumer;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -24,7 +23,6 @@ import org.junit.Test;
|
||||||
|
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
@ -274,9 +272,8 @@ public class HangConsumerTest extends ServiceTestBase
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<MessageReference> cancelScheduledMessages()
|
public void deliverScheduledMessages()
|
||||||
{
|
{
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,6 @@ import javax.transaction.xa.XAResource;
|
||||||
import javax.transaction.xa.Xid;
|
import javax.transaction.xa.Xid;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
@ -534,9 +533,8 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<MessageReference> cancelScheduledMessages()
|
public void deliverScheduledMessages()
|
||||||
{
|
{
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.activemq.core.postoffice.Bindings;
|
||||||
import org.apache.activemq.core.postoffice.PostOffice;
|
import org.apache.activemq.core.postoffice.PostOffice;
|
||||||
import org.apache.activemq.core.postoffice.QueueBinding;
|
import org.apache.activemq.core.postoffice.QueueBinding;
|
||||||
import org.apache.activemq.core.postoffice.impl.LocalQueueBinding;
|
import org.apache.activemq.core.postoffice.impl.LocalQueueBinding;
|
||||||
|
import org.apache.activemq.core.protocol.core.impl.CoreProtocolManagerFactory;
|
||||||
import org.apache.activemq.core.remoting.impl.netty.TransportConstants;
|
import org.apache.activemq.core.remoting.impl.netty.TransportConstants;
|
||||||
import org.apache.activemq.core.server.ActiveMQServer;
|
import org.apache.activemq.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.core.server.ActiveMQServers;
|
import org.apache.activemq.core.server.ActiveMQServers;
|
||||||
|
@ -1681,7 +1682,8 @@ public abstract class ClusterTestBase extends ServiceTestBase
|
||||||
.setThreadPoolMaxSize(10)
|
.setThreadPoolMaxSize(10)
|
||||||
.clearAcceptorConfigurations()
|
.clearAcceptorConfigurations()
|
||||||
.addAcceptorConfiguration(createTransportConfiguration(netty, true, generateParams(node, netty)))
|
.addAcceptorConfiguration(createTransportConfiguration(netty, true, generateParams(node, netty)))
|
||||||
.setHAPolicyConfiguration(haPolicyConfiguration);
|
.setHAPolicyConfiguration(haPolicyConfiguration)
|
||||||
|
.setResolveProtocols(false);
|
||||||
|
|
||||||
ActiveMQServer server;
|
ActiveMQServer server;
|
||||||
|
|
||||||
|
@ -1708,6 +1710,8 @@ public abstract class ClusterTestBase extends ServiceTestBase
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
server.addProtocolManagerFactory(new CoreProtocolManagerFactory());
|
||||||
|
|
||||||
server.setIdentity(this.getClass().getSimpleName() + "/Live(" + node + ")");
|
server.setIdentity(this.getClass().getSimpleName() + "/Live(" + node + ")");
|
||||||
servers[node] = server;
|
servers[node] = server;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,319 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.tests.integration.server;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.apache.activemq.api.core.Message;
|
||||||
|
import org.apache.activemq.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.api.core.client.ActiveMQClient;
|
||||||
|
import org.apache.activemq.api.core.client.ClientConsumer;
|
||||||
|
import org.apache.activemq.api.core.client.ClientMessage;
|
||||||
|
import org.apache.activemq.api.core.client.ClientProducer;
|
||||||
|
import org.apache.activemq.api.core.client.ClientSession;
|
||||||
|
import org.apache.activemq.api.core.client.ClientSessionFactory;
|
||||||
|
import org.apache.activemq.core.postoffice.impl.LocalQueueBinding;
|
||||||
|
import org.apache.activemq.core.server.impl.ScaleDownHandler;
|
||||||
|
import org.apache.activemq.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.tests.integration.cluster.distribution.ClusterTestBase;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* On this test we will run ScaleDown directly as an unit-test in several cases,
|
||||||
|
* simulating what would happen during a real scale down.
|
||||||
|
*
|
||||||
|
* @author clebertsuconic
|
||||||
|
*/
|
||||||
|
@RunWith(value = Parameterized.class)
|
||||||
|
public class ScaleDownDirectTest extends ClusterTestBase
|
||||||
|
{
|
||||||
|
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "isNetty={0}")
|
||||||
|
public static Collection getParameters()
|
||||||
|
{
|
||||||
|
return Arrays.asList(new Object[][]{
|
||||||
|
{false}, {true}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private final boolean isNetty;
|
||||||
|
|
||||||
|
public ScaleDownDirectTest(boolean isNetty)
|
||||||
|
{
|
||||||
|
this.isNetty = isNetty;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception
|
||||||
|
{
|
||||||
|
super.setUp();
|
||||||
|
setupLiveServer(0, isFileStorage(), isNetty, true);
|
||||||
|
setupLiveServer(1, isFileStorage(), isNetty, true);
|
||||||
|
startServers(0, 1);
|
||||||
|
setupSessionFactory(0, isNetty);
|
||||||
|
setupSessionFactory(1, isNetty);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception
|
||||||
|
{
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendMixedSmallMessages() throws Exception
|
||||||
|
{
|
||||||
|
internalTest(100, 100);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendMixedLargelMessages() throws Exception
|
||||||
|
{
|
||||||
|
internalTest(2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, 100);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void internalTest(int bufferSize, int numberOfMessages) throws Exception
|
||||||
|
{
|
||||||
|
ClientSessionFactory sf = sfs[0];
|
||||||
|
|
||||||
|
ClientSession session = sf.createSession(true, true);
|
||||||
|
|
||||||
|
session.createQueue("ad1", "queue1", true);
|
||||||
|
|
||||||
|
ClientProducer producer = session.createProducer("ad1");
|
||||||
|
|
||||||
|
byte[] buffer = new byte[bufferSize];
|
||||||
|
for (int i = 0; i < bufferSize; i++)
|
||||||
|
{
|
||||||
|
buffer[i] = getSamplebyte(i);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < numberOfMessages; i++)
|
||||||
|
{
|
||||||
|
ClientMessage message = session.createMessage(true);
|
||||||
|
message.putIntProperty("i", i);
|
||||||
|
message.getBodyBuffer().writeBytes(buffer);
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
session.createQueue("ad1", "queue2", true);
|
||||||
|
|
||||||
|
for (int i = numberOfMessages; i < (numberOfMessages * 2); i++)
|
||||||
|
{
|
||||||
|
ClientMessage message = session.createMessage(true);
|
||||||
|
message.putIntProperty("i", i);
|
||||||
|
message.getBodyBuffer().writeBytes(buffer);
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(numberOfMessages * 2, performScaledown());
|
||||||
|
|
||||||
|
sfs[0].close();
|
||||||
|
|
||||||
|
session.close();
|
||||||
|
|
||||||
|
stopServers(0);
|
||||||
|
|
||||||
|
session = sfs[1].createSession(true, true);
|
||||||
|
|
||||||
|
ClientConsumer consumer1 = session.createConsumer("queue1");
|
||||||
|
session.start();
|
||||||
|
|
||||||
|
|
||||||
|
for (int i = 0; i < numberOfMessages * 2; i++)
|
||||||
|
{
|
||||||
|
ClientMessage message = consumer1.receive(5000);
|
||||||
|
assertNotNull(message);
|
||||||
|
assertEquals(i, message.getIntProperty("i").intValue());
|
||||||
|
// message.acknowledge();
|
||||||
|
|
||||||
|
checkBody(message, bufferSize);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
ClientMessage messageCheckNull = consumer1.receiveImmediate();
|
||||||
|
|
||||||
|
assertNull(messageCheckNull);
|
||||||
|
|
||||||
|
ClientConsumer consumer2 = session.createConsumer("queue2");
|
||||||
|
for (int i = numberOfMessages; i < numberOfMessages * 2; i++)
|
||||||
|
{
|
||||||
|
ClientMessage message = consumer2.receive(5000);
|
||||||
|
assertNotNull(message);
|
||||||
|
assertEquals(i, message.getIntProperty("i").intValue());
|
||||||
|
// message.acknowledge();
|
||||||
|
checkBody(message, bufferSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
messageCheckNull = consumer2.receiveImmediate();
|
||||||
|
|
||||||
|
System.out.println("Received " + messageCheckNull);
|
||||||
|
|
||||||
|
assertNull(messageCheckNull);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPaging() throws Exception
|
||||||
|
{
|
||||||
|
final int CHUNK_SIZE = 50;
|
||||||
|
int messageCount = 0;
|
||||||
|
final String addressName = "testAddress";
|
||||||
|
final String queueName = "testQueue";
|
||||||
|
|
||||||
|
createQueue(0, addressName, queueName, null, true);
|
||||||
|
createQueue(1, addressName, queueName, null, true);
|
||||||
|
|
||||||
|
ClientSessionFactory sf = sfs[0];
|
||||||
|
ClientSession session = addClientSession(sf.createSession(false, false));
|
||||||
|
ClientProducer producer = addClientProducer(session.createProducer(addressName));
|
||||||
|
|
||||||
|
AddressSettings defaultSetting = new AddressSettings();
|
||||||
|
defaultSetting.setPageSizeBytes(10 * 1024);
|
||||||
|
defaultSetting.setMaxSizeBytes(20 * 1024);
|
||||||
|
servers[0].getAddressSettingsRepository().addMatch("#", defaultSetting);
|
||||||
|
|
||||||
|
while (!servers[0].getPagingManager().getPageStore(new SimpleString(addressName)).isPaging())
|
||||||
|
{
|
||||||
|
for (int i = 0; i < CHUNK_SIZE; i++)
|
||||||
|
{
|
||||||
|
Message message = session.createMessage(true);
|
||||||
|
message.getBodyBuffer().writeBytes(new byte[1024]);
|
||||||
|
// The only purpose of this count here is for eventually debug messages on print-data / print-pages
|
||||||
|
// message.putIntProperty("count", messageCount);
|
||||||
|
producer.send(message);
|
||||||
|
messageCount++;
|
||||||
|
}
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(messageCount, performScaledown());
|
||||||
|
|
||||||
|
servers[0].stop();
|
||||||
|
|
||||||
|
addConsumer(0, 1, queueName, null);
|
||||||
|
for (int i = 0; i < messageCount; i++)
|
||||||
|
{
|
||||||
|
ClientMessage message = consumers[0].getConsumer().receive(500);
|
||||||
|
Assert.assertNotNull(message);
|
||||||
|
// Assert.assertEquals(i, message.getIntProperty("count").intValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertNull(consumers[0].getConsumer().receiveImmediate());
|
||||||
|
removeConsumer(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBasicScaleDown() throws Exception
|
||||||
|
{
|
||||||
|
final int TEST_SIZE = 2;
|
||||||
|
final String addressName = "testAddress";
|
||||||
|
final String queueName1 = "testQueue1";
|
||||||
|
final String queueName2 = "testQueue2";
|
||||||
|
|
||||||
|
// create 2 queues on each node mapped to the same address
|
||||||
|
createQueue(0, addressName, queueName1, null, true);
|
||||||
|
createQueue(0, addressName, queueName2, null, true);
|
||||||
|
createQueue(1, addressName, queueName1, null, true);
|
||||||
|
createQueue(1, addressName, queueName2, null, true);
|
||||||
|
|
||||||
|
// send messages to node 0
|
||||||
|
send(0, addressName, TEST_SIZE, true, null);
|
||||||
|
|
||||||
|
// consume a message from queue 2
|
||||||
|
addConsumer(1, 0, queueName2, null, false);
|
||||||
|
ClientMessage clientMessage = consumers[1].getConsumer().receive(250);
|
||||||
|
Assert.assertNotNull(clientMessage);
|
||||||
|
clientMessage.acknowledge();
|
||||||
|
consumers[1].getSession().commit();
|
||||||
|
removeConsumer(1);
|
||||||
|
|
||||||
|
// at this point on node 0 there should be 2 messages in testQueue1 and 1 message in testQueue2
|
||||||
|
Assert.assertEquals(TEST_SIZE, getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()));
|
||||||
|
Assert.assertEquals(TEST_SIZE - 1, getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName2))).getQueue()));
|
||||||
|
|
||||||
|
assertEquals(TEST_SIZE, performScaledown());
|
||||||
|
// trigger scaleDown from node 0 to node 1
|
||||||
|
servers[0].stop();
|
||||||
|
|
||||||
|
// get the 2 messages from queue 1
|
||||||
|
addConsumer(0, 1, queueName1, null);
|
||||||
|
clientMessage = consumers[0].getConsumer().receive(250);
|
||||||
|
Assert.assertNotNull(clientMessage);
|
||||||
|
clientMessage.acknowledge();
|
||||||
|
clientMessage = consumers[0].getConsumer().receive(250);
|
||||||
|
Assert.assertNotNull(clientMessage);
|
||||||
|
clientMessage.acknowledge();
|
||||||
|
|
||||||
|
// ensure there are no more messages on queue 1
|
||||||
|
clientMessage = consumers[0].getConsumer().receive(250);
|
||||||
|
Assert.assertNull(clientMessage);
|
||||||
|
removeConsumer(0);
|
||||||
|
|
||||||
|
// get the 1 message from queue 2
|
||||||
|
addConsumer(0, 1, queueName2, null);
|
||||||
|
clientMessage = consumers[0].getConsumer().receive(250);
|
||||||
|
Assert.assertNotNull(clientMessage);
|
||||||
|
clientMessage.acknowledge();
|
||||||
|
|
||||||
|
// ensure there are no more messages on queue 1
|
||||||
|
clientMessage = consumers[0].getConsumer().receive(250);
|
||||||
|
Assert.assertNull(clientMessage);
|
||||||
|
removeConsumer(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkBody(ClientMessage message, int bufferSize)
|
||||||
|
{
|
||||||
|
assertEquals(bufferSize, message.getBodySize());
|
||||||
|
byte[] body = new byte[message.getBodySize()];
|
||||||
|
message.getBodyBuffer().readBytes(body);
|
||||||
|
for (int bpos = 0; bpos < bufferSize; bpos++)
|
||||||
|
{
|
||||||
|
if (getSamplebyte(bpos) != body[bpos])
|
||||||
|
{
|
||||||
|
fail("body comparison failure at " + message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private long performScaledown() throws Exception
|
||||||
|
{
|
||||||
|
ScaleDownHandler handler = new ScaleDownHandler(servers[0].getPagingManager(), servers[0].getPostOffice(),
|
||||||
|
servers[0].getNodeManager(),
|
||||||
|
servers[0].getClusterManager().getClusterController(),
|
||||||
|
servers[0].getStorageManager());
|
||||||
|
|
||||||
|
return handler.scaleDownMessages(sfs[1], servers[1].getNodeID());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -30,8 +30,6 @@ import org.apache.activemq.api.core.client.ClientSessionFactory;
|
||||||
import org.apache.activemq.api.core.client.ActiveMQClient;
|
import org.apache.activemq.api.core.client.ActiveMQClient;
|
||||||
import org.apache.activemq.core.config.ScaleDownConfiguration;
|
import org.apache.activemq.core.config.ScaleDownConfiguration;
|
||||||
import org.apache.activemq.core.config.ha.LiveOnlyPolicyConfiguration;
|
import org.apache.activemq.core.config.ha.LiveOnlyPolicyConfiguration;
|
||||||
import org.apache.activemq.core.persistence.impl.journal.JournalStorageManager;
|
|
||||||
import org.apache.activemq.core.persistence.impl.journal.LargeServerMessageImpl;
|
|
||||||
import org.apache.activemq.core.postoffice.Binding;
|
import org.apache.activemq.core.postoffice.Binding;
|
||||||
import org.apache.activemq.core.postoffice.impl.LocalQueueBinding;
|
import org.apache.activemq.core.postoffice.impl.LocalQueueBinding;
|
||||||
import org.apache.activemq.core.settings.impl.AddressSettings;
|
import org.apache.activemq.core.settings.impl.AddressSettings;
|
||||||
|
@ -119,13 +117,13 @@ public class ScaleDownTest extends ClusterTestBase
|
||||||
final String queueName2 = "testQueue2";
|
final String queueName2 = "testQueue2";
|
||||||
|
|
||||||
// create 2 queues on each node mapped to the same address
|
// create 2 queues on each node mapped to the same address
|
||||||
createQueue(0, addressName, queueName1, null, false);
|
createQueue(0, addressName, queueName1, null, true);
|
||||||
createQueue(0, addressName, queueName2, null, false);
|
createQueue(0, addressName, queueName2, null, true);
|
||||||
createQueue(1, addressName, queueName1, null, false);
|
createQueue(1, addressName, queueName1, null, true);
|
||||||
createQueue(1, addressName, queueName2, null, false);
|
createQueue(1, addressName, queueName2, null, true);
|
||||||
|
|
||||||
// send messages to node 0
|
// send messages to node 0
|
||||||
send(0, addressName, TEST_SIZE, false, null);
|
send(0, addressName, TEST_SIZE, true, null);
|
||||||
|
|
||||||
// consume a message from queue 2
|
// consume a message from queue 2
|
||||||
addConsumer(1, 0, queueName2, null, false);
|
addConsumer(1, 0, queueName2, null, false);
|
||||||
|
@ -375,32 +373,28 @@ public class ScaleDownTest extends ClusterTestBase
|
||||||
final String addressName = "testAddress";
|
final String addressName = "testAddress";
|
||||||
final String queueName = "testQueue";
|
final String queueName = "testQueue";
|
||||||
|
|
||||||
createQueue(0, addressName, queueName, null, false);
|
createQueue(0, addressName, queueName, null, true);
|
||||||
createQueue(1, addressName, queueName, null, false);
|
createQueue(1, addressName, queueName, null, true);
|
||||||
|
|
||||||
ClientSessionFactory sf = sfs[0];
|
ClientSessionFactory sf = sfs[0];
|
||||||
ClientSession session = addClientSession(sf.createSession(false, false));
|
ClientSession session = addClientSession(sf.createSession(false, false));
|
||||||
ClientProducer producer = addClientProducer(session.createProducer(addressName));
|
ClientProducer producer = addClientProducer(session.createProducer(addressName));
|
||||||
|
|
||||||
LargeServerMessageImpl fileMessage = new LargeServerMessageImpl((JournalStorageManager) servers[0].getStorageManager());
|
|
||||||
|
|
||||||
fileMessage.setMessageID(1005);
|
byte[] buffer = new byte[2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE];
|
||||||
fileMessage.setDurable(true);
|
for (int i = 0; i < buffer.length; i++)
|
||||||
|
|
||||||
for (int i = 0; i < 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
|
|
||||||
{
|
{
|
||||||
fileMessage.addBytes(new byte[]{UnitTestCase.getSamplebyte(i)});
|
buffer[i] = getSamplebyte(i);
|
||||||
}
|
}
|
||||||
|
|
||||||
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
|
for (int nmsg = 0; nmsg < 10; nmsg++)
|
||||||
|
{
|
||||||
|
ClientMessage message = session.createMessage(true);
|
||||||
|
message.getBodyBuffer().writeBytes(buffer);
|
||||||
|
producer.send(message);
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
|
||||||
fileMessage.releaseResources();
|
|
||||||
|
|
||||||
producer.send(fileMessage);
|
|
||||||
|
|
||||||
fileMessage.deleteFile();
|
|
||||||
|
|
||||||
session.commit();
|
|
||||||
|
|
||||||
servers[0].stop();
|
servers[0].stop();
|
||||||
|
|
||||||
|
@ -409,19 +403,23 @@ public class ScaleDownTest extends ClusterTestBase
|
||||||
ClientConsumer consumer = addClientConsumer(session.createConsumer(queueName));
|
ClientConsumer consumer = addClientConsumer(session.createConsumer(queueName));
|
||||||
session.start();
|
session.start();
|
||||||
|
|
||||||
ClientMessage msg = consumer.receive(250);
|
for (int nmsg = 0; nmsg < 10; nmsg++)
|
||||||
|
|
||||||
Assert.assertNotNull(msg);
|
|
||||||
|
|
||||||
Assert.assertEquals(2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, msg.getBodySize());
|
|
||||||
|
|
||||||
for (int i = 0; i < 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
|
|
||||||
{
|
{
|
||||||
Assert.assertEquals(UnitTestCase.getSamplebyte(i), msg.getBodyBuffer().readByte());
|
ClientMessage msg = consumer.receive(250);
|
||||||
}
|
|
||||||
|
|
||||||
msg.acknowledge();
|
Assert.assertNotNull(msg);
|
||||||
session.commit();
|
|
||||||
|
Assert.assertEquals(2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, msg.getBodySize());
|
||||||
|
|
||||||
|
for (int i = 0; i < 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
|
||||||
|
{
|
||||||
|
byte byteRead = msg.getBodyBuffer().readByte();
|
||||||
|
Assert.assertEquals(msg + " Is different", UnitTestCase.getSamplebyte(i), byteRead);
|
||||||
|
}
|
||||||
|
|
||||||
|
msg.acknowledge();
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -362,9 +362,9 @@ public class FakeQueue implements Queue
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<MessageReference> cancelScheduledMessages()
|
public void deliverScheduledMessages()
|
||||||
{
|
{
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.activemq.tests.unit.core.server.impl.fakes;
|
package org.apache.activemq.tests.unit.core.server.impl.fakes;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.activemq.api.core.Pair;
|
import org.apache.activemq.api.core.Pair;
|
||||||
import org.apache.activemq.api.core.SimpleString;
|
import org.apache.activemq.api.core.SimpleString;
|
||||||
|
@ -43,6 +44,12 @@ public class FakePostOffice implements PostOffice
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<SimpleString> getAddresses()
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() throws Exception
|
public void start() throws Exception
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue