This commit is contained in:
Clebert Suconic 2020-05-15 09:43:03 -04:00
commit 56797baf69
13 changed files with 1356 additions and 3 deletions

View File

@ -46,7 +46,12 @@ public class Artemis {
String instance = System.getProperty("artemis.instance");
File fileInstance = instance != null ? new File(instance) : null;
execute(fileHome, fileInstance, args);
Object result = execute(fileHome, fileInstance, args);
if (result instanceof Exception) {
// Set a nonzero status code for the exceptions caught and printed by org.apache.activemq.artemis.cli.Artemis.execute
System.exit(1);
}
}
/**

View File

@ -31,6 +31,9 @@ import org.apache.activemq.artemis.cli.commands.InputAbstract;
import org.apache.activemq.artemis.cli.commands.InvalidOptionsError;
import org.apache.activemq.artemis.cli.commands.Kill;
import org.apache.activemq.artemis.cli.commands.Mask;
import org.apache.activemq.artemis.cli.commands.check.HelpCheck;
import org.apache.activemq.artemis.cli.commands.check.NodeCheck;
import org.apache.activemq.artemis.cli.commands.check.QueueCheck;
import org.apache.activemq.artemis.cli.commands.queue.StatQueue;
import org.apache.activemq.artemis.cli.commands.Run;
import org.apache.activemq.artemis.cli.commands.Stop;
@ -154,6 +157,9 @@ public class Artemis {
String instance = artemisInstance != null ? artemisInstance.getAbsolutePath() : System.getProperty("artemis.instance");
Cli.CliBuilder<Action> builder = Cli.<Action>builder("artemis").withDescription("ActiveMQ Artemis Command Line").withCommand(HelpAction.class).withCommand(Producer.class).withCommand(Consumer.class).withCommand(Browse.class).withCommand(Mask.class).withDefaultCommand(HelpAction.class);
builder.withGroup("check").withDescription("Check tools group (node|queue) (example ./artemis check node)").
withDefaultCommand(HelpCheck.class).withCommands(NodeCheck.class, QueueCheck.class);
builder.withGroup("queue").withDescription("Queue tools group (create|delete|update|stat|purge) (example ./artemis queue create)").
withDefaultCommand(HelpQueue.class).withCommands(CreateQueue.class, DeleteQueue.class, UpdateQueue.class, StatQueue.class, PurgeQueue.class);

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.check;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ActiveMQManagementProxy;
import org.apache.activemq.artemis.cli.CLIException;
import org.apache.activemq.artemis.cli.commands.AbstractAction;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.StopWatch;
public abstract class CheckAbstract extends AbstractAction {
@Option(name = "--name", description = "Name of the target to check")
protected String name;
@Option(name = "--timeout", description = "Time to wait for the check execution, in milliseconds")
private int timeout = 30000;
@Option(name = "--fail-at-end", description = "If a particular module check fails, continue the rest of the checks")
private boolean failAtEnd = false;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> checkTask = executor.submit(() -> {
int errorTasks = 0;
int failedTasks = 0;
int successTasks = 0;
try (ActiveMQConnectionFactory factory = createCoreConnectionFactory();
ServerLocator serverLocator = factory.getServerLocator();
ActiveMQManagementProxy managementProxy = new ActiveMQManagementProxy(serverLocator, user, password)) {
managementProxy.start();
StopWatch watch = new StopWatch();
CheckTask[] checkTasks = getCheckTasks();
CheckContext checkContext = new CheckContext(context, factory, managementProxy);
context.out.println("Running " + this.getClass().getSimpleName());
watch.start();
try {
for (CheckTask task: checkTasks) {
try {
context.out.print("Checking that " + task.getAssertion() + " ... ");
task.getCallback().run(checkContext);
successTasks++;
context.out.println("success");
} catch (Exception e) {
String reason;
if (e instanceof CheckException) {
failedTasks++;
reason = "failure: " + e.getMessage();
} else {
errorTasks++;
reason = "error: " + e.getMessage();
}
context.out.println(reason);
if (verbose) {
context.out.println(e.toString());
e.printStackTrace(context.out);
}
if (!failAtEnd) {
fail(reason);
}
}
}
} finally {
watch.stop();
int skippedTasks = checkTasks.length - failedTasks - errorTasks - successTasks;
context.out.println(String.format("Checks run: %d, Failures: %d, Errors: %d, Skipped: %d, Time elapsed: %.03f sec - %s",
checkTasks.length, failedTasks, errorTasks, skippedTasks,
((float)watch.getTime()) / 1000, this.getClass().getSimpleName()));
}
if (successTasks < checkTasks.length) {
fail("checks not successful");
}
}
return successTasks;
});
try {
return checkTask.get(timeout, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof CLIException) {
throw (CLIException)cause;
} else {
fail(cause.toString());
}
} catch (TimeoutException e) {
fail("timeout");
} finally {
executor.shutdown();
}
return 0;
}
private void fail(String reason) throws Exception {
throw new CLIException(this.getClass().getSimpleName() + " failed. Reason: " + reason);
}
protected abstract CheckTask[] getCheckTasks();
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.check;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.core.management.ActiveMQManagementProxy;
import org.apache.activemq.artemis.api.core.management.NodeInfo;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
public class CheckContext extends ActionContext {
private ActionContext actionContext;
private ActiveMQConnectionFactory factory;
private ActiveMQManagementProxy managementProxy;
private String nodeId;
private Map<String, NodeInfo> topology;
public ActionContext getActionContext() {
return actionContext;
}
public void setActionContext(ActionContext actionContext) {
this.actionContext = actionContext;
}
public ActiveMQConnectionFactory getFactory() {
return factory;
}
public void setFactory(ActiveMQConnectionFactory factory) {
this.factory = factory;
}
public ActiveMQManagementProxy getManagementProxy() {
return managementProxy;
}
public void setManagementProxy(ActiveMQManagementProxy managementProxy) {
this.managementProxy = managementProxy;
}
public String getNodeId() throws Exception {
if (nodeId == null) {
nodeId = managementProxy.invokeOperation(String.class, "broker", "getNodeID");
}
return nodeId;
}
public Map<String, NodeInfo> getTopology() throws Exception {
if (topology == null) {
topology = Arrays.stream(NodeInfo.from(managementProxy.invokeOperation(
String.class, "broker", "listNetworkTopology"))).
collect(Collectors.toMap(node -> node.getId(), node -> node));
}
return topology;
}
public CheckContext(ActionContext actionContext, ActiveMQConnectionFactory factory, ActiveMQManagementProxy managementProxy) {
this.actionContext = actionContext;
this.factory = factory;
this.managementProxy = managementProxy;
}
}

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.check;
public class CheckException extends Exception {
public CheckException(String message) {
super(message);
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.check;
public class CheckTask {
public interface Callback {
void run(CheckContext context) throws Exception;
}
private final String assertion;
private final Callback callback;
public String getAssertion() {
return assertion;
}
public Callback getCallback() {
return callback;
}
public CheckTask(String assertion, Callback callback) {
this.assertion = assertion;
this.callback = callback;
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.check;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import io.airlift.airline.Help;
import org.apache.activemq.artemis.cli.commands.Action;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.InvalidOptionsError;
import org.apache.activemq.artemis.cli.commands.OptionsUtil;
public class HelpCheck extends Help implements Action {
@Override
public boolean isVerbose() {
return false;
}
@Override
public void setHomeValues(File brokerHome, File brokerInstance) {
}
@Override
public String getBrokerInstance() {
return null;
}
@Override
public String getBrokerHome() {
return null;
}
@Override
public void checkOptions(String[] options) throws InvalidOptionsError {
OptionsUtil.checkCommandOptions(this.getClass(), options);
}
@Override
public Object execute(ActionContext context) throws Exception {
List<String> commands = new ArrayList<>(1);
commands.add("check");
help(global, commands);
return null;
}
}

View File

@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.check;
import java.util.ArrayList;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.management.NodeInfo;
@Command(name = "node", description = "Check a node")
public class NodeCheck extends CheckAbstract {
@Option(name = "--up", description = "Check that the node is started, it is executed by default if there are no other checks")
private boolean up;
@Option(name = "--diskUsage", description = "Disk usage percentage to check or -1 to use the max-disk-usage")
private Integer diskUsage;
@Option(name = "--memoryUsage", description = "Memory usage percentage to check")
private Integer memoryUsage;
@Option(name = "--live", description = "Check that the node has a live")
private boolean live;
@Option(name = "--backup", description = "Check that the node has a backup")
private boolean backup;
@Option(name = "--peers", description = "Number of peers to check")
private Integer peers;
public boolean isUp() {
return up;
}
public void setUp(boolean up) {
this.up = up;
}
public Integer getDiskUsage() {
return diskUsage;
}
public void setDiskUsage(Integer diskUsage) {
this.diskUsage = diskUsage;
}
public Integer getMemoryUsage() {
return memoryUsage;
}
public void setMemoryUsage(Integer memoryUsage) {
this.memoryUsage = memoryUsage;
}
public boolean isLive() {
return live;
}
public void setLive(boolean live) {
this.live = live;
}
public boolean isBackup() {
return backup;
}
public void setBackup(boolean backup) {
this.backup = backup;
}
public Integer getPeers() {
return peers;
}
public void setPeers(Integer peers) {
this.peers = peers;
}
@Override
protected CheckTask[] getCheckTasks() {
ArrayList<CheckTask> checkTasks = new ArrayList<>();
if (live) {
checkTasks.add(new CheckTask("the node has a live", this::checkNodeLive));
}
if (backup) {
checkTasks.add(new CheckTask("the node has a backup", this::checkNodeBackup));
}
if (peers != null) {
if (peers > 0) {
checkTasks.add(new CheckTask(String.format("there are %d peers", peers), this::checkNodePeers));
} else {
throw new IllegalArgumentException("Invalid peers number to check: " + peers);
}
}
if (diskUsage != null) {
if (diskUsage == -1) {
checkTasks.add(new CheckTask("the disk usage is less then the max-disk-usage", this::checkNodeDiskUsage));
} else if (diskUsage > 0 && diskUsage < 100) {
checkTasks.add(new CheckTask("the disk usage is less then " + diskUsage, this::checkNodeDiskUsage));
} else {
throw new IllegalArgumentException("Invalid disk usage percentage: " + diskUsage);
}
}
if (memoryUsage != null) {
if (memoryUsage > 0 && memoryUsage < 100) {
checkTasks.add(new CheckTask("the memory usage is less then " + memoryUsage, this::checkNodeMemoryUsage));
} else {
throw new IllegalArgumentException("Invalid memory usage percentage: " + memoryUsage);
}
}
if (up || checkTasks.size() == 0) {
checkTasks.add(0, new CheckTask("the node is started", this::checkNodeUp));
}
return checkTasks.toArray(new CheckTask[checkTasks.size()]);
}
private void checkNodeUp(final CheckContext context) throws Exception {
if (!context.getManagementProxy().invokeOperation(Boolean.class, "broker", "isStarted")) {
throw new CheckException("The node isn't started.");
}
}
private void checkNodeLive(final CheckContext context) throws Exception {
String nodeId = getName();
if (nodeId == null) {
nodeId = context.getNodeId();
}
NodeInfo node = context.getTopology().get(nodeId);
if (node == null || node.getLive() == null) {
throw new CheckException("No live found for the node " + nodeId);
}
}
private void checkNodeBackup(final CheckContext context) throws Exception {
String nodeId = getName();
if (nodeId == null) {
nodeId = context.getNodeId();
}
NodeInfo node = context.getTopology().get(nodeId);
if (node == null || node.getBackup() == null) {
throw new CheckException("No backup found for the node " + nodeId);
}
}
private void checkNodePeers(final CheckContext context) throws Exception {
int topologyPeers = context.getTopology().values().stream().
mapToInt(node -> (node.getLive() != null ? 1 : 0) +
(node.getBackup() != null ? 1 : 0)).sum();
if (topologyPeers < peers) {
throw new CheckException("Insufficient peers: " + peers);
}
}
private void checkNodeDiskUsage(final CheckContext context) throws Exception {
int thresholdValue;
if (diskUsage == -1) {
thresholdValue = context.getManagementProxy().invokeOperation(
int.class, "broker", "getMaxDiskUsage");
} else {
thresholdValue = diskUsage;
}
checkNodeUsage(context, "getDiskStoreUsagePercentage", thresholdValue);
}
private void checkNodeMemoryUsage(final CheckContext context) throws Exception {
checkNodeUsage(context, "getAddressMemoryUsagePercentage", memoryUsage);
}
private void checkNodeUsage(final CheckContext context, final String name, final int thresholdValue) throws Exception {
int usageValue = context.getManagementProxy().invokeOperation(int.class, "broker", name);
if (usageValue > thresholdValue) {
throw new CheckException("The " + (name.startsWith("get") ? name.substring(3) : name) +
" " + usageValue + " is less than " + thresholdValue);
}
}
}

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.check;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import java.util.ArrayList;
import java.util.Enumeration;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
@Command(name = "queue", description = "Check a queue")
public class QueueCheck extends CheckAbstract {
@Option(name = "--up", description = "Check that the queue exists and is not paused, it is executed by default if there are no other checks")
private boolean up;
@Option(name = "--browse", description = "Number of the messages to browse or -1 to check that the queue is browsable")
private Integer browse;
@Option(name = "--consume", description = "Number of the messages to consume or -1 to check that the queue is consumable")
private Integer consume;
@Option(name = "--produce", description = "Number of the messages to produce")
private Integer produce;
public boolean isUp() {
return up;
}
public void setUp(boolean up) {
this.up = up;
}
public Integer getBrowse() {
return browse;
}
public void setBrowse(Integer browse) {
this.browse = browse;
}
public Integer getConsume() {
return consume;
}
public void setConsume(Integer consume) {
this.consume = consume;
}
public Integer getProduce() {
return produce;
}
public void setProduce(Integer produce) {
this.produce = produce;
}
@Override
protected CheckTask[] getCheckTasks() {
ArrayList<CheckTask> checkTasks = new ArrayList<>();
if (getName() == null) {
name = input("--name", "Name is a mandatory property for Queue check", null);
}
if (getName() == null) {
throw new IllegalArgumentException("The name of the queue is required");
}
if (produce != null) {
if (produce > 0) {
checkTasks.add(new CheckTask(String.format("a producer can send %d messages to the queue %s",
produce, getName()), this::checkQueueProduce));
} else {
throw new IllegalArgumentException("Invalid number of messages to produce: " + produce);
}
}
if (browse != null) {
if (browse == -1) {
checkTasks.add(new CheckTask(String.format("a consumer can browse the queue",
getName()), this::checkQueueBrowse));
} else if (browse > 0) {
checkTasks.add(new CheckTask(String.format("a consumer can browse %d messages from the queue %s",
browse, getName()), this::checkQueueBrowse));
} else {
throw new IllegalArgumentException("Invalid number of messages to browse: " + browse);
}
}
if (consume != null) {
if (consume == -1) {
checkTasks.add(new CheckTask(String.format("a consumer can consume the queue %s",
getName()), this::checkQueueConsume));
} else if (consume > 0) {
checkTasks.add(new CheckTask(String.format("a consumer can consume %d messages from the queue %s",
consume, getName()), this::checkQueueConsume));
} else {
throw new IllegalArgumentException("Invalid number of messages to consume: " + consume);
}
}
if (up || checkTasks.size() == 0) {
checkTasks.add(0, new CheckTask(String.format("the queue %s exists and is not paused",
getName()), this::checkQueueUp));
}
return checkTasks.toArray(new CheckTask[checkTasks.size()]);
}
private void checkQueueUp(final CheckContext context) throws Exception {
if (context.getManagementProxy().invokeOperation(Boolean.class,ResourceNames.QUEUE + getName(), "isPaused")) {
throw new CheckException("The queue is paused.");
}
}
private void checkQueueProduce(final CheckContext context) throws Exception {
try (Connection connection = context.getFactory().createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer queueProducer = session.createProducer(session.createQueue(getName()))) {
connection.start();
int count = 0;
while (count < produce) {
queueProducer.send(session.createTextMessage("CHECK_MESSAGE"));
count++;
}
}
}
private void checkQueueBrowse(final CheckContext context) throws Exception {
try (Connection connection = context.getFactory().createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser queueBrowser = session.createBrowser(session.createQueue(getName()))) {
connection.start();
Enumeration<Message> queueBrowserEnum = queueBrowser.getEnumeration();
if (browse == -1) {
queueBrowserEnum.hasMoreElements();
} else {
int count = 0;
while (count < browse) {
if (!queueBrowserEnum.hasMoreElements() || queueBrowserEnum.nextElement() == null) {
throw new CheckException("Insufficient messages to browse: " + count);
}
count++;
}
}
}
}
private void checkQueueConsume(final CheckContext context) throws Exception {
try (Connection connection = context.getFactory().createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer queueConsumer = session.createConsumer(session.createQueue(getName()))) {
connection.start();
if (consume == -1) {
queueConsumer.receiveNoWait();
} else {
int count = 0;
while (count < consume) {
if (queueConsumer.receive() == null) {
throw new CheckException("Insufficient messages to consume: " + count);
}
count++;
}
}
}
}
}

View File

@ -0,0 +1,420 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.cli.test;
import java.io.File;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.cli.Artemis;
import org.apache.activemq.artemis.cli.CLIException;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.Run;
import org.apache.activemq.artemis.cli.commands.check.NodeCheck;
import org.apache.activemq.artemis.cli.commands.check.QueueCheck;
import org.apache.activemq.artemis.cli.commands.tools.LockAbstract;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Test;
public class CheckTest extends CliTestBase {
final String queueName = "TEST";
@Test
public void testNodeCheckUp() throws Exception {
NodeCheck nodeCheck;
TestActionContext context;
startServer();
try {
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
Assert.assertEquals(1, nodeCheck.execute(context));
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
nodeCheck.setUp(true);
Assert.assertEquals(1, nodeCheck.execute(context));
} finally {
stopServer();
}
}
@Test
public void testNodeCheckDiskUsage() throws Exception {
NodeCheck nodeCheck;
TestActionContext context;
startServer();
try {
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
nodeCheck.setDiskUsage(-1);
Assert.assertEquals(1, nodeCheck.execute(context));
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
nodeCheck.setDiskUsage(90);
Assert.assertEquals(1, nodeCheck.execute(context));
try {
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
nodeCheck.setDiskUsage(0);
nodeCheck.execute(context);
Assert.fail("CLIException expected.");
} catch (Exception e) {
Assert.assertTrue("CLIException expected.", e instanceof CLIException);
}
} finally {
stopServer();
}
}
@Test
public void testNodeCheckMemoryUsage() throws Exception {
NodeCheck nodeCheck;
TestActionContext context;
startServer();
try {
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
nodeCheck.setMemoryUsage(90);
Assert.assertEquals(1, nodeCheck.execute(context));
try {
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
nodeCheck.setMemoryUsage(-1);
nodeCheck.execute(context);
Assert.fail("CLIException expected.");
} catch (Exception e) {
Assert.assertTrue("CLIException expected.", e instanceof CLIException);
}
} finally {
stopServer();
}
}
@Test
public void testNodeCheckTopology() throws Exception {
NodeCheck nodeCheck;
TestActionContext context;
File masterInstance = new File(temporaryFolder.getRoot(), "masterInstance");
File slaveInstance = new File(temporaryFolder.getRoot(), "slaveInstance");
Run.setEmbedded(true);
setupAuth(masterInstance);
Artemis.main("create", masterInstance.getAbsolutePath(), "--cluster-password", "artemis", "--cluster-user", "artemis", "--clustered",
"--replicated", "--host", "127.0.0.1", "--default-port", "61616", "--silent", "--no-autotune", "--no-web", "--require-login");
Artemis.main("create", slaveInstance.getAbsolutePath(), "--cluster-password", "artemis", "--cluster-user", "artemis", "--clustered",
"--replicated", "--host", "127.0.0.1", "--default-port", "61626", "--silent", "--no-autotune", "--no-web", "--require-login", "--slave");
System.setProperty("artemis.instance", masterInstance.getAbsolutePath());
Object master = Artemis.execute(false, null, masterInstance, "run");
ActiveMQServerImpl masterServer = (ActiveMQServerImpl)((Pair)master).getB();
try {
Wait.assertTrue("Master isn't active", () -> masterServer.isActive(), 10000);
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
nodeCheck.setLive(true);
Assert.assertEquals(1, nodeCheck.execute(context));
try {
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
nodeCheck.setBackup(true);
nodeCheck.execute(context);
Assert.fail("CLIException expected.");
} catch (Exception e) {
Assert.assertTrue("CLIException expected.", e instanceof CLIException);
}
LockAbstract.unlock();
Object slave = Artemis.execute(false, null, slaveInstance, "run");
ActiveMQServerImpl slaveServer = (ActiveMQServerImpl)((Pair)slave).getB();
Wait.assertTrue("Backup isn't announced", () -> slaveServer.getBackupManager() != null &&
slaveServer.getBackupManager().isStarted() && slaveServer.getBackupManager().isBackupAnnounced(), 30000);
try {
context = new TestActionContext();
nodeCheck = new NodeCheck();
nodeCheck.setUser("admin");
nodeCheck.setPassword("admin");
nodeCheck.setLive(true);
nodeCheck.setBackup(true);
nodeCheck.setPeers(2);
Assert.assertEquals(3, nodeCheck.execute(context));
} finally {
Artemis.internalExecute(null, slaveInstance, new String[] {"stop"}, ActionContext.system());
}
} finally {
stopServer();
}
}
@Test
public void testQueueCheckUp() throws Exception {
QueueCheck queueCheck;
TestActionContext context;
Object serverInstance = startServer();
try {
ActiveMQServerImpl server = (ActiveMQServerImpl)((Pair)serverInstance).getB();
try {
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setName(queueName);
queueCheck.execute(context);
Assert.fail("CLIException expected.");
} catch (Exception e) {
Assert.assertTrue("CLIException expected.", e instanceof CLIException);
}
server.createQueue(new QueueConfiguration(queueName));
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setName(queueName);
Assert.assertEquals(1, queueCheck.execute(context));
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setUp(true);
queueCheck.setName(queueName);
Assert.assertEquals(1, queueCheck.execute(context));
} finally {
stopServer();
}
}
@Test
public void testQueueCheckBrowse() throws Exception {
final int messages = 3;
QueueCheck queueCheck;
TestActionContext context;
Object serverInstance = startServer();
try {
ActiveMQServerImpl server = (ActiveMQServerImpl)((Pair)serverInstance).getB();
server.createQueue(new QueueConfiguration(queueName));
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setName(queueName);
queueCheck.setBrowse(null);
Assert.assertEquals(1, queueCheck.execute(context));
QueueControl queueControl = (QueueControl)server.getManagementService().
getResource(ResourceNames.QUEUE + queueName);
for (int i = 0; i < messages; i++) {
queueControl.sendMessage(null, Message.BYTES_TYPE, Base64.encodeBytes(
queueName.getBytes()), true, "admin", "admin");
}
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setName(queueName);
queueCheck.setBrowse(messages);
Assert.assertEquals(1, queueCheck.execute(context));
} finally {
stopServer();
}
}
@Test
public void testQueueCheckConsume() throws Exception {
final int messages = 3;
QueueCheck queueCheck;
TestActionContext context;
Object serverInstance = startServer();
try {
ActiveMQServerImpl server = (ActiveMQServerImpl)((Pair)serverInstance).getB();
server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setName(queueName);
queueCheck.setConsume(null);
Assert.assertEquals(1, queueCheck.execute(context));
QueueControl queueControl = (QueueControl)server.getManagementService().
getResource(ResourceNames.QUEUE + queueName);
for (int i = 0; i < messages; i++) {
queueControl.sendMessage(null, Message.BYTES_TYPE, Base64.encodeBytes(
queueName.getBytes()), true, "admin", "admin");
}
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setName(queueName);
queueCheck.setConsume(messages);
Assert.assertEquals(1, queueCheck.execute(context));
} finally {
stopServer();
}
}
@Test
public void testQueueCheckConsumeTimeout() throws Exception {
QueueCheck queueCheck;
TestActionContext context;
startServer();
try {
try {
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setName(queueName);
queueCheck.setConsume(1);
queueCheck.setTimeout(3000);
queueCheck.execute(context);
Assert.fail("CLIException expected.");
} catch (Exception e) {
Assert.assertTrue("CLIException expected.", e instanceof CLIException);
}
} finally {
stopServer();
}
}
@Test
public void testQueueCheckProduce() throws Exception {
final int messages = 3;
QueueCheck queueCheck;
TestActionContext context;
QueueControl queueControl;
Object serverInstance = startServer();
ActiveMQServerImpl server = (ActiveMQServerImpl)((Pair)serverInstance).getB();
try {
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setName(queueName);
queueCheck.setProduce(messages);
Assert.assertEquals(1, queueCheck.execute(context));
queueControl = (QueueControl)server.getManagementService().
getResource(ResourceNames.QUEUE + queueName);
Assert.assertEquals(messages, queueControl.getMessageCount());
} finally {
stopServer();
}
}
@Test
public void testQueueCheckProduceAndConsume() throws Exception {
final int messages = 3;
QueueCheck queueCheck;
TestActionContext context;
Object serverInstance = startServer();
ActiveMQServerImpl server = (ActiveMQServerImpl)((Pair)serverInstance).getB();
server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
try {
context = new TestActionContext();
queueCheck = new QueueCheck();
queueCheck.setUser("admin");
queueCheck.setPassword("admin");
queueCheck.setName(queueName);
queueCheck.setProduce(messages);
queueCheck.setBrowse(messages);
queueCheck.setConsume(messages);
Assert.assertEquals(3, queueCheck.execute(context));
QueueControl queueControl = (QueueControl)server.getManagementService().
getResource(ResourceNames.QUEUE + queueName);
Assert.assertEquals(0, queueControl.getMessageCount());
} finally {
stopServer();
}
}
}

View File

@ -85,13 +85,13 @@ public class CliTestBase {
LockAbstract.unlock();
}
protected void startServer() throws Exception {
protected Object startServer() throws Exception {
File rootDirectory = new File(temporaryFolder.getRoot(), "broker");
setupAuth(rootDirectory);
Run.setEmbedded(true);
Artemis.main("create", rootDirectory.getAbsolutePath(), "--silent", "--no-fsync", "--no-autotune", "--no-web", "--require-login", "--disable-persistence");
System.setProperty("artemis.instance", rootDirectory.getAbsolutePath());
Artemis.internalExecute("run");
return Artemis.internalExecute("run");
}
void setupAuth() {

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core.management;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientRequestor;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
public class ActiveMQManagementProxy implements AutoCloseable {
private final String username;
private final String password;
private final ServerLocator locator;
private ClientSessionFactory sessionFactory;
private ClientSession session;
private ClientRequestor requestor;
public ActiveMQManagementProxy(final ServerLocator locator, final String username, final String password) {
this.locator = locator;
this.username = username;
this.password = password;
}
public void start() throws Exception {
sessionFactory = locator.createSessionFactory();
session = sessionFactory.createSession(username, password, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE);
requestor = new ClientRequestor(session, ActiveMQDefaultConfiguration.getDefaultManagementAddress());
session.start();
}
public <T> T invokeOperation(final Class<T> type, final String resourceName, final String operationName, final Object... operationArgs) throws Exception {
ClientMessage request = session.createMessage(false);
ManagementHelper.putOperationInvocation(request, resourceName, operationName, operationArgs);
ClientMessage reply = requestor.request(request);
if (ManagementHelper.hasOperationSucceeded(reply)) {
return (T)ManagementHelper.getResult(reply, type);
} else {
throw new Exception("Failed to invoke " + resourceName + "." + operationName + ". Reason: " + ManagementHelper.getResult(reply, String.class));
}
}
public void stop() throws ActiveMQException {
session.stop();
}
@Override
public void close() throws Exception {
requestor.close();
session.close();
sessionFactory.close();
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core.management;
import javax.json.JsonArray;
import javax.json.JsonObject;
import org.apache.activemq.artemis.api.core.JsonUtil;
/**
* Helper class to create Java Objects from the
* JSON serialization returned by {@link ActiveMQServerControl#listNetworkTopology()}.
*/
public class NodeInfo {
private final String id;
private final String live;
private final String backup;
public String getId() {
return id;
}
public String getLive() {
return live;
}
public String getBackup() {
return backup;
}
/**
* Returns an array of NodeInfo corresponding to the JSON serialization returned
* by {@link ActiveMQServerControl#listNetworkTopology()}.
*/
public static NodeInfo[] from(final String jsonString) throws Exception {
JsonArray array = JsonUtil.readJsonArray(jsonString);
NodeInfo[] nodes = new NodeInfo[array.size()];
for (int i = 0; i < array.size(); i++) {
JsonObject nodeObject = array.getJsonObject(i);
NodeInfo role = new NodeInfo(nodeObject.getString("nodeID"), nodeObject.getString("live", null), nodeObject.getString("backup", null));
nodes[i] = role;
}
return nodes;
}
public NodeInfo(String id, String live, String backup) {
this.id = id;
this.live = live;
this.backup = backup;
}
}