ARTEMIS-1691 JMS bridge can't be manually restarted after failure
This commit is contained in:
parent
021993fe7c
commit
a1db72395c
|
@ -406,6 +406,7 @@ public final class JMSBridgeImpl implements JMSBridge {
|
|||
if (ok) {
|
||||
connectedSource = true;
|
||||
connectedTarget = true;
|
||||
failed = false;
|
||||
startSource();
|
||||
} else {
|
||||
ActiveMQJMSBridgeLogger.LOGGER.errorStartingBridge(bridgeName);
|
||||
|
@ -797,7 +798,7 @@ public final class JMSBridgeImpl implements JMSBridge {
|
|||
@Override
|
||||
public synchronized void setMaxRetries(final int retries) {
|
||||
checkBridgeNotStarted();
|
||||
JMSBridgeImpl.checkValidValue(retries, "MaxRetries");
|
||||
JMSBridgeImpl.checkValidValue(retries, "MaxRetries", true);
|
||||
|
||||
maxRetries = retries;
|
||||
}
|
||||
|
@ -921,7 +922,7 @@ public final class JMSBridgeImpl implements JMSBridge {
|
|||
checkNotNull(sourceDestinationFactory, "sourceDestinationFactory");
|
||||
checkNotNull(targetDestinationFactory, "targetDestinationFactory");
|
||||
checkValidValue(failureRetryInterval, "failureRetryInterval");
|
||||
checkValidValue(maxRetries, "maxRetries");
|
||||
checkValidValue(maxRetries, "maxRetries", true);
|
||||
if (failureRetryInterval == -1 && maxRetries > 0) {
|
||||
throw new IllegalArgumentException("If failureRetryInterval == -1 maxRetries must be set to -1");
|
||||
}
|
||||
|
@ -953,11 +954,23 @@ public final class JMSBridgeImpl implements JMSBridge {
|
|||
}
|
||||
|
||||
/**
|
||||
* Check that value is either equals to -1 or greater than 0
|
||||
* Check that value is either equals to -1 or > 0
|
||||
*
|
||||
* @throws IllegalArgumentException if the value is not valid
|
||||
*/
|
||||
private static void checkValidValue(final long value, final String name) {
|
||||
checkValidValue(value, name, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that value is either equals to -1 or >= 0
|
||||
*
|
||||
* @throws IllegalArgumentException if the value is not valid
|
||||
*/
|
||||
private static void checkValidValue(final long value, final String name, boolean allowZero) {
|
||||
if (value == 0 && allowZero) {
|
||||
return;
|
||||
}
|
||||
if (!(value == -1 || value > 0)) {
|
||||
throw new IllegalArgumentException(name + " must be > 0 or -1");
|
||||
}
|
||||
|
|
|
@ -0,0 +1,159 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.jms.bridge;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.transaction.TransactionManager;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
||||
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory;
|
||||
import org.apache.activemq.artemis.jms.bridge.DestinationFactory;
|
||||
import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode;
|
||||
import org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class JMSBridgeImplTest extends ActiveMQTestBase {
|
||||
|
||||
private static final String SOURCE = RandomUtil.randomString();
|
||||
|
||||
private static final String TARGET = RandomUtil.randomString();
|
||||
|
||||
private ActiveMQServer server;
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
server = createServer(false, createDefaultInVMConfig());
|
||||
server.start();
|
||||
|
||||
server.createQueue(new QueueConfiguration(SOURCE).setRoutingType(RoutingType.ANYCAST));
|
||||
server.createQueue(new QueueConfiguration(TARGET).setRoutingType(RoutingType.ANYCAST));
|
||||
}
|
||||
|
||||
private static ConnectionFactory createConnectionFactory() {
|
||||
ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName()));
|
||||
cf.setReconnectAttempts(0);
|
||||
cf.setBlockOnNonDurableSend(true);
|
||||
cf.setBlockOnDurableSend(true);
|
||||
return cf;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExceptionOnSourceAndManualRestartSucceeds() throws Exception {
|
||||
final AtomicReference<Connection> sourceConn = new AtomicReference<>();
|
||||
ActiveMQJMSConnectionFactory failingSourceCF = new ActiveMQJMSConnectionFactory(false, new TransportConfiguration(InVMConnectorFactory.class.getName())) {
|
||||
private static final long serialVersionUID = -8866390811966688830L;
|
||||
|
||||
@Override
|
||||
public Connection createConnection() throws JMSException {
|
||||
sourceConn.set(super.createConnection());
|
||||
return sourceConn.get();
|
||||
}
|
||||
};
|
||||
// Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
|
||||
failingSourceCF.setReconnectAttempts(0);
|
||||
failingSourceCF.setBlockOnNonDurableSend(true);
|
||||
failingSourceCF.setBlockOnDurableSend(true);
|
||||
|
||||
ConnectionFactoryFactory sourceCFF = () -> failingSourceCF;
|
||||
ConnectionFactoryFactory targetCFF = () -> createConnectionFactory();
|
||||
DestinationFactory sourceDF = () -> ActiveMQJMSClient.createQueue(JMSBridgeImplTest.SOURCE);
|
||||
DestinationFactory targetDF = () -> ActiveMQJMSClient.createQueue(JMSBridgeImplTest.TARGET);
|
||||
TransactionManager tm = Mockito.mock(TransactionManager.class);
|
||||
|
||||
JMSBridgeImpl bridge = new JMSBridgeImpl();
|
||||
bridge.setSourceConnectionFactoryFactory(sourceCFF);
|
||||
bridge.setSourceDestinationFactory(sourceDF);
|
||||
bridge.setTargetConnectionFactoryFactory(targetCFF);
|
||||
bridge.setTargetDestinationFactory(targetDF);
|
||||
bridge.setFailureRetryInterval(1);
|
||||
bridge.setMaxRetries(0);
|
||||
bridge.setMaxBatchSize(1);
|
||||
bridge.setMaxBatchTime(-1);
|
||||
bridge.setTransactionManager(tm);
|
||||
bridge.setQualityOfServiceMode(QualityOfServiceMode.AT_MOST_ONCE);
|
||||
|
||||
Assert.assertFalse(bridge.isStarted());
|
||||
bridge.start();
|
||||
Assert.assertTrue(bridge.isStarted());
|
||||
|
||||
// make sure the bridge is actually working first
|
||||
Connection targetConn = createConnectionFactory().createConnection();
|
||||
Session targetSess = targetConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = targetSess.createConsumer(targetDF.createDestination());
|
||||
final List<Message> messages = new LinkedList<>();
|
||||
MessageListener listener = message -> messages.add(message);
|
||||
consumer.setMessageListener(listener);
|
||||
targetConn.start();
|
||||
|
||||
Session sourceSess = sourceConn.get().createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = sourceSess.createProducer(sourceDF.createDestination());
|
||||
producer.send(sourceSess.createTextMessage());
|
||||
|
||||
Wait.assertEquals(1, () -> messages.size(), 2000, 100);
|
||||
|
||||
sourceConn.get().getExceptionListener().onException(new JMSException("exception on the source"));
|
||||
Wait.assertTrue(() -> bridge.isFailed(), 2000, 50);
|
||||
targetConn.close();
|
||||
bridge.stop();
|
||||
Assert.assertFalse(bridge.isStarted());
|
||||
bridge.start();
|
||||
Assert.assertTrue(bridge.isStarted());
|
||||
|
||||
// test the bridge again after it's been restarted to ensure it's working
|
||||
targetConn = JMSBridgeImplTest.createConnectionFactory().createConnection();
|
||||
targetSess = targetConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
consumer = targetSess.createConsumer(targetDF.createDestination());
|
||||
messages.clear();
|
||||
consumer.setMessageListener(listener);
|
||||
targetConn.start();
|
||||
|
||||
sourceSess = sourceConn.get().createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
producer = sourceSess.createProducer(sourceDF.createDestination());
|
||||
producer.send(sourceSess.createTextMessage());
|
||||
|
||||
Wait.assertEquals(1, () -> messages.size(), 2000, 100);
|
||||
targetConn.close();
|
||||
sourceConn.get().close();
|
||||
bridge.stop();
|
||||
Assert.assertFalse(bridge.isStarted());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue