ARTEMIS-4523 Openwire leaving consumers isolated after reconnects
co-authored with Gary Tully
This commit is contained in:
parent
2efc4967b6
commit
3ec0274356
|
@ -49,7 +49,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
|
||||||
// Request of forced shutdown
|
// Request of forced shutdown
|
||||||
private volatile boolean requestedForcedShutdown = false;
|
private volatile boolean requestedForcedShutdown = false;
|
||||||
// Request of educated shutdown:
|
// Request of educated shutdown:
|
||||||
private volatile boolean requestedShutdown = false;
|
protected volatile boolean requestedShutdown = false;
|
||||||
// Request to yield to another thread
|
// Request to yield to another thread
|
||||||
private volatile boolean yielded = false;
|
private volatile boolean yielded = false;
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,6 @@ public class ThresholdActor<T> extends ProcessorBase<Object> {
|
||||||
private final ActorListener<T> listener;
|
private final ActorListener<T> listener;
|
||||||
private final Runnable overThreshold;
|
private final Runnable overThreshold;
|
||||||
private final Runnable clearThreshold;
|
private final Runnable clearThreshold;
|
||||||
private volatile Runnable shutdownTask;
|
|
||||||
|
|
||||||
public ThresholdActor(Executor parent, ActorListener<T> listener, int maxSize, ToIntFunction<T> sizeGetter, Runnable overThreshold, Runnable clearThreshold) {
|
public ThresholdActor(Executor parent, ActorListener<T> listener, int maxSize, ToIntFunction<T> sizeGetter, Runnable overThreshold, Runnable clearThreshold) {
|
||||||
super(parent);
|
super(parent);
|
||||||
|
@ -54,10 +53,6 @@ public class ThresholdActor<T> extends ProcessorBase<Object> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected final void doTask(Object task) {
|
protected final void doTask(Object task) {
|
||||||
if (task == shutdownTask) {
|
|
||||||
shutdownTask.run();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (task == FLUSH) {
|
if (task == FLUSH) {
|
||||||
clearThreshold.run();
|
clearThreshold.run();
|
||||||
// should set to 0 no matter the value. There's a single thread setting this value back to zero
|
// should set to 0 no matter the value. There's a single thread setting this value back to zero
|
||||||
|
@ -100,14 +95,8 @@ public class ThresholdActor<T> extends ProcessorBase<Object> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void shutdown(Runnable runnable) {
|
public void requestShutdown() {
|
||||||
// do no more pending work
|
requestedShutdown = true;
|
||||||
tasks.clear();
|
tasks.clear();
|
||||||
// run this task next
|
|
||||||
shutdownTask = runnable;
|
|
||||||
tasks.add(runnable);
|
|
||||||
// wait for shutdown task to complete
|
|
||||||
flush();
|
|
||||||
shutdown();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -182,53 +182,4 @@ public class ThresholdActorTest {
|
||||||
executorService.shutdown();
|
executorService.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testShutdownTask() throws Exception {
|
|
||||||
AtomicInteger lastAcquireFailed = new AtomicInteger(0);
|
|
||||||
lastProcessed.set(0);
|
|
||||||
|
|
||||||
Semaphore allowedTasks = new Semaphore(10);
|
|
||||||
CountDownLatch completedTasks = new CountDownLatch(11);
|
|
||||||
CountDownLatch pendingTasks = new CountDownLatch(11);
|
|
||||||
|
|
||||||
final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
|
||||||
|
|
||||||
ThresholdActor<Integer> actor = new ThresholdActor<>(executorService, (i) -> {
|
|
||||||
try {
|
|
||||||
pendingTasks.countDown();
|
|
||||||
if (allowedTasks.tryAcquire(1, 200, TimeUnit.MILLISECONDS)) {
|
|
||||||
lastProcessed.set(i);
|
|
||||||
} else {
|
|
||||||
lastAcquireFailed.set(i);
|
|
||||||
}
|
|
||||||
completedTasks.countDown();
|
|
||||||
} catch (InterruptedException ignored) {
|
|
||||||
}
|
|
||||||
|
|
||||||
}, 1000, (e) -> {
|
|
||||||
return 1;
|
|
||||||
}, () -> {
|
|
||||||
}, () -> {
|
|
||||||
});
|
|
||||||
|
|
||||||
// expect allowedTasks tasks to complete
|
|
||||||
for (int i = 1; i < 100; i++) {
|
|
||||||
actor.act(i);
|
|
||||||
}
|
|
||||||
// wait for task processing
|
|
||||||
Assert.assertTrue(pendingTasks.await(4, TimeUnit.SECONDS));
|
|
||||||
|
|
||||||
actor.shutdown(() -> {
|
|
||||||
lastProcessed.set(lastProcessed.get() * 1000);
|
|
||||||
});
|
|
||||||
|
|
||||||
Assert.assertTrue(completedTasks.await(4, TimeUnit.SECONDS));
|
|
||||||
|
|
||||||
// assert processing terminated at block point
|
|
||||||
Assert.assertEquals(10000, lastProcessed.get());
|
|
||||||
// pending task executed as expected
|
|
||||||
Assert.assertEquals(11, lastAcquireFailed.get());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
package org.apache.activemq.artemis.core.protocol.openwire;
|
package org.apache.activemq.artemis.core.protocol.openwire;
|
||||||
|
|
||||||
import javax.jms.IllegalStateException;
|
import javax.jms.IllegalStateException;
|
||||||
import javax.jms.InvalidClientIDException;
|
|
||||||
import javax.jms.InvalidDestinationException;
|
import javax.jms.InvalidDestinationException;
|
||||||
import javax.jms.JMSSecurityException;
|
import javax.jms.JMSSecurityException;
|
||||||
import javax.transaction.xa.XAException;
|
import javax.transaction.xa.XAException;
|
||||||
|
@ -761,7 +760,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
|
|
||||||
final ThresholdActor<Command> localVisibleActor = openWireActor;
|
final ThresholdActor<Command> localVisibleActor = openWireActor;
|
||||||
if (localVisibleActor != null) {
|
if (localVisibleActor != null) {
|
||||||
localVisibleActor.shutdown(() -> doFail(me, message));
|
localVisibleActor.requestShutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (executor != null) {
|
||||||
|
executor.execute(() -> doFail(me, message));
|
||||||
} else {
|
} else {
|
||||||
doFail(me, message);
|
doFail(me, message);
|
||||||
}
|
}
|
||||||
|
@ -779,11 +782,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
if (this.getConnectionInfo() != null) {
|
if (this.getConnectionInfo() != null) {
|
||||||
protocolManager.removeConnection(this.getConnectionInfo(), me);
|
protocolManager.removeConnection(getClientID(), this);
|
||||||
}
|
}
|
||||||
} catch (InvalidClientIDException e) {
|
|
||||||
logger.warn("Couldn't close connection because invalid clientID", e);
|
|
||||||
} finally {
|
} finally {
|
||||||
|
try {
|
||||||
|
disconnect(false);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
// it should never happen, but never say never
|
||||||
|
logger.debug("OpenWireConnection::disconnect failure", e);
|
||||||
|
}
|
||||||
|
|
||||||
// there may be some transactions not associated with sessions
|
// there may be some transactions not associated with sessions
|
||||||
// deal with them after sessions are removed via connection removal
|
// deal with them after sessions are removed via connection removal
|
||||||
operationContext.executeOnCompletion(new IOCallback() {
|
operationContext.executeOnCompletion(new IOCallback() {
|
||||||
|
@ -876,29 +884,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), validatedUser, false);
|
internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), validatedUser, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
//raise the refCount of context
|
|
||||||
public void reconnect(AMQConnectionContext existingContext, ConnectionInfo info) throws Exception {
|
|
||||||
this.context = existingContext;
|
|
||||||
WireFormatInfo wireFormatInfo = inWireFormat.getPreferedWireFormatInfo();
|
|
||||||
// Older clients should have been defaulting this field to true.. but
|
|
||||||
// they were not.
|
|
||||||
if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
|
|
||||||
info.setClientMaster(true);
|
|
||||||
}
|
|
||||||
if (info.getClientIp() == null) {
|
|
||||||
info.setClientIp(getRemoteAddress());
|
|
||||||
}
|
|
||||||
createInternalSession(info);
|
|
||||||
state = new ConnectionState(info);
|
|
||||||
state.reset(info);
|
|
||||||
|
|
||||||
context.setConnection(this);
|
|
||||||
context.setConnectionState(state);
|
|
||||||
context.setClientMaster(info.isClientMaster());
|
|
||||||
context.setFaultTolerant(info.isFaultTolerant());
|
|
||||||
context.setReconnect(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This will answer with commands to the client
|
* This will answer with commands to the client
|
||||||
*/
|
*/
|
||||||
|
@ -1817,13 +1802,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
@Override
|
@Override
|
||||||
public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
|
public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
|
||||||
//we let protocol manager to handle connection add/remove
|
//we let protocol manager to handle connection add/remove
|
||||||
|
for (SessionState sessionState : state.getSessionStates()) {
|
||||||
|
propagateLastSequenceId(sessionState, lastDeliveredSequenceId);
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
for (SessionState sessionState : state.getSessionStates()) {
|
protocolManager.removeConnection(getClientID(), OpenWireConnection.this);
|
||||||
propagateLastSequenceId(sessionState, lastDeliveredSequenceId);
|
} finally {
|
||||||
}
|
OpenWireConnection.this.disconnect(false);
|
||||||
protocolManager.removeConnection(state.getInfo(), null);
|
|
||||||
} catch (Throwable e) {
|
|
||||||
// log
|
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -109,7 +109,7 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
|
||||||
|
|
||||||
private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<>();
|
private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
private final Map<String, AMQConnectionContext> clientIdSet = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, OpenWireConnection> clientIdSet = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private String brokerName;
|
private String brokerName;
|
||||||
|
|
||||||
|
@ -244,18 +244,9 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeConnection(ConnectionInfo info, Throwable error) throws InvalidClientIDException {
|
public void removeConnection(String clientID, OpenWireConnection connection) {
|
||||||
String clientId = info.getClientId();
|
clientIdSet.remove(clientID, connection);
|
||||||
if (clientId != null) {
|
connections.remove(connection);
|
||||||
AMQConnectionContext context = this.clientIdSet.remove(clientId);
|
|
||||||
if (context != null) {
|
|
||||||
//connection is still there and need to close
|
|
||||||
context.getConnection().disconnect(error != null);
|
|
||||||
this.connections.remove(context.getConnection());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
throw new InvalidClientIDException("No clientID specified for connection disconnect request");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*** if set, the OpenWire connection will bypass the tcpReadBuferSize and use this value instead.
|
/*** if set, the OpenWire connection will bypass the tcpReadBuferSize and use this value instead.
|
||||||
|
@ -421,22 +412,16 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
|
||||||
}
|
}
|
||||||
|
|
||||||
AMQConnectionContext context;
|
AMQConnectionContext context;
|
||||||
context = clientIdSet.get(clientId);
|
OpenWireConnection oldConnection = clientIdSet.get(clientId);
|
||||||
if (context != null) {
|
if (oldConnection != null) {
|
||||||
if (info.isFailoverReconnect()) {
|
if (!info.isFailoverReconnect()) {
|
||||||
OpenWireConnection oldConnection = context.getConnection();
|
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + oldConnection.getRemoteAddress());
|
||||||
oldConnection.disconnect(true);
|
|
||||||
connections.remove(oldConnection);
|
|
||||||
connection.reconnect(context, info);
|
|
||||||
} else {
|
|
||||||
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + context.getConnection().getRemoteAddress());
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
//new connection
|
|
||||||
context = connection.initContext(info);
|
|
||||||
clientIdSet.put(clientId, context);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
context = connection.initContext(info);
|
||||||
|
|
||||||
|
clientIdSet.put(clientId, connection);
|
||||||
connections.add(connection);
|
connections.add(connection);
|
||||||
|
|
||||||
ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
|
ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
|
||||||
|
|
|
@ -41,6 +41,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||||
|
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin;
|
||||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
|
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
|
||||||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
|
@ -498,4 +500,60 @@ public class ConnectionDroppedTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test(timeout = 10_000)
|
||||||
|
public void testForceDropOpenWire() throws Throwable {
|
||||||
|
ActiveMQServer server = createServer(true, createDefaultConfig(true));
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
Queue serverQueue = server.createQueue(new QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
|
||||||
|
|
||||||
|
CountDownLatch beforeCreateCalled = new CountDownLatch(1);
|
||||||
|
CountDownLatch goCreateConsumer = new CountDownLatch(1);
|
||||||
|
server.registerBrokerPlugin(new ActiveMQServerConsumerPlugin() {
|
||||||
|
@Override
|
||||||
|
public void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException {
|
||||||
|
if (consumer.getQueue() == serverQueue) {
|
||||||
|
logger.info("Creating a consumer at {}", consumer.getQueue());
|
||||||
|
beforeCreateCalled.countDown();
|
||||||
|
try {
|
||||||
|
goCreateConsumer.await(5, TimeUnit.MINUTES);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
ExecutorService executorService = Executors.newFixedThreadPool(1);
|
||||||
|
runAfter(executorService::shutdownNow);
|
||||||
|
|
||||||
|
ConnectionFactory factory = CFUtil.createConnectionFactory("OPENWIRE", "tcp://localhost:61616");
|
||||||
|
|
||||||
|
CountDownLatch done = new CountDownLatch(1);
|
||||||
|
|
||||||
|
executorService.execute(() -> {
|
||||||
|
try (Connection connection = factory.createConnection();
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageConsumer consumer = session.createConsumer(session.createQueue("test-queue"))) {
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn(e.getMessage(), e);
|
||||||
|
} finally {
|
||||||
|
done.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Assert.assertTrue(beforeCreateCalled.await(5, TimeUnit.MINUTES));
|
||||||
|
|
||||||
|
server.getRemotingService().getConnections().forEach(r -> {
|
||||||
|
r.fail(new ActiveMQException("this is a simulation"));
|
||||||
|
});
|
||||||
|
|
||||||
|
goCreateConsumer.countDown();
|
||||||
|
|
||||||
|
Wait.assertEquals(0, serverQueue::getConsumerCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,211 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.openwire;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.RedeliveryPolicy;
|
||||||
|
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.transport.tcp.TcpTransport;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class FastReconnectOpenWireTest extends OpenWireTestBase {
|
||||||
|
|
||||||
|
// change this number to give the test a bit more of spinning
|
||||||
|
private static final int NUM_ITERATIONS = 50;
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void configureAddressSettings(Map<String, AddressSettings> addressSettingsMap) {
|
||||||
|
super.configureAddressSettings(addressSettingsMap);
|
||||||
|
// force send to dlq early
|
||||||
|
addressSettingsMap.put("exampleQueue", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(2));
|
||||||
|
// force send to dlq late
|
||||||
|
addressSettingsMap.put("exampleQueueTwo", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(-1));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test(timeout = 60_000)
|
||||||
|
public void testFastReconnectCreateConsumerNoErrors() throws Exception {
|
||||||
|
|
||||||
|
final ArrayList<Throwable> errors = new ArrayList<>();
|
||||||
|
SimpleString durableQueue = new SimpleString("exampleQueueTwo");
|
||||||
|
this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST));
|
||||||
|
|
||||||
|
Queue queue = new ActiveMQQueue(durableQueue.toString());
|
||||||
|
|
||||||
|
final ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616?closeAsync=false)?startupMaxReconnectAttempts=-1&maxReconnectAttempts=-1&timeout=5000");
|
||||||
|
exFact.setWatchTopicAdvisories(false);
|
||||||
|
exFact.setConnectResponseTimeout(10000);
|
||||||
|
exFact.setClientID("myID");
|
||||||
|
|
||||||
|
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
|
||||||
|
redeliveryPolicy.setRedeliveryDelay(0);
|
||||||
|
redeliveryPolicy.setMaximumRedeliveries(-1);
|
||||||
|
exFact.setRedeliveryPolicy(redeliveryPolicy);
|
||||||
|
|
||||||
|
publish(1000, durableQueue.toString());
|
||||||
|
|
||||||
|
final AtomicInteger numIterations = new AtomicInteger(NUM_ITERATIONS);
|
||||||
|
ExecutorService executor = Executors.newCachedThreadPool();
|
||||||
|
runAfter(executor::shutdownNow);
|
||||||
|
|
||||||
|
final int concurrent = 2;
|
||||||
|
final CountDownLatch done = new CountDownLatch(concurrent);
|
||||||
|
for (int i = 0; i < concurrent; i++) {
|
||||||
|
executor.execute(() -> {
|
||||||
|
try {
|
||||||
|
while (numIterations.decrementAndGet() > 0) {
|
||||||
|
try (Connection conn = exFact.createConnection(); Session consumerConnectionSession = conn.createSession(Session.SESSION_TRANSACTED); MessageConsumer messageConsumer = consumerConnectionSession.createConsumer(queue)) {
|
||||||
|
|
||||||
|
messageConsumer.receiveNoWait();
|
||||||
|
|
||||||
|
try {
|
||||||
|
// force a local socket close such that the broker sees an exception on the connection and fails the consumer via serverConsumer close
|
||||||
|
((ActiveMQConnection) conn).getTransport().narrow(TcpTransport.class).stop();
|
||||||
|
} catch (Throwable expected) {
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (javax.jms.InvalidClientIDException expected) {
|
||||||
|
// deliberate clash across concurrent connections
|
||||||
|
} catch (Throwable unexpected) {
|
||||||
|
logger.warn(unexpected.getMessage(), unexpected);
|
||||||
|
errors.add(unexpected);
|
||||||
|
numIterations.set(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
done.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(done.await(30, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
Wait.assertEquals(0, () -> server.locateQueue(durableQueue).getConsumerCount(), 5000);
|
||||||
|
assertTrue(errors.isEmpty());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60_000)
|
||||||
|
public void testFastReconnectCreateConsumerNoErrorsNoClientId() throws Exception {
|
||||||
|
|
||||||
|
final ArrayList<Throwable> errors = new ArrayList<>();
|
||||||
|
SimpleString durableQueue = new SimpleString("exampleQueueTwo");
|
||||||
|
this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST));
|
||||||
|
|
||||||
|
Queue queue = new ActiveMQQueue(durableQueue.toString());
|
||||||
|
|
||||||
|
final ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616?closeAsync=false)?startupMaxReconnectAttempts=-1&maxReconnectAttempts=-1&timeout=5000");
|
||||||
|
exFact.setWatchTopicAdvisories(false);
|
||||||
|
exFact.setConnectResponseTimeout(10000);
|
||||||
|
|
||||||
|
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
|
||||||
|
redeliveryPolicy.setRedeliveryDelay(0);
|
||||||
|
redeliveryPolicy.setMaximumRedeliveries(-1);
|
||||||
|
exFact.setRedeliveryPolicy(redeliveryPolicy);
|
||||||
|
|
||||||
|
publish(1000, durableQueue.toString());
|
||||||
|
|
||||||
|
final AtomicInteger numIterations = new AtomicInteger(NUM_ITERATIONS);
|
||||||
|
ExecutorService executor = Executors.newCachedThreadPool();
|
||||||
|
runAfter(executor::shutdownNow);
|
||||||
|
|
||||||
|
final int concurrent = 2;
|
||||||
|
CountDownLatch done = new CountDownLatch(concurrent);
|
||||||
|
for (int i = 0; i < concurrent; i++) {
|
||||||
|
executor.execute(() -> {
|
||||||
|
try (Connection conn = exFact.createConnection();
|
||||||
|
Session consumerConnectionSession = conn.createSession(Session.SESSION_TRANSACTED);
|
||||||
|
MessageConsumer messageConsumer = consumerConnectionSession.createConsumer(queue)
|
||||||
|
) {
|
||||||
|
conn.start();
|
||||||
|
while (numIterations.decrementAndGet() > 0) {
|
||||||
|
try {
|
||||||
|
messageConsumer.receiveNoWait();
|
||||||
|
|
||||||
|
try {
|
||||||
|
// force a local socket close such that the broker sees an exception on the connection and fails the consumer via serverConsumer close
|
||||||
|
((ActiveMQConnection) conn).getTransport().narrow(TcpTransport.class).stop();
|
||||||
|
} catch (Throwable expected) {
|
||||||
|
}
|
||||||
|
} catch (Throwable unexpected) {
|
||||||
|
errors.add(unexpected);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Throwable unexpected) {
|
||||||
|
numIterations.set(0);
|
||||||
|
unexpected.printStackTrace();
|
||||||
|
errors.add(unexpected);
|
||||||
|
} finally {
|
||||||
|
done.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(done.await(30, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
Wait.assertEquals(0, () -> server.locateQueue(durableQueue).getConsumerCount(), 5000);
|
||||||
|
|
||||||
|
assertTrue(errors.isEmpty());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void publish(int numMessages, String name) throws Exception {
|
||||||
|
final ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
|
||||||
|
Connection exConn = exFact.createConnection();
|
||||||
|
exConn.start();
|
||||||
|
|
||||||
|
Queue queue = new ActiveMQQueue(name);
|
||||||
|
|
||||||
|
// put a few messages on the queue to have the broker do some dispatch work
|
||||||
|
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer producer = session.createProducer(queue);
|
||||||
|
|
||||||
|
TextMessage message = session.createTextMessage("This is a text message");
|
||||||
|
for (int i = 0; i < numMessages; i++) {
|
||||||
|
message.setIntProperty("SEQ", i);
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
session.close();
|
||||||
|
exConn.close();
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -47,6 +47,8 @@ public class MemoryAssertions {
|
||||||
assertMemory(checkLeak, 0, ProtonServerReceiverContext.class.getName());
|
assertMemory(checkLeak, 0, ProtonServerReceiverContext.class.getName());
|
||||||
assertMemory(checkLeak, 0, ActiveMQProtonRemotingConnection.class.getName());
|
assertMemory(checkLeak, 0, ActiveMQProtonRemotingConnection.class.getName());
|
||||||
assertMemory(checkLeak, 0, RemotingConnectionImpl.class.getName());
|
assertMemory(checkLeak, 0, RemotingConnectionImpl.class.getName());
|
||||||
|
assertMemory(checkLeak, 0, OpenWireConnection.class.getName());
|
||||||
|
assertMemory(checkLeak, 0, ActiveMQProtonRemotingConnection.class.getName());
|
||||||
assertMemory(checkLeak, 0, ServerSessionImpl.class.getName());
|
assertMemory(checkLeak, 0, ServerSessionImpl.class.getName());
|
||||||
assertMemory(checkLeak, 0, AMQPSessionContext.class.getName());
|
assertMemory(checkLeak, 0, AMQPSessionContext.class.getName());
|
||||||
assertMemory(checkLeak, 0, ServerConsumerImpl.class.getName());
|
assertMemory(checkLeak, 0, ServerConsumerImpl.class.getName());
|
||||||
|
|
Loading…
Reference in New Issue