ARTEMIS-2739 Artemis health check tool

Add the command `check` to the Command Line utility. This command exposes some
checks for nodes and queues using the management API for most of them.
The checks have been implemented to be modular. Each user can compose his own
health check, ie to produce and consume from a queue the command is
`artemis check queue --name TEST --produce 1 --consume 1`.
This commit is contained in:
brusdev 2020-05-15 13:19:55 +02:00 committed by Clebert Suconic
parent 5db1aad860
commit 8d5a212bd2
13 changed files with 1352 additions and 3 deletions

View File

@ -46,7 +46,12 @@ public class Artemis {
String instance = System.getProperty("artemis.instance"); String instance = System.getProperty("artemis.instance");
File fileInstance = instance != null ? new File(instance) : null; File fileInstance = instance != null ? new File(instance) : null;
execute(fileHome, fileInstance, args);
Object result = execute(fileHome, fileInstance, args);
if (result instanceof Exception) {
// Set a nonzero status code for the exceptions caught and printed by org.apache.activemq.artemis.cli.Artemis.execute
System.exit(1);
}
} }
/** /**

View File

@ -31,6 +31,9 @@ import org.apache.activemq.artemis.cli.commands.InputAbstract;
import org.apache.activemq.artemis.cli.commands.InvalidOptionsError; import org.apache.activemq.artemis.cli.commands.InvalidOptionsError;
import org.apache.activemq.artemis.cli.commands.Kill; import org.apache.activemq.artemis.cli.commands.Kill;
import org.apache.activemq.artemis.cli.commands.Mask; import org.apache.activemq.artemis.cli.commands.Mask;
import org.apache.activemq.artemis.cli.commands.check.HelpCheck;
import org.apache.activemq.artemis.cli.commands.check.NodeCheck;
import org.apache.activemq.artemis.cli.commands.check.QueueCheck;
import org.apache.activemq.artemis.cli.commands.queue.StatQueue; import org.apache.activemq.artemis.cli.commands.queue.StatQueue;
import org.apache.activemq.artemis.cli.commands.Run; import org.apache.activemq.artemis.cli.commands.Run;
import org.apache.activemq.artemis.cli.commands.Stop; import org.apache.activemq.artemis.cli.commands.Stop;
@ -154,6 +157,9 @@ public class Artemis {
String instance = artemisInstance != null ? artemisInstance.getAbsolutePath() : System.getProperty("artemis.instance"); String instance = artemisInstance != null ? artemisInstance.getAbsolutePath() : System.getProperty("artemis.instance");
Cli.CliBuilder<Action> builder = Cli.<Action>builder("artemis").withDescription("ActiveMQ Artemis Command Line").withCommand(HelpAction.class).withCommand(Producer.class).withCommand(Consumer.class).withCommand(Browse.class).withCommand(Mask.class).withDefaultCommand(HelpAction.class); Cli.CliBuilder<Action> builder = Cli.<Action>builder("artemis").withDescription("ActiveMQ Artemis Command Line").withCommand(HelpAction.class).withCommand(Producer.class).withCommand(Consumer.class).withCommand(Browse.class).withCommand(Mask.class).withDefaultCommand(HelpAction.class);
builder.withGroup("check").withDescription("Check tools group (node|queue) (example ./artemis check node)").
withDefaultCommand(HelpCheck.class).withCommands(NodeCheck.class, QueueCheck.class);
builder.withGroup("queue").withDescription("Queue tools group (create|delete|update|stat|purge) (example ./artemis queue create)"). builder.withGroup("queue").withDescription("Queue tools group (create|delete|update|stat|purge) (example ./artemis queue create)").
withDefaultCommand(HelpQueue.class).withCommands(CreateQueue.class, DeleteQueue.class, UpdateQueue.class, StatQueue.class, PurgeQueue.class); withDefaultCommand(HelpQueue.class).withCommands(CreateQueue.class, DeleteQueue.class, UpdateQueue.class, StatQueue.class, PurgeQueue.class);

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.check;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ActiveMQManagementProxy;
import org.apache.activemq.artemis.cli.CLIException;
import org.apache.activemq.artemis.cli.commands.AbstractAction;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.StopWatch;
public abstract class CheckAbstract extends AbstractAction {
@Option(name = "--name", description = "Name of the target to check")
private String name;
@Option(name = "--timeout", description = "Time to wait for the check execution, in milliseconds")
private int timeout = 30000;
@Option(name = "--fail-at-end", description = "If a particular module check fails, continue the rest of the checks")
private boolean failAtEnd = false;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> checkTask = executor.submit(() -> {
int errorTasks = 0;
int failedTasks = 0;
int successTasks = 0;
try (ActiveMQConnectionFactory factory = createCoreConnectionFactory();
ServerLocator serverLocator = factory.getServerLocator();
ActiveMQManagementProxy managementProxy = new ActiveMQManagementProxy(serverLocator, user, password)) {
managementProxy.start();
StopWatch watch = new StopWatch();
CheckTask[] checkTasks = getCheckTasks();
CheckContext checkContext = new CheckContext(context, factory, managementProxy);
context.out.println("Running " + this.getClass().getSimpleName());
watch.start();
try {
for (CheckTask task: checkTasks) {
try {
context.out.print("Checking that " + task.getAssertion() + " ... ");
task.getCallback().run(checkContext);
successTasks++;
context.out.println("success");
} catch (Exception e) {
String reason;
if (e instanceof CheckException) {
failedTasks++;
reason = "failure: " + e.getMessage();
} else {
errorTasks++;
reason = "error: " + e.getMessage();
}
context.out.println(reason);
if (verbose) {
context.out.println(e.toString());
e.printStackTrace(context.out);
}
if (!failAtEnd) {
fail(reason);
}
}
}
} finally {
watch.stop();
int skippedTasks = checkTasks.length - failedTasks - errorTasks - successTasks;
context.out.println(String.format("Checks run: %d, Failures: %d, Errors: %d, Skipped: %d, Time elapsed: %.03f sec - %s",
checkTasks.length, failedTasks, errorTasks, skippedTasks,
((float)watch.getTime()) / 1000, this.getClass().getSimpleName()));
}
if (successTasks < checkTasks.length) {
fail("checks not successful");
}
}
return successTasks;
});
try {
return checkTask.get(timeout, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof CLIException) {
throw (CLIException)cause;
} else {
fail(cause.toString());
}
} catch (TimeoutException e) {
fail("timeout");
} finally {
executor.shutdown();
}
return 0;
}
private void fail(String reason) throws Exception {
throw new CLIException(this.getClass().getSimpleName() + " failed. Reason: " + reason);
}
protected abstract CheckTask[] getCheckTasks();
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.check;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.core.management.ActiveMQManagementProxy;
import org.apache.activemq.artemis.api.core.management.NodeInfo;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
public class CheckContext extends ActionContext {
private ActionContext actionContext;
private ActiveMQConnectionFactory factory;
private ActiveMQManagementProxy managementProxy;
private String nodeId;
private Map<String, NodeInfo> topology;
public ActionContext getActionContext() {
return actionContext;
}
public void setActionContext(ActionContext actionContext) {
this.actionContext = actionContext;
}
public ActiveMQConnectionFactory getFactory() {
return factory;
}
public void setFactory(ActiveMQConnectionFactory factory) {
this.factory = factory;
}
public ActiveMQManagementProxy getManagementProxy() {
return managementProxy;
}
public void setManagementProxy(ActiveMQManagementProxy managementProxy) {
this.managementProxy = managementProxy;
}
public String getNodeId() throws Exception {
if (nodeId == null) {
nodeId = managementProxy.invokeOperation(String.class, "broker", "getNodeID");
}
return nodeId;
}
public Map<String, NodeInfo> getTopology() throws Exception {
if (topology == null) {
topology = Arrays.stream(NodeInfo.from(managementProxy.invokeOperation(
String.class, "broker", "listNetworkTopology"))).
collect(Collectors.toMap(node -> node.getId(), node -> node));
}
return topology;
}
public CheckContext(ActionContext actionContext, ActiveMQConnectionFactory factory, ActiveMQManagementProxy managementProxy) {
this.actionContext = actionContext;
this.factory = factory;
this.managementProxy = managementProxy;
}
}

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.check;
public class CheckException extends Exception {
public CheckException(String message) {
super(message);
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.check;
public class CheckTask {
public interface Callback {
void run(CheckContext context) throws Exception;
}
private final String assertion;
private final Callback callback;
public String getAssertion() {
return assertion;
}
public Callback getCallback() {
return callback;
}
public CheckTask(String assertion, Callback callback) {
this.assertion = assertion;
this.callback = callback;
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.check;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import io.airlift.airline.Help;
import org.apache.activemq.artemis.cli.commands.Action;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.InvalidOptionsError;
import org.apache.activemq.artemis.cli.commands.OptionsUtil;
public class HelpCheck extends Help implements Action {
@Override
public boolean isVerbose() {
return false;
}
@Override
public void setHomeValues(File brokerHome, File brokerInstance) {
}
@Override
public String getBrokerInstance() {
return null;
}
@Override
public String getBrokerHome() {
return null;
}
@Override
public void checkOptions(String[] options) throws InvalidOptionsError {
OptionsUtil.checkCommandOptions(this.getClass(), options);
}
@Override
public Object execute(ActionContext context) throws Exception {
List<String> commands = new ArrayList<>(1);
commands.add("check");
help(global, commands);
return null;
}
}

View File

@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.check;
import java.util.ArrayList;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.management.NodeInfo;
@Command(name = "node", description = "Check a node")
public class NodeCheck extends CheckAbstract {
@Option(name = "--up", description = "Check that the node is started, it is executed by default if there are no other checks")
private boolean up;
@Option(name = "--diskUsage", description = "Disk usage percentage to check or -1 to use the max-disk-usage")
private Integer diskUsage;
@Option(name = "--memoryUsage", description = "Memory usage percentage to check")
private Integer memoryUsage;
@Option(name = "--live", description = "Check that the node has a live")
private boolean live;
@Option(name = "--backup", description = "Check that the node has a backup")
private boolean backup;
@Option(name = "--peers", description = "Number of peers to check")
private Integer peers;
public boolean isUp() {
return up;
}
public void setUp(boolean up) {
this.up = up;
}
public Integer getDiskUsage() {
return diskUsage;
}
public void setDiskUsage(Integer diskUsage) {
this.diskUsage = diskUsage;
}
public Integer getMemoryUsage() {
return memoryUsage;
}
public void setMemoryUsage(Integer memoryUsage) {
this.memoryUsage = memoryUsage;
}
public boolean isLive() {
return live;
}
public void setLive(boolean live) {
this.live = live;
}
public boolean isBackup() {
return backup;
}
public void setBackup(boolean backup) {
this.backup = backup;
}
public Integer getPeers() {
return peers;
}
public void setPeers(Integer peers) {
this.peers = peers;
}
@Override
protected CheckTask[] getCheckTasks() {
ArrayList<CheckTask> checkTasks = new ArrayList<>();
if (live) {
checkTasks.add(new CheckTask("the node has a live", this::checkNodeLive));
}
if (backup) {
checkTasks.add(new CheckTask("the node has a backup", this::checkNodeBackup));
}
if (peers != null) {
if (peers > 0) {
checkTasks.add(new CheckTask(String.format("there are %d peers", peers), this::checkNodePeers));
} else {
throw new IllegalArgumentException("Invalid peers number to check: " + peers);
}
}
if (diskUsage != null) {
if (diskUsage == -1) {
checkTasks.add(new CheckTask("the disk usage is less then the max-disk-usage", this::checkNodeDiskUsage));
} else if (diskUsage > 0 && diskUsage < 100) {
checkTasks.add(new CheckTask("the disk usage is less then " + diskUsage, this::checkNodeDiskUsage));
} else {
throw new IllegalArgumentException("Invalid disk usage percentage: " + diskUsage);
}
}
if (memoryUsage != null) {
if (memoryUsage > 0 && memoryUsage < 100) {
checkTasks.add(new CheckTask("the memory usage is less then " + memoryUsage, this::checkNodeMemoryUsage));
} else {
throw new IllegalArgumentException("Invalid memory usage percentage: " + memoryUsage);
}
}
if (up || checkTasks.size() == 0) {
checkTasks.add(0, new CheckTask("the node is started", this::checkNodeUp));
}
return checkTasks.toArray(new CheckTask[checkTasks.size()]);
}
private void checkNodeUp(final CheckContext context) throws Exception {
if (!context.getManagementProxy().invokeOperation(Boolean.class, "broker", "isStarted")) {
throw new CheckException("The node isn't started.");
}
}
private void checkNodeLive(final CheckContext context) throws Exception {
String nodeId = getName();
if (nodeId == null) {
nodeId = context.getNodeId();
}
NodeInfo node = context.getTopology().get(nodeId);
if (node == null || node.getLive() == null) {
throw new CheckException("No live found for the node " + nodeId);
}
}
private void checkNodeBackup(final CheckContext context) throws Exception {
String nodeId = getName();
if (nodeId == null) {
nodeId = context.getNodeId();
}
NodeInfo node = context.getTopology().get(nodeId);
if (node == null || node.getBackup() == null) {
throw new CheckException("No backup found for the node " + nodeId);
}
}
private void checkNodePeers(final CheckContext context) throws Exception {
int topologyPeers = context.getTopology().values().stream().
mapToInt(node -> (node.getLive() != null ? 1 : 0) +
(node.getBackup() != null ? 1 : 0)).sum();
if (topologyPeers < peers) {
throw new CheckException("Insufficient peers: " + peers);
}
}
private void checkNodeDiskUsage(final CheckContext context) throws Exception {
int thresholdValue;
if (diskUsage == -1) {
thresholdValue = context.getManagementProxy().invokeOperation(
int.class, "broker", "getMaxDiskUsage");
} else {
thresholdValue = diskUsage;
}
checkNodeUsage(context, "getDiskStoreUsagePercentage", thresholdValue);
}
private void checkNodeMemoryUsage(final CheckContext context) throws Exception {
checkNodeUsage(context, "getAddressMemoryUsagePercentage", memoryUsage);
}
private void checkNodeUsage(final CheckContext context, final String name, final int thresholdValue) throws Exception {
int usageValue = context.getManagementProxy().invokeOperation(int.class, "broker", name);
if (usageValue > thresholdValue) {
throw new CheckException("The " + (name.startsWith("get") ? name.substring(3) : name) +
" " + usageValue + " is less than " + thresholdValue);
}
}
}

View File

@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.check;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import java.util.ArrayList;
import java.util.Enumeration;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
@Command(name = "queue", description = "Check a queue")
public class QueueCheck extends CheckAbstract {
@Option(name = "--up", description = "Check that the queue exists and is not paused, it is executed by default if there are no other checks")
private boolean up;
@Option(name = "--browse", description = "Number of the messages to browse or -1 to check that the queue is browsable")
private Integer browse;
@Option(name = "--consume", description = "Number of the messages to consume or -1 to check that the queue is consumable")
private Integer consume;
@Option(name = "--produce", description = "Number of the messages to produce")
private Integer produce;
public boolean isUp() {
return up;
}
public void setUp(boolean up) {
this.up = up;
}
public Integer getBrowse() {
return browse;
}
public void setBrowse(Integer browse) {
this.browse = browse;
}
public Integer getConsume() {
return consume;
}
public void setConsume(Integer consume) {
this.consume = consume;
}
public Integer getProduce() {
return produce;
}
public void setProduce(Integer produce) {
this.produce = produce;
}
@Override
protected CheckTask[] getCheckTasks() {
ArrayList<CheckTask> checkTasks = new ArrayList<>();
if (getName() == null) {
throw new IllegalArgumentException("The name of the queue is required");
}
if (produce != null) {
if (produce > 0) {
checkTasks.add(new CheckTask(String.format("a producer can send %d messages to the queue %s",
produce, getName()), this::checkQueueProduce));
} else {
throw new IllegalArgumentException("Invalid number of messages to produce: " + produce);
}
}
if (browse != null) {
if (browse == -1) {
checkTasks.add(new CheckTask(String.format("a consumer can browse the queue",
getName()), this::checkQueueBrowse));
} else if (browse > 0) {
checkTasks.add(new CheckTask(String.format("a consumer can browse %d messages from the queue %s",
browse, getName()), this::checkQueueBrowse));
} else {
throw new IllegalArgumentException("Invalid number of messages to browse: " + browse);
}
}
if (consume != null) {
if (consume == -1) {
checkTasks.add(new CheckTask(String.format("a consumer can consume the queue %s",
getName()), this::checkQueueConsume));
} else if (consume > 0) {
checkTasks.add(new CheckTask(String.format("a consumer can consume %d messages from the queue %s",
consume, getName()), this::checkQueueConsume));
} else {
throw new IllegalArgumentException("Invalid number of messages to consume: " + consume);
}
}
if (up || checkTasks.size() == 0) {
checkTasks.add(0, new CheckTask(String.format("the queue %s exists and is not paused",
getName()), this::checkQueueUp));
}
return checkTasks.toArray(new CheckTask[checkTasks.size()]);
}
private void checkQueueUp(final CheckContext context) throws Exception {
if (context.getManagementProxy().invokeOperation(Boolean.class,ResourceNames.QUEUE + getName(), "isPaused")) {
throw new CheckException("The queue is paused.");
}
}
private void checkQueueProduce(final CheckContext context) throws Exception {
try (Connection connection = context.getFactory().createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer queueProducer = session.createProducer(session.createQueue(getName()))) {
connection.start();
int count = 0;
while (count < produce) {
queueProducer.send(session.createTextMessage("CHECK_MESSAGE"));
count++;
}
}
}
private void checkQueueBrowse(final CheckContext context) throws Exception {
try (Connection connection = context.getFactory().createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser queueBrowser = session.createBrowser(session.createQueue(getName()))) {
connection.start();
Enumeration<Message> queueBrowserEnum = queueBrowser.getEnumeration();
if (browse == -1) {
queueBrowserEnum.hasMoreElements();
} else {
int count = 0;
while (count < browse) {
if (!queueBrowserEnum.hasMoreElements() || queueBrowserEnum.nextElement() == null) {
throw new CheckException("Insufficient messages to browse: " + count);
}
count++;
}
}
}
}
private void checkQueueConsume(final CheckContext context) throws Exception {
try (Connection connection = context.getFactory().createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer queueConsumer = session.createConsumer(session.createQueue(getName()))) {
connection.start();
if (consume == -1) {
queueConsumer.receiveNoWait();
} else {
int count = 0;
while (count < consume) {
if (queueConsumer.receive() == null) {
throw new CheckException("Insufficient messages to consume: " + count);
}
count++;
}
}
}
}
}

View File

@ -0,0 +1,420 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.cli.test;
import java.io.File;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.cli.Artemis;
import org.apache.activemq.artemis.cli.CLIException;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.Run;
import org.apache.activemq.artemis.cli.commands.check.NodeCheck;
import org.apache.activemq.artemis.cli.commands.check.QueueCheck;
import org.apache.activemq.artemis.cli.commands.tools.LockAbstract;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Test;
public class CheckTest extends CliTestBase {
final String queueName = "TEST";
@Test
public void testNodeCheckUp() throws Exception {
NodeCheck nodeCheck;
TestActionContext context;
startServer();
try {
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
Assert.assertEquals(1, nodeCheck.execute(context));
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
nodeCheck.setUp(true);
Assert.assertEquals(1, nodeCheck.execute(context));
} finally {
stopServer();
}
}
@Test
public void testNodeCheckDiskUsage() throws Exception {
NodeCheck nodeCheck;
TestActionContext context;
startServer();
try {
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
nodeCheck.setDiskUsage(-1);
Assert.assertEquals(1, nodeCheck.execute(context));
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
nodeCheck.setDiskUsage(90);
Assert.assertEquals(1, nodeCheck.execute(context));
try {
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
nodeCheck.setDiskUsage(0);
nodeCheck.execute(context);
Assert.fail("CLIException expected.");
} catch (Exception e) {
Assert.assertTrue("CLIException expected.", e instanceof CLIException);
}
} finally {
stopServer();
}
}
@Test
public void testNodeCheckMemoryUsage() throws Exception {
NodeCheck nodeCheck;
TestActionContext context;
startServer();
try {
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
nodeCheck.setMemoryUsage(90);
Assert.assertEquals(1, nodeCheck.execute(context));
try {
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
nodeCheck.setMemoryUsage(-1);
nodeCheck.execute(context);
Assert.fail("CLIException expected.");
} catch (Exception e) {
Assert.assertTrue("CLIException expected.", e instanceof CLIException);
}
} finally {
stopServer();
}
}
@Test
public void testNodeCheckTopology() throws Exception {
NodeCheck nodeCheck;
TestActionContext context;
File masterInstance = new File(temporaryFolder.getRoot(), "masterInstance");
File slaveInstance = new File(temporaryFolder.getRoot(), "slaveInstance");
Run.setEmbedded(true);
setupAuth(masterInstance);
Artemis.main("create", masterInstance.getAbsolutePath(), "--cluster-password", "artemis", "--cluster-user", "artemis", "--clustered",
"--replicated", "--host", "127.0.0.1", "--default-port", "61616", "--silent", "--no-autotune", "--no-web", "--require-login");
Artemis.main("create", slaveInstance.getAbsolutePath(), "--cluster-password", "artemis", "--cluster-user", "artemis", "--clustered",
"--replicated", "--host", "127.0.0.1", "--default-port", "61626", "--silent", "--no-autotune", "--no-web", "--require-login", "--slave");
System.setProperty("artemis.instance", masterInstance.getAbsolutePath());
Object master = Artemis.execute(false, null, masterInstance, "run");
ActiveMQServerImpl masterServer = (ActiveMQServerImpl)((Pair)master).getB();
try {
Wait.assertTrue("Master isn't active", () -> masterServer.isActive(), 10000);
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
nodeCheck.setLive(true);
Assert.assertEquals(1, nodeCheck.execute(context));
try {
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
nodeCheck.setBackup(true);
nodeCheck.execute(context);
Assert.fail("CLIException expected.");
} catch (Exception e) {
Assert.assertTrue("CLIException expected.", e instanceof CLIException);
}
LockAbstract.unlock();
Object slave = Artemis.execute(false, null, slaveInstance, "run");
ActiveMQServerImpl slaveServer = (ActiveMQServerImpl)((Pair)slave).getB();
Wait.assertTrue("Backup isn't announced", () -> slaveServer.getBackupManager() != null &&
slaveServer.getBackupManager().isStarted() && slaveServer.getBackupManager().isBackupAnnounced(), 30000);
try {
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
nodeCheck.setLive(true);
nodeCheck.setBackup(true);
nodeCheck.setPeers(2);
Assert.assertEquals(3, nodeCheck.execute(context));
} finally {
Artemis.internalExecute(null, slaveInstance, new String[] {"stop"}, ActionContext.system());
}
} finally {
stopServer();
}
}
@Test
public void testQueueCheckUp() throws Exception {
QueueCheck queueCheck;
TestActionContext context;
Object serverInstance = startServer();
try {
ActiveMQServerImpl server = (ActiveMQServerImpl)((Pair)serverInstance).getB();
try {
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setName(queueName);
queueCheck.execute(context);
Assert.fail("CLIException expected.");
} catch (Exception e) {
Assert.assertTrue("CLIException expected.", e instanceof CLIException);
}
server.createQueue(new QueueConfiguration(queueName));
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setName(queueName);
Assert.assertEquals(1, queueCheck.execute(context));
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setUp(true);
queueCheck.setName(queueName);
Assert.assertEquals(1, queueCheck.execute(context));
} finally {
stopServer();
}
}
@Test
public void testQueueCheckBrowse() throws Exception {
final int messages = 3;
QueueCheck queueCheck;
TestActionContext context;
Object serverInstance = startServer();
try {
ActiveMQServerImpl server = (ActiveMQServerImpl)((Pair)serverInstance).getB();
server.createQueue(new QueueConfiguration(queueName));
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setName(queueName);
queueCheck.setBrowse(null);
Assert.assertEquals(1, queueCheck.execute(context));
QueueControl queueControl = (QueueControl)server.getManagementService().
getResource(ResourceNames.QUEUE + queueName);
for (int i = 0; i < messages; i++) {
queueControl.sendMessage(null, Message.BYTES_TYPE, Base64.encodeBytes(
queueName.getBytes()), true, "admin", "admin");
}
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setName(queueName);
queueCheck.setBrowse(messages);
Assert.assertEquals(1, queueCheck.execute(context));
} finally {
stopServer();
}
}
@Test
public void testQueueCheckConsume() throws Exception {
final int messages = 3;
QueueCheck queueCheck;
TestActionContext context;
Object serverInstance = startServer();
try {
ActiveMQServerImpl server = (ActiveMQServerImpl)((Pair)serverInstance).getB();
server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setName(queueName);
queueCheck.setConsume(null);
Assert.assertEquals(1, queueCheck.execute(context));
QueueControl queueControl = (QueueControl)server.getManagementService().
getResource(ResourceNames.QUEUE + queueName);
for (int i = 0; i < messages; i++) {
queueControl.sendMessage(null, Message.BYTES_TYPE, Base64.encodeBytes(
queueName.getBytes()), true, "admin", "admin");
}
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setName(queueName);
queueCheck.setConsume(messages);
Assert.assertEquals(1, queueCheck.execute(context));
} finally {
stopServer();
}
}
@Test
public void testQueueCheckConsumeTimeout() throws Exception {
QueueCheck queueCheck;
TestActionContext context;
startServer();
try {
try {
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setName(queueName);
queueCheck.setConsume(1);
queueCheck.setTimeout(3000);
queueCheck.execute(context);
Assert.fail("CLIException expected.");
} catch (Exception e) {
Assert.assertTrue("CLIException expected.", e instanceof CLIException);
}
} finally {
stopServer();
}
}
@Test
public void testQueueCheckProduce() throws Exception {
final int messages = 3;
QueueCheck queueCheck;
TestActionContext context;
QueueControl queueControl;
Object serverInstance = startServer();
ActiveMQServerImpl server = (ActiveMQServerImpl)((Pair)serverInstance).getB();
try {
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setName(queueName);
queueCheck.setProduce(messages);
Assert.assertEquals(1, queueCheck.execute(context));
queueControl = (QueueControl)server.getManagementService().
getResource(ResourceNames.QUEUE + queueName);
Assert.assertEquals(messages, queueControl.getMessageCount());
} finally {
stopServer();
}
}
@Test
public void testQueueCheckProduceAndConsume() throws Exception {
final int messages = 3;
QueueCheck queueCheck;
TestActionContext context;
Object serverInstance = startServer();
ActiveMQServerImpl server = (ActiveMQServerImpl)((Pair)serverInstance).getB();
server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
try {
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setName(queueName);
queueCheck.setProduce(messages);
queueCheck.setBrowse(messages);
queueCheck.setConsume(messages);
Assert.assertEquals(3, queueCheck.execute(context));
QueueControl queueControl = (QueueControl)server.getManagementService().
getResource(ResourceNames.QUEUE + queueName);
Assert.assertEquals(0, queueControl.getMessageCount());
} finally {
stopServer();
}
}
}

View File

@ -85,13 +85,13 @@ public class CliTestBase {
LockAbstract.unlock(); LockAbstract.unlock();
} }
protected void startServer() throws Exception { protected Object startServer() throws Exception {
File rootDirectory = new File(temporaryFolder.getRoot(), "broker"); File rootDirectory = new File(temporaryFolder.getRoot(), "broker");
setupAuth(rootDirectory); setupAuth(rootDirectory);
Run.setEmbedded(true); Run.setEmbedded(true);
Artemis.main("create", rootDirectory.getAbsolutePath(), "--silent", "--no-fsync", "--no-autotune", "--no-web", "--require-login", "--disable-persistence"); Artemis.main("create", rootDirectory.getAbsolutePath(), "--silent", "--no-fsync", "--no-autotune", "--no-web", "--require-login", "--disable-persistence");
System.setProperty("artemis.instance", rootDirectory.getAbsolutePath()); System.setProperty("artemis.instance", rootDirectory.getAbsolutePath());
Artemis.internalExecute("run"); return Artemis.internalExecute("run");
} }
void setupAuth() { void setupAuth() {

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core.management;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientRequestor;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
public class ActiveMQManagementProxy implements AutoCloseable {
private final String username;
private final String password;
private final ServerLocator locator;
private ClientSessionFactory sessionFactory;
private ClientSession session;
private ClientRequestor requestor;
public ActiveMQManagementProxy(final ServerLocator locator, final String username, final String password) {
this.locator = locator;
this.username = username;
this.password = password;
}
public void start() throws Exception {
sessionFactory = locator.createSessionFactory();
session = sessionFactory.createSession(username, password, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE);
requestor = new ClientRequestor(session, ActiveMQDefaultConfiguration.getDefaultManagementAddress());
session.start();
}
public <T> T invokeOperation(final Class<T> type, final String resourceName, final String operationName, final Object... operationArgs) throws Exception {
ClientMessage request = session.createMessage(false);
ManagementHelper.putOperationInvocation(request, resourceName, operationName, operationArgs);
ClientMessage reply = requestor.request(request);
if (ManagementHelper.hasOperationSucceeded(reply)) {
return (T)ManagementHelper.getResult(reply, type);
} else {
throw new Exception("Failed to invoke " + resourceName + "." + operationName + ". Reason: " + ManagementHelper.getResult(reply, String.class));
}
}
public void stop() throws ActiveMQException {
session.stop();
}
@Override
public void close() throws Exception {
requestor.close();
session.close();
sessionFactory.close();
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core.management;
import javax.json.JsonArray;
import javax.json.JsonObject;
import org.apache.activemq.artemis.api.core.JsonUtil;
/**
* Helper class to create Java Objects from the
* JSON serialization returned by {@link ActiveMQServerControl#listNetworkTopology()}.
*/
public class NodeInfo {
private final String id;
private final String live;
private final String backup;
public String getId() {
return id;
}
public String getLive() {
return live;
}
public String getBackup() {
return backup;
}
/**
* Returns an array of NodeInfo corresponding to the JSON serialization returned
* by {@link ActiveMQServerControl#listNetworkTopology()}.
*/
public static NodeInfo[] from(final String jsonString) throws Exception {
JsonArray array = JsonUtil.readJsonArray(jsonString);
NodeInfo[] nodes = new NodeInfo[array.size()];
for (int i = 0; i < array.size(); i++) {
JsonObject nodeObject = array.getJsonObject(i);
NodeInfo role = new NodeInfo(nodeObject.getString("nodeID"), nodeObject.getString("live", null), nodeObject.getString("backup", null));
nodes[i] = role;
}
return nodes;
}
public NodeInfo(String id, String live, String backup) {
this.id = id;
this.live = live;
this.backup = backup;
}
}