diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java index 044d179fdf..76b993c69a 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryDatabase; import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO; import org.apache.activemq.artemis.core.persistence.impl.journal.JDBCJournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; +import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; @@ -195,6 +196,7 @@ public class DBOption extends OptionalLocking { configuration.setJournalDirectory(getJournal()); configuration.setPagingDirectory(getPaging()); configuration.setLargeMessagesDirectory(getLargeMessages()); + configuration.setJournalType(JournalType.NIO); } return configuration; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java index 71679b5337..7fcfcd5e1b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java @@ -401,4 +401,8 @@ public class TransportConfiguration implements Serializable { private static String replaceWildcardChars(final String str) { return str.replace('.', '-'); } + + public void setFactoryClassName(String factoryClassName) { + this.factoryClassName = factoryClassName; + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java index c58a0bde6b..47401c5043 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java @@ -429,6 +429,12 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { } } + + protected ClusterTopologyChangeMessage updateTransportConfiguration(final ClusterTopologyChangeMessage topMessage) { + return topMessage; + } + + private class Channel0Handler implements ChannelHandler { private final CoreRemotingConnection conn; @@ -456,13 +462,13 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { topologyResponseHandler.nodeDisconnected(conn, nodeID == null ? null : nodeID.toString(), scaleDownTargetNodeID); } else if (type == PacketImpl.CLUSTER_TOPOLOGY) { ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet; - notifyTopologyChange(topMessage); + notifyTopologyChange(updateTransportConfiguration(topMessage)); } else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2) { ClusterTopologyChangeMessage_V2 topMessage = (ClusterTopologyChangeMessage_V2) packet; - notifyTopologyChange(topMessage); + notifyTopologyChange(updateTransportConfiguration(topMessage)); } else if (type == PacketImpl.CLUSTER_TOPOLOGY || type == PacketImpl.CLUSTER_TOPOLOGY_V2 || type == PacketImpl.CLUSTER_TOPOLOGY_V3) { ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet; - notifyTopologyChange(topMessage); + notifyTopologyChange(updateTransportConfiguration(topMessage)); } else if (type == PacketImpl.CHECK_FOR_FAILOVER_REPLY) { System.out.println("Channel0Handler.handlePacket"); } @@ -471,7 +477,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { /** * @param topMessage */ - private void notifyTopologyChange(final ClusterTopologyChangeMessage topMessage) { + protected void notifyTopologyChange(final ClusterTopologyChangeMessage topMessage) { final long eventUID; final String backupGroupName; final String scaleDownGroupName; diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java index 6a998ef0f0..273038362c 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java @@ -17,12 +17,16 @@ package org.apache.activemq.artemis.core.protocol.hornetq.client; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.version.Version; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; @@ -64,4 +68,27 @@ public class HornetQClientProtocolManager extends ActiveMQClientProtocolManager getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer, VERSION_PLAYED)); } + @Override + public boolean checkForFailover(String liveNodeID) throws ActiveMQException { + //HornetQ doesn't support CheckFailoverMessage packet + return true; + } + + + @Override + protected ClusterTopologyChangeMessage updateTransportConfiguration(final ClusterTopologyChangeMessage topMessage) { + updateTransportConfiguration(topMessage.getPair().getA()); + updateTransportConfiguration(topMessage.getPair().getB()); + return super.updateTransportConfiguration(topMessage); + } + + private void updateTransportConfiguration(TransportConfiguration connector) { + if (connector != null) { + String factoryClassName = connector.getFactoryClassName(); + if ("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory".equals(factoryClassName)) { + connector.setFactoryClassName(NettyConnectorFactory.class.getName()); + } + } + } + } diff --git a/tests/compatibility-tests/src/main/resources/hqfailovertest/hornetqServer.groovy b/tests/compatibility-tests/src/main/resources/hqfailovertest/hornetqServer.groovy new file mode 100644 index 0000000000..83cbe229ba --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/hqfailovertest/hornetqServer.groovy @@ -0,0 +1,369 @@ +package hqfailovertest + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +// This is a failover test with a hornetq server. +// these configurations were taken out of hornetq's testsuite.. more specifically FailoverTestBase +// sorry for the mess of copy & paste here but i couldn't depend directly into hornetq's test jar +// and I coudn't either spawn a new VM here. +// it's not possible to do FileLock without spawning a new VM.. we had InVMNodeManagers that were part of the testsuite +// + + + +import org.hornetq.api.core.* +import org.hornetq.api.core.TransportConfiguration +import org.hornetq.core.config.* +import org.hornetq.core.config.impl.* + +import org.hornetq.core.config.impl.ConfigurationImpl +import org.hornetq.core.server.* +import org.hornetq.core.server.impl.* +import org.hornetq.core.settings.impl.* +import org.hornetq.jms.server.config.impl.* +import org.hornetq.spi.core.security.* +import org.hornetq.core.remoting.impl.invm.* +import org.hornetq.core.remoting.impl.netty.* +import org.hornetq.utils.* + +import javax.management.MBeanServer +import java.lang.management.ManagementFactory +import java.util.concurrent.Semaphore + + + +folder = arg[0]; +nodeManager = new InVMNodeManager(false, folder); + +backupConfig = createDefaultConfig(); +backupConfig.getAcceptorConfigurations().clear(); +backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false)); +backupConfig.setSharedStore(true); +backupConfig.setBackup(true); +backupConfig.setFailbackDelay(1000); + +TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); +TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); +backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); +backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector); +basicClusterConnectionConfig(backupConfig, backupConnector.getName(), liveConnector.getName()); +backupServer = createTestableServer(backupConfig, nodeManager); + +liveConfig = createDefaultConfig(); +liveConfig.getAcceptorConfigurations().clear(); +liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true)); +liveConfig.setSharedStore(true); +liveConfig.setFailbackDelay(1000); + +basicClusterConnectionConfig(liveConfig, liveConnector.getName()); +liveConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); +liveServer = createTestableServer(liveConfig, nodeManager); + +liveServer.start(); +backupServer.start(); + +liveServer.createQueue(SimpleString.toSimpleString("jms.queue.queue"), SimpleString.toSimpleString("jms.queue.queue"), null, true, false); + + + + +protected Configuration createDefaultConfig() throws Exception { + return createDefaultConfig(new HashMap(), StaticProperties.INVM_ACCEPTOR_FACTORY, StaticProperties.NETTY_ACCEPTOR_FACTORY); +} + + +HornetQServer createInVMFailoverServer(final boolean realFiles, + final Configuration configuration, + NodeManager nodeManager, + final int id) { + HornetQServer server; + HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl(); + configuration.setPersistenceEnabled(realFiles); + server = new InVMNodeManagerServer(configuration, + ManagementFactory.getPlatformMBeanServer(), + securityManager, + nodeManager); + + server.setIdentity("Server " + id); + + AddressSettings defaultSetting = new AddressSettings(); + defaultSetting.setPageSizeBytes(10 * 1024 * 1024); + defaultSetting.setMaxSizeBytes(1024 * 1024 * 1024); + + server.getAddressSettingsRepository().addMatch("#", defaultSetting); + + return server; +} + +Configuration createDefaultConfig(final Map params, final String... acceptors) throws Exception { + ConfigurationImpl configuration = createBasicConfig(-1); + + configuration.setFileDeploymentEnabled(false); + configuration.setJMXManagementEnabled(false); + + configuration.getAcceptorConfigurations().clear(); + + for (String acceptor : acceptors) { + TransportConfiguration transportConfig = new TransportConfiguration(acceptor, params); + configuration.getAcceptorConfigurations().add(transportConfig); + } + return configuration; +} + +protected ConfigurationImpl createBasicConfig(final int serverID) +{ + ConfigurationImpl configuration = new ConfigurationImpl(); + configuration.setSecurityEnabled(false); + configuration.setJournalMinFiles(2); + configuration.setJournalFileSize(100 * 1024); + + configuration.setJournalType(JournalType.NIO); + + configuration.setJournalDirectory(folder) + configuration.setBindingsDirectory(folder + "/bindings") + configuration.setPagingDirectory(folder + "/paging") + configuration.setLargeMessagesDirectory(folder + "/largemessages") + + configuration.setJournalCompactMinFiles(0); + configuration.setJournalCompactPercentage(0); + configuration.setClusterPassword("changeme"); + return configuration; +} + + + +protected static final void basicClusterConnectionConfig(Configuration mainConfig, String connectorName, + String... connectors) { + ArrayList connectors0 = new ArrayList(); + for (String c : connectors) { + connectors0.add(c); + } + basicClusterConnectionConfig(mainConfig, connectorName, connectors0); +} + +protected static final void basicClusterConnectionConfig(Configuration mainConfig, + String connectorName, + List connectors) { + ClusterConnectionConfiguration ccc = null; + + ccc = new ClusterConnectionConfiguration("cluster1", "jms", connectorName, 10, false, false, 1, 1, connectors, + false); + + mainConfig.getClusterConfigurations().add(ccc); +} + +public final class InVMNodeManager extends NodeManager +{ + + private final Semaphore liveLock; + + private final Semaphore backupLock; + + public enum State {LIVE, PAUSED, FAILING_BACK, NOT_STARTED} + + public State state = State.NOT_STARTED; + + public long failoverPause = 0l; + + public InVMNodeManager(boolean replicatedBackup) + { + this(replicatedBackup, null); + if (replicatedBackup) + throw new RuntimeException("if replicated-backup, we need its journal directory"); + } + + public InVMNodeManager(boolean replicatedBackup, String directory) + { + super(replicatedBackup, directory); + liveLock = new Semaphore(1); + backupLock = new Semaphore(1); + setUUID(UUIDGenerator.getInstance().generateUUID()); + } + + public void awaitLiveNode() throws Exception + { + while (true) + { + while (state == State.NOT_STARTED) + { + Thread.sleep(2000); + } + + liveLock.acquire(); + + if (state == State.PAUSED) + { + liveLock.release(); + Thread.sleep(2000); + } + else if (state == State.FAILING_BACK) + { + liveLock.release(); + Thread.sleep(2000); + } + else if (state == State.LIVE) + { + break; + } + } + if(failoverPause > 0l) + { + Thread.sleep(failoverPause); + } + } + + public void startBackup() throws Exception + { + backupLock.acquire(); + } + + public void startLiveNode() throws Exception + { + state = State.FAILING_BACK; + liveLock.acquire(); + state = State.LIVE; + } + + public void pauseLiveServer() throws Exception + { + state = State.PAUSED; + liveLock.release(); + } + + public void crashLiveServer() throws Exception + { + //overkill as already set to live + state = State.LIVE; + liveLock.release(); + } + + public boolean isAwaitingFailback() throws Exception + { + return state == State.FAILING_BACK; + } + + public boolean isBackupLive() throws Exception + { + return liveLock.availablePermits() == 0; + } + + public void interrupt() + { + // + } + + public void releaseBackup() + { + if (backupLock != null) + { + backupLock.release(); + } + } + + public SimpleString readNodeId() throws HornetQIllegalStateException, IOException + { + return getNodeId(); + } +} + +final class InVMNodeManagerServer extends HornetQServerImpl { + final NodeManager nodeManager; + + public InVMNodeManagerServer(final NodeManager nodeManager) { + super(); + this.nodeManager = nodeManager; + } + + public InVMNodeManagerServer(final Configuration configuration, final NodeManager nodeManager) { + super(configuration); + this.nodeManager = nodeManager; + } + + public InVMNodeManagerServer(final Configuration configuration, + final MBeanServer mbeanServer, + final NodeManager nodeManager) { + super(configuration, mbeanServer); + this.nodeManager = nodeManager; + } + + public InVMNodeManagerServer(final Configuration configuration, + final HornetQSecurityManager securityManager, + final NodeManager nodeManager) { + super(configuration, securityManager); + this.nodeManager = nodeManager; + } + + public InVMNodeManagerServer(final Configuration configuration, + final MBeanServer mbeanServer, + final HornetQSecurityManager securityManager, + final NodeManager nodeManager) { + super(configuration, mbeanServer, securityManager); + this.nodeManager = nodeManager; + } + + @Override + protected NodeManager createNodeManager( + final String directory, final String nodeGroupName, boolean replicatingBackup) { + nodeManager.setNodeGroupName(nodeGroupName); + return nodeManager; + } + +} + + +private static TransportConfiguration transportConfiguration(String classname, boolean live, int server) +{ + if (classname.contains("netty")) + { + Map serverParams = new HashMap(); + Integer port = live ? 5445 : 5545; + serverParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port); + return new TransportConfiguration(classname, serverParams); + } + + Map serverParams = new HashMap(); + serverParams.put(TransportConstants.SERVER_ID_PROP_NAME, live ? server : server + 100); + return new TransportConfiguration(classname, serverParams); +} + + + +protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) +{ + return transportConfiguration(StaticProperties.NETTY_ACCEPTOR_FACTORY, live, 1); +} + +protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) +{ + return transportConfiguration(StaticProperties.NETTY_CONNECTOR_FACTORY, live, 1); +} + +protected HornetQServer createTestableServer(Configuration config, NodeManager nodeManager) +{ + return createInVMFailoverServer(true, config, nodeManager, 1) +} + + +class StaticProperties { + static String INVM_ACCEPTOR_FACTORY = InVMAcceptorFactory.class.getCanonicalName(); + static String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName(); + static String NETTY_ACCEPTOR_FACTORY = NettyAcceptorFactory.class.getCanonicalName(); + static String NETTY_CONNECTOR_FACTORY = NettyConnectorFactory.class.getCanonicalName(); +} diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQFailoverTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQFailoverTest.java new file mode 100644 index 0000000000..6bdf664b99 --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQFailoverTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.compatibility; + +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.client.FailoverEventType; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.utils.FileUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.HORNETQ_235; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** This test will run a hornetq server with artemis clients + * and it will make sure that failover happens without any problems. */ +@RunWith(Parameterized.class) +public class HQFailoverTest extends VersionedBaseTest { + + @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}") + public static Collection getParameters() { + List combinations = new ArrayList<>(); + + combinations.add(new Object[]{HORNETQ_235, SNAPSHOT, SNAPSHOT}); + return combinations; + } + + public HQFailoverTest(String server, String sender, String receiver) throws Exception { + super(server, sender, receiver); + } + + @Before + public void setUp() throws Throwable { + FileUtil.deleteDirectory(serverFolder.getRoot()); + evaluate(serverClassloader, "hqfailovertest/hornetqServer.groovy", serverFolder.getRoot().getAbsolutePath()); + } + + @After + public void tearDown() throws Throwable { + execute(serverClassloader, "backupServer.stop(); liveServer.stop();"); + + } + + @Test + public void failoverTest() throws Throwable { + String textBody = "a rapadura e doce mas nao e mole nao"; + String queueName = "queue"; + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:5445?ha=true&reconnectAttempts=10&protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false"); + + ActiveMQConnection conn = (ActiveMQConnection) cf.createConnection(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage(textBody + i)); + } + + CountDownLatch latch = new CountDownLatch(1); + conn.setFailoverListener(eventType -> { + if (eventType == FailoverEventType.FAILOVER_COMPLETED) { + latch.countDown(); + } + }); + + execute(serverClassloader, "liveServer.stop(true)"); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + conn.start(); + queue = session.createQueue("queue"); + MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0; i < 10; i++) { + Message msg = consumer.receive(5000); + assertNotNull(msg); + } + assertNull(consumer.receiveNoWait()); + + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/compat/HQClientProtocolManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/compat/HQClientProtocolManagerTest.java new file mode 100644 index 0000000000..7bb070aedb --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/compat/HQClientProtocolManagerTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.remoting.compat; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Test; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class HQClientProtocolManagerTest extends ActiveMQTestBase { + + @Test + public void testNoCheckFailoverMessage() throws Exception { + final int pingPeriod = 1000; + + ActiveMQServer server = createServer(false, true); + + server.start(); + + ClientSessionInternal session = null; + + try { + ServerLocator locator = createFactory(true).setClientFailureCheckPeriod(pingPeriod).setRetryInterval(500).setRetryIntervalMultiplier(1d).setReconnectAttempts(-1).setConfirmationWindowSize(1024 * 1024); + locator.setProtocolManagerFactory(new HornetQClientProtocolManagerFactory()); + ClientSessionFactory factory = createSessionFactory(locator); + + session = (ClientSessionInternal) factory.createSession(); + + server.stop(); + + Thread.sleep((pingPeriod * 2)); + + List incomings = server.getConfiguration().getIncomingInterceptorClassNames(); + incomings.add(UnsupportedPacketInterceptor.class.getName()); + + server.start(); + + //issue a query to make sure session is reconnected. + ClientSession.QueueQuery query = session.queueQuery(new SimpleString("anyvalue")); + assertFalse(query.isExists()); + + locator.close(); + + UnsupportedPacketInterceptor.checkReceivedTypes(); + } finally { + try { + session.close(); + } catch (Throwable e) { + } + + server.stop(); + } + } + + + public static class UnsupportedPacketInterceptor implements Interceptor { + + private static Set receivedTypes = new HashSet<>(); + private static Set unsupportedTypes = new HashSet<>(); + static { + unsupportedTypes.add(PacketImpl.CHECK_FOR_FAILOVER); + } + + @Override + public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { + receivedTypes.add(packet.getType()); + return true; + } + + public static void checkReceivedTypes() throws Exception { + for (Byte type : receivedTypes) { + assertFalse("Received unsupported type: " + type, unsupportedTypes.contains(type)); + } + } + } +}