diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java index 1c7b3f2748..b37d75009d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java @@ -16,7 +16,9 @@ */ package org.apache.activemq.artemis.core.server.federation; +import java.lang.invoke.MethodHandles; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -34,11 +36,15 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.transformer.Transformer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.LargeData; public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, SessionFailureListener { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final ActiveMQServer server; private final Federation federation; private final FederatedConsumerKey key; @@ -49,6 +55,8 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi private final int intialConnectDelayMultiplier = 2; private final int intialConnectDelayMax = 30; private final ClientSessionCallback clientSessionCallback; + private boolean started = false; + private volatile ScheduledFuture currentConnectTask; private ClientSessionFactoryInternal clientSessionFactory; private ClientSession clientSession; @@ -95,24 +103,29 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi } @Override - public void start() { - scheduleConnect(0); + public synchronized void start() { + if (!started) { + started = true; + scheduleConnect(0); + } } private void scheduleConnect(int delay) { - scheduledExecutorService.schedule(() -> { + currentConnectTask = scheduledExecutorService.schedule(() -> { try { connect(); } catch (Exception e) { - scheduleConnect(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax)); + int nextDelay = FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax); + logger.trace("{} failed to connect. Scheduling reconnect in {} seconds.", this, nextDelay, e); + scheduleConnect(nextDelay); } }, delay, TimeUnit.SECONDS); } - private void connect() throws Exception { - try { - if (clientConsumer == null) { - synchronized (this) { + private synchronized void connect() throws Exception { + if (started) { + try { + if (clientConsumer == null) { this.clientSessionFactory = (ClientSessionFactoryInternal) upstream.getConnection().clientSessionFactory(); this.clientSession = clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(), false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(), clientSessionFactory.getServerLocator().getAckBatchSize()); this.clientSession.addFailureListener(this); @@ -129,22 +142,26 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote"); } } - } - } catch (Exception e) { - try { - if (clientSessionFactory != null) { - clientSessionFactory.cleanup(); + } catch (Exception e) { + try { + if (clientSessionFactory != null) { + clientSessionFactory.cleanup(); + } + disconnect(); + } catch (ActiveMQException ignored) { } - disconnect(); - } catch (ActiveMQException ignored) { + throw e; } - throw e; } } @Override - public void close() { - scheduleDisconnect(0); + public synchronized void close() { + if (started) { + started = false; + currentConnectTask.cancel(false); + scheduleDisconnect(0); + } } private void scheduleDisconnect(int delay) { @@ -159,13 +176,12 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi private void disconnect() throws ActiveMQException { if (clientConsumer != null) { clientConsumer.close(); + clientConsumer = null; } if (clientSession != null) { clientSession.close(); + clientSession = null; } - clientConsumer = null; - clientSession = null; - if (clientSessionFactory != null && clientSessionFactory.numSessions() == 0 && !upstream.getConnection().isSharedConnection()) { clientSessionFactory.close(); clientSessionFactory = null; @@ -241,13 +257,18 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi clientSessionFactory = null; } catch (Throwable dontCare) { } - start(); + scheduleConnect(0); } @Override public void beforeReconnect(ActiveMQException exception) { } + // used for testing + public ScheduledFuture getCurrentConnectTask() { + return currentConnectTask; + } + public interface ClientSessionCallback { void callback(ClientSession clientSession) throws ActiveMQException; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java new file mode 100644 index 0000000000..62ed87e356 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.federation; + +import java.util.concurrent.ScheduledFuture; + +import org.apache.activemq.artemis.core.config.FederationConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl; +import org.apache.activemq.artemis.core.server.federation.Federation; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.RandomUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.Test; + +public class FederatedQueueConsumerTest extends ActiveMQTestBase { + + @Test + public void testClose() throws Exception { + ActiveMQServer server = createServer(false, createDefaultInVMConfig()); + server.start(); + Federation federation = new Federation(server, new FederationConfiguration().setName(RandomUtil.randomString())); + federation.start(); + FederatedQueueConsumerImpl consumer = new FederatedQueueConsumerImpl(federation, server, null, null, null, null); + assertNull(consumer.getCurrentConnectTask()); + consumer.start(); + assertNotNull(consumer.getCurrentConnectTask()); + consumer.close(); + Wait.assertTrue(() -> { + ScheduledFuture task = consumer.getCurrentConnectTask(); + return (task.isDone() || task.isCancelled()) && task == consumer.getCurrentConnectTask(); + }, 2000, 50); + } +}