From 1928180a7c3dca6322e8888c7fc1a32cd3ae0133 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 29 Aug 2019 18:13:19 -0400 Subject: [PATCH] ARTEMIS-2467 Allowing create of static clusters through CLI --- .../activemq/artemis/cli/commands/Create.java | 53 +++++- .../commands/etc/cluster-static-settings.txt | 11 ++ .../cli/test/StreamClassPathTest.java | 1 + .../artemis/maven/ArtemisCreatePlugin.java | 7 + examples/features/ha/pom.xml | 2 + .../ha/shared-storage-static-cluster/pom.xml | 161 ++++++++++++++++++ .../example/SharedStorageStaticCluster.java | 157 +++++++++++++++++ 7 files changed, 390 insertions(+), 2 deletions(-) create mode 100644 artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/cluster-static-settings.txt create mode 100644 examples/features/ha/shared-storage-static-cluster/pom.xml create mode 100644 examples/features/ha/shared-storage-static-cluster/src/main/java/org/apache/activemq/artemis/jms/example/SharedStorageStaticCluster.java diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java index 6318d0b40d..36d0c3a75e 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java @@ -95,6 +95,7 @@ public class Create extends InputAbstract { public static final String ETC_SHARED_STORE_SETTINGS_TXT = "etc/shared-store-settings.txt"; public static final String ETC_CLUSTER_SECURITY_SETTINGS_TXT = "etc/cluster-security-settings.txt"; public static final String ETC_CLUSTER_SETTINGS_TXT = "etc/cluster-settings.txt"; + public static final String ETC_CLUSTER_STATIC_SETTINGS_TXT = "etc/cluster-static-settings.txt"; public static final String ETC_CONNECTOR_SETTINGS_TXT = "etc/connector-settings.txt"; public static final String ETC_BOOTSTRAP_WEB_SETTINGS_TXT = "etc/bootstrap-web-settings.txt"; public static final String ETC_JOURNAL_BUFFER_SETTINGS = "etc/journal-buffer-settings.txt"; @@ -275,6 +276,17 @@ public class Create extends InputAbstract { @Option(name = "--jdbc", description = "It will activate jdbc") boolean jdbc; + @Option(name = "--staticCluster", description = "Cluster node connectors list, separated by comma: Example \"tcp://server:61616,tcp://server2:61616,tcp://server3:61616\"") + String staticNode; + + public String[] getStaticNodes() { + if (staticNode == null) { + return new String[0]; + } else { + return staticNode.split(","); + } + } + @Option(name = "--jdbc-bindings-table-name", description = "Name of the jdbc bindigns table") private String jdbcBindings = ActiveMQDefaultConfiguration.getDefaultBindingsTableName(); @@ -571,6 +583,10 @@ public class Create extends InputAbstract { filters.put("${ping-config.settings}", readTextFile(ETC_COMMENTED_PING_TXT, filters)); } + if (staticNode != null) { + clustered = true; + } + if (replicated) { clustered = true; filters.put("${replicated.settings}", readTextFile(ETC_REPLICATED_SETTINGS_TXT, filters)); @@ -650,12 +666,27 @@ public class Create extends InputAbstract { if (name == null) { name = getHostForClustered(); } - String connectorSettings = readTextFile(ETC_CONNECTOR_SETTINGS_TXT, filters); + String connectorSettings = getConnectors(filters); filters.put("${name}", name); filters.put("${connector-config.settings}", connectorSettings); filters.put("${cluster-security.settings}", readTextFile(ETC_CLUSTER_SECURITY_SETTINGS_TXT, filters)); - filters.put("${cluster.settings}", readTextFile(ETC_CLUSTER_SETTINGS_TXT, filters)); + if (staticNode != null) { + + String staticCluster = readTextFile(ETC_CLUSTER_STATIC_SETTINGS_TXT, filters); + StringWriter stringWriter = new StringWriter(); + PrintWriter printWriter = new PrintWriter(stringWriter); + int countCluster = getStaticNodes().length; + for (int i = 0; i < countCluster; i++) { + printWriter.println(" node" + i + ""); + } + filters.put("${connectors-list}", stringWriter.toString()); + + staticCluster = applyFilters(staticCluster, filters); + filters.put("${cluster.settings}", staticCluster); + } else { + filters.put("${cluster.settings}", readTextFile(ETC_CLUSTER_SETTINGS_TXT, filters)); + } filters.put("${cluster-user}", getClusterUser()); filters.put("${cluster-password}", getClusterPassword()); } else { @@ -828,6 +859,24 @@ public class Create extends InputAbstract { return null; } + private String getConnectors(HashMap filters) throws IOException { + if (staticNode != null) { + StringWriter stringWriter = new StringWriter(); + PrintWriter printer = new PrintWriter(stringWriter); + printer.println(" "); + printer.println(" "); + printer.println(applyFilters(" tcp://${host}:${default.port}", filters)); + int counter = 0; + for (String node: getStaticNodes()) { + printer.println(" " + node + ""); + } + printer.println(" "); + return stringWriter.toString(); + } else { + return readTextFile(ETC_CONNECTOR_SETTINGS_TXT, filters); + } + } + private void printStar(String message) { int size = Math.min(message.length(), 80); StringBuffer buffer = new StringBuffer(size); diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/cluster-static-settings.txt b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/cluster-static-settings.txt new file mode 100644 index 0000000000..499b1affad --- /dev/null +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/cluster-static-settings.txt @@ -0,0 +1,11 @@ + + + artemis + ${message-load-balancing} + ${max-hops} + +${connectors-list} + + + + diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/StreamClassPathTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/StreamClassPathTest.java index c348476951..a950ac1c64 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/StreamClassPathTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/StreamClassPathTest.java @@ -49,6 +49,7 @@ public class StreamClassPathTest { testStream(Create.class, Create.ETC_SHARED_STORE_SETTINGS_TXT); testStream(Create.class, Create.ETC_CLUSTER_SECURITY_SETTINGS_TXT); testStream(Create.class, Create.ETC_CLUSTER_SETTINGS_TXT); + testStream(Create.class, Create.ETC_CLUSTER_STATIC_SETTINGS_TXT); testStream(Create.class, Create.ETC_CONNECTOR_SETTINGS_TXT); testStream(Create.class, Create.ETC_BOOTSTRAP_WEB_SETTINGS_TXT); testStream(Create.class, Create.ETC_JOURNAL_BUFFER_SETTINGS); diff --git a/artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ArtemisCreatePlugin.java b/artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ArtemisCreatePlugin.java index 191bb4493f..161d3df484 100644 --- a/artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ArtemisCreatePlugin.java +++ b/artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ArtemisCreatePlugin.java @@ -102,6 +102,9 @@ public class ArtemisCreatePlugin extends ArtemisAbstractPlugin { @Parameter(defaultValue = "false") private boolean slave; + @Parameter + private String staticCluster; + @Parameter(defaultValue = "./data") String dataFolder; @@ -192,6 +195,10 @@ public class ArtemisCreatePlugin extends ArtemisAbstractPlugin { add(listCommands, "--require-login"); } + if (staticCluster != null) { + add(listCommands, "--staticCluster", staticCluster); + } + if (!javaOptions.isEmpty()) { add(listCommands, "--java-options", javaOptions); } diff --git a/examples/features/ha/pom.xml b/examples/features/ha/pom.xml index b3afd99f24..ea46436751 100644 --- a/examples/features/ha/pom.xml +++ b/examples/features/ha/pom.xml @@ -60,6 +60,7 @@ under the License. replicated-multiple-failover replicated-transaction-failover scale-down + shared-storage-static-cluster stop-server-failover transaction-failover @@ -80,6 +81,7 @@ under the License. replicated-multiple-failover replicated-transaction-failover scale-down + shared-storage-static-cluster transaction-failover diff --git a/examples/features/ha/shared-storage-static-cluster/pom.xml b/examples/features/ha/shared-storage-static-cluster/pom.xml new file mode 100644 index 0000000000..7853633a41 --- /dev/null +++ b/examples/features/ha/shared-storage-static-cluster/pom.xml @@ -0,0 +1,161 @@ + + + + + 4.0.0 + + + org.apache.activemq.examples.failover + broker-failover + 2.11.0-SNAPSHOT + + + shared-storage-static-cluster + jar + ActiveMQ Artemis JMS Client Side Failover Listener Example + + + ${project.basedir}/../../../.. + + + + + org.apache.activemq + artemis-cli + ${project.version} + + + org.apache.activemq + artemis-jms-client + ${project.version} + + + + + + + org.apache.activemq + artemis-maven-plugin + + + create0 + + create + + + + -Djava.net.preferIPv4Stack=true + ${basedir}/target/server0 + ../shared + true + false + true + tcp://localhost:61617,tcp://localhost:61618 + + --queues + exampleQueue + + + + + create1 + + create + + + + -Djava.net.preferIPv4Stack=true + ${basedir}/target/server1 + ../shared + true + true + 1 + true + tcp://localhost:61616,tcp://localhost:61618 + + --queues + exampleQueue + + + + + create2 + + create + + + + -Djava.net.preferIPv4Stack=true + ${basedir}/target/server2 + ../shared + true + true + 2 + true + tcp://localhost:61616,tcp://localhost:61617 + + --queues + exampleQueue + + + + + runClient + + runClient + + + org.apache.activemq.artemis.jms.example.SharedStorageStaticCluster + + ${basedir}/target/server0 + ${basedir}/target/server1 + ${basedir}/target/server2 + + + + + + + org.apache.activemq.examples.failover + shared-storage-static-cluster + ${project.version} + + + + + org.apache.maven.plugins + maven-clean-plugin + + + + + + release + + + + com.vladsch.flexmark + markdown-page-generator-plugin + + + + + + \ No newline at end of file diff --git a/examples/features/ha/shared-storage-static-cluster/src/main/java/org/apache/activemq/artemis/jms/example/SharedStorageStaticCluster.java b/examples/features/ha/shared-storage-static-cluster/src/main/java/org/apache/activemq/artemis/jms/example/SharedStorageStaticCluster.java new file mode 100644 index 0000000000..32660e4178 --- /dev/null +++ b/examples/features/ha/shared-storage-static-cluster/src/main/java/org/apache/activemq/artemis/jms/example/SharedStorageStaticCluster.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jms.example; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.InitialContext; + +import org.apache.activemq.artemis.api.core.client.FailoverEventListener; +import org.apache.activemq.artemis.api.core.client.FailoverEventType; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.util.ServerUtil; + +/** + * This example demonstrates how you can listen on failover event on the client side + *

+ * In this example there are two nodes running in a cluster, both server will be running for start, + * but after a while the first server will crash. This will trigger a fail-over event + */ +public class SharedStorageStaticCluster { + + private static Process server0; + + private static Process server1; + + private static Process server2; + + public static void main(final String[] args) throws Exception { + + for (String x : args) { + System.out.println("::" + x); + } + InitialContext initialContext = null; + + Connection connection = null; + + try { + server0 = ServerUtil.startServer(args[0], SharedStorageStaticCluster.class.getSimpleName() + "0", 0, 5000); + server1 = ServerUtil.startServer(args[1], SharedStorageStaticCluster.class.getSimpleName() + "1", 1, 0); + + // Step 3. Look-up a JMS Connection Factory object from JNDI on server 0 + //ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://(localhost:61616,localhost:61617,localhost:61618)"); + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("(tcp://localhost:61616,tcp://localhost:61617,tcp://localhost:61618)?ha=true&retryInterval=100&retryIntervalMultiplier=1.0&reconnectAttempts=-1"); + + // Step 4. We create 1 JMS connections from the same connection factory. + // Wait a little while to make sure broadcasts from all nodes have reached the client + Thread.sleep(5000); + connection = connectionFactory.createConnection(); + ((ActiveMQConnection) connection).setFailoverListener(new FailoverListenerImpl()); + + // Step 5. We create JMS Sessions + final Session sessionA = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = (Queue) sessionA.createQueue("exampleQueue"); + + // Step 6. We create JMS MessageProducer objects on the sessions + MessageProducer producerA = sessionA.createProducer(queue); + + // Step 7. We send some messages on each producer + final int numMessages = 10; + + for (int i = 0; i < numMessages; i++) { + TextMessage messageA = sessionA.createTextMessage("A:This is text message " + i); + producerA.send(messageA); + System.out.println("Sent message: " + messageA.getText()); + + } + + // Step 8. We start the connection to consume messages + connection.start(); + + MessageConsumer consumer = sessionA.createConsumer(queue); + + for (int i = 0; i < numMessages; i++) { + TextMessage message = (TextMessage) consumer.receive(2000); + System.out.println("Got message: " + message.getText() + " from node A"); + if (i == 5) { + ServerUtil.killServer(server0); + } + } + + System.out.println("receive other message from node B: " + consumer.receive(2000)); + + consumer.close(); + + server2 = ServerUtil.startServer(args[2], SharedStorageStaticCluster.class.getSimpleName() + "2", 2, 0); + + Thread.sleep(5000); + + server1.destroy(); + + Thread.sleep(5000); + + for (int i = 0; i < numMessages; i++) { + try { + TextMessage messageA = sessionA.createTextMessage("A:This is text message after kill " + i); + producerA.send(messageA); + System.out.println("Sent message: " + messageA.getText()); + } catch (Exception e) { + // it can happen + e.printStackTrace(); + } + } + + connection.start(); + consumer = sessionA.createConsumer(queue); + + for (int i = 0; i < numMessages; i++) { + TextMessage message = (TextMessage) consumer.receive(2000); + System.out.println("Got message: " + message.getText()); + } + + consumer.close(); + + } finally { + // Step 10. Be sure to close our resources! + + if (connection != null) { + connection.close(); + } + + if (initialContext != null) { + initialContext.close(); + } + + ServerUtil.killServer(server0); + ServerUtil.killServer(server1); + } + } + + private static class FailoverListenerImpl implements FailoverEventListener { + + @Override + public void failoverEvent(FailoverEventType eventType) { + System.out.println("Failover event triggered :" + eventType.toString()); + } + } +}