This commit is contained in:
Michael Andre Pearce 2019-03-27 17:02:06 +00:00
commit 82fe0e5894
2 changed files with 66 additions and 1 deletions

View File

@ -895,7 +895,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// directDeliver flag to be re-computed resulting in direct delivery if the queue is empty // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
// We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
if (deliveriesInTransit.getCount() == 0 && getExecutor().isFlushed() && intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() && !pageIterator.hasNext() && !pageSubscription.isPaging()) { if (deliveriesInTransit.getCount() == 0 && getExecutor().isFlushed() &&
intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() &&
pageIterator != null && !pageIterator.hasNext() &&
pageSubscription != null && !pageSubscription.isPaging()) {
// We must block on the executor to ensure any async deliveries have completed or we might get out of order // We must block on the executor to ensure any async deliveries have completed or we might get out of order
// deliveries // deliveries
// Go into direct delivery mode // Go into direct delivery mode

View File

@ -19,8 +19,10 @@ package org.apache.activemq.artemis.tests.integration.paging;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Map; import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -33,11 +35,13 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
@ -287,4 +291,62 @@ public class GlobalPagingTest extends PagingTest {
} }
} }
@Test
public void testManagementMessageRequestCannotFailAfterFailedDirectDeliver() throws Exception {
clearDataRecreateServerDirs();
Configuration config = createDefaultNettyConfig().setJournalSyncNonTransactional(false);
final ActiveMQServer server = createServer(true, config, PagingTest.PAGE_SIZE, -1);
try {
final SimpleString managementAddress = server.getConfiguration().getManagementAddress();
server.start();
//need to use Netty in order to have direct delivery available
final ServerLocator locator = createNettyNonHALocator()
.setBlockOnNonDurableSend(true)
.setBlockOnDurableSend(true)
.setBlockOnAcknowledge(true);
try (ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, true)) {
session.start();
if (server.locateQueue(managementAddress) == null) {
session.createQueue(managementAddress, managementAddress, null, true);
}
final SimpleString address = SimpleString.toSimpleString("queue");
if (server.locateQueue(address) == null) {
session.createQueue(address, address, null, true);
}
try (ClientProducer requestProducer = session.createProducer(managementAddress)) {
final SimpleString replyQueue = new SimpleString(managementAddress + "." + UUID.randomUUID().toString());
session.createTemporaryQueue(replyQueue, ActiveMQDefaultConfiguration.getDefaultRoutingType(), replyQueue);
try (ClientConsumer consumer = session.createConsumer(replyQueue)) {
final Queue queue = server.locateQueue(replyQueue);
final MessageReference reference = MessageReference.Factory.createReference(session.createMessage(false), queue);
//it will cause QueueImpl::directDeliver -> false
queue.addHead(reference, false);
Assert.assertSame(reference, queue.removeReferenceWithID(reference.getMessageID()));
ClientMessage message = session.createMessage(false);
message.putStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME, replyQueue);
ManagementHelper.putAttribute(message, "queue." + address.toString(), "messageCount");
requestProducer.send(message);
Assert.assertNotNull(consumer.receive());
}
}
}
} finally {
server.stop(true);
}
}
} }