ARTEMIS-3234 - revisit fix to deal with credit on unmatched acks, thanks to brusdev for the interceptor feature and test from ARTEMIS-2650
This commit is contained in:
parent
ee92ddcb3b
commit
815f383f9c
|
@ -131,7 +131,6 @@ import org.apache.activemq.openwire.OpenWireFormat;
|
|||
import org.apache.activemq.state.CommandVisitor;
|
||||
import org.apache.activemq.state.ConnectionState;
|
||||
import org.apache.activemq.state.ConsumerState;
|
||||
import org.apache.activemq.state.ProducerState;
|
||||
import org.apache.activemq.state.SessionState;
|
||||
import org.apache.activemq.transport.TransmitCallback;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
|
@ -257,11 +256,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
if (state == null) {
|
||||
return null;
|
||||
}
|
||||
ConnectionInfo info = state.getInfo();
|
||||
if (info == null) {
|
||||
return null;
|
||||
}
|
||||
return info;
|
||||
return state.getInfo();
|
||||
}
|
||||
|
||||
//tells the connection that
|
||||
|
@ -311,6 +306,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
AuditLogger.setRemoteAddress(getRemoteAddress());
|
||||
}
|
||||
|
||||
if (this.protocolManager.invokeIncoming(command, this) != null) {
|
||||
logger.debugf("Interceptor rejected OpenWire command: %s", command);
|
||||
disconnect(true);
|
||||
return;
|
||||
}
|
||||
|
||||
boolean responseRequired = command.isResponseRequired();
|
||||
int commandId = command.getCommandId();
|
||||
|
||||
|
@ -496,6 +497,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
}
|
||||
|
||||
public void physicalSend(Command command) throws IOException {
|
||||
if (this.protocolManager.invokeOutgoing(command, this) != null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
tracePhysicalSend(transportConnection, command);
|
||||
|
@ -595,10 +599,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
SessionState ss = state.getSessionState(id.getParentId());
|
||||
if (ss != null) {
|
||||
result.setProducerState(ss.getProducerState(id));
|
||||
ProducerState producerState = ss.getProducerState(id);
|
||||
if (producerState != null && producerState.getInfo() != null) {
|
||||
ProducerInfo info = producerState.getInfo();
|
||||
}
|
||||
}
|
||||
producerExchanges.put(id, result);
|
||||
}
|
||||
|
@ -672,6 +672,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
if (fail) {
|
||||
shutdown(fail);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -808,8 +811,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
|
||||
try {
|
||||
physicalSend(command);
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
} catch (Throwable t) {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.protocol.openwire;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||
import org.apache.activemq.command.Command;
|
||||
|
||||
public interface OpenWireInterceptor extends BaseInterceptor<Command> {
|
||||
|
||||
}
|
|
@ -35,7 +35,6 @@ import org.apache.activemq.advisory.AdvisorySupport;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
||||
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
|
||||
|
@ -49,8 +48,8 @@ import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
|||
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
|
||||
import org.apache.activemq.artemis.reader.MessageUtil;
|
||||
import org.apache.activemq.artemis.selector.impl.LRUCache;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
||||
|
@ -82,7 +81,7 @@ import org.apache.activemq.util.LongSequenceGenerator;
|
|||
|
||||
import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.SELECTOR_AWARE_OPTION;
|
||||
|
||||
public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, ClusterTopologyListener {
|
||||
public class OpenWireProtocolManager extends AbstractProtocolManager<Command, OpenWireInterceptor, OpenWireConnection> implements ClusterTopologyListener {
|
||||
|
||||
private static final List<String> websocketRegistryNames = Collections.EMPTY_LIST;
|
||||
|
||||
|
@ -94,7 +93,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
|||
|
||||
private final OpenWireProtocolManagerFactory factory;
|
||||
|
||||
private OpenWireFormatFactory wireFactory;
|
||||
private final OpenWireFormatFactory wireFactory;
|
||||
|
||||
private boolean prefixPacketSize = true;
|
||||
|
||||
|
@ -135,7 +134,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
|||
|
||||
private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
|
||||
|
||||
protected class VirtualTopicConfig {
|
||||
private final List<OpenWireInterceptor> incomingInterceptors = new ArrayList<>();
|
||||
private final List<OpenWireInterceptor> outgoingInterceptors = new ArrayList<>();
|
||||
|
||||
|
||||
protected static class VirtualTopicConfig {
|
||||
public int filterPathTerminus;
|
||||
public boolean selectorAware;
|
||||
|
||||
|
@ -160,7 +163,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
|||
private final Map<DestinationFilter, VirtualTopicConfig> vtConsumerDestinationMatchers = new HashMap<>();
|
||||
protected final LRUCache<ActiveMQDestination, ActiveMQDestination> vtDestMapCache = new LRUCache();
|
||||
|
||||
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
|
||||
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server,
|
||||
List<BaseInterceptor> incomingInterceptors,
|
||||
List<BaseInterceptor> outgoingInterceptors) {
|
||||
this.factory = factory;
|
||||
this.server = server;
|
||||
this.wireFactory = new OpenWireFormatFactory();
|
||||
|
@ -170,6 +175,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
|||
scheduledPool = server.getScheduledPool();
|
||||
this.wireFormat = (OpenWireFormat) wireFactory.createWireFormat();
|
||||
|
||||
updateInterceptors(incomingInterceptors, outgoingInterceptors);
|
||||
|
||||
final ClusterManager clusterManager = this.server.getClusterManager();
|
||||
|
||||
ClusterConnection cc = clusterManager.getDefaultConnection(null);
|
||||
|
@ -245,14 +252,29 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
|||
}
|
||||
|
||||
@Override
|
||||
public ProtocolManagerFactory<Interceptor> getFactory() {
|
||||
public ProtocolManagerFactory getFactory() {
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateInterceptors(List<BaseInterceptor> incomingInterceptors,
|
||||
List<BaseInterceptor> outgoingInterceptors) {
|
||||
// NO-OP
|
||||
public void updateInterceptors(List incoming, List outgoing) {
|
||||
this.incomingInterceptors.clear();
|
||||
if (incoming != null) {
|
||||
this.incomingInterceptors.addAll(getFactory().filterInterceptors(incoming));
|
||||
}
|
||||
|
||||
this.outgoingInterceptors.clear();
|
||||
if (outgoing != null) {
|
||||
this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
|
||||
}
|
||||
}
|
||||
|
||||
public String invokeIncoming(Command command, OpenWireConnection connection) {
|
||||
return super.invokeInterceptors(this.incomingInterceptors, command, connection);
|
||||
}
|
||||
|
||||
public String invokeOutgoing(Command command, OpenWireConnection connection) {
|
||||
return super.invokeInterceptors(this.outgoingInterceptors, command, connection);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -16,12 +16,10 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.openwire;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
|
||||
|
@ -30,7 +28,7 @@ import org.apache.activemq.artemis.utils.uri.BeanSupport;
|
|||
import org.osgi.service.component.annotations.Component;
|
||||
|
||||
@Component(service = ProtocolManagerFactory.class)
|
||||
public class OpenWireProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> {
|
||||
public class OpenWireProtocolManagerFactory extends AbstractProtocolManagerFactory<OpenWireInterceptor> {
|
||||
|
||||
public static final String OPENWIRE_PROTOCOL_NAME = "OPENWIRE";
|
||||
|
||||
|
@ -44,12 +42,12 @@ public class OpenWireProtocolManagerFactory extends AbstractProtocolManagerFacto
|
|||
final List<BaseInterceptor> incomingInterceptors,
|
||||
List<BaseInterceptor> outgoingInterceptors) throws Exception {
|
||||
BeanSupport.stripPasswords(parameters);
|
||||
return BeanSupport.setData(new OpenWireProtocolManager(this, server), parameters);
|
||||
return BeanSupport.setData(new OpenWireProtocolManager(this, server, incomingInterceptors, outgoingInterceptors), parameters);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Interceptor> filterInterceptors(List<BaseInterceptor> interceptors) {
|
||||
return Collections.emptyList();
|
||||
public List<OpenWireInterceptor> filterInterceptors(List<BaseInterceptor> interceptors) {
|
||||
return internalFilterInterceptors(OpenWireInterceptor.class, interceptors);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -293,40 +293,33 @@ public class AMQConsumer {
|
|||
}
|
||||
|
||||
final int ackMessageCount = ack.getMessageCount();
|
||||
acquireCredit(ackMessageCount);
|
||||
|
||||
if (ack.isDeliveredAck()) {
|
||||
acquireCredit(ackMessageCount);
|
||||
deliveredAcksCreditExtension += ackMessageCount;
|
||||
// our work is done
|
||||
return;
|
||||
}
|
||||
|
||||
// some sort of real ack, rebalance deliveredAcksCreditExtension
|
||||
if (deliveredAcksCreditExtension > 0) {
|
||||
deliveredAcksCreditExtension -= ackMessageCount;
|
||||
if (deliveredAcksCreditExtension >= 0) {
|
||||
currentWindow.addAndGet(-ackMessageCount);
|
||||
}
|
||||
}
|
||||
|
||||
final MessageId startID, lastID;
|
||||
|
||||
if (ack.getFirstMessageId() == null) {
|
||||
startID = ack.getLastMessageId();
|
||||
lastID = ack.getLastMessageId();
|
||||
} else {
|
||||
startID = ack.getFirstMessageId();
|
||||
lastID = ack.getLastMessageId();
|
||||
}
|
||||
|
||||
boolean removeReferences = !serverConsumer.isBrowseOnly(); // if it's browse only, nothing to be acked, we just remove the lists
|
||||
if (serverConsumer.getQueue().isNonDestructive()) {
|
||||
removeReferences = false;
|
||||
}
|
||||
final MessageId lastID = ack.getLastMessageId();
|
||||
final MessageId startID = ack.getFirstMessageId() == null ? lastID : ack.getFirstMessageId();
|
||||
|
||||
// if it's browse only, nothing to be acked
|
||||
final boolean removeReferences = !serverConsumer.isBrowseOnly() && !serverConsumer.getQueue().isNonDestructive();
|
||||
final List<MessageReference> ackList = serverConsumer.scanDeliveringReferences(removeReferences, reference -> startID.equals(reference.getProtocolData()), reference -> lastID.equals(reference.getProtocolData()));
|
||||
|
||||
if (!ackList.isEmpty()) {
|
||||
if (!ackList.isEmpty() || !removeReferences || serverConsumer.getQueue().isTemporary()) {
|
||||
|
||||
// valid match in delivered or browsing or temp - deal with credit
|
||||
acquireCredit(ackMessageCount);
|
||||
|
||||
// some sort of real ack, rebalance deliveredAcksCreditExtension
|
||||
if (deliveredAcksCreditExtension > 0) {
|
||||
deliveredAcksCreditExtension -= ackMessageCount;
|
||||
if (deliveredAcksCreditExtension >= 0) {
|
||||
currentWindow.addAndGet(-ackMessageCount);
|
||||
}
|
||||
}
|
||||
|
||||
if (ack.isExpiredAck()) {
|
||||
for (MessageReference ref : ackList) {
|
||||
ref.getQueue().expire(ref, serverConsumer);
|
||||
|
|
|
@ -36,7 +36,7 @@ public class OpenWireProtocolManagerTest extends ActiveMQTestBase {
|
|||
|
||||
@Test
|
||||
public void testVtAutoConversion() {
|
||||
underTest = new OpenWireProtocolManager(null, new DummyServer()) {
|
||||
underTest = new OpenWireProtocolManager(null, new DummyServer(), null, null) {
|
||||
@Override
|
||||
public ActiveMQDestination virtualTopicConsumerToFQQN(ActiveMQDestination destination) {
|
||||
if (lruCacheRef == null) {
|
||||
|
|
|
@ -27,20 +27,31 @@ import javax.jms.Queue;
|
|||
import javax.jms.Session;
|
||||
import javax.jms.StreamMessage;
|
||||
import javax.jms.TextMessage;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.ActiveMQMessageConsumer;
|
||||
import org.apache.activemq.ActiveMQMessageProducer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.management.QueueControl;
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireInterceptor;
|
||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.transport.TransportListener;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -51,13 +62,10 @@ import org.junit.Test;
|
|||
*/
|
||||
public class GeneralInteropTest extends BasicOpenWireTest {
|
||||
|
||||
private ServerLocator locator;
|
||||
|
||||
@Before
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
locator = this.createInVMNonHALocator();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -194,6 +202,103 @@ public class GeneralInteropTest extends BasicOpenWireTest {
|
|||
|
||||
@Test
|
||||
public void testFailoverReceivingFromCore() throws Exception {
|
||||
|
||||
/**
|
||||
* to get logging to stdout from failover client
|
||||
* org.slf4j.impl.SimpleLoggerFactory simpleLoggerFactory = new SimpleLoggerFactory();
|
||||
* ((SimpleLogger)simpleLoggerFactory.getLogger(FailoverTransport.class.getName())).setLevel(SimpleLogger.TRACE);
|
||||
*/
|
||||
|
||||
final String text = "HelloWorld";
|
||||
final int prefetchSize = 10;
|
||||
|
||||
SimpleString dla = new SimpleString("DLA");
|
||||
SimpleString dlq = new SimpleString("DLQ1");
|
||||
server.createQueue(new QueueConfiguration(dlq).setAddress(dla).setDurable(false));
|
||||
server.getAddressSettingsRepository().addMatch(queueName, new AddressSettings().setDeadLetterAddress(dla));
|
||||
|
||||
sendMultipleTextMessagesUsingCoreJms(queueName, text, 100);
|
||||
|
||||
String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT
|
||||
+ ")?randomize=false&timeout=400&reconnectDelay=500" +
|
||||
"&useExponentialBackOff=false&initialReconnectDelay=500&nested.wireFormat.maxInactivityDuration=500" +
|
||||
"&nested.wireFormat.maxInactivityDurationInitalDelay=500" +
|
||||
"&nested.soTimeout=500&nested.connectionTimeout=400&jms.connectResponseTimeout=400&jms.sendTimeout=400&jms.closeTimeout=400";
|
||||
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(urlString);
|
||||
connectionFactory.setSendAcksAsync(false);
|
||||
connectionFactory.setOptimizeAcknowledge(false);
|
||||
connectionFactory.getPrefetchPolicy().setAll(prefetchSize);
|
||||
|
||||
Connection connection = connectionFactory.createConnection();
|
||||
try {
|
||||
connection.setClientID("test.consumer.queue." + queueName);
|
||||
connection.start();
|
||||
|
||||
Message message = null;
|
||||
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue(queueName);
|
||||
QueueControl queueControl = (QueueControl)server.getManagementService().
|
||||
getResource(ResourceNames.QUEUE + queueName);
|
||||
|
||||
QueueControl dlqControl = (QueueControl)server.getManagementService().
|
||||
getResource(ResourceNames.QUEUE + dlq.toString());
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
||||
message = consumer.receive(5000);
|
||||
assertNotNull(message);
|
||||
assertTrue(message instanceof TextMessage);
|
||||
assertEquals(text + 0, ((TextMessage)message).getText());
|
||||
message.acknowledge();
|
||||
|
||||
Wait.assertEquals(1L, () -> queueControl.getMessagesAcknowledged(), 3000, 100);
|
||||
Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 3000, 100);
|
||||
|
||||
message = consumer.receive(5000);
|
||||
assertNotNull(message);
|
||||
assertTrue(message instanceof TextMessage);
|
||||
assertEquals(text + 1, ((TextMessage)message).getText());
|
||||
|
||||
// client won't get a reply to the ack command, just a disconnect and will replay the ack on reconnect
|
||||
server.getRemotingService().addIncomingInterceptor(new OpenWireInterceptor() {
|
||||
@Override
|
||||
public boolean intercept(Command packet, RemotingConnection connection) throws ActiveMQException {
|
||||
if (packet.isMessageAck()) {
|
||||
server.getRemotingService().removeIncomingInterceptor(this);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
message.acknowledge();
|
||||
|
||||
// after a response to the replay....
|
||||
// the message should be redelivered and pending for the replayed ack... hence it gets acked ok.
|
||||
// the real delivery gets suppressed as a duplicate by the message audit and poison acked
|
||||
// but there is a race between client failover reconnect and server dispatch to a new consumer
|
||||
// if redispatch has not happened, the replayed ack is dropped and the posion ack will match and try and dlq
|
||||
Wait.waitFor(() -> dlqControl.getMessageCount() == 1 && queueControl.getMessagesAcknowledged() == 1
|
||||
|| dlqControl.getMessageCount() == 0 && queueControl.getMessagesAcknowledged() == 2, 3000, 100);
|
||||
Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 3000, 100);
|
||||
|
||||
message = consumer.receive(5000);
|
||||
assertNotNull(message);
|
||||
assertTrue(message instanceof TextMessage);
|
||||
assertEquals(text + 2, ((TextMessage)message).getText());
|
||||
message.acknowledge();
|
||||
|
||||
Wait.waitFor(() -> dlqControl.getMessageCount() == 1 && queueControl.getMessagesAcknowledged() == 2
|
||||
|| dlqControl.getMessageCount() == 0 && queueControl.getMessagesAcknowledged() == 3, 3000, 100);
|
||||
Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 30000, 100);
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverReceivingFromCoreWithAckAfterInterrupt() throws Exception {
|
||||
final int prefetchSize = 10;
|
||||
final String text = "HelloWorld";
|
||||
|
||||
|
@ -227,13 +332,43 @@ public class GeneralInteropTest extends BasicOpenWireTest {
|
|||
Wait.assertEquals(1L, () -> queueControl.getMessagesAcknowledged(), 3000, 100);
|
||||
Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 3000, 100);
|
||||
|
||||
//Force a disconnection.
|
||||
message = consumer.receive(5000);
|
||||
assertNotNull(message);
|
||||
assertTrue(message instanceof TextMessage);
|
||||
assertEquals(text + 1, ((TextMessage)message).getText());
|
||||
|
||||
CountDownLatch interrupted = new CountDownLatch(1);
|
||||
((ActiveMQConnection)connection).addTransportListener(new TransportListener() {
|
||||
@Override
|
||||
public void onCommand(Object command) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(IOException error) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transportInterupted() {
|
||||
interrupted.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transportResumed() {
|
||||
}
|
||||
});
|
||||
|
||||
//Force a disconnection that will result in duplicate ack
|
||||
for (ServerSession serverSession : server.getSessions()) {
|
||||
if (session.toString().contains(serverSession.getName())) {
|
||||
serverSession.getRemotingConnection().fail(new ActiveMQDisconnectedException());
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue(interrupted.await(10, TimeUnit.SECONDS));
|
||||
|
||||
// ack will be dropped
|
||||
message.acknowledge();
|
||||
|
||||
Wait.assertEquals(1L, () -> queueControl.getMessagesAcknowledged(), 3000, 100);
|
||||
Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 3000, 100);
|
||||
|
||||
|
|
Loading…
Reference in New Issue