From 1693db0177c80a87444438839396e15678e719af Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Wed, 31 Jan 2018 09:05:47 +0800 Subject: [PATCH] ARTEMIS-1639 HornetQClientProtocolManager sending unsupported packet HornetQClientProtocolManager is used to connect HornteQ servers. During reconnect, it sends a CheckFailoverMessage packet to the server as part of reconnection. This packet is not supported by HornetQ server (existing release), so it will break the backward compatibility. Also fixed a failover issue where a hornetq NettyConnector's ConnectorFactory is serialized to the clients who cannot instantiate it because class not found exception. --- .../api/core/TransportConfiguration.java | 4 + .../client/impl/ClientSessionFactoryImpl.java | 4 + .../core/remoting/ClientProtocolManager.java | 3 + .../client/HornetQClientProtocolManager.java | 17 +++ .../clients/artemisHQClientHA.groovy | 37 +++++++ .../hqclientProtocolTest/failoverTest.groovy | 85 ++++++++++++++ .../servers/node/hornetqServer.groovy | 89 +++++++++++++++ .../servers/node/hornetqServer_backup.groovy | 91 +++++++++++++++ .../compatibility/FailoverServerBaseTest.java | 45 ++++++++ .../compatibility/HQClientProtocolTest.java | 84 ++++++++++++++ .../compatibility/VersionedBaseTest.java | 31 ++++++ .../compat/HQClientProtocolManagerTest.java | 104 ++++++++++++++++++ 12 files changed, 594 insertions(+) create mode 100644 tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy create mode 100644 tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy create mode 100644 tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy create mode 100644 tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy create mode 100644 tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java create mode 100644 tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQClientProtocolTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/compat/HQClientProtocolManagerTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java index 71679b5337..7fcfcd5e1b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java @@ -401,4 +401,8 @@ public class TransportConfiguration implements Serializable { private static String replaceWildcardChars(final String str) { return str.replace('.', '-'); } + + public void setFactoryClassName(String factoryClassName) { + this.factoryClassName = factoryClassName; + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index e8ac8f8d46..e2bfd0fdc5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -256,6 +256,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C public void setBackupConnector(final TransportConfiguration live, final TransportConfiguration backUp) { Connector localConnector = connector; + if (backUp != null) { + this.clientProtocolManager.updateTransportConfiguration(backUp); + } + // if the connector has never been used (i.e. the getConnection hasn't been called yet), we will need // to create a connector just to validate if the parameters are ok. // so this will create the instance to be used on the isEquivalent check diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java index e2c9fc1ee2..9788e1e4cb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java @@ -22,6 +22,7 @@ import java.util.concurrent.locks.Lock; import io.netty.channel.ChannelPipeline; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -78,4 +79,6 @@ public interface ClientProtocolManager { String getName(); + default void updateTransportConfiguration(TransportConfiguration backUp) { + } } diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java index 6a998ef0f0..f0010eab40 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java @@ -17,12 +17,15 @@ package org.apache.activemq.artemis.core.protocol.hornetq.client; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.version.Version; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; @@ -64,4 +67,18 @@ public class HornetQClientProtocolManager extends ActiveMQClientProtocolManager getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer, VERSION_PLAYED)); } + @Override + public boolean checkForFailover(String liveNodeID) throws ActiveMQException { + //HornetQ doesn't support CheckFailoverMessage packet + return true; + } + + @Override + public void updateTransportConfiguration(TransportConfiguration connector) { + String factoryClassName = connector.getFactoryClassName(); + if ("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory".equals(factoryClassName)) { + connector.setFactoryClassName(NettyConnectorFactory.class.getName()); + } + } + } diff --git a/tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy b/tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy new file mode 100644 index 0000000000..c179a1b7ec --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy @@ -0,0 +1,37 @@ +package clients +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Create a client connection factory + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory +import org.apache.activemq.artemis.tests.compatibility.GroovyRun; + +println("serverType " + serverArg[0]); + +if (serverArg[0].startsWith("HORNETQ")) { + cf = new ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&reconnectAttempts=10&protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false"); +} else { + cf = new ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&reconnectAttempts=10&confirmationWindowSize=1048576&blockOnDurableSend=false"); +} + + +GroovyRun.assertTrue(!cf.getServerLocator().isBlockOnDurableSend()); +GroovyRun.assertEquals(1048576, cf.getServerLocator().getConfirmationWindowSize()); +GroovyRun.assertTrue(cf.getServerLocator().isHA()); +GroovyRun.assertEquals(10, cf.getServerLocator().getReconnectAttempts()); + diff --git a/tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy b/tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy new file mode 100644 index 0000000000..0dedb962fa --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy @@ -0,0 +1,85 @@ +package meshTest + +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +import javax.jms.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// starts an artemis server +String serverType = arg[0]; +String clientType = arg[1]; +String operation = arg[2]; + + +String queueName = "queue"; + + +String textBody = "a rapadura e doce mas nao e mole nao"; + +println("clientType " + clientType); +println("serverType " + serverType); + +if (clientType.startsWith("ARTEMIS")) { + // Can't depend directly on artemis, otherwise it wouldn't compile in hornetq + GroovyRun.evaluate("clients/artemisHQClientHA.groovy", "serverArg", serverType); +} else { + throw new RuntimeException("The test is not meant for client type: " + clientType); +} + + +Connection connection = cf.createConnection(); +Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); +Queue queue = session.createQueue(queueName); + +if (operation.equals("failoverTestSend")) { + + CountDownLatch latch = new CountDownLatch(10); + + CompletionListener completionListener = new CompletionListener() { + @Override + void onCompletion(Message message) { + latch.countDown(); + } + + @Override + void onException(Message message, Exception exception) { + + } + } + + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage(textBody + i), completionListener); + } + + GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS)); + + System.out.println("Message sent"); + + return connection; +} else { + throw new RuntimeException("Invalid operation " + operation); +} + + + + diff --git a/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy b/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy new file mode 100644 index 0000000000..6460850e30 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy @@ -0,0 +1,89 @@ +package servers + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// starts a clustered live hornetq server +import org.hornetq.api.core.TransportConfiguration +import org.hornetq.api.core.BroadcastGroupConfiguration +import org.hornetq.api.core.UDPBroadcastGroupConfiguration +import org.hornetq.api.core.DiscoveryGroupConfiguration +import org.hornetq.core.config.impl.ConfigurationImpl +import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory +import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory +import org.hornetq.core.remoting.impl.netty.TransportConstants +import org.hornetq.jms.server.config.impl.JMSConfigurationImpl +import org.hornetq.jms.server.config.impl.* +import org.hornetq.jms.server.embedded.EmbeddedJMS +import org.hornetq.core.config.ClusterConnectionConfiguration + +String folder = arg[0]; +String id = arg[1]; +String offset = arg[2]; + +configuration = new ConfigurationImpl(); +configuration.setSecurityEnabled(false); +configuration.setJournalDirectory(folder + "/" + id + "/journal"); +configuration.setBindingsDirectory(folder + "/" + id + "/binding"); +configuration.setPagingDirectory(folder + "/" + id + "/paging"); +configuration.setLargeMessagesDirectory(folder + "/" + id + "/largemessage"); +configuration.setJournalType(org.hornetq.core.server.JournalType.NIO); +configuration.setPersistenceEnabled(true); +configuration.setFailoverOnServerShutdown(true); + +HashMap map = new HashMap(); +map.put(TransportConstants.HOST_PROP_NAME, "localhost"); +map.put(TransportConstants.PORT_PROP_NAME, (61616 + Integer.parseInt(offset))); +TransportConfiguration tpc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), map); +configuration.getAcceptorConfigurations().add(tpc); + +TransportConfiguration connectorConfig = new TransportConfiguration(NettyConnectorFactory.class.getName(), map, "netty"); +configuration.getConnectorConfigurations().put("netty", connectorConfig); + +ClusterConnectionConfiguration cc = new ClusterConnectionConfiguration("test-cluster", "jms", "netty", 200, + true, + true, + 1, + 1024, + "dg"); +configuration.getClusterConfigurations().add(cc); + +UDPBroadcastGroupConfiguration endpoint = new UDPBroadcastGroupConfiguration("231.7.7.7", 9876, null, -1); +List connectors = new ArrayList<>(); +connectors.add("netty"); +BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg", 250, + connectors, + endpoint); + +configuration.getBroadcastGroupConfigurations().add(bcConfig); + +DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg", 5000, 5000, endpoint); +configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig); + +jmsConfiguration = new JMSConfigurationImpl(); + +JMSQueueConfigurationImpl queueConfiguration = new JMSQueueConfigurationImpl("queue", null, true); +TopicConfigurationImpl topicConfiguration = new TopicConfigurationImpl("topic"); + + +jmsConfiguration.getQueueConfigurations().add(queueConfiguration); +jmsConfiguration.getTopicConfigurations().add(topicConfiguration); +server = new EmbeddedJMS(); +server.setConfiguration(configuration); +server.setJmsConfiguration(jmsConfiguration); +server.start(); + diff --git a/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy b/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy new file mode 100644 index 0000000000..f75f185e17 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy @@ -0,0 +1,91 @@ +package servers + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// starts a clustered backup hornetq server +import org.hornetq.api.core.TransportConfiguration +import org.hornetq.api.core.BroadcastGroupConfiguration +import org.hornetq.api.core.UDPBroadcastGroupConfiguration +import org.hornetq.api.core.DiscoveryGroupConfiguration +import org.hornetq.core.config.impl.ConfigurationImpl +import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory +import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory +import org.hornetq.core.remoting.impl.netty.TransportConstants +import org.hornetq.jms.server.config.impl.JMSConfigurationImpl +import org.hornetq.jms.server.config.impl.* +import org.hornetq.jms.server.embedded.EmbeddedJMS +import org.hornetq.core.config.ClusterConnectionConfiguration + +String folder = arg[0]; +String id = arg[1]; +String offset = arg[2]; + +configuration = new ConfigurationImpl(); +configuration.setSecurityEnabled(false); +configuration.setJournalDirectory(folder + "/" + id + "/journal"); +configuration.setBindingsDirectory(folder + "/" + id + "/binding"); +configuration.setPagingDirectory(folder + "/" + id + "/paging"); +configuration.setLargeMessagesDirectory(folder + "/" + id + "/largemessage"); +configuration.setJournalType(org.hornetq.core.server.JournalType.NIO); +configuration.setPersistenceEnabled(true); + +configuration.setFailoverOnServerShutdown(true); +configuration.setBackup(true); +configuration.setSharedStore(true); + +HashMap map = new HashMap(); +map.put(TransportConstants.HOST_PROP_NAME, "localhost"); +map.put(TransportConstants.PORT_PROP_NAME, (61616 + Integer.parseInt(offset))); +TransportConfiguration tpc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), map); +configuration.getAcceptorConfigurations().add(tpc); + +TransportConfiguration connectorConfig = new TransportConfiguration(NettyConnectorFactory.class.getName(), map, "netty"); +configuration.getConnectorConfigurations().put("netty", connectorConfig); + +ClusterConnectionConfiguration cc = new ClusterConnectionConfiguration("test-cluster", "jms", "netty", 200, + true, + true, + 1, + 1024, + "dg"); +configuration.getClusterConfigurations().add(cc); + +UDPBroadcastGroupConfiguration endpoint = new UDPBroadcastGroupConfiguration("231.7.7.7", 9876, null, -1); +List connectors = new ArrayList<>(); +connectors.add("netty"); +BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg", 250, + connectors, + endpoint); + +configuration.getBroadcastGroupConfigurations().add(bcConfig); + +DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg", 5000, 5000, endpoint); +configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig); + +jmsConfiguration = new JMSConfigurationImpl(); + +JMSQueueConfigurationImpl queueConfiguration = new JMSQueueConfigurationImpl("queue", null, true); +TopicConfigurationImpl topicConfiguration = new TopicConfigurationImpl("topic"); + + +jmsConfiguration.getQueueConfigurations().add(queueConfiguration); +jmsConfiguration.getTopicConfigurations().add(topicConfiguration); +backupServer = new EmbeddedJMS(); +backupServer.setConfiguration(configuration); +backupServer.setJmsConfiguration(jmsConfiguration); +backupServer.start(); diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java new file mode 100644 index 0000000000..55e03315d9 --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.compatibility; + +import org.apache.activemq.artemis.utils.FileUtil; +import org.junit.After; +import org.junit.Before; + +public class FailoverServerBaseTest extends VersionedBaseTest { + + protected boolean stopServers = true; + + public FailoverServerBaseTest(String server, String sender, String receiver) throws Exception { + super(server, sender, receiver); + } + + @Before + public void setUp() throws Throwable { + FileUtil.deleteDirectory(serverFolder.getRoot()); + setVariable(serverClassloader, "persistent", Boolean.FALSE); + startServerWithBackup(serverFolder.getRoot(), serverClassloader, "live"); + } + + @After + public void tearDown() throws Throwable { + if (stopServers) { + stopServerWithBackup(serverClassloader); + } + } +} diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQClientProtocolTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQClientProtocolTest.java new file mode 100644 index 0000000000..1e7e8de324 --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQClientProtocolTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.compatibility; + +import org.apache.activemq.artemis.api.core.client.FailoverEventType; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.HORNETQ_235; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class HQClientProtocolTest extends FailoverServerBaseTest { + + @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}") + public static Collection getParameters() { + List combinations = new ArrayList<>(); + + combinations.add(new Object[]{HORNETQ_235, SNAPSHOT, SNAPSHOT}); + return combinations; + } + + public HQClientProtocolTest(String server, String sender, String receiver) throws Exception { + super(server, sender, receiver); + } + + @Test + public void FailoverTest() throws Throwable { + ActiveMQConnection conn = (ActiveMQConnection) evaluate(senderClassloader, "hqclientProtocolTest/failoverTest.groovy", server, sender, "failoverTestSend"); + CountDownLatch latch = new CountDownLatch(1); + conn.setFailoverListener(eventType -> { + if (eventType == FailoverEventType.FAILOVER_COMPLETED) { + latch.countDown(); + } + }); + + stopServer(serverClassloader); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + conn.start(); + Queue queue = session.createQueue("queue"); + MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0; i < 10; i++) { + Message msg = consumer.receive(5000); + assertNotNull(msg); + } + assertNull(consumer.receiveNoWait()); + + stopBackup(serverClassloader); + stopServers = false; + } +} diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java index 8dc3302e42..8f80591444 100644 --- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java @@ -203,7 +203,38 @@ public abstract class VersionedBaseTest { evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver); } + public void startServerWithBackup(File folder, ClassLoader loader, String serverName) throws Throwable { + folder.mkdirs(); + + System.out.println("Folder::" + folder); + + String liveScript; + String backupScript; + String topologyScript; + + if (server.startsWith("ARTEMIS")) { + liveScript = "servers/node/artemisServer.groovy"; + backupScript = "servers/node/artemisServer_backup.groovy"; + topologyScript = null; + } else { + liveScript = "servers/node/hornetqServer.groovy"; + backupScript = "servers/node/hornetqServer_backup.groovy"; + } + + evaluate(loader, liveScript, folder.getAbsolutePath(), serverName, "0"); + evaluate(loader, backupScript, folder.getAbsolutePath(), serverName, "1"); + } + + public void stopServerWithBackup(ClassLoader loader) throws Throwable { + execute(loader, "backupServer.stop()"); + execute(loader, "server.stop()"); + } + public void stopServer(ClassLoader loader) throws Throwable { execute(loader, "server.stop()"); } + + public void stopBackup(ClassLoader loader) throws Throwable { + execute(loader, "backupServer.stop()"); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/compat/HQClientProtocolManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/compat/HQClientProtocolManagerTest.java new file mode 100644 index 0000000000..7bb070aedb --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/compat/HQClientProtocolManagerTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.remoting.compat; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Test; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class HQClientProtocolManagerTest extends ActiveMQTestBase { + + @Test + public void testNoCheckFailoverMessage() throws Exception { + final int pingPeriod = 1000; + + ActiveMQServer server = createServer(false, true); + + server.start(); + + ClientSessionInternal session = null; + + try { + ServerLocator locator = createFactory(true).setClientFailureCheckPeriod(pingPeriod).setRetryInterval(500).setRetryIntervalMultiplier(1d).setReconnectAttempts(-1).setConfirmationWindowSize(1024 * 1024); + locator.setProtocolManagerFactory(new HornetQClientProtocolManagerFactory()); + ClientSessionFactory factory = createSessionFactory(locator); + + session = (ClientSessionInternal) factory.createSession(); + + server.stop(); + + Thread.sleep((pingPeriod * 2)); + + List incomings = server.getConfiguration().getIncomingInterceptorClassNames(); + incomings.add(UnsupportedPacketInterceptor.class.getName()); + + server.start(); + + //issue a query to make sure session is reconnected. + ClientSession.QueueQuery query = session.queueQuery(new SimpleString("anyvalue")); + assertFalse(query.isExists()); + + locator.close(); + + UnsupportedPacketInterceptor.checkReceivedTypes(); + } finally { + try { + session.close(); + } catch (Throwable e) { + } + + server.stop(); + } + } + + + public static class UnsupportedPacketInterceptor implements Interceptor { + + private static Set receivedTypes = new HashSet<>(); + private static Set unsupportedTypes = new HashSet<>(); + static { + unsupportedTypes.add(PacketImpl.CHECK_FOR_FAILOVER); + } + + @Override + public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { + receivedTypes.add(packet.getType()); + return true; + } + + public static void checkReceivedTypes() throws Exception { + for (Byte type : receivedTypes) { + assertFalse("Received unsupported type: " + type, unsupportedTypes.contains(type)); + } + } + } +}