diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java index 044d179fdf..76b993c69a 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryDatabase; import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO; import org.apache.activemq.artemis.core.persistence.impl.journal.JDBCJournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; +import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; @@ -195,6 +196,7 @@ public class DBOption extends OptionalLocking { configuration.setJournalDirectory(getJournal()); configuration.setPagingDirectory(getPaging()); configuration.setLargeMessagesDirectory(getLargeMessages()); + configuration.setJournalType(JournalType.NIO); } return configuration; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index e2bfd0fdc5..e8ac8f8d46 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -256,10 +256,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C public void setBackupConnector(final TransportConfiguration live, final TransportConfiguration backUp) { Connector localConnector = connector; - if (backUp != null) { - this.clientProtocolManager.updateTransportConfiguration(backUp); - } - // if the connector has never been used (i.e. the getConnection hasn't been called yet), we will need // to create a connector just to validate if the parameters are ok. // so this will create the instance to be used on the isEquivalent check diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java index c58a0bde6b..47401c5043 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java @@ -429,6 +429,12 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { } } + + protected ClusterTopologyChangeMessage updateTransportConfiguration(final ClusterTopologyChangeMessage topMessage) { + return topMessage; + } + + private class Channel0Handler implements ChannelHandler { private final CoreRemotingConnection conn; @@ -456,13 +462,13 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { topologyResponseHandler.nodeDisconnected(conn, nodeID == null ? null : nodeID.toString(), scaleDownTargetNodeID); } else if (type == PacketImpl.CLUSTER_TOPOLOGY) { ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet; - notifyTopologyChange(topMessage); + notifyTopologyChange(updateTransportConfiguration(topMessage)); } else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2) { ClusterTopologyChangeMessage_V2 topMessage = (ClusterTopologyChangeMessage_V2) packet; - notifyTopologyChange(topMessage); + notifyTopologyChange(updateTransportConfiguration(topMessage)); } else if (type == PacketImpl.CLUSTER_TOPOLOGY || type == PacketImpl.CLUSTER_TOPOLOGY_V2 || type == PacketImpl.CLUSTER_TOPOLOGY_V3) { ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet; - notifyTopologyChange(topMessage); + notifyTopologyChange(updateTransportConfiguration(topMessage)); } else if (type == PacketImpl.CHECK_FOR_FAILOVER_REPLY) { System.out.println("Channel0Handler.handlePacket"); } @@ -471,7 +477,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { /** * @param topMessage */ - private void notifyTopologyChange(final ClusterTopologyChangeMessage topMessage) { + protected void notifyTopologyChange(final ClusterTopologyChangeMessage topMessage) { final long eventUID; final String backupGroupName; final String scaleDownGroupName; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java index 9788e1e4cb..e2c9fc1ee2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java @@ -22,7 +22,6 @@ import java.util.concurrent.locks.Lock; import io.netty.channel.ChannelPipeline; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; -import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -79,6 +78,4 @@ public interface ClientProtocolManager { String getName(); - default void updateTransportConfiguration(TransportConfiguration backUp) { - } } diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java index f0010eab40..273038362c 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; @@ -73,11 +74,20 @@ public class HornetQClientProtocolManager extends ActiveMQClientProtocolManager return true; } + @Override - public void updateTransportConfiguration(TransportConfiguration connector) { - String factoryClassName = connector.getFactoryClassName(); - if ("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory".equals(factoryClassName)) { - connector.setFactoryClassName(NettyConnectorFactory.class.getName()); + protected ClusterTopologyChangeMessage updateTransportConfiguration(final ClusterTopologyChangeMessage topMessage) { + updateTransportConfiguration(topMessage.getPair().getA()); + updateTransportConfiguration(topMessage.getPair().getB()); + return super.updateTransportConfiguration(topMessage); + } + + private void updateTransportConfiguration(TransportConfiguration connector) { + if (connector != null) { + String factoryClassName = connector.getFactoryClassName(); + if ("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory".equals(factoryClassName)) { + connector.setFactoryClassName(NettyConnectorFactory.class.getName()); + } } } diff --git a/tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy b/tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy deleted file mode 100644 index c179a1b7ec..0000000000 --- a/tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy +++ /dev/null @@ -1,37 +0,0 @@ -package clients -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Create a client connection factory - -import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory -import org.apache.activemq.artemis.tests.compatibility.GroovyRun; - -println("serverType " + serverArg[0]); - -if (serverArg[0].startsWith("HORNETQ")) { - cf = new ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&reconnectAttempts=10&protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false"); -} else { - cf = new ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&reconnectAttempts=10&confirmationWindowSize=1048576&blockOnDurableSend=false"); -} - - -GroovyRun.assertTrue(!cf.getServerLocator().isBlockOnDurableSend()); -GroovyRun.assertEquals(1048576, cf.getServerLocator().getConfirmationWindowSize()); -GroovyRun.assertTrue(cf.getServerLocator().isHA()); -GroovyRun.assertEquals(10, cf.getServerLocator().getReconnectAttempts()); - diff --git a/tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy b/tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy deleted file mode 100644 index 0dedb962fa..0000000000 --- a/tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy +++ /dev/null @@ -1,85 +0,0 @@ -package meshTest - -import org.apache.activemq.artemis.tests.compatibility.GroovyRun - -import javax.jms.* -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// starts an artemis server -String serverType = arg[0]; -String clientType = arg[1]; -String operation = arg[2]; - - -String queueName = "queue"; - - -String textBody = "a rapadura e doce mas nao e mole nao"; - -println("clientType " + clientType); -println("serverType " + serverType); - -if (clientType.startsWith("ARTEMIS")) { - // Can't depend directly on artemis, otherwise it wouldn't compile in hornetq - GroovyRun.evaluate("clients/artemisHQClientHA.groovy", "serverArg", serverType); -} else { - throw new RuntimeException("The test is not meant for client type: " + clientType); -} - - -Connection connection = cf.createConnection(); -Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); -Queue queue = session.createQueue(queueName); - -if (operation.equals("failoverTestSend")) { - - CountDownLatch latch = new CountDownLatch(10); - - CompletionListener completionListener = new CompletionListener() { - @Override - void onCompletion(Message message) { - latch.countDown(); - } - - @Override - void onException(Message message, Exception exception) { - - } - } - - MessageProducer producer = session.createProducer(queue); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - for (int i = 0; i < 10; i++) { - producer.send(session.createTextMessage(textBody + i), completionListener); - } - - GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS)); - - System.out.println("Message sent"); - - return connection; -} else { - throw new RuntimeException("Invalid operation " + operation); -} - - - - diff --git a/tests/compatibility-tests/src/main/resources/hqfailovertest/hornetqServer.groovy b/tests/compatibility-tests/src/main/resources/hqfailovertest/hornetqServer.groovy new file mode 100644 index 0000000000..83cbe229ba --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/hqfailovertest/hornetqServer.groovy @@ -0,0 +1,369 @@ +package hqfailovertest + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +// This is a failover test with a hornetq server. +// these configurations were taken out of hornetq's testsuite.. more specifically FailoverTestBase +// sorry for the mess of copy & paste here but i couldn't depend directly into hornetq's test jar +// and I coudn't either spawn a new VM here. +// it's not possible to do FileLock without spawning a new VM.. we had InVMNodeManagers that were part of the testsuite +// + + + +import org.hornetq.api.core.* +import org.hornetq.api.core.TransportConfiguration +import org.hornetq.core.config.* +import org.hornetq.core.config.impl.* + +import org.hornetq.core.config.impl.ConfigurationImpl +import org.hornetq.core.server.* +import org.hornetq.core.server.impl.* +import org.hornetq.core.settings.impl.* +import org.hornetq.jms.server.config.impl.* +import org.hornetq.spi.core.security.* +import org.hornetq.core.remoting.impl.invm.* +import org.hornetq.core.remoting.impl.netty.* +import org.hornetq.utils.* + +import javax.management.MBeanServer +import java.lang.management.ManagementFactory +import java.util.concurrent.Semaphore + + + +folder = arg[0]; +nodeManager = new InVMNodeManager(false, folder); + +backupConfig = createDefaultConfig(); +backupConfig.getAcceptorConfigurations().clear(); +backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false)); +backupConfig.setSharedStore(true); +backupConfig.setBackup(true); +backupConfig.setFailbackDelay(1000); + +TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); +TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); +backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); +backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector); +basicClusterConnectionConfig(backupConfig, backupConnector.getName(), liveConnector.getName()); +backupServer = createTestableServer(backupConfig, nodeManager); + +liveConfig = createDefaultConfig(); +liveConfig.getAcceptorConfigurations().clear(); +liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true)); +liveConfig.setSharedStore(true); +liveConfig.setFailbackDelay(1000); + +basicClusterConnectionConfig(liveConfig, liveConnector.getName()); +liveConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector); +liveServer = createTestableServer(liveConfig, nodeManager); + +liveServer.start(); +backupServer.start(); + +liveServer.createQueue(SimpleString.toSimpleString("jms.queue.queue"), SimpleString.toSimpleString("jms.queue.queue"), null, true, false); + + + + +protected Configuration createDefaultConfig() throws Exception { + return createDefaultConfig(new HashMap(), StaticProperties.INVM_ACCEPTOR_FACTORY, StaticProperties.NETTY_ACCEPTOR_FACTORY); +} + + +HornetQServer createInVMFailoverServer(final boolean realFiles, + final Configuration configuration, + NodeManager nodeManager, + final int id) { + HornetQServer server; + HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl(); + configuration.setPersistenceEnabled(realFiles); + server = new InVMNodeManagerServer(configuration, + ManagementFactory.getPlatformMBeanServer(), + securityManager, + nodeManager); + + server.setIdentity("Server " + id); + + AddressSettings defaultSetting = new AddressSettings(); + defaultSetting.setPageSizeBytes(10 * 1024 * 1024); + defaultSetting.setMaxSizeBytes(1024 * 1024 * 1024); + + server.getAddressSettingsRepository().addMatch("#", defaultSetting); + + return server; +} + +Configuration createDefaultConfig(final Map params, final String... acceptors) throws Exception { + ConfigurationImpl configuration = createBasicConfig(-1); + + configuration.setFileDeploymentEnabled(false); + configuration.setJMXManagementEnabled(false); + + configuration.getAcceptorConfigurations().clear(); + + for (String acceptor : acceptors) { + TransportConfiguration transportConfig = new TransportConfiguration(acceptor, params); + configuration.getAcceptorConfigurations().add(transportConfig); + } + return configuration; +} + +protected ConfigurationImpl createBasicConfig(final int serverID) +{ + ConfigurationImpl configuration = new ConfigurationImpl(); + configuration.setSecurityEnabled(false); + configuration.setJournalMinFiles(2); + configuration.setJournalFileSize(100 * 1024); + + configuration.setJournalType(JournalType.NIO); + + configuration.setJournalDirectory(folder) + configuration.setBindingsDirectory(folder + "/bindings") + configuration.setPagingDirectory(folder + "/paging") + configuration.setLargeMessagesDirectory(folder + "/largemessages") + + configuration.setJournalCompactMinFiles(0); + configuration.setJournalCompactPercentage(0); + configuration.setClusterPassword("changeme"); + return configuration; +} + + + +protected static final void basicClusterConnectionConfig(Configuration mainConfig, String connectorName, + String... connectors) { + ArrayList connectors0 = new ArrayList(); + for (String c : connectors) { + connectors0.add(c); + } + basicClusterConnectionConfig(mainConfig, connectorName, connectors0); +} + +protected static final void basicClusterConnectionConfig(Configuration mainConfig, + String connectorName, + List connectors) { + ClusterConnectionConfiguration ccc = null; + + ccc = new ClusterConnectionConfiguration("cluster1", "jms", connectorName, 10, false, false, 1, 1, connectors, + false); + + mainConfig.getClusterConfigurations().add(ccc); +} + +public final class InVMNodeManager extends NodeManager +{ + + private final Semaphore liveLock; + + private final Semaphore backupLock; + + public enum State {LIVE, PAUSED, FAILING_BACK, NOT_STARTED} + + public State state = State.NOT_STARTED; + + public long failoverPause = 0l; + + public InVMNodeManager(boolean replicatedBackup) + { + this(replicatedBackup, null); + if (replicatedBackup) + throw new RuntimeException("if replicated-backup, we need its journal directory"); + } + + public InVMNodeManager(boolean replicatedBackup, String directory) + { + super(replicatedBackup, directory); + liveLock = new Semaphore(1); + backupLock = new Semaphore(1); + setUUID(UUIDGenerator.getInstance().generateUUID()); + } + + public void awaitLiveNode() throws Exception + { + while (true) + { + while (state == State.NOT_STARTED) + { + Thread.sleep(2000); + } + + liveLock.acquire(); + + if (state == State.PAUSED) + { + liveLock.release(); + Thread.sleep(2000); + } + else if (state == State.FAILING_BACK) + { + liveLock.release(); + Thread.sleep(2000); + } + else if (state == State.LIVE) + { + break; + } + } + if(failoverPause > 0l) + { + Thread.sleep(failoverPause); + } + } + + public void startBackup() throws Exception + { + backupLock.acquire(); + } + + public void startLiveNode() throws Exception + { + state = State.FAILING_BACK; + liveLock.acquire(); + state = State.LIVE; + } + + public void pauseLiveServer() throws Exception + { + state = State.PAUSED; + liveLock.release(); + } + + public void crashLiveServer() throws Exception + { + //overkill as already set to live + state = State.LIVE; + liveLock.release(); + } + + public boolean isAwaitingFailback() throws Exception + { + return state == State.FAILING_BACK; + } + + public boolean isBackupLive() throws Exception + { + return liveLock.availablePermits() == 0; + } + + public void interrupt() + { + // + } + + public void releaseBackup() + { + if (backupLock != null) + { + backupLock.release(); + } + } + + public SimpleString readNodeId() throws HornetQIllegalStateException, IOException + { + return getNodeId(); + } +} + +final class InVMNodeManagerServer extends HornetQServerImpl { + final NodeManager nodeManager; + + public InVMNodeManagerServer(final NodeManager nodeManager) { + super(); + this.nodeManager = nodeManager; + } + + public InVMNodeManagerServer(final Configuration configuration, final NodeManager nodeManager) { + super(configuration); + this.nodeManager = nodeManager; + } + + public InVMNodeManagerServer(final Configuration configuration, + final MBeanServer mbeanServer, + final NodeManager nodeManager) { + super(configuration, mbeanServer); + this.nodeManager = nodeManager; + } + + public InVMNodeManagerServer(final Configuration configuration, + final HornetQSecurityManager securityManager, + final NodeManager nodeManager) { + super(configuration, securityManager); + this.nodeManager = nodeManager; + } + + public InVMNodeManagerServer(final Configuration configuration, + final MBeanServer mbeanServer, + final HornetQSecurityManager securityManager, + final NodeManager nodeManager) { + super(configuration, mbeanServer, securityManager); + this.nodeManager = nodeManager; + } + + @Override + protected NodeManager createNodeManager( + final String directory, final String nodeGroupName, boolean replicatingBackup) { + nodeManager.setNodeGroupName(nodeGroupName); + return nodeManager; + } + +} + + +private static TransportConfiguration transportConfiguration(String classname, boolean live, int server) +{ + if (classname.contains("netty")) + { + Map serverParams = new HashMap(); + Integer port = live ? 5445 : 5545; + serverParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port); + return new TransportConfiguration(classname, serverParams); + } + + Map serverParams = new HashMap(); + serverParams.put(TransportConstants.SERVER_ID_PROP_NAME, live ? server : server + 100); + return new TransportConfiguration(classname, serverParams); +} + + + +protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) +{ + return transportConfiguration(StaticProperties.NETTY_ACCEPTOR_FACTORY, live, 1); +} + +protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) +{ + return transportConfiguration(StaticProperties.NETTY_CONNECTOR_FACTORY, live, 1); +} + +protected HornetQServer createTestableServer(Configuration config, NodeManager nodeManager) +{ + return createInVMFailoverServer(true, config, nodeManager, 1) +} + + +class StaticProperties { + static String INVM_ACCEPTOR_FACTORY = InVMAcceptorFactory.class.getCanonicalName(); + static String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName(); + static String NETTY_ACCEPTOR_FACTORY = NettyAcceptorFactory.class.getCanonicalName(); + static String NETTY_CONNECTOR_FACTORY = NettyConnectorFactory.class.getCanonicalName(); +} diff --git a/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy b/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy deleted file mode 100644 index 6460850e30..0000000000 --- a/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy +++ /dev/null @@ -1,89 +0,0 @@ -package servers - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// starts a clustered live hornetq server -import org.hornetq.api.core.TransportConfiguration -import org.hornetq.api.core.BroadcastGroupConfiguration -import org.hornetq.api.core.UDPBroadcastGroupConfiguration -import org.hornetq.api.core.DiscoveryGroupConfiguration -import org.hornetq.core.config.impl.ConfigurationImpl -import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory -import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory -import org.hornetq.core.remoting.impl.netty.TransportConstants -import org.hornetq.jms.server.config.impl.JMSConfigurationImpl -import org.hornetq.jms.server.config.impl.* -import org.hornetq.jms.server.embedded.EmbeddedJMS -import org.hornetq.core.config.ClusterConnectionConfiguration - -String folder = arg[0]; -String id = arg[1]; -String offset = arg[2]; - -configuration = new ConfigurationImpl(); -configuration.setSecurityEnabled(false); -configuration.setJournalDirectory(folder + "/" + id + "/journal"); -configuration.setBindingsDirectory(folder + "/" + id + "/binding"); -configuration.setPagingDirectory(folder + "/" + id + "/paging"); -configuration.setLargeMessagesDirectory(folder + "/" + id + "/largemessage"); -configuration.setJournalType(org.hornetq.core.server.JournalType.NIO); -configuration.setPersistenceEnabled(true); -configuration.setFailoverOnServerShutdown(true); - -HashMap map = new HashMap(); -map.put(TransportConstants.HOST_PROP_NAME, "localhost"); -map.put(TransportConstants.PORT_PROP_NAME, (61616 + Integer.parseInt(offset))); -TransportConfiguration tpc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), map); -configuration.getAcceptorConfigurations().add(tpc); - -TransportConfiguration connectorConfig = new TransportConfiguration(NettyConnectorFactory.class.getName(), map, "netty"); -configuration.getConnectorConfigurations().put("netty", connectorConfig); - -ClusterConnectionConfiguration cc = new ClusterConnectionConfiguration("test-cluster", "jms", "netty", 200, - true, - true, - 1, - 1024, - "dg"); -configuration.getClusterConfigurations().add(cc); - -UDPBroadcastGroupConfiguration endpoint = new UDPBroadcastGroupConfiguration("231.7.7.7", 9876, null, -1); -List connectors = new ArrayList<>(); -connectors.add("netty"); -BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg", 250, - connectors, - endpoint); - -configuration.getBroadcastGroupConfigurations().add(bcConfig); - -DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg", 5000, 5000, endpoint); -configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig); - -jmsConfiguration = new JMSConfigurationImpl(); - -JMSQueueConfigurationImpl queueConfiguration = new JMSQueueConfigurationImpl("queue", null, true); -TopicConfigurationImpl topicConfiguration = new TopicConfigurationImpl("topic"); - - -jmsConfiguration.getQueueConfigurations().add(queueConfiguration); -jmsConfiguration.getTopicConfigurations().add(topicConfiguration); -server = new EmbeddedJMS(); -server.setConfiguration(configuration); -server.setJmsConfiguration(jmsConfiguration); -server.start(); - diff --git a/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy b/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy deleted file mode 100644 index f75f185e17..0000000000 --- a/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy +++ /dev/null @@ -1,91 +0,0 @@ -package servers - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// starts a clustered backup hornetq server -import org.hornetq.api.core.TransportConfiguration -import org.hornetq.api.core.BroadcastGroupConfiguration -import org.hornetq.api.core.UDPBroadcastGroupConfiguration -import org.hornetq.api.core.DiscoveryGroupConfiguration -import org.hornetq.core.config.impl.ConfigurationImpl -import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory -import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory -import org.hornetq.core.remoting.impl.netty.TransportConstants -import org.hornetq.jms.server.config.impl.JMSConfigurationImpl -import org.hornetq.jms.server.config.impl.* -import org.hornetq.jms.server.embedded.EmbeddedJMS -import org.hornetq.core.config.ClusterConnectionConfiguration - -String folder = arg[0]; -String id = arg[1]; -String offset = arg[2]; - -configuration = new ConfigurationImpl(); -configuration.setSecurityEnabled(false); -configuration.setJournalDirectory(folder + "/" + id + "/journal"); -configuration.setBindingsDirectory(folder + "/" + id + "/binding"); -configuration.setPagingDirectory(folder + "/" + id + "/paging"); -configuration.setLargeMessagesDirectory(folder + "/" + id + "/largemessage"); -configuration.setJournalType(org.hornetq.core.server.JournalType.NIO); -configuration.setPersistenceEnabled(true); - -configuration.setFailoverOnServerShutdown(true); -configuration.setBackup(true); -configuration.setSharedStore(true); - -HashMap map = new HashMap(); -map.put(TransportConstants.HOST_PROP_NAME, "localhost"); -map.put(TransportConstants.PORT_PROP_NAME, (61616 + Integer.parseInt(offset))); -TransportConfiguration tpc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), map); -configuration.getAcceptorConfigurations().add(tpc); - -TransportConfiguration connectorConfig = new TransportConfiguration(NettyConnectorFactory.class.getName(), map, "netty"); -configuration.getConnectorConfigurations().put("netty", connectorConfig); - -ClusterConnectionConfiguration cc = new ClusterConnectionConfiguration("test-cluster", "jms", "netty", 200, - true, - true, - 1, - 1024, - "dg"); -configuration.getClusterConfigurations().add(cc); - -UDPBroadcastGroupConfiguration endpoint = new UDPBroadcastGroupConfiguration("231.7.7.7", 9876, null, -1); -List connectors = new ArrayList<>(); -connectors.add("netty"); -BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg", 250, - connectors, - endpoint); - -configuration.getBroadcastGroupConfigurations().add(bcConfig); - -DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg", 5000, 5000, endpoint); -configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig); - -jmsConfiguration = new JMSConfigurationImpl(); - -JMSQueueConfigurationImpl queueConfiguration = new JMSQueueConfigurationImpl("queue", null, true); -TopicConfigurationImpl topicConfiguration = new TopicConfigurationImpl("topic"); - - -jmsConfiguration.getQueueConfigurations().add(queueConfiguration); -jmsConfiguration.getTopicConfigurations().add(topicConfiguration); -backupServer = new EmbeddedJMS(); -backupServer.setConfiguration(configuration); -backupServer.setJmsConfiguration(jmsConfiguration); -backupServer.start(); diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java deleted file mode 100644 index 55e03315d9..0000000000 --- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.tests.compatibility; - -import org.apache.activemq.artemis.utils.FileUtil; -import org.junit.After; -import org.junit.Before; - -public class FailoverServerBaseTest extends VersionedBaseTest { - - protected boolean stopServers = true; - - public FailoverServerBaseTest(String server, String sender, String receiver) throws Exception { - super(server, sender, receiver); - } - - @Before - public void setUp() throws Throwable { - FileUtil.deleteDirectory(serverFolder.getRoot()); - setVariable(serverClassloader, "persistent", Boolean.FALSE); - startServerWithBackup(serverFolder.getRoot(), serverClassloader, "live"); - } - - @After - public void tearDown() throws Throwable { - if (stopServers) { - stopServerWithBackup(serverClassloader); - } - } -} diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQClientProtocolTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQFailoverTest.java similarity index 58% rename from tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQClientProtocolTest.java rename to tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQFailoverTest.java index 1e7e8de324..6bdf664b99 100644 --- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQClientProtocolTest.java +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQFailoverTest.java @@ -17,14 +17,10 @@ package org.apache.activemq.artemis.tests.compatibility; -import org.apache.activemq.artemis.api.core.client.FailoverEventType; -import org.apache.activemq.artemis.jms.client.ActiveMQConnection; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - +import javax.jms.DeliveryMode; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import java.util.ArrayList; @@ -33,14 +29,26 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.client.FailoverEventType; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.utils.FileUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.HORNETQ_235; import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +/** This test will run a hornetq server with artemis clients + * and it will make sure that failover happens without any problems. */ @RunWith(Parameterized.class) -public class HQClientProtocolTest extends FailoverServerBaseTest { +public class HQFailoverTest extends VersionedBaseTest { @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}") public static Collection getParameters() { @@ -50,13 +58,38 @@ public class HQClientProtocolTest extends FailoverServerBaseTest { return combinations; } - public HQClientProtocolTest(String server, String sender, String receiver) throws Exception { + public HQFailoverTest(String server, String sender, String receiver) throws Exception { super(server, sender, receiver); } + @Before + public void setUp() throws Throwable { + FileUtil.deleteDirectory(serverFolder.getRoot()); + evaluate(serverClassloader, "hqfailovertest/hornetqServer.groovy", serverFolder.getRoot().getAbsolutePath()); + } + + @After + public void tearDown() throws Throwable { + execute(serverClassloader, "backupServer.stop(); liveServer.stop();"); + + } + @Test - public void FailoverTest() throws Throwable { - ActiveMQConnection conn = (ActiveMQConnection) evaluate(senderClassloader, "hqclientProtocolTest/failoverTest.groovy", server, sender, "failoverTestSend"); + public void failoverTest() throws Throwable { + String textBody = "a rapadura e doce mas nao e mole nao"; + String queueName = "queue"; + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:5445?ha=true&reconnectAttempts=10&protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false"); + + ActiveMQConnection conn = (ActiveMQConnection) cf.createConnection(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage(textBody + i)); + } + CountDownLatch latch = new CountDownLatch(1); conn.setFailoverListener(eventType -> { if (eventType == FailoverEventType.FAILOVER_COMPLETED) { @@ -64,13 +97,13 @@ public class HQClientProtocolTest extends FailoverServerBaseTest { } }); - stopServer(serverClassloader); + execute(serverClassloader, "liveServer.stop(true)"); assertTrue(latch.await(10, TimeUnit.SECONDS)); - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); - Queue queue = session.createQueue("queue"); + queue = session.createQueue("queue"); MessageConsumer consumer = session.createConsumer(queue); for (int i = 0; i < 10; i++) { Message msg = consumer.receive(5000); @@ -78,7 +111,5 @@ public class HQClientProtocolTest extends FailoverServerBaseTest { } assertNull(consumer.receiveNoWait()); - stopBackup(serverClassloader); - stopServers = false; } } diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java index 8f80591444..8dc3302e42 100644 --- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java @@ -203,38 +203,7 @@ public abstract class VersionedBaseTest { evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver); } - public void startServerWithBackup(File folder, ClassLoader loader, String serverName) throws Throwable { - folder.mkdirs(); - - System.out.println("Folder::" + folder); - - String liveScript; - String backupScript; - String topologyScript; - - if (server.startsWith("ARTEMIS")) { - liveScript = "servers/node/artemisServer.groovy"; - backupScript = "servers/node/artemisServer_backup.groovy"; - topologyScript = null; - } else { - liveScript = "servers/node/hornetqServer.groovy"; - backupScript = "servers/node/hornetqServer_backup.groovy"; - } - - evaluate(loader, liveScript, folder.getAbsolutePath(), serverName, "0"); - evaluate(loader, backupScript, folder.getAbsolutePath(), serverName, "1"); - } - - public void stopServerWithBackup(ClassLoader loader) throws Throwable { - execute(loader, "backupServer.stop()"); - execute(loader, "server.stop()"); - } - public void stopServer(ClassLoader loader) throws Throwable { execute(loader, "server.stop()"); } - - public void stopBackup(ClassLoader loader) throws Throwable { - execute(loader, "backupServer.stop()"); - } }