From 1693db0177c80a87444438839396e15678e719af Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Wed, 31 Jan 2018 09:05:47 +0800 Subject: [PATCH 1/2] ARTEMIS-1639 HornetQClientProtocolManager sending unsupported packet HornetQClientProtocolManager is used to connect HornteQ servers. During reconnect, it sends a CheckFailoverMessage packet to the server as part of reconnection. This packet is not supported by HornetQ server (existing release), so it will break the backward compatibility. Also fixed a failover issue where a hornetq NettyConnector's ConnectorFactory is serialized to the clients who cannot instantiate it because class not found exception. --- .../api/core/TransportConfiguration.java | 4 + .../client/impl/ClientSessionFactoryImpl.java | 4 + .../core/remoting/ClientProtocolManager.java | 3 + .../client/HornetQClientProtocolManager.java | 17 +++ .../clients/artemisHQClientHA.groovy | 37 +++++++ .../hqclientProtocolTest/failoverTest.groovy | 85 ++++++++++++++ .../servers/node/hornetqServer.groovy | 89 +++++++++++++++ .../servers/node/hornetqServer_backup.groovy | 91 +++++++++++++++ .../compatibility/FailoverServerBaseTest.java | 45 ++++++++ .../compatibility/HQClientProtocolTest.java | 84 ++++++++++++++ .../compatibility/VersionedBaseTest.java | 31 ++++++ .../compat/HQClientProtocolManagerTest.java | 104 ++++++++++++++++++ 12 files changed, 594 insertions(+) create mode 100644 tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy create mode 100644 tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy create mode 100644 tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy create mode 100644 tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy create mode 100644 tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java create mode 100644 tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQClientProtocolTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/compat/HQClientProtocolManagerTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java index 71679b5337..7fcfcd5e1b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java @@ -401,4 +401,8 @@ public class TransportConfiguration implements Serializable { private static String replaceWildcardChars(final String str) { return str.replace('.', '-'); } + + public void setFactoryClassName(String factoryClassName) { + this.factoryClassName = factoryClassName; + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index e8ac8f8d46..e2bfd0fdc5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -256,6 +256,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C public void setBackupConnector(final TransportConfiguration live, final TransportConfiguration backUp) { Connector localConnector = connector; + if (backUp != null) { + this.clientProtocolManager.updateTransportConfiguration(backUp); + } + // if the connector has never been used (i.e. the getConnection hasn't been called yet), we will need // to create a connector just to validate if the parameters are ok. // so this will create the instance to be used on the isEquivalent check diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java index e2c9fc1ee2..9788e1e4cb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java @@ -22,6 +22,7 @@ import java.util.concurrent.locks.Lock; import io.netty.channel.ChannelPipeline; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -78,4 +79,6 @@ public interface ClientProtocolManager { String getName(); + default void updateTransportConfiguration(TransportConfiguration backUp) { + } } diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java index 6a998ef0f0..f0010eab40 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java @@ -17,12 +17,15 @@ package org.apache.activemq.artemis.core.protocol.hornetq.client; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.version.Version; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; @@ -64,4 +67,18 @@ public class HornetQClientProtocolManager extends ActiveMQClientProtocolManager getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer, VERSION_PLAYED)); } + @Override + public boolean checkForFailover(String liveNodeID) throws ActiveMQException { + //HornetQ doesn't support CheckFailoverMessage packet + return true; + } + + @Override + public void updateTransportConfiguration(TransportConfiguration connector) { + String factoryClassName = connector.getFactoryClassName(); + if ("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory".equals(factoryClassName)) { + connector.setFactoryClassName(NettyConnectorFactory.class.getName()); + } + } + } diff --git a/tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy b/tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy new file mode 100644 index 0000000000..c179a1b7ec --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy @@ -0,0 +1,37 @@ +package clients +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Create a client connection factory + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory +import org.apache.activemq.artemis.tests.compatibility.GroovyRun; + +println("serverType " + serverArg[0]); + +if (serverArg[0].startsWith("HORNETQ")) { + cf = new ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&reconnectAttempts=10&protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false"); +} else { + cf = new ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&reconnectAttempts=10&confirmationWindowSize=1048576&blockOnDurableSend=false"); +} + + +GroovyRun.assertTrue(!cf.getServerLocator().isBlockOnDurableSend()); +GroovyRun.assertEquals(1048576, cf.getServerLocator().getConfirmationWindowSize()); +GroovyRun.assertTrue(cf.getServerLocator().isHA()); +GroovyRun.assertEquals(10, cf.getServerLocator().getReconnectAttempts()); + diff --git a/tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy b/tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy new file mode 100644 index 0000000000..0dedb962fa --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy @@ -0,0 +1,85 @@ +package meshTest + +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +import javax.jms.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// starts an artemis server +String serverType = arg[0]; +String clientType = arg[1]; +String operation = arg[2]; + + +String queueName = "queue"; + + +String textBody = "a rapadura e doce mas nao e mole nao"; + +println("clientType " + clientType); +println("serverType " + serverType); + +if (clientType.startsWith("ARTEMIS")) { + // Can't depend directly on artemis, otherwise it wouldn't compile in hornetq + GroovyRun.evaluate("clients/artemisHQClientHA.groovy", "serverArg", serverType); +} else { + throw new RuntimeException("The test is not meant for client type: " + clientType); +} + + +Connection connection = cf.createConnection(); +Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); +Queue queue = session.createQueue(queueName); + +if (operation.equals("failoverTestSend")) { + + CountDownLatch latch = new CountDownLatch(10); + + CompletionListener completionListener = new CompletionListener() { + @Override + void onCompletion(Message message) { + latch.countDown(); + } + + @Override + void onException(Message message, Exception exception) { + + } + } + + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage(textBody + i), completionListener); + } + + GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS)); + + System.out.println("Message sent"); + + return connection; +} else { + throw new RuntimeException("Invalid operation " + operation); +} + + + + diff --git a/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy b/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy new file mode 100644 index 0000000000..6460850e30 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy @@ -0,0 +1,89 @@ +package servers + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// starts a clustered live hornetq server +import org.hornetq.api.core.TransportConfiguration +import org.hornetq.api.core.BroadcastGroupConfiguration +import org.hornetq.api.core.UDPBroadcastGroupConfiguration +import org.hornetq.api.core.DiscoveryGroupConfiguration +import org.hornetq.core.config.impl.ConfigurationImpl +import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory +import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory +import org.hornetq.core.remoting.impl.netty.TransportConstants +import org.hornetq.jms.server.config.impl.JMSConfigurationImpl +import org.hornetq.jms.server.config.impl.* +import org.hornetq.jms.server.embedded.EmbeddedJMS +import org.hornetq.core.config.ClusterConnectionConfiguration + +String folder = arg[0]; +String id = arg[1]; +String offset = arg[2]; + +configuration = new ConfigurationImpl(); +configuration.setSecurityEnabled(false); +configuration.setJournalDirectory(folder + "/" + id + "/journal"); +configuration.setBindingsDirectory(folder + "/" + id + "/binding"); +configuration.setPagingDirectory(folder + "/" + id + "/paging"); +configuration.setLargeMessagesDirectory(folder + "/" + id + "/largemessage"); +configuration.setJournalType(org.hornetq.core.server.JournalType.NIO); +configuration.setPersistenceEnabled(true); +configuration.setFailoverOnServerShutdown(true); + +HashMap map = new HashMap(); +map.put(TransportConstants.HOST_PROP_NAME, "localhost"); +map.put(TransportConstants.PORT_PROP_NAME, (61616 + Integer.parseInt(offset))); +TransportConfiguration tpc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), map); +configuration.getAcceptorConfigurations().add(tpc); + +TransportConfiguration connectorConfig = new TransportConfiguration(NettyConnectorFactory.class.getName(), map, "netty"); +configuration.getConnectorConfigurations().put("netty", connectorConfig); + +ClusterConnectionConfiguration cc = new ClusterConnectionConfiguration("test-cluster", "jms", "netty", 200, + true, + true, + 1, + 1024, + "dg"); +configuration.getClusterConfigurations().add(cc); + +UDPBroadcastGroupConfiguration endpoint = new UDPBroadcastGroupConfiguration("231.7.7.7", 9876, null, -1); +List connectors = new ArrayList<>(); +connectors.add("netty"); +BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg", 250, + connectors, + endpoint); + +configuration.getBroadcastGroupConfigurations().add(bcConfig); + +DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg", 5000, 5000, endpoint); +configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig); + +jmsConfiguration = new JMSConfigurationImpl(); + +JMSQueueConfigurationImpl queueConfiguration = new JMSQueueConfigurationImpl("queue", null, true); +TopicConfigurationImpl topicConfiguration = new TopicConfigurationImpl("topic"); + + +jmsConfiguration.getQueueConfigurations().add(queueConfiguration); +jmsConfiguration.getTopicConfigurations().add(topicConfiguration); +server = new EmbeddedJMS(); +server.setConfiguration(configuration); +server.setJmsConfiguration(jmsConfiguration); +server.start(); + diff --git a/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy b/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy new file mode 100644 index 0000000000..f75f185e17 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy @@ -0,0 +1,91 @@ +package servers + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// starts a clustered backup hornetq server +import org.hornetq.api.core.TransportConfiguration +import org.hornetq.api.core.BroadcastGroupConfiguration +import org.hornetq.api.core.UDPBroadcastGroupConfiguration +import org.hornetq.api.core.DiscoveryGroupConfiguration +import org.hornetq.core.config.impl.ConfigurationImpl +import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory +import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory +import org.hornetq.core.remoting.impl.netty.TransportConstants +import org.hornetq.jms.server.config.impl.JMSConfigurationImpl +import org.hornetq.jms.server.config.impl.* +import org.hornetq.jms.server.embedded.EmbeddedJMS +import org.hornetq.core.config.ClusterConnectionConfiguration + +String folder = arg[0]; +String id = arg[1]; +String offset = arg[2]; + +configuration = new ConfigurationImpl(); +configuration.setSecurityEnabled(false); +configuration.setJournalDirectory(folder + "/" + id + "/journal"); +configuration.setBindingsDirectory(folder + "/" + id + "/binding"); +configuration.setPagingDirectory(folder + "/" + id + "/paging"); +configuration.setLargeMessagesDirectory(folder + "/" + id + "/largemessage"); +configuration.setJournalType(org.hornetq.core.server.JournalType.NIO); +configuration.setPersistenceEnabled(true); + +configuration.setFailoverOnServerShutdown(true); +configuration.setBackup(true); +configuration.setSharedStore(true); + +HashMap map = new HashMap(); +map.put(TransportConstants.HOST_PROP_NAME, "localhost"); +map.put(TransportConstants.PORT_PROP_NAME, (61616 + Integer.parseInt(offset))); +TransportConfiguration tpc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), map); +configuration.getAcceptorConfigurations().add(tpc); + +TransportConfiguration connectorConfig = new TransportConfiguration(NettyConnectorFactory.class.getName(), map, "netty"); +configuration.getConnectorConfigurations().put("netty", connectorConfig); + +ClusterConnectionConfiguration cc = new ClusterConnectionConfiguration("test-cluster", "jms", "netty", 200, + true, + true, + 1, + 1024, + "dg"); +configuration.getClusterConfigurations().add(cc); + +UDPBroadcastGroupConfiguration endpoint = new UDPBroadcastGroupConfiguration("231.7.7.7", 9876, null, -1); +List connectors = new ArrayList<>(); +connectors.add("netty"); +BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg", 250, + connectors, + endpoint); + +configuration.getBroadcastGroupConfigurations().add(bcConfig); + +DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg", 5000, 5000, endpoint); +configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig); + +jmsConfiguration = new JMSConfigurationImpl(); + +JMSQueueConfigurationImpl queueConfiguration = new JMSQueueConfigurationImpl("queue", null, true); +TopicConfigurationImpl topicConfiguration = new TopicConfigurationImpl("topic"); + + +jmsConfiguration.getQueueConfigurations().add(queueConfiguration); +jmsConfiguration.getTopicConfigurations().add(topicConfiguration); +backupServer = new EmbeddedJMS(); +backupServer.setConfiguration(configuration); +backupServer.setJmsConfiguration(jmsConfiguration); +backupServer.start(); diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java new file mode 100644 index 0000000000..55e03315d9 --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.compatibility; + +import org.apache.activemq.artemis.utils.FileUtil; +import org.junit.After; +import org.junit.Before; + +public class FailoverServerBaseTest extends VersionedBaseTest { + + protected boolean stopServers = true; + + public FailoverServerBaseTest(String server, String sender, String receiver) throws Exception { + super(server, sender, receiver); + } + + @Before + public void setUp() throws Throwable { + FileUtil.deleteDirectory(serverFolder.getRoot()); + setVariable(serverClassloader, "persistent", Boolean.FALSE); + startServerWithBackup(serverFolder.getRoot(), serverClassloader, "live"); + } + + @After + public void tearDown() throws Throwable { + if (stopServers) { + stopServerWithBackup(serverClassloader); + } + } +} diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQClientProtocolTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQClientProtocolTest.java new file mode 100644 index 0000000000..1e7e8de324 --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQClientProtocolTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.compatibility; + +import org.apache.activemq.artemis.api.core.client.FailoverEventType; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.HORNETQ_235; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class HQClientProtocolTest extends FailoverServerBaseTest { + + @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}") + public static Collection getParameters() { + List combinations = new ArrayList<>(); + + combinations.add(new Object[]{HORNETQ_235, SNAPSHOT, SNAPSHOT}); + return combinations; + } + + public HQClientProtocolTest(String server, String sender, String receiver) throws Exception { + super(server, sender, receiver); + } + + @Test + public void FailoverTest() throws Throwable { + ActiveMQConnection conn = (ActiveMQConnection) evaluate(senderClassloader, "hqclientProtocolTest/failoverTest.groovy", server, sender, "failoverTestSend"); + CountDownLatch latch = new CountDownLatch(1); + conn.setFailoverListener(eventType -> { + if (eventType == FailoverEventType.FAILOVER_COMPLETED) { + latch.countDown(); + } + }); + + stopServer(serverClassloader); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + conn.start(); + Queue queue = session.createQueue("queue"); + MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0; i < 10; i++) { + Message msg = consumer.receive(5000); + assertNotNull(msg); + } + assertNull(consumer.receiveNoWait()); + + stopBackup(serverClassloader); + stopServers = false; + } +} diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java index 8dc3302e42..8f80591444 100644 --- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java @@ -203,7 +203,38 @@ public abstract class VersionedBaseTest { evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver); } + public void startServerWithBackup(File folder, ClassLoader loader, String serverName) throws Throwable { + folder.mkdirs(); + + System.out.println("Folder::" + folder); + + String liveScript; + String backupScript; + String topologyScript; + + if (server.startsWith("ARTEMIS")) { + liveScript = "servers/node/artemisServer.groovy"; + backupScript = "servers/node/artemisServer_backup.groovy"; + topologyScript = null; + } else { + liveScript = "servers/node/hornetqServer.groovy"; + backupScript = "servers/node/hornetqServer_backup.groovy"; + } + + evaluate(loader, liveScript, folder.getAbsolutePath(), serverName, "0"); + evaluate(loader, backupScript, folder.getAbsolutePath(), serverName, "1"); + } + + public void stopServerWithBackup(ClassLoader loader) throws Throwable { + execute(loader, "backupServer.stop()"); + execute(loader, "server.stop()"); + } + public void stopServer(ClassLoader loader) throws Throwable { execute(loader, "server.stop()"); } + + public void stopBackup(ClassLoader loader) throws Throwable { + execute(loader, "backupServer.stop()"); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/compat/HQClientProtocolManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/compat/HQClientProtocolManagerTest.java new file mode 100644 index 0000000000..7bb070aedb --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/compat/HQClientProtocolManagerTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.remoting.compat; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Test; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class HQClientProtocolManagerTest extends ActiveMQTestBase { + + @Test + public void testNoCheckFailoverMessage() throws Exception { + final int pingPeriod = 1000; + + ActiveMQServer server = createServer(false, true); + + server.start(); + + ClientSessionInternal session = null; + + try { + ServerLocator locator = createFactory(true).setClientFailureCheckPeriod(pingPeriod).setRetryInterval(500).setRetryIntervalMultiplier(1d).setReconnectAttempts(-1).setConfirmationWindowSize(1024 * 1024); + locator.setProtocolManagerFactory(new HornetQClientProtocolManagerFactory()); + ClientSessionFactory factory = createSessionFactory(locator); + + session = (ClientSessionInternal) factory.createSession(); + + server.stop(); + + Thread.sleep((pingPeriod * 2)); + + List incomings = server.getConfiguration().getIncomingInterceptorClassNames(); + incomings.add(UnsupportedPacketInterceptor.class.getName()); + + server.start(); + + //issue a query to make sure session is reconnected. + ClientSession.QueueQuery query = session.queueQuery(new SimpleString("anyvalue")); + assertFalse(query.isExists()); + + locator.close(); + + UnsupportedPacketInterceptor.checkReceivedTypes(); + } finally { + try { + session.close(); + } catch (Throwable e) { + } + + server.stop(); + } + } + + + public static class UnsupportedPacketInterceptor implements Interceptor { + + private static Set receivedTypes = new HashSet<>(); + private static Set unsupportedTypes = new HashSet<>(); + static { + unsupportedTypes.add(PacketImpl.CHECK_FOR_FAILOVER); + } + + @Override + public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { + receivedTypes.add(packet.getType()); + return true; + } + + public static void checkReceivedTypes() throws Exception { + for (Byte type : receivedTypes) { + assertFalse("Received unsupported type: " + type, unsupportedTypes.contains(type)); + } + } + } +} From 5653ec9980a8ba3c1f7f3fc5876bfe8adfa00aba Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 31 Jan 2018 17:22:06 -0500 Subject: [PATCH 2/2] ARTEMIS-1639 Simplifying/fixing test and improving update topology --- .../artemis/cli/commands/tools/DBOption.java | 2 + .../client/impl/ClientSessionFactoryImpl.java | 4 - .../impl/ActiveMQClientProtocolManager.java | 14 +- .../core/remoting/ClientProtocolManager.java | 3 - .../client/HornetQClientProtocolManager.java | 18 +- .../clients/artemisHQClientHA.groovy | 37 -- .../hqclientProtocolTest/failoverTest.groovy | 85 ---- .../hqfailovertest/hornetqServer.groovy | 369 ++++++++++++++++++ .../servers/node/hornetqServer.groovy | 89 ----- .../servers/node/hornetqServer_backup.groovy | 91 ----- .../compatibility/FailoverServerBaseTest.java | 45 --- ...tProtocolTest.java => HQFailoverTest.java} | 61 ++- .../compatibility/VersionedBaseTest.java | 31 -- 13 files changed, 441 insertions(+), 408 deletions(-) delete mode 100644 tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy delete mode 100644 tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy create mode 100644 tests/compatibility-tests/src/main/resources/hqfailovertest/hornetqServer.groovy delete mode 100644 tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy delete mode 100644 tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy delete mode 100644 tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java rename tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/{HQClientProtocolTest.java => HQFailoverTest.java} (58%) diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java index 044d179fdf..76b993c69a 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryDatabase; import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO; import org.apache.activemq.artemis.core.persistence.impl.journal.JDBCJournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; +import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; @@ -195,6 +196,7 @@ public class DBOption extends OptionalLocking { configuration.setJournalDirectory(getJournal()); configuration.setPagingDirectory(getPaging()); configuration.setLargeMessagesDirectory(getLargeMessages()); + configuration.setJournalType(JournalType.NIO); } return configuration; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index e2bfd0fdc5..e8ac8f8d46 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -256,10 +256,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C public void setBackupConnector(final TransportConfiguration live, final TransportConfiguration backUp) { Connector localConnector = connector; - if (backUp != null) { - this.clientProtocolManager.updateTransportConfiguration(backUp); - } - // if the connector has never been used (i.e. the getConnection hasn't been called yet), we will need // to create a connector just to validate if the parameters are ok. // so this will create the instance to be used on the isEquivalent check diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java index c58a0bde6b..47401c5043 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java @@ -429,6 +429,12 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { } } + + protected ClusterTopologyChangeMessage updateTransportConfiguration(final ClusterTopologyChangeMessage topMessage) { + return topMessage; + } + + private class Channel0Handler implements ChannelHandler { private final CoreRemotingConnection conn; @@ -456,13 +462,13 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { topologyResponseHandler.nodeDisconnected(conn, nodeID == null ? null : nodeID.toString(), scaleDownTargetNodeID); } else if (type == PacketImpl.CLUSTER_TOPOLOGY) { ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet; - notifyTopologyChange(topMessage); + notifyTopologyChange(updateTransportConfiguration(topMessage)); } else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2) { ClusterTopologyChangeMessage_V2 topMessage = (ClusterTopologyChangeMessage_V2) packet; - notifyTopologyChange(topMessage); + notifyTopologyChange(updateTransportConfiguration(topMessage)); } else if (type == PacketImpl.CLUSTER_TOPOLOGY || type == PacketImpl.CLUSTER_TOPOLOGY_V2 || type == PacketImpl.CLUSTER_TOPOLOGY_V3) { ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet; - notifyTopologyChange(topMessage); + notifyTopologyChange(updateTransportConfiguration(topMessage)); } else if (type == PacketImpl.CHECK_FOR_FAILOVER_REPLY) { System.out.println("Channel0Handler.handlePacket"); } @@ -471,7 +477,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { /** * @param topMessage */ - private void notifyTopologyChange(final ClusterTopologyChangeMessage topMessage) { + protected void notifyTopologyChange(final ClusterTopologyChangeMessage topMessage) { final long eventUID; final String backupGroupName; final String scaleDownGroupName; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java index 9788e1e4cb..e2c9fc1ee2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java @@ -22,7 +22,6 @@ import java.util.concurrent.locks.Lock; import io.netty.channel.ChannelPipeline; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; -import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -79,6 +78,4 @@ public interface ClientProtocolManager { String getName(); - default void updateTransportConfiguration(TransportConfiguration backUp) { - } } diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java index f0010eab40..273038362c 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; @@ -73,11 +74,20 @@ public class HornetQClientProtocolManager extends ActiveMQClientProtocolManager return true; } + @Override - public void updateTransportConfiguration(TransportConfiguration connector) { - String factoryClassName = connector.getFactoryClassName(); - if ("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory".equals(factoryClassName)) { - connector.setFactoryClassName(NettyConnectorFactory.class.getName()); + protected ClusterTopologyChangeMessage updateTransportConfiguration(final ClusterTopologyChangeMessage topMessage) { + updateTransportConfiguration(topMessage.getPair().getA()); + updateTransportConfiguration(topMessage.getPair().getB()); + return super.updateTransportConfiguration(topMessage); + } + + private void updateTransportConfiguration(TransportConfiguration connector) { + if (connector != null) { + String factoryClassName = connector.getFactoryClassName(); + if ("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory".equals(factoryClassName)) { + connector.setFactoryClassName(NettyConnectorFactory.class.getName()); + } } } diff --git a/tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy b/tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy deleted file mode 100644 index c179a1b7ec..0000000000 --- a/tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy +++ /dev/null @@ -1,37 +0,0 @@ -package clients -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Create a client connection factory - -import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory -import org.apache.activemq.artemis.tests.compatibility.GroovyRun; - -println("serverType " + serverArg[0]); - -if (serverArg[0].startsWith("HORNETQ")) { - cf = new ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&reconnectAttempts=10&protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false"); -} else { - cf = new ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&reconnectAttempts=10&confirmationWindowSize=1048576&blockOnDurableSend=false"); -} - - -GroovyRun.assertTrue(!cf.getServerLocator().isBlockOnDurableSend()); -GroovyRun.assertEquals(1048576, cf.getServerLocator().getConfirmationWindowSize()); -GroovyRun.assertTrue(cf.getServerLocator().isHA()); -GroovyRun.assertEquals(10, cf.getServerLocator().getReconnectAttempts()); - diff --git a/tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy b/tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy deleted file mode 100644 index 0dedb962fa..0000000000 --- a/tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy +++ /dev/null @@ -1,85 +0,0 @@ -package meshTest - -import org.apache.activemq.artemis.tests.compatibility.GroovyRun - -import javax.jms.* -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// starts an artemis server -String serverType = arg[0]; -String clientType = arg[1]; -String operation = arg[2]; - - -String queueName = "queue"; - - -String textBody = "a rapadura e doce mas nao e mole nao"; - -println("clientType " + clientType); -println("serverType " + serverType); - -if (clientType.startsWith("ARTEMIS")) { - // Can't depend directly on artemis, otherwise it wouldn't compile in hornetq - GroovyRun.evaluate("clients/artemisHQClientHA.groovy", "serverArg", serverType); -} else { - throw new RuntimeException("The test is not meant for client type: " + clientType); -} - - -Connection connection = cf.createConnection(); -Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); -Queue queue = session.createQueue(queueName); - -if (operation.equals("failoverTestSend")) { - - CountDownLatch latch = new CountDownLatch(10); - - CompletionListener completionListener = new CompletionListener() { - @Override - void onCompletion(Message message) { - latch.countDown(); - } - - @Override - void onException(Message message, Exception exception) { - - } - } - - MessageProducer producer = session.createProducer(queue); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - for (int i = 0; i < 10; i++) { - producer.send(session.createTextMessage(textBody + i), completionListener); - } - - GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS)); - - System.out.println("Message sent"); - - return connection; -} else { - throw new RuntimeException("Invalid operation " + operation); -} - - - - diff --git a/tests/compatibility-tests/src/main/resources/hqfailovertest/hornetqServer.groovy b/tests/compatibility-tests/src/main/resources/hqfailovertest/hornetqServer.groovy new file mode 100644 index 0000000000..83cbe229ba --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/hqfailovertest/hornetqServer.groovy @@ -0,0 +1,369 @@ +package hqfailovertest + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +// This is a failover test with a hornetq server. +// these configurations were taken out of hornetq's testsuite.. more specifically FailoverTestBase +// sorry for the mess of copy & paste here but i couldn't depend directly into hornetq's test jar +// and I coudn't either spawn a new VM here. +// it's not possible to do FileLock without spawning a new VM.. we had InVMNodeManagers that were part of the testsuite +// + + + +import org.hornetq.api.core.* +import org.hornetq.api.core.TransportConfiguration +import org.hornetq.core.config.* +import org.hornetq.core.config.impl.* + +import org.hornetq.core.config.impl.ConfigurationImpl +import org.hornetq.core.server.* +import org.hornetq.core.server.impl.* +import org.hornetq.core.settings.impl.* +import org.hornetq.jms.server.config.impl.* +import org.hornetq.spi.core.security.* +import org.hornetq.core.remoting.impl.invm.* +import org.hornetq.core.remoting.impl.netty.* +import org.hornetq.utils.* + +import javax.management.MBeanServer +import java.lang.management.ManagementFactory +import java.util.concurrent.Semaphore + + + +folder = arg[0]; +nodeManager = new InVMNodeManager(false, folder); + +backupConfig = createDefaultConfig(); +backupConfig.getAcceptorConfigurations().clear(); +backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false)); +backupConfig.setSharedStore(true); +backupConfig.setBackup(true); +backupConfig.setFailbackDelay(1000); + +TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); +TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); +backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); +backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector); +basicClusterConnectionConfig(backupConfig, backupConnector.getName(), liveConnector.getName()); +backupServer = createTestableServer(backupConfig, nodeManager); + +liveConfig = createDefaultConfig(); +liveConfig.getAcceptorConfigurations().clear(); +liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true)); +liveConfig.setSharedStore(true); +liveConfig.setFailbackDelay(1000); + +basicClusterConnectionConfig(liveConfig, liveConnector.getName()); +liveConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); +liveServer = createTestableServer(liveConfig, nodeManager); + +liveServer.start(); +backupServer.start(); + +liveServer.createQueue(SimpleString.toSimpleString("jms.queue.queue"), SimpleString.toSimpleString("jms.queue.queue"), null, true, false); + + + + +protected Configuration createDefaultConfig() throws Exception { + return createDefaultConfig(new HashMap(), StaticProperties.INVM_ACCEPTOR_FACTORY, StaticProperties.NETTY_ACCEPTOR_FACTORY); +} + + +HornetQServer createInVMFailoverServer(final boolean realFiles, + final Configuration configuration, + NodeManager nodeManager, + final int id) { + HornetQServer server; + HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl(); + configuration.setPersistenceEnabled(realFiles); + server = new InVMNodeManagerServer(configuration, + ManagementFactory.getPlatformMBeanServer(), + securityManager, + nodeManager); + + server.setIdentity("Server " + id); + + AddressSettings defaultSetting = new AddressSettings(); + defaultSetting.setPageSizeBytes(10 * 1024 * 1024); + defaultSetting.setMaxSizeBytes(1024 * 1024 * 1024); + + server.getAddressSettingsRepository().addMatch("#", defaultSetting); + + return server; +} + +Configuration createDefaultConfig(final Map params, final String... acceptors) throws Exception { + ConfigurationImpl configuration = createBasicConfig(-1); + + configuration.setFileDeploymentEnabled(false); + configuration.setJMXManagementEnabled(false); + + configuration.getAcceptorConfigurations().clear(); + + for (String acceptor : acceptors) { + TransportConfiguration transportConfig = new TransportConfiguration(acceptor, params); + configuration.getAcceptorConfigurations().add(transportConfig); + } + return configuration; +} + +protected ConfigurationImpl createBasicConfig(final int serverID) +{ + ConfigurationImpl configuration = new ConfigurationImpl(); + configuration.setSecurityEnabled(false); + configuration.setJournalMinFiles(2); + configuration.setJournalFileSize(100 * 1024); + + configuration.setJournalType(JournalType.NIO); + + configuration.setJournalDirectory(folder) + configuration.setBindingsDirectory(folder + "/bindings") + configuration.setPagingDirectory(folder + "/paging") + configuration.setLargeMessagesDirectory(folder + "/largemessages") + + configuration.setJournalCompactMinFiles(0); + configuration.setJournalCompactPercentage(0); + configuration.setClusterPassword("changeme"); + return configuration; +} + + + +protected static final void basicClusterConnectionConfig(Configuration mainConfig, String connectorName, + String... connectors) { + ArrayList connectors0 = new ArrayList(); + for (String c : connectors) { + connectors0.add(c); + } + basicClusterConnectionConfig(mainConfig, connectorName, connectors0); +} + +protected static final void basicClusterConnectionConfig(Configuration mainConfig, + String connectorName, + List connectors) { + ClusterConnectionConfiguration ccc = null; + + ccc = new ClusterConnectionConfiguration("cluster1", "jms", connectorName, 10, false, false, 1, 1, connectors, + false); + + mainConfig.getClusterConfigurations().add(ccc); +} + +public final class InVMNodeManager extends NodeManager +{ + + private final Semaphore liveLock; + + private final Semaphore backupLock; + + public enum State {LIVE, PAUSED, FAILING_BACK, NOT_STARTED} + + public State state = State.NOT_STARTED; + + public long failoverPause = 0l; + + public InVMNodeManager(boolean replicatedBackup) + { + this(replicatedBackup, null); + if (replicatedBackup) + throw new RuntimeException("if replicated-backup, we need its journal directory"); + } + + public InVMNodeManager(boolean replicatedBackup, String directory) + { + super(replicatedBackup, directory); + liveLock = new Semaphore(1); + backupLock = new Semaphore(1); + setUUID(UUIDGenerator.getInstance().generateUUID()); + } + + public void awaitLiveNode() throws Exception + { + while (true) + { + while (state == State.NOT_STARTED) + { + Thread.sleep(2000); + } + + liveLock.acquire(); + + if (state == State.PAUSED) + { + liveLock.release(); + Thread.sleep(2000); + } + else if (state == State.FAILING_BACK) + { + liveLock.release(); + Thread.sleep(2000); + } + else if (state == State.LIVE) + { + break; + } + } + if(failoverPause > 0l) + { + Thread.sleep(failoverPause); + } + } + + public void startBackup() throws Exception + { + backupLock.acquire(); + } + + public void startLiveNode() throws Exception + { + state = State.FAILING_BACK; + liveLock.acquire(); + state = State.LIVE; + } + + public void pauseLiveServer() throws Exception + { + state = State.PAUSED; + liveLock.release(); + } + + public void crashLiveServer() throws Exception + { + //overkill as already set to live + state = State.LIVE; + liveLock.release(); + } + + public boolean isAwaitingFailback() throws Exception + { + return state == State.FAILING_BACK; + } + + public boolean isBackupLive() throws Exception + { + return liveLock.availablePermits() == 0; + } + + public void interrupt() + { + // + } + + public void releaseBackup() + { + if (backupLock != null) + { + backupLock.release(); + } + } + + public SimpleString readNodeId() throws HornetQIllegalStateException, IOException + { + return getNodeId(); + } +} + +final class InVMNodeManagerServer extends HornetQServerImpl { + final NodeManager nodeManager; + + public InVMNodeManagerServer(final NodeManager nodeManager) { + super(); + this.nodeManager = nodeManager; + } + + public InVMNodeManagerServer(final Configuration configuration, final NodeManager nodeManager) { + super(configuration); + this.nodeManager = nodeManager; + } + + public InVMNodeManagerServer(final Configuration configuration, + final MBeanServer mbeanServer, + final NodeManager nodeManager) { + super(configuration, mbeanServer); + this.nodeManager = nodeManager; + } + + public InVMNodeManagerServer(final Configuration configuration, + final HornetQSecurityManager securityManager, + final NodeManager nodeManager) { + super(configuration, securityManager); + this.nodeManager = nodeManager; + } + + public InVMNodeManagerServer(final Configuration configuration, + final MBeanServer mbeanServer, + final HornetQSecurityManager securityManager, + final NodeManager nodeManager) { + super(configuration, mbeanServer, securityManager); + this.nodeManager = nodeManager; + } + + @Override + protected NodeManager createNodeManager( + final String directory, final String nodeGroupName, boolean replicatingBackup) { + nodeManager.setNodeGroupName(nodeGroupName); + return nodeManager; + } + +} + + +private static TransportConfiguration transportConfiguration(String classname, boolean live, int server) +{ + if (classname.contains("netty")) + { + Map serverParams = new HashMap(); + Integer port = live ? 5445 : 5545; + serverParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port); + return new TransportConfiguration(classname, serverParams); + } + + Map serverParams = new HashMap(); + serverParams.put(TransportConstants.SERVER_ID_PROP_NAME, live ? server : server + 100); + return new TransportConfiguration(classname, serverParams); +} + + + +protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) +{ + return transportConfiguration(StaticProperties.NETTY_ACCEPTOR_FACTORY, live, 1); +} + +protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) +{ + return transportConfiguration(StaticProperties.NETTY_CONNECTOR_FACTORY, live, 1); +} + +protected HornetQServer createTestableServer(Configuration config, NodeManager nodeManager) +{ + return createInVMFailoverServer(true, config, nodeManager, 1) +} + + +class StaticProperties { + static String INVM_ACCEPTOR_FACTORY = InVMAcceptorFactory.class.getCanonicalName(); + static String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName(); + static String NETTY_ACCEPTOR_FACTORY = NettyAcceptorFactory.class.getCanonicalName(); + static String NETTY_CONNECTOR_FACTORY = NettyConnectorFactory.class.getCanonicalName(); +} diff --git a/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy b/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy deleted file mode 100644 index 6460850e30..0000000000 --- a/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy +++ /dev/null @@ -1,89 +0,0 @@ -package servers - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// starts a clustered live hornetq server -import org.hornetq.api.core.TransportConfiguration -import org.hornetq.api.core.BroadcastGroupConfiguration -import org.hornetq.api.core.UDPBroadcastGroupConfiguration -import org.hornetq.api.core.DiscoveryGroupConfiguration -import org.hornetq.core.config.impl.ConfigurationImpl -import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory -import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory -import org.hornetq.core.remoting.impl.netty.TransportConstants -import org.hornetq.jms.server.config.impl.JMSConfigurationImpl -import org.hornetq.jms.server.config.impl.* -import org.hornetq.jms.server.embedded.EmbeddedJMS -import org.hornetq.core.config.ClusterConnectionConfiguration - -String folder = arg[0]; -String id = arg[1]; -String offset = arg[2]; - -configuration = new ConfigurationImpl(); -configuration.setSecurityEnabled(false); -configuration.setJournalDirectory(folder + "/" + id + "/journal"); -configuration.setBindingsDirectory(folder + "/" + id + "/binding"); -configuration.setPagingDirectory(folder + "/" + id + "/paging"); -configuration.setLargeMessagesDirectory(folder + "/" + id + "/largemessage"); -configuration.setJournalType(org.hornetq.core.server.JournalType.NIO); -configuration.setPersistenceEnabled(true); -configuration.setFailoverOnServerShutdown(true); - -HashMap map = new HashMap(); -map.put(TransportConstants.HOST_PROP_NAME, "localhost"); -map.put(TransportConstants.PORT_PROP_NAME, (61616 + Integer.parseInt(offset))); -TransportConfiguration tpc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), map); -configuration.getAcceptorConfigurations().add(tpc); - -TransportConfiguration connectorConfig = new TransportConfiguration(NettyConnectorFactory.class.getName(), map, "netty"); -configuration.getConnectorConfigurations().put("netty", connectorConfig); - -ClusterConnectionConfiguration cc = new ClusterConnectionConfiguration("test-cluster", "jms", "netty", 200, - true, - true, - 1, - 1024, - "dg"); -configuration.getClusterConfigurations().add(cc); - -UDPBroadcastGroupConfiguration endpoint = new UDPBroadcastGroupConfiguration("231.7.7.7", 9876, null, -1); -List connectors = new ArrayList<>(); -connectors.add("netty"); -BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg", 250, - connectors, - endpoint); - -configuration.getBroadcastGroupConfigurations().add(bcConfig); - -DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg", 5000, 5000, endpoint); -configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig); - -jmsConfiguration = new JMSConfigurationImpl(); - -JMSQueueConfigurationImpl queueConfiguration = new JMSQueueConfigurationImpl("queue", null, true); -TopicConfigurationImpl topicConfiguration = new TopicConfigurationImpl("topic"); - - -jmsConfiguration.getQueueConfigurations().add(queueConfiguration); -jmsConfiguration.getTopicConfigurations().add(topicConfiguration); -server = new EmbeddedJMS(); -server.setConfiguration(configuration); -server.setJmsConfiguration(jmsConfiguration); -server.start(); - diff --git a/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy b/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy deleted file mode 100644 index f75f185e17..0000000000 --- a/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy +++ /dev/null @@ -1,91 +0,0 @@ -package servers - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// starts a clustered backup hornetq server -import org.hornetq.api.core.TransportConfiguration -import org.hornetq.api.core.BroadcastGroupConfiguration -import org.hornetq.api.core.UDPBroadcastGroupConfiguration -import org.hornetq.api.core.DiscoveryGroupConfiguration -import org.hornetq.core.config.impl.ConfigurationImpl -import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory -import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory -import org.hornetq.core.remoting.impl.netty.TransportConstants -import org.hornetq.jms.server.config.impl.JMSConfigurationImpl -import org.hornetq.jms.server.config.impl.* -import org.hornetq.jms.server.embedded.EmbeddedJMS -import org.hornetq.core.config.ClusterConnectionConfiguration - -String folder = arg[0]; -String id = arg[1]; -String offset = arg[2]; - -configuration = new ConfigurationImpl(); -configuration.setSecurityEnabled(false); -configuration.setJournalDirectory(folder + "/" + id + "/journal"); -configuration.setBindingsDirectory(folder + "/" + id + "/binding"); -configuration.setPagingDirectory(folder + "/" + id + "/paging"); -configuration.setLargeMessagesDirectory(folder + "/" + id + "/largemessage"); -configuration.setJournalType(org.hornetq.core.server.JournalType.NIO); -configuration.setPersistenceEnabled(true); - -configuration.setFailoverOnServerShutdown(true); -configuration.setBackup(true); -configuration.setSharedStore(true); - -HashMap map = new HashMap(); -map.put(TransportConstants.HOST_PROP_NAME, "localhost"); -map.put(TransportConstants.PORT_PROP_NAME, (61616 + Integer.parseInt(offset))); -TransportConfiguration tpc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), map); -configuration.getAcceptorConfigurations().add(tpc); - -TransportConfiguration connectorConfig = new TransportConfiguration(NettyConnectorFactory.class.getName(), map, "netty"); -configuration.getConnectorConfigurations().put("netty", connectorConfig); - -ClusterConnectionConfiguration cc = new ClusterConnectionConfiguration("test-cluster", "jms", "netty", 200, - true, - true, - 1, - 1024, - "dg"); -configuration.getClusterConfigurations().add(cc); - -UDPBroadcastGroupConfiguration endpoint = new UDPBroadcastGroupConfiguration("231.7.7.7", 9876, null, -1); -List connectors = new ArrayList<>(); -connectors.add("netty"); -BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg", 250, - connectors, - endpoint); - -configuration.getBroadcastGroupConfigurations().add(bcConfig); - -DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg", 5000, 5000, endpoint); -configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig); - -jmsConfiguration = new JMSConfigurationImpl(); - -JMSQueueConfigurationImpl queueConfiguration = new JMSQueueConfigurationImpl("queue", null, true); -TopicConfigurationImpl topicConfiguration = new TopicConfigurationImpl("topic"); - - -jmsConfiguration.getQueueConfigurations().add(queueConfiguration); -jmsConfiguration.getTopicConfigurations().add(topicConfiguration); -backupServer = new EmbeddedJMS(); -backupServer.setConfiguration(configuration); -backupServer.setJmsConfiguration(jmsConfiguration); -backupServer.start(); diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java deleted file mode 100644 index 55e03315d9..0000000000 --- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.tests.compatibility; - -import org.apache.activemq.artemis.utils.FileUtil; -import org.junit.After; -import org.junit.Before; - -public class FailoverServerBaseTest extends VersionedBaseTest { - - protected boolean stopServers = true; - - public FailoverServerBaseTest(String server, String sender, String receiver) throws Exception { - super(server, sender, receiver); - } - - @Before - public void setUp() throws Throwable { - FileUtil.deleteDirectory(serverFolder.getRoot()); - setVariable(serverClassloader, "persistent", Boolean.FALSE); - startServerWithBackup(serverFolder.getRoot(), serverClassloader, "live"); - } - - @After - public void tearDown() throws Throwable { - if (stopServers) { - stopServerWithBackup(serverClassloader); - } - } -} diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQClientProtocolTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQFailoverTest.java similarity index 58% rename from tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQClientProtocolTest.java rename to tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQFailoverTest.java index 1e7e8de324..6bdf664b99 100644 --- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQClientProtocolTest.java +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQFailoverTest.java @@ -17,14 +17,10 @@ package org.apache.activemq.artemis.tests.compatibility; -import org.apache.activemq.artemis.api.core.client.FailoverEventType; -import org.apache.activemq.artemis.jms.client.ActiveMQConnection; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - +import javax.jms.DeliveryMode; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import java.util.ArrayList; @@ -33,14 +29,26 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.client.FailoverEventType; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.utils.FileUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.HORNETQ_235; import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +/** This test will run a hornetq server with artemis clients + * and it will make sure that failover happens without any problems. */ @RunWith(Parameterized.class) -public class HQClientProtocolTest extends FailoverServerBaseTest { +public class HQFailoverTest extends VersionedBaseTest { @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}") public static Collection getParameters() { @@ -50,13 +58,38 @@ public class HQClientProtocolTest extends FailoverServerBaseTest { return combinations; } - public HQClientProtocolTest(String server, String sender, String receiver) throws Exception { + public HQFailoverTest(String server, String sender, String receiver) throws Exception { super(server, sender, receiver); } + @Before + public void setUp() throws Throwable { + FileUtil.deleteDirectory(serverFolder.getRoot()); + evaluate(serverClassloader, "hqfailovertest/hornetqServer.groovy", serverFolder.getRoot().getAbsolutePath()); + } + + @After + public void tearDown() throws Throwable { + execute(serverClassloader, "backupServer.stop(); liveServer.stop();"); + + } + @Test - public void FailoverTest() throws Throwable { - ActiveMQConnection conn = (ActiveMQConnection) evaluate(senderClassloader, "hqclientProtocolTest/failoverTest.groovy", server, sender, "failoverTestSend"); + public void failoverTest() throws Throwable { + String textBody = "a rapadura e doce mas nao e mole nao"; + String queueName = "queue"; + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:5445?ha=true&reconnectAttempts=10&protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false"); + + ActiveMQConnection conn = (ActiveMQConnection) cf.createConnection(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage(textBody + i)); + } + CountDownLatch latch = new CountDownLatch(1); conn.setFailoverListener(eventType -> { if (eventType == FailoverEventType.FAILOVER_COMPLETED) { @@ -64,13 +97,13 @@ public class HQClientProtocolTest extends FailoverServerBaseTest { } }); - stopServer(serverClassloader); + execute(serverClassloader, "liveServer.stop(true)"); assertTrue(latch.await(10, TimeUnit.SECONDS)); - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); - Queue queue = session.createQueue("queue"); + queue = session.createQueue("queue"); MessageConsumer consumer = session.createConsumer(queue); for (int i = 0; i < 10; i++) { Message msg = consumer.receive(5000); @@ -78,7 +111,5 @@ public class HQClientProtocolTest extends FailoverServerBaseTest { } assertNull(consumer.receiveNoWait()); - stopBackup(serverClassloader); - stopServers = false; } } diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java index 8f80591444..8dc3302e42 100644 --- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java @@ -203,38 +203,7 @@ public abstract class VersionedBaseTest { evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver); } - public void startServerWithBackup(File folder, ClassLoader loader, String serverName) throws Throwable { - folder.mkdirs(); - - System.out.println("Folder::" + folder); - - String liveScript; - String backupScript; - String topologyScript; - - if (server.startsWith("ARTEMIS")) { - liveScript = "servers/node/artemisServer.groovy"; - backupScript = "servers/node/artemisServer_backup.groovy"; - topologyScript = null; - } else { - liveScript = "servers/node/hornetqServer.groovy"; - backupScript = "servers/node/hornetqServer_backup.groovy"; - } - - evaluate(loader, liveScript, folder.getAbsolutePath(), serverName, "0"); - evaluate(loader, backupScript, folder.getAbsolutePath(), serverName, "1"); - } - - public void stopServerWithBackup(ClassLoader loader) throws Throwable { - execute(loader, "backupServer.stop()"); - execute(loader, "server.stop()"); - } - public void stopServer(ClassLoader loader) throws Throwable { execute(loader, "server.stop()"); } - - public void stopBackup(ClassLoader loader) throws Throwable { - execute(loader, "backupServer.stop()"); - } }