diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java index 0c4faa25f5..323da6a5d7 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java @@ -49,7 +49,7 @@ public abstract class ProcessorBase extends HandlerBase { // Request of forced shutdown private volatile boolean requestedForcedShutdown = false; // Request of educated shutdown: - private volatile boolean requestedShutdown = false; + protected volatile boolean requestedShutdown = false; // Request to yield to another thread private volatile boolean yielded = false; diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java index 4e7bbb6ded..7b79d45ada 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java @@ -41,7 +41,6 @@ public class ThresholdActor extends ProcessorBase { private final ActorListener listener; private final Runnable overThreshold; private final Runnable clearThreshold; - private volatile Runnable shutdownTask; public ThresholdActor(Executor parent, ActorListener listener, int maxSize, ToIntFunction sizeGetter, Runnable overThreshold, Runnable clearThreshold) { super(parent); @@ -54,10 +53,6 @@ public class ThresholdActor extends ProcessorBase { @Override protected final void doTask(Object task) { - if (task == shutdownTask) { - shutdownTask.run(); - return; - } if (task == FLUSH) { clearThreshold.run(); // should set to 0 no matter the value. There's a single thread setting this value back to zero @@ -100,14 +95,8 @@ public class ThresholdActor extends ProcessorBase { } } - public void shutdown(Runnable runnable) { - // do no more pending work + public void requestShutdown() { + requestedShutdown = true; tasks.clear(); - // run this task next - shutdownTask = runnable; - tasks.add(runnable); - // wait for shutdown task to complete - flush(); - shutdown(); } } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java index e554d632f7..5d7fe27c13 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java @@ -182,53 +182,4 @@ public class ThresholdActorTest { executorService.shutdown(); } } - - - @Test - public void testShutdownTask() throws Exception { - AtomicInteger lastAcquireFailed = new AtomicInteger(0); - lastProcessed.set(0); - - Semaphore allowedTasks = new Semaphore(10); - CountDownLatch completedTasks = new CountDownLatch(11); - CountDownLatch pendingTasks = new CountDownLatch(11); - - final ExecutorService executorService = Executors.newSingleThreadExecutor(); - - ThresholdActor actor = new ThresholdActor<>(executorService, (i) -> { - try { - pendingTasks.countDown(); - if (allowedTasks.tryAcquire(1, 200, TimeUnit.MILLISECONDS)) { - lastProcessed.set(i); - } else { - lastAcquireFailed.set(i); - } - completedTasks.countDown(); - } catch (InterruptedException ignored) { - } - - }, 1000, (e) -> { - return 1; - }, () -> { - }, () -> { - }); - - // expect allowedTasks tasks to complete - for (int i = 1; i < 100; i++) { - actor.act(i); - } - // wait for task processing - Assert.assertTrue(pendingTasks.await(4, TimeUnit.SECONDS)); - - actor.shutdown(() -> { - lastProcessed.set(lastProcessed.get() * 1000); - }); - - Assert.assertTrue(completedTasks.await(4, TimeUnit.SECONDS)); - - // assert processing terminated at block point - Assert.assertEquals(10000, lastProcessed.get()); - // pending task executed as expected - Assert.assertEquals(11, lastAcquireFailed.get()); - } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index ee895a51a1..670eca258d 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.core.protocol.openwire; import javax.jms.IllegalStateException; -import javax.jms.InvalidClientIDException; import javax.jms.InvalidDestinationException; import javax.jms.JMSSecurityException; import javax.transaction.xa.XAException; @@ -761,7 +760,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se final ThresholdActor localVisibleActor = openWireActor; if (localVisibleActor != null) { - localVisibleActor.shutdown(() -> doFail(me, message)); + localVisibleActor.requestShutdown(); + } + + if (executor != null) { + executor.execute(() -> doFail(me, message)); } else { doFail(me, message); } @@ -779,11 +782,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } try { if (this.getConnectionInfo() != null) { - protocolManager.removeConnection(this.getConnectionInfo(), me); + protocolManager.removeConnection(getClientID(), this); } - } catch (InvalidClientIDException e) { - logger.warn("Couldn't close connection because invalid clientID", e); } finally { + try { + disconnect(false); + } catch (Throwable e) { + // it should never happen, but never say never + logger.debug("OpenWireConnection::disconnect failure", e); + } + // there may be some transactions not associated with sessions // deal with them after sessions are removed via connection removal operationContext.executeOnCompletion(new IOCallback() { @@ -876,29 +884,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), validatedUser, false); } - //raise the refCount of context - public void reconnect(AMQConnectionContext existingContext, ConnectionInfo info) throws Exception { - this.context = existingContext; - WireFormatInfo wireFormatInfo = inWireFormat.getPreferedWireFormatInfo(); - // Older clients should have been defaulting this field to true.. but - // they were not. - if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) { - info.setClientMaster(true); - } - if (info.getClientIp() == null) { - info.setClientIp(getRemoteAddress()); - } - createInternalSession(info); - state = new ConnectionState(info); - state.reset(info); - - context.setConnection(this); - context.setConnectionState(state); - context.setClientMaster(info.isClientMaster()); - context.setFaultTolerant(info.isFaultTolerant()); - context.setReconnect(true); - } - /** * This will answer with commands to the client */ @@ -1817,13 +1802,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception { //we let protocol manager to handle connection add/remove + for (SessionState sessionState : state.getSessionStates()) { + propagateLastSequenceId(sessionState, lastDeliveredSequenceId); + } try { - for (SessionState sessionState : state.getSessionStates()) { - propagateLastSequenceId(sessionState, lastDeliveredSequenceId); - } - protocolManager.removeConnection(state.getInfo(), null); - } catch (Throwable e) { - // log + protocolManager.removeConnection(getClientID(), OpenWireConnection.this); + } finally { + OpenWireConnection.this.disconnect(false); } return null; } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index f1fd7fb6e1..56ff48552c 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -109,7 +109,7 @@ public class OpenWireProtocolManager extends AbstractProtocolManager connections = new CopyOnWriteArrayList<>(); - private final Map clientIdSet = new ConcurrentHashMap<>(); + private final ConcurrentHashMap clientIdSet = new ConcurrentHashMap<>(); private String brokerName; @@ -244,18 +244,9 @@ public class OpenWireProtocolManager extends AbstractProtocolManager { + try (Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue("test-queue"))) { + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } finally { + done.countDown(); + } + }); + + Assert.assertTrue(beforeCreateCalled.await(5, TimeUnit.MINUTES)); + + server.getRemotingService().getConnections().forEach(r -> { + r.fail(new ActiveMQException("this is a simulation")); + }); + + goCreateConsumer.countDown(); + + Wait.assertEquals(0, serverQueue::getConsumerCount); + } + + + } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FastReconnectOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FastReconnectOpenWireTest.java new file mode 100644 index 0000000000..1a41b32580 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FastReconnectOpenWireTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FastReconnectOpenWireTest extends OpenWireTestBase { + + // change this number to give the test a bit more of spinning + private static final int NUM_ITERATIONS = 50; + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Override + protected void configureAddressSettings(Map addressSettingsMap) { + super.configureAddressSettings(addressSettingsMap); + // force send to dlq early + addressSettingsMap.put("exampleQueue", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(2)); + // force send to dlq late + addressSettingsMap.put("exampleQueueTwo", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(-1)); + } + + + @Test(timeout = 60_000) + public void testFastReconnectCreateConsumerNoErrors() throws Exception { + + final ArrayList errors = new ArrayList<>(); + SimpleString durableQueue = new SimpleString("exampleQueueTwo"); + this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST)); + + Queue queue = new ActiveMQQueue(durableQueue.toString()); + + final ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616?closeAsync=false)?startupMaxReconnectAttempts=-1&maxReconnectAttempts=-1&timeout=5000"); + exFact.setWatchTopicAdvisories(false); + exFact.setConnectResponseTimeout(10000); + exFact.setClientID("myID"); + + RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + redeliveryPolicy.setRedeliveryDelay(0); + redeliveryPolicy.setMaximumRedeliveries(-1); + exFact.setRedeliveryPolicy(redeliveryPolicy); + + publish(1000, durableQueue.toString()); + + final AtomicInteger numIterations = new AtomicInteger(NUM_ITERATIONS); + ExecutorService executor = Executors.newCachedThreadPool(); + runAfter(executor::shutdownNow); + + final int concurrent = 2; + final CountDownLatch done = new CountDownLatch(concurrent); + for (int i = 0; i < concurrent; i++) { + executor.execute(() -> { + try { + while (numIterations.decrementAndGet() > 0) { + try (Connection conn = exFact.createConnection(); Session consumerConnectionSession = conn.createSession(Session.SESSION_TRANSACTED); MessageConsumer messageConsumer = consumerConnectionSession.createConsumer(queue)) { + + messageConsumer.receiveNoWait(); + + try { + // force a local socket close such that the broker sees an exception on the connection and fails the consumer via serverConsumer close + ((ActiveMQConnection) conn).getTransport().narrow(TcpTransport.class).stop(); + } catch (Throwable expected) { + } + + } catch (javax.jms.InvalidClientIDException expected) { + // deliberate clash across concurrent connections + } catch (Throwable unexpected) { + logger.warn(unexpected.getMessage(), unexpected); + errors.add(unexpected); + numIterations.set(0); + } + } + } finally { + done.countDown(); + } + }); + } + + assertTrue(done.await(30, TimeUnit.SECONDS)); + + Wait.assertEquals(0, () -> server.locateQueue(durableQueue).getConsumerCount(), 5000); + assertTrue(errors.isEmpty()); + + } + + @Test(timeout = 60_000) + public void testFastReconnectCreateConsumerNoErrorsNoClientId() throws Exception { + + final ArrayList errors = new ArrayList<>(); + SimpleString durableQueue = new SimpleString("exampleQueueTwo"); + this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST)); + + Queue queue = new ActiveMQQueue(durableQueue.toString()); + + final ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616?closeAsync=false)?startupMaxReconnectAttempts=-1&maxReconnectAttempts=-1&timeout=5000"); + exFact.setWatchTopicAdvisories(false); + exFact.setConnectResponseTimeout(10000); + + RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + redeliveryPolicy.setRedeliveryDelay(0); + redeliveryPolicy.setMaximumRedeliveries(-1); + exFact.setRedeliveryPolicy(redeliveryPolicy); + + publish(1000, durableQueue.toString()); + + final AtomicInteger numIterations = new AtomicInteger(NUM_ITERATIONS); + ExecutorService executor = Executors.newCachedThreadPool(); + runAfter(executor::shutdownNow); + + final int concurrent = 2; + CountDownLatch done = new CountDownLatch(concurrent); + for (int i = 0; i < concurrent; i++) { + executor.execute(() -> { + try (Connection conn = exFact.createConnection(); + Session consumerConnectionSession = conn.createSession(Session.SESSION_TRANSACTED); + MessageConsumer messageConsumer = consumerConnectionSession.createConsumer(queue) + ) { + conn.start(); + while (numIterations.decrementAndGet() > 0) { + try { + messageConsumer.receiveNoWait(); + + try { + // force a local socket close such that the broker sees an exception on the connection and fails the consumer via serverConsumer close + ((ActiveMQConnection) conn).getTransport().narrow(TcpTransport.class).stop(); + } catch (Throwable expected) { + } + } catch (Throwable unexpected) { + errors.add(unexpected); + } + } + } catch (Throwable unexpected) { + numIterations.set(0); + unexpected.printStackTrace(); + errors.add(unexpected); + } finally { + done.countDown(); + } + }); + } + + assertTrue(done.await(30, TimeUnit.SECONDS)); + + Wait.assertEquals(0, () -> server.locateQueue(durableQueue).getConsumerCount(), 5000); + + assertTrue(errors.isEmpty()); + + } + + private void publish(int numMessages, String name) throws Exception { + final ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(); + Connection exConn = exFact.createConnection(); + exConn.start(); + + Queue queue = new ActiveMQQueue(name); + + // put a few messages on the queue to have the broker do some dispatch work + Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + + TextMessage message = session.createTextMessage("This is a text message"); + for (int i = 0; i < numMessages; i++) { + message.setIntProperty("SEQ", i); + producer.send(message); + } + session.close(); + exConn.close(); + + } +} diff --git a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java index dc926e517c..0345e1c7b2 100644 --- a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java +++ b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java @@ -47,6 +47,8 @@ public class MemoryAssertions { assertMemory(checkLeak, 0, ProtonServerReceiverContext.class.getName()); assertMemory(checkLeak, 0, ActiveMQProtonRemotingConnection.class.getName()); assertMemory(checkLeak, 0, RemotingConnectionImpl.class.getName()); + assertMemory(checkLeak, 0, OpenWireConnection.class.getName()); + assertMemory(checkLeak, 0, ActiveMQProtonRemotingConnection.class.getName()); assertMemory(checkLeak, 0, ServerSessionImpl.class.getName()); assertMemory(checkLeak, 0, AMQPSessionContext.class.getName()); assertMemory(checkLeak, 0, ServerConsumerImpl.class.getName());