ARTEMIS-4384 cluster verify CLI command
ARTEMIS-4385 Expand StatQueue to visualize --clustered
This commit is contained in:
parent
444d5da72b
commit
064018a3e9
|
@ -51,6 +51,7 @@ import org.apache.activemq.artemis.cli.commands.messages.Transfer;
|
|||
import org.apache.activemq.artemis.cli.commands.messages.perf.PerfGroup;
|
||||
import org.apache.activemq.artemis.cli.commands.queue.QueueGroup;
|
||||
import org.apache.activemq.artemis.cli.commands.tools.DataGroup;
|
||||
import org.apache.activemq.artemis.cli.commands.tools.cluster.ClusterGroup;
|
||||
import org.apache.activemq.artemis.cli.commands.tools.journal.PerfJournal;
|
||||
import org.apache.activemq.artemis.cli.commands.user.UserGroup;
|
||||
import org.apache.activemq.artemis.dto.ManagementContextDTO;
|
||||
|
@ -300,6 +301,8 @@ public class Artemis implements Runnable {
|
|||
commandLine.addSubcommand(new Upgrade());
|
||||
}
|
||||
|
||||
commandLine.addSubcommand(new ClusterGroup(commandLine));
|
||||
|
||||
return commandLine;
|
||||
}
|
||||
|
||||
|
|
|
@ -51,11 +51,6 @@ public class Shell implements Runnable {
|
|||
@CommandLine.Option(names = "--password", description = "It will be used for an initial connection if set.")
|
||||
protected String password;
|
||||
|
||||
|
||||
private static String RED_UNICODE = "\u001B[31m";
|
||||
private static String YELLOW_UNICODE = "\u001B[33m";
|
||||
private static String CLEAR_UNICODE = "\u001B[0m";
|
||||
|
||||
public Shell(CommandLine commandLine) {
|
||||
}
|
||||
|
||||
|
@ -109,15 +104,15 @@ public class Shell implements Runnable {
|
|||
.build();
|
||||
factory.setTerminal(terminal);
|
||||
|
||||
String prompt = YELLOW_UNICODE + Artemis.getNameFromBanner() + " > " + CLEAR_UNICODE;
|
||||
String prompt = org.apache.activemq.artemis.cli.Terminal.YELLOW_UNICODE + Artemis.getNameFromBanner() + " > " + org.apache.activemq.artemis.cli.Terminal.CLEAR_UNICODE;
|
||||
String rightPrompt = null;
|
||||
|
||||
if (printBanner) {
|
||||
printBanner();
|
||||
}
|
||||
|
||||
System.out.println("For a list of commands, type " + RED_UNICODE + "help" + CLEAR_UNICODE + " or press " + RED_UNICODE + "<TAB>" + CLEAR_UNICODE + ":");
|
||||
System.out.println("Type " + RED_UNICODE + "exit" + CLEAR_UNICODE + " or press " + RED_UNICODE + "<CTRL-D>" + CLEAR_UNICODE + " to leave the session:");
|
||||
System.out.println("For a list of commands, type " + org.apache.activemq.artemis.cli.Terminal.RED_UNICODE + "help" + org.apache.activemq.artemis.cli.Terminal.CLEAR_UNICODE + " or press " + org.apache.activemq.artemis.cli.Terminal.RED_UNICODE + "<TAB>" + org.apache.activemq.artemis.cli.Terminal.CLEAR_UNICODE + ":");
|
||||
System.out.println("Type " + org.apache.activemq.artemis.cli.Terminal.RED_UNICODE + "exit" + org.apache.activemq.artemis.cli.Terminal.CLEAR_UNICODE + " or press " + org.apache.activemq.artemis.cli.Terminal.RED_UNICODE + "<CTRL-D>" + org.apache.activemq.artemis.cli.Terminal.CLEAR_UNICODE + " to leave the session:");
|
||||
|
||||
// start the shell and process input until the user quits with Ctrl-D
|
||||
String line;
|
||||
|
@ -149,14 +144,14 @@ public class Shell implements Runnable {
|
|||
}
|
||||
|
||||
private static void printBanner() {
|
||||
System.out.print(YELLOW_UNICODE);
|
||||
System.out.print(org.apache.activemq.artemis.cli.Terminal.YELLOW_UNICODE);
|
||||
try {
|
||||
Artemis.printBanner(System.out);
|
||||
} catch (Exception e) {
|
||||
System.out.println("Error recovering the banner:");
|
||||
e.printStackTrace();
|
||||
}
|
||||
System.out.print(CLEAR_UNICODE);
|
||||
System.out.print(org.apache.activemq.artemis.cli.Terminal.CLEAR_UNICODE);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.cli;
|
||||
|
||||
public class Terminal {
|
||||
|
||||
public static String RED_UNICODE = "\u001B[31m";
|
||||
public static String YELLOW_UNICODE = "\u001B[33m";
|
||||
public static String CLEAR_UNICODE = "\u001B[0m";
|
||||
}
|
|
@ -121,6 +121,8 @@ public class ConnectionAbstract extends InputAbstract {
|
|||
public Object execute(ActionContext context) throws Exception {
|
||||
super.execute(context);
|
||||
|
||||
recoverConnectionInformation();
|
||||
|
||||
// it is intentional to make a comparison on the String object here
|
||||
// this is to test if the original option was switched or not.
|
||||
// we don't care about being .equals at all.
|
||||
|
@ -213,7 +215,7 @@ public class ConnectionAbstract extends InputAbstract {
|
|||
return createCoreConnectionFactory(brokerURL, user, password, clientID);
|
||||
}
|
||||
|
||||
private void recoverConnectionInformation() {
|
||||
protected void recoverConnectionInformation() {
|
||||
if (CONNECTION_INFORMATION.get() != null) {
|
||||
ConnectionInformation connectionInfo = CONNECTION_INFORMATION.get();
|
||||
if (this.user == null) {
|
||||
|
@ -222,7 +224,7 @@ public class ConnectionAbstract extends InputAbstract {
|
|||
if (this.password == null) {
|
||||
this.password = connectionInfo.password;
|
||||
}
|
||||
if (this.brokerURL == null) {
|
||||
if (this.brokerURL == null || this.brokerURL == DEFAULT_BROKER_URL) {
|
||||
this.brokerURL = connectionInfo.uri;
|
||||
}
|
||||
|
||||
|
@ -233,6 +235,9 @@ public class ConnectionAbstract extends InputAbstract {
|
|||
if (Shell.inShell() && CONNECTION_INFORMATION.get() == null) {
|
||||
CONNECTION_INFORMATION.set(new ConnectionInformation(brokerURL, user, password));
|
||||
System.out.println("CLI connected to broker " + brokerURL + ", user:" + user);
|
||||
this.brokerURL = brokerURL;
|
||||
this.user = user;
|
||||
this.password = password;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -324,4 +329,9 @@ public class ConnectionAbstract extends InputAbstract {
|
|||
}
|
||||
}
|
||||
|
||||
}
|
||||
protected void performCoreManagement(String uri, String user, String password, ManagementHelper.MessageAcceptor setup, ManagementHelper.MessageAcceptor ok, ManagementHelper.MessageAcceptor failed) throws Exception {
|
||||
try (ActiveMQConnectionFactory factory = createCoreConnectionFactory(uri, user, password, null)) {
|
||||
ManagementHelper.doManagement(factory.getServerLocator(), user, password, setup, ok, failed);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -23,6 +23,8 @@ import java.util.TreeMap;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.JsonUtil;
|
||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
||||
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
|
||||
import org.apache.activemq.artemis.cli.Terminal;
|
||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||
import org.apache.activemq.artemis.cli.commands.messages.ConnectionAbstract;
|
||||
import org.apache.activemq.artemis.json.JsonArray;
|
||||
|
@ -85,6 +87,9 @@ public class StatQueue extends ConnectionAbstract {
|
|||
@Option(names = "--maxColumnSize", description = "The max width of data column. Set to -1 for no limit. Default is 25.")
|
||||
private int maxColumnSize = DEFAULT_MAX_COLUMN_SIZE;
|
||||
|
||||
@Option(names = "--clustered", description = "Expands the report for all nodes on the topology")
|
||||
private boolean clustered = false;
|
||||
|
||||
private int statCount = 0;
|
||||
|
||||
//easier for testing
|
||||
|
@ -146,12 +151,46 @@ public class StatQueue extends ConnectionAbstract {
|
|||
getActionContext().out.println("filter is '" + filter + "'");
|
||||
getActionContext().out.println("maxRows='" + maxRows + "'");
|
||||
}
|
||||
printStats(getActionContext(), filter);
|
||||
createConnectionFactory();
|
||||
|
||||
try (SimpleManagement simpleManagement = new SimpleManagement(brokerURL, user, password).open()) {
|
||||
String nodeID = simpleManagement.getNodeID();
|
||||
JsonArray topology = simpleManagement.listNetworkTopology();
|
||||
|
||||
if (clustered && topology.size() > 1) {
|
||||
context.out.println(Terminal.YELLOW_UNICODE + "*******************************************************************************************************************************");
|
||||
context.out.println(">>> Queue stats on node " + nodeID + ", url=" + brokerURL + Terminal.CLEAR_UNICODE);
|
||||
printStats(brokerURL, filter);
|
||||
|
||||
for (int i = 0; i < topology.size(); i++) {
|
||||
JsonObject node = topology.getJsonObject(i);
|
||||
if (node.getString("nodeID").equals(nodeID) || node.getJsonString("live") == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String url = "tcp://" + node.getString("live");
|
||||
|
||||
context.out.println(Terminal.YELLOW_UNICODE + "*******************************************************************************************************************************");
|
||||
context.out.println(">>> Queue stats on node " + node.getString("nodeID") + ", url=" + url + Terminal.CLEAR_UNICODE);
|
||||
|
||||
printStats(url, filter);
|
||||
}
|
||||
} else {
|
||||
printStats(brokerURL, filter);
|
||||
if (topology.size() > 1) {
|
||||
context.out.println();
|
||||
context.out.println("Note: Use " + Terminal.RED_UNICODE + "--clustered" + Terminal.CLEAR_UNICODE + " to expand the report to other nodes in the topology.");
|
||||
context.out.println();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return statCount;
|
||||
}
|
||||
|
||||
private void printStats(final ActionContext context, final String filter) throws Exception {
|
||||
performCoreManagement(message -> {
|
||||
private void printStats(String uri, final String filter) throws Exception {
|
||||
performCoreManagement(uri, user, password, message -> {
|
||||
ManagementHelper.putOperationInvocation(message, "broker", "listQueues", filter, 1, maxRows);
|
||||
}, reply -> {
|
||||
final String result = (String) ManagementHelper.getResult(reply, String.class);
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.cli.commands.tools.cluster;
|
||||
|
||||
import org.apache.activemq.artemis.cli.commands.HelpAction;
|
||||
import picocli.CommandLine;
|
||||
import picocli.CommandLine.Command;
|
||||
|
||||
@Command(name = "cluster", description = "use 'help cluster' for sub commands list", subcommands = {Verify.class})
|
||||
public class ClusterGroup implements Runnable {
|
||||
|
||||
CommandLine commandLine;
|
||||
|
||||
public ClusterGroup(CommandLine commandLine) {
|
||||
this.commandLine = commandLine;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
HelpAction.help(commandLine, "cluster");
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,323 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.cli.commands.tools.cluster;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
|
||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||
import org.apache.activemq.artemis.json.JsonArray;
|
||||
import org.apache.activemq.artemis.json.JsonObject;
|
||||
import org.apache.activemq.artemis.json.JsonString;
|
||||
|
||||
public class ClusterVerifier implements AutoCloseable {
|
||||
|
||||
final String uri, user, password;
|
||||
|
||||
final SimpleManagement simpleManagement;
|
||||
|
||||
final long allowedVariance;
|
||||
|
||||
public ClusterVerifier(String uri, String user, String password) {
|
||||
this(uri, user, password, 1000);
|
||||
}
|
||||
|
||||
public ClusterVerifier(String uri, String user, String password, long variance) {
|
||||
this.uri = uri;
|
||||
this.user = user;
|
||||
this.password = password;
|
||||
this.allowedVariance = variance;
|
||||
this.simpleManagement = new SimpleManagement(uri, user, password);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
simpleManagement.close();
|
||||
}
|
||||
|
||||
public ClusterVerifier open() throws Exception {
|
||||
simpleManagement.open();
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean verify(ActionContext context) throws Exception {
|
||||
String mainID = getNodeID();
|
||||
JsonArray mainToplogy = fetchMainTopology();
|
||||
|
||||
AtomicBoolean verificationResult = new AtomicBoolean(true);
|
||||
|
||||
Map<String, TopologyItem> mainTopology = parseTopology(mainToplogy);
|
||||
boolean supportTime = true;
|
||||
try {
|
||||
fetchTopologyTime(mainTopology);
|
||||
} catch (Exception e) {
|
||||
supportTime = false;
|
||||
}
|
||||
|
||||
if (supportTime) {
|
||||
verifyTime(context, mainTopology, verificationResult, supportTime);
|
||||
} else {
|
||||
context.out.println("*******************************************************************************************************************************");
|
||||
context.out.println("Topology on " + uri + " nodeID=" + mainID + " with " + mainToplogy.size() + " nodes :");
|
||||
printTopology(context, "", mainToplogy);
|
||||
context.out.println("*******************************************************************************************************************************");
|
||||
}
|
||||
|
||||
mainTopology.forEach((a, b) -> {
|
||||
try {
|
||||
context.out.println("--> Verifying Topology for NodeID " + b.nodeID + ", live = " + b.live + ", backup = " + b.backup);
|
||||
if (b.live != null) {
|
||||
context.out.println(" verification on live " + b.live);
|
||||
if (!subVerify(context, b.live, mainTopology)) {
|
||||
verificationResult.set(false);
|
||||
} else {
|
||||
context.out.println(" ok!");
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace(context.out);
|
||||
verificationResult.set(false);
|
||||
}
|
||||
});
|
||||
|
||||
return verificationResult.get();
|
||||
}
|
||||
|
||||
protected void verifyTime(ActionContext context,
|
||||
Map<String, TopologyItem> mainTopology,
|
||||
AtomicBoolean verificationResult,
|
||||
boolean supportTime) {
|
||||
|
||||
final String FORMAT = "%-40s | %-25s | %-19s | %-25s";
|
||||
context.out.println("*******************************************************************************************************************************");
|
||||
|
||||
if (supportTime) {
|
||||
Long[] times = fetchTopologyTime(mainTopology);
|
||||
|
||||
context.out.printf(FORMAT, "nodeID", "live", "live local time", "backup");
|
||||
context.out.println();
|
||||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
long initialTime = System.currentTimeMillis();
|
||||
|
||||
mainTopology.forEach((id, node) -> {
|
||||
context.out.printf(FORMAT, id, node.live, formatDate(sdf, node.liveTime), node.backup);
|
||||
context.out.println();
|
||||
});
|
||||
|
||||
// how long it took to fetch the times. I'm adding this to the allowed variance.
|
||||
long latencyTime = System.currentTimeMillis() - initialTime;
|
||||
|
||||
long min = Long.MAX_VALUE, max = Long.MIN_VALUE;
|
||||
|
||||
for (long l : times) {
|
||||
|
||||
if (l < min) {
|
||||
min = l;
|
||||
}
|
||||
|
||||
if (l > max) {
|
||||
max = l;
|
||||
}
|
||||
}
|
||||
|
||||
long variance = times.length > 0 ? (max - min) : 0;
|
||||
|
||||
long allowedVarianceWithLatency = allowedVariance + latencyTime;
|
||||
|
||||
if (variance < allowedVarianceWithLatency) {
|
||||
context.out.println("Time variance in the cluster is " + variance + " milliseconds");
|
||||
} else {
|
||||
context.out.println("WARNING: Time variance in the cluster is greater than " + allowedVarianceWithLatency + " milliseconds: " + variance + ". Please verify your server's NTP configuration.");
|
||||
verificationResult.set(false);
|
||||
}
|
||||
} else {
|
||||
context.out.println("The current management version does not support the getCurrentTimeMillis() method. Please verify whether your server's times are in sync and whether they are using NTP.");
|
||||
}
|
||||
context.out.println("*******************************************************************************************************************************");
|
||||
}
|
||||
|
||||
String formatDate(SimpleDateFormat sdf, long time) {
|
||||
if (time == 0) {
|
||||
return "";
|
||||
} else {
|
||||
return sdf.format(new Date(time));
|
||||
}
|
||||
}
|
||||
|
||||
protected Long[] fetchTopologyTime(Map<String, TopologyItem> topologyItemMap) {
|
||||
ArrayList<Long> times = new ArrayList<>(topologyItemMap.size() * 2);
|
||||
topologyItemMap.forEach((id, node) -> {
|
||||
if (node.live != null) {
|
||||
try {
|
||||
node.liveTime = fetchTime(node.live);
|
||||
times.add(node.liveTime);
|
||||
} catch (Exception e) {
|
||||
ActionContext.system().err.println("Cannot fetch liveTime for nodeID=" + id + ", url=" + node.live + " -> " + e.getMessage());
|
||||
node.liveTime = 0;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return times.toArray(new Long[times.size()]);
|
||||
}
|
||||
|
||||
private boolean subVerify(ActionContext context,
|
||||
String uri,
|
||||
Map<String, TopologyItem> mainTopology) throws Exception {
|
||||
JsonArray verifyTopology = fetchTopology(uri);
|
||||
Map<String, TopologyItem> verifyTopologyMap = parseTopology(verifyTopology);
|
||||
String result = compareTopology(mainTopology, verifyTopologyMap);
|
||||
if (result != null) {
|
||||
context.out.println(result);
|
||||
context.out.println(" Topology detailing for " + uri);
|
||||
printTopology(context, " ", verifyTopology);
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public String compareTopology(Map<String, TopologyItem> mainTopology, Map<String, TopologyItem> compareTopology) {
|
||||
if (mainTopology.size() != compareTopology.size()) {
|
||||
return "main topology size " + mainTopology.size() + "!= compareTopology size " + compareTopology.size();
|
||||
}
|
||||
|
||||
int matchElements = 0;
|
||||
|
||||
for (Map.Entry<String, TopologyItem> entry : mainTopology.entrySet()) {
|
||||
TopologyItem item = compareTopology.get(entry.getKey());
|
||||
if (!item.equals(entry.getValue())) {
|
||||
return "Topology mistmatch on " + item;
|
||||
} else {
|
||||
matchElements++;
|
||||
}
|
||||
}
|
||||
|
||||
if (matchElements != mainTopology.size()) {
|
||||
return "Not all elements match!";
|
||||
}
|
||||
|
||||
return null;
|
||||
|
||||
}
|
||||
|
||||
Map<String, TopologyItem> parseTopology(JsonArray topology) {
|
||||
Map<String, TopologyItem> map = new LinkedHashMap<>();
|
||||
navigateTopology(topology, t -> map.put(t.nodeID, t));
|
||||
return map;
|
||||
}
|
||||
|
||||
private void printTopology(ActionContext context, String prefix, JsonArray topology) {
|
||||
context.out.printf(prefix + "%-40s | %-25s | %-25s", "nodeID", "live", "backup");
|
||||
context.out.println();
|
||||
navigateTopology(topology, t -> {
|
||||
context.out.printf(prefix + "%-40s | %-25s | %-25s", t.nodeID, t.live, t.backup);
|
||||
context.out.println();
|
||||
});
|
||||
}
|
||||
|
||||
private void navigateTopology(JsonArray topology, Consumer<TopologyItem> consumer) {
|
||||
for (int i = 0; i < topology.size(); i++) {
|
||||
JsonObject node = topology.getJsonObject(i);
|
||||
JsonString live = node.getJsonString("live");
|
||||
JsonString backup = node.getJsonString("backup");
|
||||
String nodeID = node.getString("nodeID");
|
||||
TopologyItem item = new TopologyItem(nodeID, live != null ? live.getString() : null, backup != null ? backup.getString() : null);
|
||||
consumer.accept(item);
|
||||
}
|
||||
}
|
||||
|
||||
protected String getNodeID() throws Exception {
|
||||
return simpleManagement.getNodeID();
|
||||
}
|
||||
|
||||
protected long fetchMainTime() throws Exception {
|
||||
return simpleManagement.getCurrentTimeMillis();
|
||||
}
|
||||
|
||||
protected long fetchTime(String uri) throws Exception {
|
||||
SimpleManagement liveManagement = new SimpleManagement(uri, user, password);
|
||||
return liveManagement.getCurrentTimeMillis();
|
||||
}
|
||||
|
||||
protected JsonArray fetchMainTopology() throws Exception {
|
||||
return simpleManagement.listNetworkTopology();
|
||||
}
|
||||
|
||||
protected JsonArray fetchTopology(String uri) throws Exception {
|
||||
SimpleManagement liveManagement = new SimpleManagement(uri, user, password);
|
||||
return liveManagement.listNetworkTopology();
|
||||
}
|
||||
|
||||
public static class TopologyItem {
|
||||
|
||||
final String nodeID, live, backup;
|
||||
|
||||
long liveTime, backupTime;
|
||||
|
||||
TopologyItem(String nodeID, String live, String backup) {
|
||||
this.nodeID = nodeID;
|
||||
if (live != null) {
|
||||
this.live = "tcp://" + live;
|
||||
} else {
|
||||
this.live = null;
|
||||
}
|
||||
if (backup != null) {
|
||||
this.backup = "tcp://" + backup;
|
||||
} else {
|
||||
this.backup = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o)
|
||||
return true;
|
||||
if (o == null || getClass() != o.getClass())
|
||||
return false;
|
||||
|
||||
TopologyItem item = (TopologyItem) o;
|
||||
|
||||
if (nodeID != null ? !nodeID.equals(item.nodeID) : item.nodeID != null)
|
||||
return false;
|
||||
if (live != null ? !live.equals(item.live) : item.live != null)
|
||||
return false;
|
||||
return backup != null ? backup.equals(item.backup) : item.backup == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = nodeID != null ? nodeID.hashCode() : 0;
|
||||
result = 31 * result + (live != null ? live.hashCode() : 0);
|
||||
result = 31 * result + (backup != null ? backup.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TopologyItem{" + "nodeID='" + nodeID + '\'' + ", live='" + live + '\'' + ", backup='" + backup + '\'' + '}';
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.cli.commands.tools.cluster;
|
||||
|
||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||
import org.apache.activemq.artemis.cli.commands.messages.ConnectionAbstract;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(name = "verify", description = "Verify if all the nodes match the same topology.")
|
||||
public class Verify extends ConnectionAbstract {
|
||||
|
||||
@CommandLine.Option(names = "--variance", description = "Allowed variance in milliseconds before considered a failure. (default=1000)")
|
||||
public long variance = 1000;
|
||||
|
||||
@Override
|
||||
public Object execute(ActionContext context) throws Exception {
|
||||
super.execute(context);
|
||||
|
||||
createConnectionFactory();
|
||||
|
||||
try (ClusterVerifier clusterVerifier = new ClusterVerifier(brokerURL, user, password, variance).open()) {
|
||||
return clusterVerifier.verify(context);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1301,37 +1301,6 @@ public class ArtemisTest extends CliTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60_000)
|
||||
public void testQueueStatRetry() throws Exception {
|
||||
File instanceFolder = temporaryFolder.newFolder("server");
|
||||
setupAuth(instanceFolder);
|
||||
Run.setEmbedded(true);
|
||||
Artemis.main("create", instanceFolder.getAbsolutePath(), "--verbose", "--force", "--silent", "--no-web", "--queues", "q1", "--no-autotune", "--require-login", "--default-port", "61616");
|
||||
System.setProperty("artemis.instance", instanceFolder.getAbsolutePath());
|
||||
|
||||
try {
|
||||
Artemis.internalExecute("run");
|
||||
InputStream in = new ByteArrayInputStream("admin\n".getBytes());
|
||||
ActionContext context = new ActionContext(in, System.out, System.err);
|
||||
|
||||
/*
|
||||
* This operation should fail the first time and then prompt the user to re-enter the username which
|
||||
* it will read from the InputStream in the ActionContext. It can't read the password since it's using
|
||||
* System.console.readPassword() for that.
|
||||
*/
|
||||
assertTrue((int) Artemis.internalExecute(null, null, null, new String[] {"queue", "stat", "--password", "admin"}, context) > 0);
|
||||
|
||||
/*
|
||||
* This is the same as above except it will prompt the user to re-enter both the URL and the username.
|
||||
*/
|
||||
in = new ByteArrayInputStream("tcp://localhost:61616\nadmin\n".getBytes());
|
||||
context = new ActionContext(in, System.out, System.err);
|
||||
assertTrue((int) Artemis.internalExecute(null, null, null, new String[] {"queue", "stat", "--password", "admin", "--url", "tcp://badhost:11111"}, context) > 0);
|
||||
} finally {
|
||||
stopServer();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60_000)
|
||||
public void testWeirdCharacter() throws Exception {
|
||||
testSimpleRun("test%26%26x86_6");
|
||||
|
|
|
@ -0,0 +1,159 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.cli.test;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.JsonUtil;
|
||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||
import org.apache.activemq.artemis.cli.commands.tools.cluster.ClusterVerifier;
|
||||
import org.apache.activemq.artemis.json.JsonArray;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ClusterVerifyTest {
|
||||
|
||||
private static JsonArray read(String jsonString) throws Exception {
|
||||
return JsonUtil.readJsonArray(jsonString);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClusterVerifyError() throws Exception {
|
||||
internalVerify(true);
|
||||
internalVerify(false);
|
||||
}
|
||||
|
||||
private void internalVerify(boolean fail) throws Exception {
|
||||
|
||||
// This is returning the following string:
|
||||
// for main topology: [{"nodeID":"A1","live":"A:1", "backup":"B:1"}, {"nodeID":"A2","live":"A:2", "backup":"B:2"}, {"nodeID":"A3","live":"A:3", "backup":"B:3"}]
|
||||
// A:1 topology:[{"nodeID":"A1","live":"A:1", "backup":"B:1"}, {"nodeID":"A2","live":"A:2", "backup":"B:2"}, {"nodeID":"A3","live":"A:3", "backup":"B:3"}]
|
||||
// A:2 topology:[{"nodeID":"A1","live":"A:1"}, {"nodeID":"A2","live":"A:2"}, {"nodeID":"A3","live":"A:3"}]
|
||||
// A:3 topology:[{"nodeID":"A1","live":"A:1", "backup":"B:1"}, {"nodeID":"A2","live":"A:2", "backup":"B:2"}] [
|
||||
ClusterVerifier fakeVerifier = new ClusterVerifier("fake", "a", "b") {
|
||||
@Override
|
||||
protected void verifyTime(ActionContext context,
|
||||
Map<String, ClusterVerifier.TopologyItem> mainTopology,
|
||||
AtomicBoolean verificationResult,
|
||||
boolean supportTime) {
|
||||
// not doing it
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Long[] fetchTopologyTime(Map<String, TopologyItem> topologyItemMap) {
|
||||
throw new NotImplementedException("not doing it");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long fetchMainTime() throws Exception {
|
||||
throw new NotImplementedException("not doing it");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long fetchTime(String uri) throws Exception {
|
||||
return super.fetchTime(uri);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getNodeID() {
|
||||
return "AA";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected JsonArray fetchMainTopology() throws Exception {
|
||||
return read("[{\"nodeID\":\"A1\",\"live\":\"A:1\", \"backup\":\"B:1\"}, {\"nodeID\":\"A2\",\"live\":\"A:2\", \"backup\":\"B:2\"}, {\"nodeID\":\"A3\",\"live\":\"A:3\", \"backup\":\"B:3\"}]");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected JsonArray fetchTopology(String uri) throws Exception {
|
||||
if (fail) {
|
||||
switch (uri) {
|
||||
case "tcp://A:1":
|
||||
case "tcp://B:1":
|
||||
return read("[{\"nodeID\":\"A1\",\"live\":\"A:1\", \"backup\":\"B:1\"}, {\"nodeID\":\"A2\",\"live\":\"A:2\", \"backup\":\"B:2\"}, {\"nodeID\":\"A3\",\"live\":\"A:3\", \"backup\":\"B:3\"}]");
|
||||
case "tcp://A:2":
|
||||
case "tcp://B:2":
|
||||
return read("[{\"nodeID\":\"A1\",\"live\":\"A:1\"}, {\"nodeID\":\"A2\",\"live\":\"A:2\"}, {\"nodeID\":\"A3\",\"live\":\"A:3\"}]");
|
||||
case "tcp://A:3":
|
||||
case "tcp://B:3":
|
||||
default:
|
||||
return read("[{\"nodeID\":\"A1\",\"live\":\"A:1\", \"backup\":\"B:1\"}, {\"nodeID\":\"A2\",\"live\":\"A:2\", \"backup\":\"B:2\"}]");
|
||||
}
|
||||
} else {
|
||||
return fetchMainTopology();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if (fail) {
|
||||
Assert.assertFalse(fakeVerifier.verify(new ActionContext()));
|
||||
} else {
|
||||
Assert.assertTrue(fakeVerifier.verify(new ActionContext()));
|
||||
}
|
||||
}
|
||||
|
||||
// The server is not clustered, and listTopology will return []
|
||||
@Test
|
||||
public void testReadEmpty() throws Exception {
|
||||
ClusterVerifier fakeVerifier = new ClusterVerifier("fake", "a", "b") {
|
||||
@Override
|
||||
protected void verifyTime(ActionContext context,
|
||||
Map<String, TopologyItem> mainTopology,
|
||||
AtomicBoolean verificationResult,
|
||||
boolean supportTime) {
|
||||
// not doing it
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Long[] fetchTopologyTime(Map<String, TopologyItem> topologyItemMap) {
|
||||
throw new NotImplementedException("not doing it");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long fetchMainTime() throws Exception {
|
||||
throw new NotImplementedException("not doing it");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long fetchTime(String uri) throws Exception {
|
||||
return super.fetchTime(uri);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getNodeID() {
|
||||
return "AA";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected JsonArray fetchMainTopology() throws Exception {
|
||||
return read("[]");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected JsonArray fetchTopology(String uri) throws Exception {
|
||||
return read("[]");
|
||||
}
|
||||
};
|
||||
|
||||
Assert.assertTrue(fakeVerifier.verify(new ActionContext()));
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -2667,4 +2667,12 @@ public interface AuditLogger {
|
|||
|
||||
@LogMessage(id = 601770, value = "User {} is clearing authorization cache on target resource: {}", level = LogMessage.Level.INFO)
|
||||
void clearAuthorizationCache(String user, Object source);
|
||||
|
||||
static void getCurrentTimeMillis() {
|
||||
BASE_LOGGER.getCurrentTimeMillis(getCaller());
|
||||
}
|
||||
|
||||
@LogMessage(id = 601771, value = "User {} is getting name on target resource: {}", level = LogMessage.Level.INFO)
|
||||
void getCurrentTimeMillis(Object source);
|
||||
|
||||
}
|
||||
|
|
|
@ -38,6 +38,9 @@ public interface ActiveMQServerControl {
|
|||
@Attribute(desc = "Server's name")
|
||||
String getName();
|
||||
|
||||
@Attribute(desc = "Server's current timeMillis")
|
||||
long getCurrentTimeMillis();
|
||||
|
||||
/**
|
||||
* Returns this server's version.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,160 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.api.core.management;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.JsonUtil;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
|
||||
import org.apache.activemq.artemis.json.JsonArray;
|
||||
import org.apache.activemq.artemis.json.JsonObject;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/** This class provides a simple proxy for management operations */
|
||||
public class SimpleManagement implements AutoCloseable {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static final String SIMPLE_OPTIONS = "{\"field\":\"\",\"value\":\"\",\"operation\":\"\"}";
|
||||
|
||||
String uri, user, password;
|
||||
|
||||
ServerLocator locator;
|
||||
ClientSessionFactory sessionFactory;
|
||||
ClientSession session;
|
||||
|
||||
|
||||
public SimpleManagement(String uri, String user, String password) {
|
||||
this.uri = uri;
|
||||
this.user = user;
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
public SimpleManagement open() throws Exception {
|
||||
if (session == null) {
|
||||
locator = ServerLocatorImpl.newLocator(uri);
|
||||
sessionFactory = locator.createSessionFactory();
|
||||
session = sessionFactory.createSession(user, password, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
if (session != null) {
|
||||
session.close();
|
||||
sessionFactory.close();
|
||||
locator.close();
|
||||
session = null;
|
||||
sessionFactory = null;
|
||||
locator = null;
|
||||
}
|
||||
}
|
||||
|
||||
public long getCurrentTimeMillis() throws Exception {
|
||||
return simpleManagementLong("broker", "getCurrentTimeMillis");
|
||||
}
|
||||
|
||||
/** Simple helper for management returning a string.*/
|
||||
public String simpleManagement(String resource, String method, Object... parameters) throws Exception {
|
||||
AtomicReference<String> responseString = new AtomicReference<>();
|
||||
doManagement((m) -> setupCall(m, resource, method, parameters), m -> setStringResult(m, responseString), SimpleManagement::failed);
|
||||
return responseString.get();
|
||||
}
|
||||
|
||||
/** Simple helper for management returning a long.*/
|
||||
public long simpleManagementLong(String resource, String method, Object... parameters) throws Exception {
|
||||
AtomicLong responseLong = new AtomicLong();
|
||||
doManagement((m) -> setupCall(m, resource, method, parameters), m -> setLongResult(m, responseLong), SimpleManagement::failed);
|
||||
return responseLong.get();
|
||||
}
|
||||
|
||||
public long getQueueCount(String queueName) throws Exception {
|
||||
return simpleManagementLong(ResourceNames.QUEUE + queueName, "getMessageCount");
|
||||
}
|
||||
|
||||
public Map<String, Long> getQueueCounts(int maxRows) throws Exception {
|
||||
String responseString = simpleManagement("broker", "listQueues", SIMPLE_OPTIONS, 1, maxRows);
|
||||
|
||||
JsonObject queuesAsJsonObject = JsonUtil.readJsonObject(responseString);
|
||||
JsonArray array = queuesAsJsonObject.getJsonArray("data");
|
||||
|
||||
Map<String, Long> queues = new HashMap<>();
|
||||
|
||||
for (int i = 0; i < array.size(); i++) {
|
||||
JsonObject object = array.getJsonObject(i);
|
||||
String name = object.getString("name");
|
||||
String messageCount = object.getString("messageCount");
|
||||
queues.put(name, Long.parseLong(messageCount));
|
||||
}
|
||||
|
||||
return queues;
|
||||
}
|
||||
|
||||
public String getNodeID() throws Exception {
|
||||
return simpleManagement("broker", "getNodeID");
|
||||
}
|
||||
|
||||
public JsonArray listNetworkTopology() throws Exception {
|
||||
String result = simpleManagement("broker", "listNetworkTopology");
|
||||
return JsonUtil.readJsonArray(result);
|
||||
}
|
||||
|
||||
protected static void failed(ClientMessage message) throws Exception {
|
||||
final String result = (String) ManagementHelper.getResult(message, String.class);
|
||||
logger.warn("simple management operation failed:: {}", result);
|
||||
throw new Exception("Failed " + result);
|
||||
}
|
||||
|
||||
protected static void setupCall(ClientMessage m, String resource, String methodName, Object... parameters) throws Exception {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Setting up call {}::{}::{}", resource, methodName, parameters);
|
||||
}
|
||||
ManagementHelper.putOperationInvocation(m, resource, methodName, parameters);
|
||||
}
|
||||
|
||||
protected static void setStringResult(ClientMessage m, AtomicReference<String> result) throws Exception {
|
||||
String resultString = (String)ManagementHelper.getResult(m, String.class);
|
||||
logger.debug("management result:: {}", resultString);
|
||||
result.set(resultString);
|
||||
}
|
||||
|
||||
protected static void setLongResult(ClientMessage m, AtomicLong result) throws Exception {
|
||||
long resultLong = (long)ManagementHelper.getResult(m, Long.class);
|
||||
logger.debug("management result:: {}", resultLong);
|
||||
result.set(resultLong);
|
||||
}
|
||||
|
||||
protected void doManagement(ManagementHelper.MessageAcceptor setup, ManagementHelper.MessageAcceptor ok, ManagementHelper.MessageAcceptor failed) throws Exception {
|
||||
if (session != null) {
|
||||
ManagementHelper.doManagement(session, setup, ok, failed);
|
||||
} else {
|
||||
ManagementHelper.doManagement(uri, user, password, setup, ok, failed);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -227,6 +227,14 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCurrentTimeMillis() {
|
||||
if (AuditLogger.isBaseLoggingEnabled()) {
|
||||
AuditLogger.getCurrentTimeMillis();
|
||||
}
|
||||
return System.currentTimeMillis();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getVersion() {
|
||||
if (AuditLogger.isBaseLoggingEnabled()) {
|
||||
|
|
|
@ -1,64 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.utils;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.JsonUtil;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
||||
import org.apache.activemq.artemis.json.JsonArray;
|
||||
import org.apache.activemq.artemis.json.JsonObject;
|
||||
|
||||
public class SimpleManagement {
|
||||
|
||||
private static final String SIMPLE_OPTIONS = "{\"field\":\"\",\"value\":\"\",\"operation\":\"\"}";
|
||||
|
||||
public static Map<String, Integer> listQueues(String uri, String user, String password, int maxRows) throws Exception {
|
||||
Map<String, Integer> queues = new HashMap<>();
|
||||
ManagementHelper.doManagement(uri, user, password, t -> setupListQueue(t, maxRows), t -> listQueueResult(t, queues), SimpleManagement::failed);
|
||||
return queues;
|
||||
}
|
||||
|
||||
private static void setupListQueue(ClientMessage m, int maxRows) throws Exception {
|
||||
ManagementHelper.putOperationInvocation(m, "broker", "listQueues", SIMPLE_OPTIONS, 1, maxRows);
|
||||
}
|
||||
|
||||
private static void listQueueResult(ClientMessage message, Map<String, Integer> mapQueues) throws Exception {
|
||||
|
||||
final String result = (String) ManagementHelper.getResult(message, String.class);
|
||||
|
||||
JsonObject queuesAsJsonObject = JsonUtil.readJsonObject(result);
|
||||
JsonArray array = queuesAsJsonObject.getJsonArray("data");
|
||||
|
||||
for (int i = 0; i < array.size(); i++) {
|
||||
JsonObject object = array.getJsonObject(i);
|
||||
String name = object.getString("name");
|
||||
String messageCount = object.getString("messageCount");
|
||||
mapQueues.put(name, Integer.parseInt(messageCount));
|
||||
}
|
||||
}
|
||||
|
||||
private static void failed(ClientMessage message) throws Exception {
|
||||
|
||||
final String result = (String) ManagementHelper.getResult(message, String.class);
|
||||
|
||||
throw new Exception("Failed " + result);
|
||||
}
|
||||
}
|
|
@ -257,6 +257,13 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
|||
Wait.assertEquals(usingCore() ? 8 : 1, () -> serverControl.getAuthorizationCacheSize());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCurrentTime() throws Exception {
|
||||
long time = System.currentTimeMillis();
|
||||
ActiveMQServerControl serverControl = createManagementControl();
|
||||
Assert.assertTrue("serverControl returned an invalid time.", serverControl.getCurrentTimeMillis() >= time);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClearingSecurityCaches() throws Exception {
|
||||
ActiveMQServerControl serverControl = createManagementControl();
|
||||
|
|
|
@ -103,6 +103,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCurrentTimeMillis() {
|
||||
return (long)proxy.retrieveAttributeValue("currentTimeMillis", long.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startBrokerConnection(String name) throws Exception {
|
||||
proxy.invokeOperation("startBrokerConnection", name);
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.management;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
|
||||
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.json.JsonArray;
|
||||
import org.apache.activemq.artemis.json.JsonObject;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||
import org.apache.activemq.artemis.tests.util.RandomUtil;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class SimpleManagementTest extends ActiveMQTestBase {
|
||||
|
||||
public static final String LOCALHOST = "tcp://localhost:61616";
|
||||
private ActiveMQServer server;
|
||||
SimpleManagement simpleManagement = new SimpleManagement(LOCALHOST, "admin", "admin");
|
||||
|
||||
@Before
|
||||
public void setupServer() throws Exception {
|
||||
server = createServer(false, createDefaultConfig(0, true));
|
||||
|
||||
ClusterConnectionConfiguration ccconf = new ClusterConnectionConfiguration();
|
||||
ccconf.setStaticConnectors(new ArrayList<>()).getStaticConnectors().add("backup");
|
||||
ccconf.setName("cluster");
|
||||
ccconf.setConnectorName("live");
|
||||
server.getConfiguration().addConnectorConfiguration("live", LOCALHOST);
|
||||
server.getConfiguration().addClusterConfiguration(ccconf);
|
||||
|
||||
server.start();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQueues() throws Exception {
|
||||
server.start();
|
||||
String queueName = RandomUtil.randomString();
|
||||
server.addAddressInfo(new AddressInfo(queueName).addRoutingType(RoutingType.ANYCAST));
|
||||
server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setAddress(queueName).setDurable(true));
|
||||
|
||||
ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", LOCALHOST);
|
||||
|
||||
try (Connection connection = factory.createConnection()) {
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
MessageProducer producer = session.createProducer(session.createQueue(queueName));
|
||||
for (int i = 0; i < 33; i++) {
|
||||
producer.send(session.createMessage());
|
||||
}
|
||||
session.commit();
|
||||
}
|
||||
|
||||
Queue serverQueue = server.locateQueue(queueName);
|
||||
Wait.assertEquals(33, serverQueue::getMessageCount);
|
||||
|
||||
Map<String, Long> queues = simpleManagement.getQueueCounts(100);
|
||||
Assert.assertEquals((Long)33L, queues.get(queueName));
|
||||
Assert.assertEquals(33L, simpleManagement.getQueueCount(queueName));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListTopology() throws Exception {
|
||||
JsonArray topology = simpleManagement.listNetworkTopology();
|
||||
String nodeId = simpleManagement.getNodeID();
|
||||
Assert.assertEquals(1, topology.size());
|
||||
JsonObject node = topology.getJsonObject(0);
|
||||
Assert.assertEquals("localhost:61616", node.getString("live"));
|
||||
Assert.assertEquals(nodeId, node.getString("nodeID"));
|
||||
}
|
||||
|
||||
}
|
|
@ -1397,6 +1397,77 @@
|
|||
<configuration>${basedir}/target/classes/servers/mirrored-subscriptions/broker2</configuration>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<phase>test-compile</phase>
|
||||
<id>create-topology-check-one</id>
|
||||
<goals>
|
||||
<goal>create</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<role>amq</role>
|
||||
<user>admin</user>
|
||||
<password>admin</password>
|
||||
<allowAnonymous>false</allowAnonymous>
|
||||
<noWeb>false</noWeb>
|
||||
<clustered>true</clustered>
|
||||
<instance>${basedir}/target/topology-check/broker1</instance>
|
||||
<configuration>${basedir}/target/classes/servers/topology-check/broker1</configuration>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<phase>test-compile</phase>
|
||||
<id>create-topology-check-two</id>
|
||||
<goals>
|
||||
<goal>create</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<role>amq</role>
|
||||
<user>admin</user>
|
||||
<password>admin</password>
|
||||
<portOffset>1</portOffset>
|
||||
<allowAnonymous>false</allowAnonymous>
|
||||
<noWeb>false</noWeb>
|
||||
<clustered>true</clustered>
|
||||
<instance>${basedir}/target/topology-check/broker2</instance>
|
||||
<configuration>${basedir}/target/classes/servers/topology-check/broker2</configuration>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<phase>test-compile</phase>
|
||||
<id>create-topology-check-three</id>
|
||||
<goals>
|
||||
<goal>create</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<role>amq</role>
|
||||
<user>admin</user>
|
||||
<password>admin</password>
|
||||
<portOffset>2</portOffset>
|
||||
<allowAnonymous>false</allowAnonymous>
|
||||
<noWeb>false</noWeb>
|
||||
<clustered>true</clustered>
|
||||
<instance>${basedir}/target/topology-check/broker3</instance>
|
||||
<configuration>${basedir}/target/classes/servers/topology-check/broker3</configuration>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<phase>test-compile</phase>
|
||||
<id>create-topology-check-four</id>
|
||||
<goals>
|
||||
<goal>create</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<role>amq</role>
|
||||
<user>admin</user>
|
||||
<password>admin</password>
|
||||
<portOffset>3</portOffset>
|
||||
<allowAnonymous>false</allowAnonymous>
|
||||
<noWeb>false</noWeb>
|
||||
<clustered>true</clustered>
|
||||
<instance>${basedir}/target/topology-check/broker4</instance>
|
||||
<configuration>${basedir}/target/classes/servers/topology-check/broker4</configuration>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
|
|
|
@ -0,0 +1,290 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:xi="http://www.w3.org/2001/XInclude"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq:core ">
|
||||
|
||||
<name>broker1</name>
|
||||
|
||||
<security-enabled>false</security-enabled>
|
||||
|
||||
<persistence-enabled>true</persistence-enabled>
|
||||
|
||||
<!-- It is recommended to keep this value as 1, maximizing the number of records stored about redeliveries.
|
||||
However if you must preserve state of individual redeliveries, you may increase this value or set it to -1 (infinite). -->
|
||||
<max-redelivery-records>1</max-redelivery-records>
|
||||
|
||||
<!-- this could be ASYNCIO, MAPPED, NIO
|
||||
ASYNCIO: Linux Libaio
|
||||
MAPPED: mmap files
|
||||
NIO: Plain Java Files
|
||||
-->
|
||||
<journal-type>NIO</journal-type>
|
||||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<bindings-directory>./data/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>./data/journal</journal-directory>
|
||||
|
||||
<large-messages-directory>./data/large-messages</large-messages-directory>
|
||||
|
||||
|
||||
<!-- if you want to retain your journal uncomment this following configuration.
|
||||
|
||||
This will allow your system to keep 7 days of your data, up to 10G. Tweak it accordingly to your use case and capacity.
|
||||
|
||||
it is recommended to use a separate storage unit from the journal for performance considerations.
|
||||
|
||||
<journal-retention-directory period="7" unit="DAYS" storage-limit="10G">data/retention</journal-retention-directory>
|
||||
|
||||
You can also enable retention by using the argument journal-retention on the `artemis create` command -->
|
||||
|
||||
|
||||
|
||||
<journal-datasync>true</journal-datasync>
|
||||
|
||||
<journal-min-files>2</journal-min-files>
|
||||
|
||||
<journal-pool-files>10</journal-pool-files>
|
||||
|
||||
<journal-device-block-size>4096</journal-device-block-size>
|
||||
|
||||
<journal-file-size>10M</journal-file-size>
|
||||
<!--
|
||||
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
|
||||
<network-check-NIC>theNicName</network-check-NIC>
|
||||
-->
|
||||
|
||||
<!--
|
||||
Use this to use an HTTP server to validate the network
|
||||
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
|
||||
|
||||
<!-- <network-check-period>10000</network-check-period> -->
|
||||
<!-- <network-check-timeout>1000</network-check-timeout> -->
|
||||
|
||||
<!-- this is a comma separated list, no spaces, just DNS or IPs
|
||||
it should accept IPV6
|
||||
|
||||
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
|
||||
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
|
||||
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
|
||||
<!-- <network-check-list>10.0.0.1</network-check-list> -->
|
||||
|
||||
<!-- use this to customize the ping used for ipv4 addresses -->
|
||||
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
|
||||
|
||||
<!-- use this to customize the ping used for ipv6 addresses -->
|
||||
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
|
||||
|
||||
|
||||
|
||||
<connectors>
|
||||
<!-- Connector used to be announced through cluster connections and notifications -->
|
||||
<connector name="broker1">tcp://localhost:61616</connector>
|
||||
<connector name="broker2">tcp://localhost:61617</connector>
|
||||
<connector name="broker3">tcp://localhost:61618</connector>
|
||||
<connector name="broker4">tcp://localhost:61619</connector>
|
||||
|
||||
<connector name="broker5">tcp://localhost:61620</connector>
|
||||
<connector name="broker6">tcp://localhost:61621</connector>
|
||||
<connector name="broker7">tcp://localhost:61622</connector>
|
||||
<connector name="broker8">tcp://localhost:61623</connector>
|
||||
</connectors>
|
||||
|
||||
|
||||
<cluster-connections>
|
||||
<cluster-connection name="test">
|
||||
<connector-ref>broker1</connector-ref>
|
||||
<check-period>100</check-period>
|
||||
<connection-ttl>1000</connection-ttl>
|
||||
<retry-interval>10</retry-interval>
|
||||
<initial-connect-attempts>10</initial-connect-attempts>
|
||||
<reconnect-attempts>5</reconnect-attempts>
|
||||
<use-duplicate-detection>true</use-duplicate-detection>
|
||||
<message-load-balancing>STRICT</message-load-balancing>
|
||||
<producer-window-size>-1</producer-window-size>
|
||||
<notification-interval>100</notification-interval>
|
||||
<notification-attempts>100</notification-attempts>
|
||||
<static-connectors>
|
||||
<connector-ref>broker2</connector-ref>
|
||||
<connector-ref>broker3</connector-ref>
|
||||
<connector-ref>broker4</connector-ref>
|
||||
</static-connectors>
|
||||
</cluster-connection>
|
||||
</cluster-connections>
|
||||
|
||||
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
|
||||
<disk-scan-period>5000</disk-scan-period>
|
||||
|
||||
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
|
||||
that won't support flow control. -->
|
||||
<max-disk-usage>90</max-disk-usage>
|
||||
|
||||
<!-- should the broker detect dead locks and other issues -->
|
||||
<critical-analyzer>true</critical-analyzer>
|
||||
|
||||
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
|
||||
|
||||
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
|
||||
|
||||
<critical-analyzer-policy>HALT</critical-analyzer-policy>
|
||||
|
||||
|
||||
|
||||
<!-- the system will enter into page mode once you hit this limit. This is an estimate in bytes of how much the messages are using in memory
|
||||
|
||||
The system will use half of the available memory (-Xmx) by default for the global-max-size.
|
||||
You may specify a different value here if you need to customize it to your needs.
|
||||
|
||||
<global-max-size>100Mb</global-max-size> -->
|
||||
|
||||
<!-- the maximum number of messages accepted before entering full address mode.
|
||||
if global-max-size is specified the full address mode will be specified by whatever hits it first. -->
|
||||
<global-max-messages>-1</global-max-messages>
|
||||
|
||||
<acceptors>
|
||||
|
||||
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
|
||||
<!-- amqpCredits: The number of credits sent to AMQP producers -->
|
||||
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
|
||||
<!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
|
||||
as duplicate detection requires applicationProperties to be parsed on the server. -->
|
||||
<!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data.
|
||||
default: 102400, -1 would mean to disable large message control -->
|
||||
|
||||
<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
|
||||
"anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
|
||||
See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->
|
||||
|
||||
|
||||
<!-- Acceptor for every supported protocol -->
|
||||
<acceptor name="artemis">tcp://localhost:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
|
||||
|
||||
</acceptors>
|
||||
|
||||
|
||||
<cluster-user>cluster-admin</cluster-user>
|
||||
|
||||
<cluster-password>password-admin</cluster-password>
|
||||
|
||||
<security-settings>
|
||||
<security-setting match="#">
|
||||
<permission type="createNonDurableQueue" roles="amq"/>
|
||||
<permission type="deleteNonDurableQueue" roles="amq"/>
|
||||
<permission type="createDurableQueue" roles="amq"/>
|
||||
<permission type="deleteDurableQueue" roles="amq"/>
|
||||
<permission type="createAddress" roles="amq"/>
|
||||
<permission type="deleteAddress" roles="amq"/>
|
||||
<permission type="consume" roles="amq"/>
|
||||
<permission type="browse" roles="amq"/>
|
||||
<permission type="send" roles="amq"/>
|
||||
<!-- we need this otherwise ./artemis data imp wouldn't work -->
|
||||
<permission type="manage" roles="amq"/>
|
||||
</security-setting>
|
||||
</security-settings>
|
||||
|
||||
<address-settings>
|
||||
<!-- if you define auto-create on certain queues, management has to be auto-create -->
|
||||
<address-setting match="activemq.management#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<!-- with -1 only the global-max-size is in use for limiting -->
|
||||
<max-size-bytes>-1</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
</address-setting>
|
||||
<!--default for catch all-->
|
||||
<address-setting match="#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-delete-queues>false</auto-delete-queues>
|
||||
<auto-delete-addresses>false</auto-delete-addresses>
|
||||
|
||||
<!-- The size of each page file -->
|
||||
<page-size-bytes>10M</page-size-bytes>
|
||||
|
||||
<!-- When we start applying the address-full-policy, e.g paging -->
|
||||
<!-- Both are disabled by default, which means we will use the global-max-size/global-max-messages -->
|
||||
<max-size-bytes>-1</max-size-bytes>
|
||||
<max-size-messages>-1</max-size-messages>
|
||||
|
||||
<!-- When we read from paging into queues (memory) -->
|
||||
|
||||
<max-read-page-messages>-1</max-read-page-messages>
|
||||
<max-read-page-bytes>20M</max-read-page-bytes>
|
||||
|
||||
<!-- Limit on paging capacity before starting to throw errors -->
|
||||
|
||||
<page-limit-bytes>-1</page-limit-bytes>
|
||||
<page-limit-messages>-1</page-limit-messages>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
|
||||
<addresses>
|
||||
<address name="DLQ">
|
||||
<anycast>
|
||||
<queue name="DLQ" />
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="ExpiryQueue">
|
||||
<anycast>
|
||||
<queue name="ExpiryQueue" />
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="TEST">
|
||||
<anycast>
|
||||
<queue name="TEST"/>
|
||||
</anycast>
|
||||
</address>
|
||||
|
||||
</addresses>
|
||||
|
||||
|
||||
<!-- Uncomment the following if you want to use the Standard LoggingActiveMQServerPlugin pluging to log in events
|
||||
<broker-plugins>
|
||||
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
|
||||
<property key="LOG_ALL_EVENTS" value="true"/>
|
||||
<property key="LOG_CONNECTION_EVENTS" value="true"/>
|
||||
<property key="LOG_SESSION_EVENTS" value="true"/>
|
||||
<property key="LOG_CONSUMER_EVENTS" value="true"/>
|
||||
<property key="LOG_DELIVERING_EVENTS" value="true"/>
|
||||
<property key="LOG_SENDING_EVENTS" value="true"/>
|
||||
<property key="LOG_INTERNAL_EVENTS" value="true"/>
|
||||
</broker-plugin>
|
||||
</broker-plugins>
|
||||
-->
|
||||
|
||||
</core>
|
||||
</configuration>
|
|
@ -0,0 +1,292 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:xi="http://www.w3.org/2001/XInclude"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq:core ">
|
||||
|
||||
<name>broker2</name>
|
||||
|
||||
<security-enabled>false</security-enabled>
|
||||
|
||||
<persistence-enabled>true</persistence-enabled>
|
||||
|
||||
<!-- It is recommended to keep this value as 1, maximizing the number of records stored about redeliveries.
|
||||
However if you must preserve state of individual redeliveries, you may increase this value or set it to -1 (infinite). -->
|
||||
<max-redelivery-records>1</max-redelivery-records>
|
||||
|
||||
<!-- this could be ASYNCIO, MAPPED, NIO
|
||||
ASYNCIO: Linux Libaio
|
||||
MAPPED: mmap files
|
||||
NIO: Plain Java Files
|
||||
-->
|
||||
<journal-type>NIO</journal-type>
|
||||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<bindings-directory>./data/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>./data/journal</journal-directory>
|
||||
|
||||
<large-messages-directory>./data/large-messages</large-messages-directory>
|
||||
|
||||
|
||||
<!-- if you want to retain your journal uncomment this following configuration.
|
||||
|
||||
This will allow your system to keep 7 days of your data, up to 10G. Tweak it accordingly to your use case and capacity.
|
||||
|
||||
it is recommended to use a separate storage unit from the journal for performance considerations.
|
||||
|
||||
<journal-retention-directory period="7" unit="DAYS" storage-limit="10G">data/retention</journal-retention-directory>
|
||||
|
||||
You can also enable retention by using the argument journal-retention on the `artemis create` command -->
|
||||
|
||||
|
||||
|
||||
<journal-datasync>true</journal-datasync>
|
||||
|
||||
<journal-min-files>2</journal-min-files>
|
||||
|
||||
<journal-pool-files>10</journal-pool-files>
|
||||
|
||||
<journal-device-block-size>4096</journal-device-block-size>
|
||||
|
||||
<journal-file-size>10M</journal-file-size>
|
||||
<!--
|
||||
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
|
||||
<network-check-NIC>theNicName</network-check-NIC>
|
||||
-->
|
||||
|
||||
<!--
|
||||
Use this to use an HTTP server to validate the network
|
||||
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
|
||||
|
||||
<!-- <network-check-period>10000</network-check-period> -->
|
||||
<!-- <network-check-timeout>1000</network-check-timeout> -->
|
||||
|
||||
<!-- this is a comma separated list, no spaces, just DNS or IPs
|
||||
it should accept IPV6
|
||||
|
||||
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
|
||||
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
|
||||
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
|
||||
<!-- <network-check-list>10.0.0.1</network-check-list> -->
|
||||
|
||||
<!-- use this to customize the ping used for ipv4 addresses -->
|
||||
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
|
||||
|
||||
<!-- use this to customize the ping used for ipv6 addresses -->
|
||||
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
|
||||
|
||||
|
||||
|
||||
<connectors>
|
||||
<!-- Connector used to be announced through cluster connections and notifications -->
|
||||
<connector name="broker1">tcp://localhost:61616</connector>
|
||||
<connector name="broker2">tcp://localhost:61617</connector>
|
||||
<connector name="broker3">tcp://localhost:61618</connector>
|
||||
<connector name="broker4">tcp://localhost:61619</connector>
|
||||
|
||||
<connector name="broker5">tcp://localhost:61620</connector>
|
||||
<connector name="broker6">tcp://localhost:61621</connector>
|
||||
<connector name="broker7">tcp://localhost:61622</connector>
|
||||
<connector name="broker8">tcp://localhost:61623</connector>
|
||||
</connectors>
|
||||
|
||||
|
||||
<cluster-connections>
|
||||
<cluster-connection name="test">
|
||||
<connector-ref>broker2</connector-ref>
|
||||
<check-period>100</check-period>
|
||||
<connection-ttl>1000</connection-ttl>
|
||||
<retry-interval>10</retry-interval>
|
||||
<initial-connect-attempts>10</initial-connect-attempts>
|
||||
<reconnect-attempts>5</reconnect-attempts>
|
||||
<use-duplicate-detection>true</use-duplicate-detection>
|
||||
<message-load-balancing>STRICT</message-load-balancing>
|
||||
<producer-window-size>-1</producer-window-size>
|
||||
<notification-interval>100</notification-interval>
|
||||
<notification-attempts>100</notification-attempts>
|
||||
<static-connectors>
|
||||
<connector-ref>broker1</connector-ref>
|
||||
<connector-ref>broker3</connector-ref>
|
||||
<connector-ref>broker4</connector-ref>
|
||||
</static-connectors>
|
||||
</cluster-connection>
|
||||
</cluster-connections>
|
||||
|
||||
|
||||
|
||||
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
|
||||
<disk-scan-period>5000</disk-scan-period>
|
||||
|
||||
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
|
||||
that won't support flow control. -->
|
||||
<max-disk-usage>90</max-disk-usage>
|
||||
|
||||
<!-- should the broker detect dead locks and other issues -->
|
||||
<critical-analyzer>true</critical-analyzer>
|
||||
|
||||
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
|
||||
|
||||
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
|
||||
|
||||
<critical-analyzer-policy>HALT</critical-analyzer-policy>
|
||||
|
||||
|
||||
|
||||
<!-- the system will enter into page mode once you hit this limit. This is an estimate in bytes of how much the messages are using in memory
|
||||
|
||||
The system will use half of the available memory (-Xmx) by default for the global-max-size.
|
||||
You may specify a different value here if you need to customize it to your needs.
|
||||
|
||||
<global-max-size>100Mb</global-max-size> -->
|
||||
|
||||
<!-- the maximum number of messages accepted before entering full address mode.
|
||||
if global-max-size is specified the full address mode will be specified by whatever hits it first. -->
|
||||
<global-max-messages>-1</global-max-messages>
|
||||
|
||||
<acceptors>
|
||||
|
||||
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
|
||||
<!-- amqpCredits: The number of credits sent to AMQP producers -->
|
||||
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
|
||||
<!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
|
||||
as duplicate detection requires applicationProperties to be parsed on the server. -->
|
||||
<!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data.
|
||||
default: 102400, -1 would mean to disable large message control -->
|
||||
|
||||
<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
|
||||
"anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
|
||||
See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->
|
||||
|
||||
|
||||
<!-- Acceptor for every supported protocol -->
|
||||
<acceptor name="artemis">tcp://localhost:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
|
||||
|
||||
</acceptors>
|
||||
|
||||
|
||||
<cluster-user>cluster-admin</cluster-user>
|
||||
|
||||
<cluster-password>password-admin</cluster-password>
|
||||
|
||||
<security-settings>
|
||||
<security-setting match="#">
|
||||
<permission type="createNonDurableQueue" roles="amq"/>
|
||||
<permission type="deleteNonDurableQueue" roles="amq"/>
|
||||
<permission type="createDurableQueue" roles="amq"/>
|
||||
<permission type="deleteDurableQueue" roles="amq"/>
|
||||
<permission type="createAddress" roles="amq"/>
|
||||
<permission type="deleteAddress" roles="amq"/>
|
||||
<permission type="consume" roles="amq"/>
|
||||
<permission type="browse" roles="amq"/>
|
||||
<permission type="send" roles="amq"/>
|
||||
<!-- we need this otherwise ./artemis data imp wouldn't work -->
|
||||
<permission type="manage" roles="amq"/>
|
||||
</security-setting>
|
||||
</security-settings>
|
||||
|
||||
<address-settings>
|
||||
<!-- if you define auto-create on certain queues, management has to be auto-create -->
|
||||
<address-setting match="activemq.management#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<!-- with -1 only the global-max-size is in use for limiting -->
|
||||
<max-size-bytes>-1</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
</address-setting>
|
||||
<!--default for catch all-->
|
||||
<address-setting match="#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-delete-queues>false</auto-delete-queues>
|
||||
<auto-delete-addresses>false</auto-delete-addresses>
|
||||
|
||||
<!-- The size of each page file -->
|
||||
<page-size-bytes>10M</page-size-bytes>
|
||||
|
||||
<!-- When we start applying the address-full-policy, e.g paging -->
|
||||
<!-- Both are disabled by default, which means we will use the global-max-size/global-max-messages -->
|
||||
<max-size-bytes>-1</max-size-bytes>
|
||||
<max-size-messages>-1</max-size-messages>
|
||||
|
||||
<!-- When we read from paging into queues (memory) -->
|
||||
|
||||
<max-read-page-messages>-1</max-read-page-messages>
|
||||
<max-read-page-bytes>20M</max-read-page-bytes>
|
||||
|
||||
<!-- Limit on paging capacity before starting to throw errors -->
|
||||
|
||||
<page-limit-bytes>-1</page-limit-bytes>
|
||||
<page-limit-messages>-1</page-limit-messages>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
|
||||
<addresses>
|
||||
<address name="DLQ">
|
||||
<anycast>
|
||||
<queue name="DLQ" />
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="ExpiryQueue">
|
||||
<anycast>
|
||||
<queue name="ExpiryQueue" />
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="TEST">
|
||||
<anycast>
|
||||
<queue name="TEST"/>
|
||||
</anycast>
|
||||
</address>
|
||||
|
||||
</addresses>
|
||||
|
||||
|
||||
<!-- Uncomment the following if you want to use the Standard LoggingActiveMQServerPlugin pluging to log in events
|
||||
<broker-plugins>
|
||||
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
|
||||
<property key="LOG_ALL_EVENTS" value="true"/>
|
||||
<property key="LOG_CONNECTION_EVENTS" value="true"/>
|
||||
<property key="LOG_SESSION_EVENTS" value="true"/>
|
||||
<property key="LOG_CONSUMER_EVENTS" value="true"/>
|
||||
<property key="LOG_DELIVERING_EVENTS" value="true"/>
|
||||
<property key="LOG_SENDING_EVENTS" value="true"/>
|
||||
<property key="LOG_INTERNAL_EVENTS" value="true"/>
|
||||
</broker-plugin>
|
||||
</broker-plugins>
|
||||
-->
|
||||
|
||||
</core>
|
||||
</configuration>
|
|
@ -0,0 +1,292 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:xi="http://www.w3.org/2001/XInclude"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq:core ">
|
||||
|
||||
<name>broker2</name>
|
||||
|
||||
<security-enabled>false</security-enabled>
|
||||
|
||||
|
||||
<persistence-enabled>true</persistence-enabled>
|
||||
|
||||
<!-- It is recommended to keep this value as 1, maximizing the number of records stored about redeliveries.
|
||||
However if you must preserve state of individual redeliveries, you may increase this value or set it to -1 (infinite). -->
|
||||
<max-redelivery-records>1</max-redelivery-records>
|
||||
|
||||
<!-- this could be ASYNCIO, MAPPED, NIO
|
||||
ASYNCIO: Linux Libaio
|
||||
MAPPED: mmap files
|
||||
NIO: Plain Java Files
|
||||
-->
|
||||
<journal-type>NIO</journal-type>
|
||||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<bindings-directory>./data/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>./data/journal</journal-directory>
|
||||
|
||||
<large-messages-directory>./data/large-messages</large-messages-directory>
|
||||
|
||||
|
||||
<!-- if you want to retain your journal uncomment this following configuration.
|
||||
|
||||
This will allow your system to keep 7 days of your data, up to 10G. Tweak it accordingly to your use case and capacity.
|
||||
|
||||
it is recommended to use a separate storage unit from the journal for performance considerations.
|
||||
|
||||
<journal-retention-directory period="7" unit="DAYS" storage-limit="10G">data/retention</journal-retention-directory>
|
||||
|
||||
You can also enable retention by using the argument journal-retention on the `artemis create` command -->
|
||||
|
||||
|
||||
|
||||
<journal-datasync>true</journal-datasync>
|
||||
|
||||
<journal-min-files>2</journal-min-files>
|
||||
|
||||
<journal-pool-files>10</journal-pool-files>
|
||||
|
||||
<journal-device-block-size>4096</journal-device-block-size>
|
||||
|
||||
<journal-file-size>10M</journal-file-size>
|
||||
<!--
|
||||
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
|
||||
<network-check-NIC>theNicName</network-check-NIC>
|
||||
-->
|
||||
|
||||
<!--
|
||||
Use this to use an HTTP server to validate the network
|
||||
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
|
||||
|
||||
<!-- <network-check-period>10000</network-check-period> -->
|
||||
<!-- <network-check-timeout>1000</network-check-timeout> -->
|
||||
|
||||
<!-- this is a comma separated list, no spaces, just DNS or IPs
|
||||
it should accept IPV6
|
||||
|
||||
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
|
||||
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
|
||||
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
|
||||
<!-- <network-check-list>10.0.0.1</network-check-list> -->
|
||||
|
||||
<!-- use this to customize the ping used for ipv4 addresses -->
|
||||
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
|
||||
|
||||
<!-- use this to customize the ping used for ipv6 addresses -->
|
||||
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
|
||||
|
||||
|
||||
|
||||
<connectors>
|
||||
<!-- Connector used to be announced through cluster connections and notifications -->
|
||||
<connector name="broker1">tcp://localhost:61616</connector>
|
||||
<connector name="broker2">tcp://localhost:61617</connector>
|
||||
<connector name="broker3">tcp://localhost:61618</connector>
|
||||
<connector name="broker4">tcp://localhost:61619</connector>
|
||||
|
||||
<connector name="broker5">tcp://localhost:61620</connector>
|
||||
<connector name="broker6">tcp://localhost:61621</connector>
|
||||
<connector name="broker7">tcp://localhost:61622</connector>
|
||||
<connector name="broker8">tcp://localhost:61623</connector>
|
||||
</connectors>
|
||||
|
||||
|
||||
<cluster-connections>
|
||||
<cluster-connection name="test">
|
||||
<connector-ref>broker3</connector-ref>
|
||||
<check-period>100</check-period>
|
||||
<connection-ttl>1000</connection-ttl>
|
||||
<retry-interval>10</retry-interval>
|
||||
<initial-connect-attempts>10</initial-connect-attempts>
|
||||
<reconnect-attempts>5</reconnect-attempts>
|
||||
<use-duplicate-detection>true</use-duplicate-detection>
|
||||
<message-load-balancing>STRICT</message-load-balancing>
|
||||
<producer-window-size>-1</producer-window-size>
|
||||
<notification-interval>100</notification-interval>
|
||||
<notification-attempts>100</notification-attempts>
|
||||
<static-connectors>
|
||||
<connector-ref>broker1</connector-ref>
|
||||
<connector-ref>broker2</connector-ref>
|
||||
<connector-ref>broker4</connector-ref>
|
||||
</static-connectors>
|
||||
</cluster-connection>
|
||||
</cluster-connections>
|
||||
|
||||
|
||||
|
||||
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
|
||||
<disk-scan-period>5000</disk-scan-period>
|
||||
|
||||
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
|
||||
that won't support flow control. -->
|
||||
<max-disk-usage>90</max-disk-usage>
|
||||
|
||||
<!-- should the broker detect dead locks and other issues -->
|
||||
<critical-analyzer>true</critical-analyzer>
|
||||
|
||||
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
|
||||
|
||||
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
|
||||
|
||||
<critical-analyzer-policy>HALT</critical-analyzer-policy>
|
||||
|
||||
|
||||
|
||||
<!-- the system will enter into page mode once you hit this limit. This is an estimate in bytes of how much the messages are using in memory
|
||||
|
||||
The system will use half of the available memory (-Xmx) by default for the global-max-size.
|
||||
You may specify a different value here if you need to customize it to your needs.
|
||||
|
||||
<global-max-size>100Mb</global-max-size> -->
|
||||
|
||||
<!-- the maximum number of messages accepted before entering full address mode.
|
||||
if global-max-size is specified the full address mode will be specified by whatever hits it first. -->
|
||||
<global-max-messages>-1</global-max-messages>
|
||||
|
||||
<acceptors>
|
||||
|
||||
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
|
||||
<!-- amqpCredits: The number of credits sent to AMQP producers -->
|
||||
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
|
||||
<!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
|
||||
as duplicate detection requires applicationProperties to be parsed on the server. -->
|
||||
<!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data.
|
||||
default: 102400, -1 would mean to disable large message control -->
|
||||
|
||||
<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
|
||||
"anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
|
||||
See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->
|
||||
|
||||
|
||||
<!-- Acceptor for every supported protocol -->
|
||||
<acceptor name="artemis">tcp://localhost:61618?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
|
||||
|
||||
</acceptors>
|
||||
|
||||
|
||||
<cluster-user>cluster-admin</cluster-user>
|
||||
|
||||
<cluster-password>password-admin</cluster-password>
|
||||
|
||||
<security-settings>
|
||||
<security-setting match="#">
|
||||
<permission type="createNonDurableQueue" roles="amq"/>
|
||||
<permission type="deleteNonDurableQueue" roles="amq"/>
|
||||
<permission type="createDurableQueue" roles="amq"/>
|
||||
<permission type="deleteDurableQueue" roles="amq"/>
|
||||
<permission type="createAddress" roles="amq"/>
|
||||
<permission type="deleteAddress" roles="amq"/>
|
||||
<permission type="consume" roles="amq"/>
|
||||
<permission type="browse" roles="amq"/>
|
||||
<permission type="send" roles="amq"/>
|
||||
<!-- we need this otherwise ./artemis data imp wouldn't work -->
|
||||
<permission type="manage" roles="amq"/>
|
||||
</security-setting>
|
||||
</security-settings>
|
||||
|
||||
<address-settings>
|
||||
<!-- if you define auto-create on certain queues, management has to be auto-create -->
|
||||
<address-setting match="activemq.management#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<!-- with -1 only the global-max-size is in use for limiting -->
|
||||
<max-size-bytes>-1</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
</address-setting>
|
||||
<!--default for catch all-->
|
||||
<address-setting match="#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-delete-queues>false</auto-delete-queues>
|
||||
<auto-delete-addresses>false</auto-delete-addresses>
|
||||
|
||||
<!-- The size of each page file -->
|
||||
<page-size-bytes>10M</page-size-bytes>
|
||||
|
||||
<!-- When we start applying the address-full-policy, e.g paging -->
|
||||
<!-- Both are disabled by default, which means we will use the global-max-size/global-max-messages -->
|
||||
<max-size-bytes>-1</max-size-bytes>
|
||||
<max-size-messages>-1</max-size-messages>
|
||||
|
||||
<!-- When we read from paging into queues (memory) -->
|
||||
|
||||
<max-read-page-messages>-1</max-read-page-messages>
|
||||
<max-read-page-bytes>20M</max-read-page-bytes>
|
||||
|
||||
<!-- Limit on paging capacity before starting to throw errors -->
|
||||
|
||||
<page-limit-bytes>-1</page-limit-bytes>
|
||||
<page-limit-messages>-1</page-limit-messages>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
|
||||
<addresses>
|
||||
<address name="DLQ">
|
||||
<anycast>
|
||||
<queue name="DLQ" />
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="ExpiryQueue">
|
||||
<anycast>
|
||||
<queue name="ExpiryQueue" />
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="TEST">
|
||||
<anycast>
|
||||
<queue name="TEST"/>
|
||||
</anycast>
|
||||
</address>
|
||||
</addresses>
|
||||
|
||||
|
||||
<!-- Uncomment the following if you want to use the Standard LoggingActiveMQServerPlugin pluging to log in events
|
||||
<broker-plugins>
|
||||
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
|
||||
<property key="LOG_ALL_EVENTS" value="true"/>
|
||||
<property key="LOG_CONNECTION_EVENTS" value="true"/>
|
||||
<property key="LOG_SESSION_EVENTS" value="true"/>
|
||||
<property key="LOG_CONSUMER_EVENTS" value="true"/>
|
||||
<property key="LOG_DELIVERING_EVENTS" value="true"/>
|
||||
<property key="LOG_SENDING_EVENTS" value="true"/>
|
||||
<property key="LOG_INTERNAL_EVENTS" value="true"/>
|
||||
</broker-plugin>
|
||||
</broker-plugins>
|
||||
-->
|
||||
|
||||
</core>
|
||||
</configuration>
|
|
@ -0,0 +1,292 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:xi="http://www.w3.org/2001/XInclude"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq:core ">
|
||||
|
||||
<name>broker2</name>
|
||||
|
||||
|
||||
<security-enabled>false</security-enabled>
|
||||
|
||||
<persistence-enabled>true</persistence-enabled>
|
||||
|
||||
<!-- It is recommended to keep this value as 1, maximizing the number of records stored about redeliveries.
|
||||
However if you must preserve state of individual redeliveries, you may increase this value or set it to -1 (infinite). -->
|
||||
<max-redelivery-records>1</max-redelivery-records>
|
||||
|
||||
<!-- this could be ASYNCIO, MAPPED, NIO
|
||||
ASYNCIO: Linux Libaio
|
||||
MAPPED: mmap files
|
||||
NIO: Plain Java Files
|
||||
-->
|
||||
<journal-type>NIO</journal-type>
|
||||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<bindings-directory>./data/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>./data/journal</journal-directory>
|
||||
|
||||
<large-messages-directory>./data/large-messages</large-messages-directory>
|
||||
|
||||
|
||||
<!-- if you want to retain your journal uncomment this following configuration.
|
||||
|
||||
This will allow your system to keep 7 days of your data, up to 10G. Tweak it accordingly to your use case and capacity.
|
||||
|
||||
it is recommended to use a separate storage unit from the journal for performance considerations.
|
||||
|
||||
<journal-retention-directory period="7" unit="DAYS" storage-limit="10G">data/retention</journal-retention-directory>
|
||||
|
||||
You can also enable retention by using the argument journal-retention on the `artemis create` command -->
|
||||
|
||||
|
||||
|
||||
<journal-datasync>true</journal-datasync>
|
||||
|
||||
<journal-min-files>2</journal-min-files>
|
||||
|
||||
<journal-pool-files>10</journal-pool-files>
|
||||
|
||||
<journal-device-block-size>4096</journal-device-block-size>
|
||||
|
||||
<journal-file-size>10M</journal-file-size>
|
||||
<!--
|
||||
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
|
||||
<network-check-NIC>theNicName</network-check-NIC>
|
||||
-->
|
||||
|
||||
<!--
|
||||
Use this to use an HTTP server to validate the network
|
||||
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
|
||||
|
||||
<!-- <network-check-period>10000</network-check-period> -->
|
||||
<!-- <network-check-timeout>1000</network-check-timeout> -->
|
||||
|
||||
<!-- this is a comma separated list, no spaces, just DNS or IPs
|
||||
it should accept IPV6
|
||||
|
||||
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
|
||||
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
|
||||
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
|
||||
<!-- <network-check-list>10.0.0.1</network-check-list> -->
|
||||
|
||||
<!-- use this to customize the ping used for ipv4 addresses -->
|
||||
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
|
||||
|
||||
<!-- use this to customize the ping used for ipv6 addresses -->
|
||||
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
|
||||
|
||||
|
||||
|
||||
<connectors>
|
||||
<!-- Connector used to be announced through cluster connections and notifications -->
|
||||
<connector name="broker1">tcp://localhost:61616</connector>
|
||||
<connector name="broker2">tcp://localhost:61617</connector>
|
||||
<connector name="broker3">tcp://localhost:61618</connector>
|
||||
<connector name="broker4">tcp://localhost:61619</connector>
|
||||
|
||||
<connector name="broker5">tcp://localhost:61620</connector>
|
||||
<connector name="broker6">tcp://localhost:61621</connector>
|
||||
<connector name="broker7">tcp://localhost:61622</connector>
|
||||
<connector name="broker8">tcp://localhost:61623</connector>
|
||||
</connectors>
|
||||
|
||||
|
||||
<cluster-connections>
|
||||
<cluster-connection name="test">
|
||||
<connector-ref>broker4</connector-ref>
|
||||
<check-period>100</check-period>
|
||||
<connection-ttl>1000</connection-ttl>
|
||||
<retry-interval>10</retry-interval>
|
||||
<initial-connect-attempts>10</initial-connect-attempts>
|
||||
<reconnect-attempts>5</reconnect-attempts>
|
||||
<use-duplicate-detection>true</use-duplicate-detection>
|
||||
<message-load-balancing>STRICT</message-load-balancing>
|
||||
<producer-window-size>-1</producer-window-size>
|
||||
<notification-interval>100</notification-interval>
|
||||
<notification-attempts>100</notification-attempts>
|
||||
<static-connectors>
|
||||
<connector-ref>broker1</connector-ref>
|
||||
<connector-ref>broker2</connector-ref>
|
||||
<connector-ref>broker3</connector-ref>
|
||||
</static-connectors>
|
||||
</cluster-connection>
|
||||
</cluster-connections>
|
||||
|
||||
|
||||
|
||||
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
|
||||
<disk-scan-period>5000</disk-scan-period>
|
||||
|
||||
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
|
||||
that won't support flow control. -->
|
||||
<max-disk-usage>90</max-disk-usage>
|
||||
|
||||
<!-- should the broker detect dead locks and other issues -->
|
||||
<critical-analyzer>true</critical-analyzer>
|
||||
|
||||
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
|
||||
|
||||
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
|
||||
|
||||
<critical-analyzer-policy>HALT</critical-analyzer-policy>
|
||||
|
||||
|
||||
|
||||
<!-- the system will enter into page mode once you hit this limit. This is an estimate in bytes of how much the messages are using in memory
|
||||
|
||||
The system will use half of the available memory (-Xmx) by default for the global-max-size.
|
||||
You may specify a different value here if you need to customize it to your needs.
|
||||
|
||||
<global-max-size>100Mb</global-max-size> -->
|
||||
|
||||
<!-- the maximum number of messages accepted before entering full address mode.
|
||||
if global-max-size is specified the full address mode will be specified by whatever hits it first. -->
|
||||
<global-max-messages>-1</global-max-messages>
|
||||
|
||||
<acceptors>
|
||||
|
||||
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
|
||||
<!-- amqpCredits: The number of credits sent to AMQP producers -->
|
||||
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
|
||||
<!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
|
||||
as duplicate detection requires applicationProperties to be parsed on the server. -->
|
||||
<!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data.
|
||||
default: 102400, -1 would mean to disable large message control -->
|
||||
|
||||
<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
|
||||
"anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
|
||||
See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->
|
||||
|
||||
|
||||
<!-- Acceptor for every supported protocol -->
|
||||
<acceptor name="artemis">tcp://localhost:61619?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
|
||||
|
||||
</acceptors>
|
||||
|
||||
|
||||
<cluster-user>cluster-admin</cluster-user>
|
||||
|
||||
<cluster-password>password-admin</cluster-password>
|
||||
|
||||
<security-settings>
|
||||
<security-setting match="#">
|
||||
<permission type="createNonDurableQueue" roles="amq"/>
|
||||
<permission type="deleteNonDurableQueue" roles="amq"/>
|
||||
<permission type="createDurableQueue" roles="amq"/>
|
||||
<permission type="deleteDurableQueue" roles="amq"/>
|
||||
<permission type="createAddress" roles="amq"/>
|
||||
<permission type="deleteAddress" roles="amq"/>
|
||||
<permission type="consume" roles="amq"/>
|
||||
<permission type="browse" roles="amq"/>
|
||||
<permission type="send" roles="amq"/>
|
||||
<!-- we need this otherwise ./artemis data imp wouldn't work -->
|
||||
<permission type="manage" roles="amq"/>
|
||||
</security-setting>
|
||||
</security-settings>
|
||||
|
||||
<address-settings>
|
||||
<!-- if you define auto-create on certain queues, management has to be auto-create -->
|
||||
<address-setting match="activemq.management#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<!-- with -1 only the global-max-size is in use for limiting -->
|
||||
<max-size-bytes>-1</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
</address-setting>
|
||||
<!--default for catch all-->
|
||||
<address-setting match="#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-delete-queues>false</auto-delete-queues>
|
||||
<auto-delete-addresses>false</auto-delete-addresses>
|
||||
|
||||
<!-- The size of each page file -->
|
||||
<page-size-bytes>10M</page-size-bytes>
|
||||
|
||||
<!-- When we start applying the address-full-policy, e.g paging -->
|
||||
<!-- Both are disabled by default, which means we will use the global-max-size/global-max-messages -->
|
||||
<max-size-bytes>-1</max-size-bytes>
|
||||
<max-size-messages>-1</max-size-messages>
|
||||
|
||||
<!-- When we read from paging into queues (memory) -->
|
||||
|
||||
<max-read-page-messages>-1</max-read-page-messages>
|
||||
<max-read-page-bytes>20M</max-read-page-bytes>
|
||||
|
||||
<!-- Limit on paging capacity before starting to throw errors -->
|
||||
|
||||
<page-limit-bytes>-1</page-limit-bytes>
|
||||
<page-limit-messages>-1</page-limit-messages>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
|
||||
<addresses>
|
||||
<address name="DLQ">
|
||||
<anycast>
|
||||
<queue name="DLQ" />
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="ExpiryQueue">
|
||||
<anycast>
|
||||
<queue name="ExpiryQueue" />
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="TEST">
|
||||
<anycast>
|
||||
<queue name="TEST"/>
|
||||
</anycast>
|
||||
</address>
|
||||
</addresses>
|
||||
|
||||
|
||||
<!-- Uncomment the following if you want to use the Standard LoggingActiveMQServerPlugin pluging to log in events
|
||||
<broker-plugins>
|
||||
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
|
||||
<property key="LOG_ALL_EVENTS" value="true"/>
|
||||
<property key="LOG_CONNECTION_EVENTS" value="true"/>
|
||||
<property key="LOG_SESSION_EVENTS" value="true"/>
|
||||
<property key="LOG_CONSUMER_EVENTS" value="true"/>
|
||||
<property key="LOG_DELIVERING_EVENTS" value="true"/>
|
||||
<property key="LOG_SENDING_EVENTS" value="true"/>
|
||||
<property key="LOG_INTERNAL_EVENTS" value="true"/>
|
||||
</broker-plugin>
|
||||
</broker-plugins>
|
||||
-->
|
||||
|
||||
</core>
|
||||
</configuration>
|
|
@ -32,10 +32,10 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
|
||||
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||
import org.apache.activemq.artemis.util.ServerUtil;
|
||||
import org.apache.activemq.artemis.utils.SimpleManagement;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -70,11 +70,13 @@ public class MirroredSubscriptionTest extends SmokeTestBase {
|
|||
@Test
|
||||
public void testConsumeAll() throws Throwable {
|
||||
int COMMIT_INTERVAL = 100;
|
||||
int NUMBER_OF_MESSAGES = 300;
|
||||
long NUMBER_OF_MESSAGES = 300;
|
||||
int CLIENTS = 5;
|
||||
String mainURI = "tcp://localhost:61616";
|
||||
String secondURI = "tcp://localhost:61617";
|
||||
|
||||
SimpleManagement mainManagement = new SimpleManagement(mainURI, null, null);
|
||||
|
||||
String topicName = "myTopic";
|
||||
|
||||
ConnectionFactory cf = CFUtil.createConnectionFactory("amqp", mainURI);
|
||||
|
@ -101,7 +103,7 @@ public class MirroredSubscriptionTest extends SmokeTestBase {
|
|||
session.commit();
|
||||
}
|
||||
|
||||
Map<String, Integer> result = SimpleManagement.listQueues(mainURI, null, null, 100);
|
||||
Map<String, Long> result = mainManagement.getQueueCounts(100);
|
||||
result.entrySet().forEach(entry -> logger.info("Queue {} = {}", entry.getKey(), entry.getValue()));
|
||||
|
||||
checkMessages(NUMBER_OF_MESSAGES, CLIENTS, mainURI, secondURI);
|
||||
|
@ -164,7 +166,7 @@ public class MirroredSubscriptionTest extends SmokeTestBase {
|
|||
checkMessages(0, CLIENTS, mainURI, secondURI);
|
||||
}
|
||||
|
||||
private void checkMessages(int NUMBER_OF_MESSAGES, int CLIENTS, String mainURI, String secondURI) throws Exception {
|
||||
private void checkMessages(long NUMBER_OF_MESSAGES, int CLIENTS, String mainURI, String secondURI) throws Exception {
|
||||
for (int i = 0; i < CLIENTS; i++) {
|
||||
final int clientID = i;
|
||||
Wait.assertEquals(NUMBER_OF_MESSAGES, () -> getMessageCount(mainURI, "client" + clientID + ".subscription" + clientID));
|
||||
|
@ -172,18 +174,10 @@ public class MirroredSubscriptionTest extends SmokeTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
int getMessageCount(String uri, String queueName) throws Exception {
|
||||
long getMessageCount(String uri, String queueName) throws Exception {
|
||||
SimpleManagement management = new SimpleManagement(uri, null, null);
|
||||
try {
|
||||
Map<String, Integer> result = SimpleManagement.listQueues(uri, null, null, 100);
|
||||
|
||||
if (result == null) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
Integer resultReturn = result.get(queueName);
|
||||
|
||||
logger.debug("Result = {}, queueName={}, returnValue = {}", result, queueName, resultReturn);
|
||||
return resultReturn == null ? 0 : resultReturn;
|
||||
return management.getQueueCount(queueName);
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
// if an exception happened during a retry
|
||||
|
|
|
@ -88,6 +88,10 @@ public class SmokeTestBase extends ActiveMQTestBase {
|
|||
processes.add(process);
|
||||
}
|
||||
|
||||
public void removeProcess(Process process) {
|
||||
processes.remove(process);
|
||||
}
|
||||
|
||||
public Process startServer(String serverName, int portID, int timeout) throws Exception {
|
||||
Process process = ServerUtil.startServer(getServerLocation(serverName), serverName, portID, timeout);
|
||||
addProcess(process);
|
||||
|
|
|
@ -0,0 +1,150 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.smoke.topologycheck;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
|
||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||
import org.apache.activemq.artemis.cli.commands.tools.cluster.ClusterVerifier;
|
||||
import org.apache.activemq.artemis.json.JsonArray;
|
||||
import org.apache.activemq.artemis.json.JsonObject;
|
||||
import org.apache.activemq.artemis.json.JsonString;
|
||||
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
|
||||
import org.apache.activemq.artemis.util.ServerUtil;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class TopologyCheckTest extends SmokeTestBase {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static final String SERVER_NAME_1 = "topology-check/broker1";
|
||||
private static final String SERVER_NAME_2 = "topology-check/broker2";
|
||||
private static final String SERVER_NAME_3 = "topology-check/broker3";
|
||||
private static final String SERVER_NAME_4 = "topology-check/broker4";
|
||||
|
||||
private static final String URI_1 = "tcp://localhost:61616";
|
||||
private static final String URI_2 = "tcp://localhost:61617";
|
||||
private static final String URI_3 = "tcp://localhost:61618";
|
||||
private static final String URI_4 = "tcp://localhost:61619";
|
||||
|
||||
Process[] process = new Process[4];
|
||||
String[] URLS = new String[]{URI_1, URI_2, URI_3, URI_4};
|
||||
String[] SERVER_NAMES = new String[]{SERVER_NAME_1, SERVER_NAME_2, SERVER_NAME_3, SERVER_NAME_4};
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
cleanupData(SERVER_NAME_1);
|
||||
cleanupData(SERVER_NAME_2);
|
||||
cleanupData(SERVER_NAME_3);
|
||||
cleanupData(SERVER_NAME_4);
|
||||
for (int i = 0; i < process.length; i++) {
|
||||
process[i] = startServer(SERVER_NAMES[i], i, 0);
|
||||
}
|
||||
|
||||
for (int i = 0; i < process.length; i++) {
|
||||
ServerUtil.waitForServerToStart(i, "admin", "admin", 30_000);
|
||||
}
|
||||
|
||||
disableCheckThread();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckTopology() throws Throwable {
|
||||
String[] uris = new String[]{URI_1, URI_2, URI_3, URI_4};
|
||||
String[] nodes = new String[4];
|
||||
for (int i = 0; i < 4; i++) {
|
||||
SimpleManagement simpleManagement = new SimpleManagement(uris[i], "admin", "admin");
|
||||
nodes[i] = simpleManagement.getNodeID();
|
||||
logger.info("node[{}]={}", i, nodes[i]);
|
||||
}
|
||||
|
||||
validateTopology(uris, nodes, 0, 1, 2, 3);
|
||||
|
||||
shutdown(process[1]);
|
||||
validateTopology(uris, nodes, 0, 2, 3);
|
||||
|
||||
shutdown(process[3]);
|
||||
validateTopology(uris, nodes, 0, 2);
|
||||
|
||||
process[1] = startServer(SERVER_NAMES[1], 1, 10_000);
|
||||
validateTopology(uris, nodes, 0, 1, 2);
|
||||
|
||||
process[3] = startServer(SERVER_NAMES[3], 3, 10_000);
|
||||
validateTopology(uris, nodes, 0, 1, 2, 3);
|
||||
|
||||
shutdown(process[0]);
|
||||
process[0] = startServer(SERVER_NAMES[0], 0, 10_000);
|
||||
validateTopology(uris, nodes, 0, 1, 2, 3);
|
||||
|
||||
shutdown(process[0]);
|
||||
shutdown(process[1]);
|
||||
shutdown(process[2]);
|
||||
process[2] = startServer(SERVER_NAMES[2], 2, 10_000);
|
||||
validateTopology(uris, nodes, 2, 3);
|
||||
|
||||
process[0] = startServer(SERVER_NAMES[0], 0, 10_000);
|
||||
process[1] = startServer(SERVER_NAMES[1], 1, 10_000);
|
||||
validateTopology(uris, nodes, 0, 1, 2, 3);
|
||||
|
||||
shutdown(process[3]);
|
||||
process[3] = startServer(SERVER_NAMES[3], 0, 10_000);
|
||||
validateTopology(uris, nodes, 0, 1, 2, 3);
|
||||
}
|
||||
|
||||
private void shutdown(Process process) {
|
||||
process.destroy();
|
||||
Wait.assertFalse(process::isAlive);
|
||||
removeProcess(process);
|
||||
}
|
||||
|
||||
private void validateTopology(String[] uris, String[] nodeIDs, int... validNodes) throws Exception {
|
||||
for (int i : validNodes) {
|
||||
SimpleManagement simpleManagement = new SimpleManagement(uris[i], "admin", "adming");
|
||||
Wait.assertEquals(validNodes.length, () -> simpleManagement.listNetworkTopology().size(), 500, 5000);
|
||||
|
||||
JsonArray topologyArray = simpleManagement.listNetworkTopology();
|
||||
Assert.assertNotNull(topologyArray);
|
||||
|
||||
for (int j : validNodes) {
|
||||
JsonObject itemTopology = findTopologyNode(nodeIDs[j], topologyArray);
|
||||
Assert.assertNotNull(itemTopology);
|
||||
JsonString jsonString = (JsonString) itemTopology.get("live");
|
||||
Assert.assertEquals(uris[j], "tcp://" + jsonString.getString());
|
||||
}
|
||||
}
|
||||
|
||||
ClusterVerifier clusterVerifier = new ClusterVerifier(uris[validNodes[0]], "admin", "admin");
|
||||
Assert.assertTrue(clusterVerifier.verify(new ActionContext()));
|
||||
}
|
||||
|
||||
JsonObject findTopologyNode(String nodeID, JsonArray topologyArray) {
|
||||
for (int i = 0; i < topologyArray.size(); i++) {
|
||||
JsonObject object = topologyArray.getJsonObject(i);
|
||||
if (nodeID.equals(object.getString("nodeID"))) {
|
||||
return object;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -30,6 +30,8 @@ under the License.
|
|||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<security-enabled>false</security-enabled>
|
||||
|
||||
<cluster-user>exampleUser</cluster-user>
|
||||
|
||||
<cluster-password>secret</cluster-password>
|
||||
|
@ -56,7 +58,7 @@ under the License.
|
|||
|
||||
<!-- Acceptors -->
|
||||
<acceptors>
|
||||
<acceptor name="netty-acceptor">tcp://localhost:61101</acceptor>
|
||||
<acceptor name="artemis">tcp://localhost:61101</acceptor>
|
||||
</acceptors>
|
||||
|
||||
<cluster-connections>
|
||||
|
|
|
@ -30,6 +30,8 @@ under the License.
|
|||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<security-enabled>false</security-enabled>
|
||||
|
||||
<cluster-user>exampleUser</cluster-user>
|
||||
|
||||
<cluster-password>secret</cluster-password>
|
||||
|
@ -56,7 +58,7 @@ under the License.
|
|||
|
||||
<!-- Acceptors -->
|
||||
<acceptors>
|
||||
<acceptor name="netty-acceptor">tcp://localhost:61102</acceptor>
|
||||
<acceptor name="artemis">tcp://localhost:61102</acceptor>
|
||||
</acceptors>
|
||||
|
||||
<cluster-connections>
|
||||
|
|
|
@ -30,6 +30,8 @@ under the License.
|
|||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<security-enabled>false</security-enabled>
|
||||
|
||||
<cluster-user>exampleUser</cluster-user>
|
||||
|
||||
<cluster-password>secret</cluster-password>
|
||||
|
@ -56,7 +58,7 @@ under the License.
|
|||
|
||||
<!-- Acceptors -->
|
||||
<acceptors>
|
||||
<acceptor name="netty-acceptor">tcp://localhost:61100</acceptor>
|
||||
<acceptor name="artemis">tcp://localhost:61100</acceptor>
|
||||
</acceptors>
|
||||
|
||||
<cluster-connections>
|
||||
|
|
|
@ -30,6 +30,8 @@ under the License.
|
|||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<security-enabled>false</security-enabled>
|
||||
|
||||
<cluster-user>exampleUser</cluster-user>
|
||||
|
||||
<cluster-password>secret</cluster-password>
|
||||
|
@ -57,7 +59,7 @@ under the License.
|
|||
|
||||
<!-- Acceptors -->
|
||||
<acceptors>
|
||||
<acceptor name="netty-acceptor">tcp://localhost:61001</acceptor>
|
||||
<acceptor name="artemis">tcp://localhost:61001</acceptor>
|
||||
</acceptors>
|
||||
|
||||
<cluster-connections>
|
||||
|
|
|
@ -30,6 +30,8 @@ under the License.
|
|||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<security-enabled>false</security-enabled>
|
||||
|
||||
<cluster-user>exampleUser</cluster-user>
|
||||
|
||||
<cluster-password>secret</cluster-password>
|
||||
|
@ -37,7 +39,7 @@ under the License.
|
|||
<ha-policy>
|
||||
<replication>
|
||||
<master>
|
||||
<group-name>test-pair-one</group-name>
|
||||
<group-name>test-pair-two</group-name>
|
||||
<check-for-live-server>false</check-for-live-server>
|
||||
<vote-on-replication-failure>false</vote-on-replication-failure>
|
||||
<quorum-size>2</quorum-size>
|
||||
|
@ -57,7 +59,7 @@ under the License.
|
|||
|
||||
<!-- Acceptors -->
|
||||
<acceptors>
|
||||
<acceptor name="netty-acceptor">tcp://localhost:61002</acceptor>
|
||||
<acceptor name="artemis">tcp://localhost:61002</acceptor>
|
||||
</acceptors>
|
||||
|
||||
<cluster-connections>
|
||||
|
|
|
@ -32,6 +32,8 @@ under the License.
|
|||
|
||||
<cluster-user>exampleUser</cluster-user>
|
||||
|
||||
<security-enabled>false</security-enabled>
|
||||
|
||||
<cluster-password>secret</cluster-password>
|
||||
|
||||
<ha-policy>
|
||||
|
@ -57,7 +59,7 @@ under the License.
|
|||
|
||||
<!-- Acceptors -->
|
||||
<acceptors>
|
||||
<acceptor name="netty-acceptor">tcp://localhost:61000</acceptor>
|
||||
<acceptor name="artemis">tcp://localhost:61000</acceptor>
|
||||
</acceptors>
|
||||
|
||||
<cluster-connections>
|
||||
|
|
Loading…
Reference in New Issue