This commit is contained in:
Clebert Suconic 2019-09-10 15:04:19 -04:00
commit 9002bc533f
7 changed files with 390 additions and 2 deletions

View File

@ -95,6 +95,7 @@ public class Create extends InputAbstract {
public static final String ETC_SHARED_STORE_SETTINGS_TXT = "etc/shared-store-settings.txt";
public static final String ETC_CLUSTER_SECURITY_SETTINGS_TXT = "etc/cluster-security-settings.txt";
public static final String ETC_CLUSTER_SETTINGS_TXT = "etc/cluster-settings.txt";
public static final String ETC_CLUSTER_STATIC_SETTINGS_TXT = "etc/cluster-static-settings.txt";
public static final String ETC_CONNECTOR_SETTINGS_TXT = "etc/connector-settings.txt";
public static final String ETC_BOOTSTRAP_WEB_SETTINGS_TXT = "etc/bootstrap-web-settings.txt";
public static final String ETC_JOURNAL_BUFFER_SETTINGS = "etc/journal-buffer-settings.txt";
@ -275,6 +276,17 @@ public class Create extends InputAbstract {
@Option(name = "--jdbc", description = "It will activate jdbc")
boolean jdbc;
@Option(name = "--staticCluster", description = "Cluster node connectors list, separated by comma: Example \"tcp://server:61616,tcp://server2:61616,tcp://server3:61616\"")
String staticNode;
public String[] getStaticNodes() {
if (staticNode == null) {
return new String[0];
} else {
return staticNode.split(",");
}
}
@Option(name = "--jdbc-bindings-table-name", description = "Name of the jdbc bindigns table")
private String jdbcBindings = ActiveMQDefaultConfiguration.getDefaultBindingsTableName();
@ -571,6 +583,10 @@ public class Create extends InputAbstract {
filters.put("${ping-config.settings}", readTextFile(ETC_COMMENTED_PING_TXT, filters));
}
if (staticNode != null) {
clustered = true;
}
if (replicated) {
clustered = true;
filters.put("${replicated.settings}", readTextFile(ETC_REPLICATED_SETTINGS_TXT, filters));
@ -650,12 +666,27 @@ public class Create extends InputAbstract {
if (name == null) {
name = getHostForClustered();
}
String connectorSettings = readTextFile(ETC_CONNECTOR_SETTINGS_TXT, filters);
String connectorSettings = getConnectors(filters);
filters.put("${name}", name);
filters.put("${connector-config.settings}", connectorSettings);
filters.put("${cluster-security.settings}", readTextFile(ETC_CLUSTER_SECURITY_SETTINGS_TXT, filters));
filters.put("${cluster.settings}", readTextFile(ETC_CLUSTER_SETTINGS_TXT, filters));
if (staticNode != null) {
String staticCluster = readTextFile(ETC_CLUSTER_STATIC_SETTINGS_TXT, filters);
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter);
int countCluster = getStaticNodes().length;
for (int i = 0; i < countCluster; i++) {
printWriter.println(" <connector-ref>node" + i + "</connector-ref>");
}
filters.put("${connectors-list}", stringWriter.toString());
staticCluster = applyFilters(staticCluster, filters);
filters.put("${cluster.settings}", staticCluster);
} else {
filters.put("${cluster.settings}", readTextFile(ETC_CLUSTER_SETTINGS_TXT, filters));
}
filters.put("${cluster-user}", getClusterUser());
filters.put("${cluster-password}", getClusterPassword());
} else {
@ -828,6 +859,24 @@ public class Create extends InputAbstract {
return null;
}
private String getConnectors(HashMap<String, String> filters) throws IOException {
if (staticNode != null) {
StringWriter stringWriter = new StringWriter();
PrintWriter printer = new PrintWriter(stringWriter);
printer.println(" <connectors>");
printer.println(" <!-- Connector used to be announced through cluster connections and notifications -->");
printer.println(applyFilters(" <connector name=\"artemis\">tcp://${host}:${default.port}</connector>", filters));
int counter = 0;
for (String node: getStaticNodes()) {
printer.println(" <connector name = \"node" + (counter++) + "\">" + node + "</connector>");
}
printer.println(" </connectors>");
return stringWriter.toString();
} else {
return readTextFile(ETC_CONNECTOR_SETTINGS_TXT, filters);
}
}
private void printStar(String message) {
int size = Math.min(message.length(), 80);
StringBuffer buffer = new StringBuffer(size);

View File

@ -0,0 +1,11 @@
<cluster-connections>
<cluster-connection name="my-cluster">
<connector-ref>artemis</connector-ref>
<message-load-balancing>${message-load-balancing}</message-load-balancing>
<max-hops>${max-hops}</max-hops>
<static-connectors>
${connectors-list}
</static-connectors>
</cluster-connection>
</cluster-connections>

View File

@ -49,6 +49,7 @@ public class StreamClassPathTest {
testStream(Create.class, Create.ETC_SHARED_STORE_SETTINGS_TXT);
testStream(Create.class, Create.ETC_CLUSTER_SECURITY_SETTINGS_TXT);
testStream(Create.class, Create.ETC_CLUSTER_SETTINGS_TXT);
testStream(Create.class, Create.ETC_CLUSTER_STATIC_SETTINGS_TXT);
testStream(Create.class, Create.ETC_CONNECTOR_SETTINGS_TXT);
testStream(Create.class, Create.ETC_BOOTSTRAP_WEB_SETTINGS_TXT);
testStream(Create.class, Create.ETC_JOURNAL_BUFFER_SETTINGS);

View File

@ -102,6 +102,9 @@ public class ArtemisCreatePlugin extends ArtemisAbstractPlugin {
@Parameter(defaultValue = "false")
private boolean slave;
@Parameter
private String staticCluster;
@Parameter(defaultValue = "./data")
String dataFolder;
@ -192,6 +195,10 @@ public class ArtemisCreatePlugin extends ArtemisAbstractPlugin {
add(listCommands, "--require-login");
}
if (staticCluster != null) {
add(listCommands, "--staticCluster", staticCluster);
}
if (!javaOptions.isEmpty()) {
add(listCommands, "--java-options", javaOptions);
}

View File

@ -60,6 +60,7 @@ under the License.
<module>replicated-multiple-failover</module>
<module>replicated-transaction-failover</module>
<module>scale-down</module>
<module>shared-storage-static-cluster</module>
<module>stop-server-failover</module>
<module>transaction-failover</module>
</modules>
@ -80,6 +81,7 @@ under the License.
<module>replicated-multiple-failover</module>
<module>replicated-transaction-failover</module>
<module>scale-down</module>
<module>shared-storage-static-cluster</module>
<module>transaction-failover</module>
</modules>
</profile>

View File

@ -0,0 +1,161 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq.examples.failover</groupId>
<artifactId>broker-failover</artifactId>
<version>2.11.0-SNAPSHOT</version>
</parent>
<artifactId>shared-storage-static-cluster</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis JMS Client Side Failover Listener Example</name>
<properties>
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-maven-plugin</artifactId>
<executions>
<execution>
<id>create0</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
<instance>${basedir}/target/server0</instance>
<dataFolder>../shared</dataFolder>
<sharedStore>true</sharedStore>
<slave>false</slave>
<failoverOnShutdown>true</failoverOnShutdown>
<staticCluster>tcp://localhost:61617,tcp://localhost:61618</staticCluster>
<args>
<arg>--queues</arg>
<arg>exampleQueue</arg>
</args>
</configuration>
</execution>
<execution>
<id>create1</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
<instance>${basedir}/target/server1</instance>
<dataFolder>../shared</dataFolder>
<sharedStore>true</sharedStore>
<slave>true</slave>
<portOffset>1</portOffset>
<failoverOnShutdown>true</failoverOnShutdown>
<staticCluster>tcp://localhost:61616,tcp://localhost:61618</staticCluster>
<args>
<arg>--queues</arg>
<arg>exampleQueue</arg>
</args>
</configuration>
</execution>
<execution>
<id>create2</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
<instance>${basedir}/target/server2</instance>
<dataFolder>../shared</dataFolder>
<sharedStore>true</sharedStore>
<slave>true</slave>
<portOffset>2</portOffset>
<failoverOnShutdown>true</failoverOnShutdown>
<staticCluster>tcp://localhost:61616,tcp://localhost:61617</staticCluster>
<args>
<arg>--queues</arg>
<arg>exampleQueue</arg>
</args>
</configuration>
</execution>
<execution>
<id>runClient</id>
<goals>
<goal>runClient</goal>
</goals>
<configuration>
<clientClass>org.apache.activemq.artemis.jms.example.SharedStorageStaticCluster</clientClass>
<args>
<param>${basedir}/target/server0</param>
<param>${basedir}/target/server1</param>
<param>${basedir}/target/server2</param>
</args>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.activemq.examples.failover</groupId>
<artifactId>shared-storage-static-cluster</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>release</id>
<build>
<plugins>
<plugin>
<groupId>com.vladsch.flexmark</groupId>
<artifactId>markdown-page-generator-plugin</artifactId>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.example;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.util.ServerUtil;
/**
* This example demonstrates how you can listen on failover event on the client side
* <p/>
* In this example there are two nodes running in a cluster, both server will be running for start,
* but after a while the first server will crash. This will trigger a fail-over event
*/
public class SharedStorageStaticCluster {
private static Process server0;
private static Process server1;
private static Process server2;
public static void main(final String[] args) throws Exception {
for (String x : args) {
System.out.println("::" + x);
}
InitialContext initialContext = null;
Connection connection = null;
try {
server0 = ServerUtil.startServer(args[0], SharedStorageStaticCluster.class.getSimpleName() + "0", 0, 5000);
server1 = ServerUtil.startServer(args[1], SharedStorageStaticCluster.class.getSimpleName() + "1", 1, 0);
// Step 3. Look-up a JMS Connection Factory object from JNDI on server 0
//ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://(localhost:61616,localhost:61617,localhost:61618)");
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("(tcp://localhost:61616,tcp://localhost:61617,tcp://localhost:61618)?ha=true&retryInterval=100&retryIntervalMultiplier=1.0&reconnectAttempts=-1");
// Step 4. We create 1 JMS connections from the same connection factory.
// Wait a little while to make sure broadcasts from all nodes have reached the client
Thread.sleep(5000);
connection = connectionFactory.createConnection();
((ActiveMQConnection) connection).setFailoverListener(new FailoverListenerImpl());
// Step 5. We create JMS Sessions
final Session sessionA = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue) sessionA.createQueue("exampleQueue");
// Step 6. We create JMS MessageProducer objects on the sessions
MessageProducer producerA = sessionA.createProducer(queue);
// Step 7. We send some messages on each producer
final int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
TextMessage messageA = sessionA.createTextMessage("A:This is text message " + i);
producerA.send(messageA);
System.out.println("Sent message: " + messageA.getText());
}
// Step 8. We start the connection to consume messages
connection.start();
MessageConsumer consumer = sessionA.createConsumer(queue);
for (int i = 0; i < numMessages; i++) {
TextMessage message = (TextMessage) consumer.receive(2000);
System.out.println("Got message: " + message.getText() + " from node A");
if (i == 5) {
ServerUtil.killServer(server0);
}
}
System.out.println("receive other message from node B: " + consumer.receive(2000));
consumer.close();
server2 = ServerUtil.startServer(args[2], SharedStorageStaticCluster.class.getSimpleName() + "2", 2, 0);
Thread.sleep(5000);
server1.destroy();
Thread.sleep(5000);
for (int i = 0; i < numMessages; i++) {
try {
TextMessage messageA = sessionA.createTextMessage("A:This is text message after kill " + i);
producerA.send(messageA);
System.out.println("Sent message: " + messageA.getText());
} catch (Exception e) {
// it can happen
e.printStackTrace();
}
}
connection.start();
consumer = sessionA.createConsumer(queue);
for (int i = 0; i < numMessages; i++) {
TextMessage message = (TextMessage) consumer.receive(2000);
System.out.println("Got message: " + message.getText());
}
consumer.close();
} finally {
// Step 10. Be sure to close our resources!
if (connection != null) {
connection.close();
}
if (initialContext != null) {
initialContext.close();
}
ServerUtil.killServer(server0);
ServerUtil.killServer(server1);
}
}
private static class FailoverListenerImpl implements FailoverEventListener {
@Override
public void failoverEvent(FailoverEventType eventType) {
System.out.println("Failover event triggered :" + eventType.toString());
}
}
}