This closes #50 Adding Browse command and other tweaks to the CLI
This commit is contained in:
commit
ab00b7c573
|
@ -16,12 +16,14 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.cli;
|
package org.apache.activemq.artemis.cli;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
|
||||||
import io.airlift.airline.Cli;
|
import io.airlift.airline.Cli;
|
||||||
import org.apache.activemq.artemis.cli.commands.Action;
|
import org.apache.activemq.artemis.cli.commands.Action;
|
||||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||||
|
import org.apache.activemq.artemis.cli.commands.Browse;
|
||||||
import org.apache.activemq.artemis.cli.commands.Consumer;
|
import org.apache.activemq.artemis.cli.commands.Consumer;
|
||||||
import org.apache.activemq.artemis.cli.commands.Create;
|
import org.apache.activemq.artemis.cli.commands.Create;
|
||||||
import org.apache.activemq.artemis.cli.commands.HelpAction;
|
import org.apache.activemq.artemis.cli.commands.HelpAction;
|
||||||
|
@ -41,12 +43,60 @@ public class Artemis
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public static void main(String... args) throws Exception
|
public static void main(String... args) throws Exception
|
||||||
{
|
{
|
||||||
String instance = System.getProperty("artemis.instance");
|
try
|
||||||
|
{
|
||||||
|
execute(args);
|
||||||
|
}
|
||||||
|
catch (ConfigurationException configException)
|
||||||
|
{
|
||||||
|
System.err.println(configException.getMessage());
|
||||||
|
System.out.println();
|
||||||
|
System.out.println("Configuration should be specified as 'scheme:location'. Default configuration is 'xml:${ARTEMIS_INSTANCE}/etc/bootstrap.xml'");
|
||||||
|
}
|
||||||
|
catch (RuntimeException re)
|
||||||
|
{
|
||||||
|
System.err.println(re.getMessage());
|
||||||
|
System.out.println();
|
||||||
|
|
||||||
|
Cli<Action> parser = builder(null).build();
|
||||||
|
|
||||||
|
parser.parse("help").execute(ActionContext.system());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Object execute(String... args) throws Exception
|
||||||
|
{
|
||||||
|
return execute(null, null, args);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Object execute(File artemisHome, File artemisInstance, String... args) throws Exception
|
||||||
|
{
|
||||||
|
Action action = builder(artemisInstance).build().parse(args);
|
||||||
|
action.setHomeValues(artemisHome, artemisInstance);
|
||||||
|
|
||||||
|
if (action.isVerbose())
|
||||||
|
{
|
||||||
|
System.out.print("Executing " + action.getClass().getName() + " ");
|
||||||
|
for (String arg : args)
|
||||||
|
{
|
||||||
|
System.out.print(arg + " ");
|
||||||
|
}
|
||||||
|
System.out.println();
|
||||||
|
System.out.println("Home::" + action.getBrokerHome() + ", Instance::" + action.getBrokerInstance());
|
||||||
|
}
|
||||||
|
|
||||||
|
return action.execute(ActionContext.system());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Cli.CliBuilder<Action> builder(File artemisInstance)
|
||||||
|
{
|
||||||
|
String instance = artemisInstance != null ? artemisInstance.getAbsolutePath() : System.getProperty("artemis.instance");
|
||||||
Cli.CliBuilder<Action> builder = Cli.<Action>builder("artemis")
|
Cli.CliBuilder<Action> builder = Cli.<Action>builder("artemis")
|
||||||
.withDescription("ActiveMQ Artemis Command Line")
|
.withDescription("ActiveMQ Artemis Command Line")
|
||||||
.withCommand(HelpAction.class)
|
.withCommand(HelpAction.class)
|
||||||
.withCommand(Producer.class)
|
.withCommand(Producer.class)
|
||||||
.withCommand(Consumer.class)
|
.withCommand(Consumer.class)
|
||||||
|
.withCommand(Browse.class)
|
||||||
.withDefaultCommand(HelpAction.class);
|
.withDefaultCommand(HelpAction.class);
|
||||||
|
|
||||||
|
|
||||||
|
@ -64,25 +114,9 @@ public class Artemis
|
||||||
builder = builder.withCommand(Create.class);
|
builder = builder.withCommand(Create.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
Cli<Action> parser = builder.build();
|
return builder;
|
||||||
try
|
|
||||||
{
|
|
||||||
parser.parse(args).execute(ActionContext.system());
|
|
||||||
}
|
|
||||||
catch (ConfigurationException configException)
|
|
||||||
{
|
|
||||||
System.err.println(configException.getMessage());
|
|
||||||
System.out.println();
|
|
||||||
System.out.println("Configuration should be specified as 'scheme:location'. Default configuration is 'xml:${ARTEMIS_INSTANCE}/etc/bootstrap.xml'");
|
|
||||||
}
|
|
||||||
catch (RuntimeException re)
|
|
||||||
{
|
|
||||||
System.err.println(re.getMessage());
|
|
||||||
System.out.println();
|
|
||||||
parser.parse("help").execute(ActionContext.system());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void printBanner() throws Exception
|
public static void printBanner() throws Exception
|
||||||
{
|
{
|
||||||
|
|
|
@ -16,9 +16,18 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.cli.commands;
|
package org.apache.activemq.artemis.cli.commands;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
public interface Action
|
public interface Action
|
||||||
{
|
{
|
||||||
|
boolean isVerbose();
|
||||||
|
|
||||||
|
void setHomeValues(File brokerHome, File brokerInstance);
|
||||||
|
|
||||||
Object execute(ActionContext context) throws Exception;
|
Object execute(ActionContext context) throws Exception;
|
||||||
|
|
||||||
|
String getBrokerInstance();
|
||||||
|
|
||||||
|
String getBrokerHome();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,11 +16,89 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.cli.commands;
|
package org.apache.activemq.artemis.cli.commands;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
|
import io.airlift.airline.Option;
|
||||||
|
|
||||||
public abstract class ActionAbstract implements Action
|
public abstract class ActionAbstract implements Action
|
||||||
{
|
{
|
||||||
|
|
||||||
|
@Option(name = "--verbose", description = "Adds more information on the execution")
|
||||||
|
boolean verbose;
|
||||||
|
|
||||||
|
private String brokerInstance;
|
||||||
|
|
||||||
|
private String brokerHome;
|
||||||
|
|
||||||
protected ActionContext context;
|
protected ActionContext context;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isVerbose()
|
||||||
|
{
|
||||||
|
return verbose;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setHomeValues(File brokerHome, File brokerInstance)
|
||||||
|
{
|
||||||
|
if (brokerHome != null)
|
||||||
|
{
|
||||||
|
this.brokerHome = brokerHome.getAbsolutePath();
|
||||||
|
}
|
||||||
|
if (brokerInstance != null)
|
||||||
|
{
|
||||||
|
this.brokerInstance = brokerInstance.getAbsolutePath();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getBrokerInstance()
|
||||||
|
{
|
||||||
|
if (brokerInstance == null)
|
||||||
|
{
|
||||||
|
/* We use File URI for locating files. The ARTEMIS_HOME variable is used to determine file paths. For Windows
|
||||||
|
the ARTEMIS_HOME variable will include back slashes (An invalid file URI character path separator). For this
|
||||||
|
reason we overwrite the ARTEMIS_HOME variable with backslashes replaced with forward slashes. */
|
||||||
|
brokerInstance = System.getProperty("artemis.instance");
|
||||||
|
if (brokerInstance != null)
|
||||||
|
{
|
||||||
|
brokerInstance = brokerInstance.replace("\\", "/");
|
||||||
|
System.setProperty("artemis.instance", brokerInstance);
|
||||||
|
}
|
||||||
|
if (brokerInstance == null)
|
||||||
|
{
|
||||||
|
// if still null we will try to improvise with "."
|
||||||
|
brokerInstance = ".";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return brokerInstance;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public String getBrokerHome()
|
||||||
|
{
|
||||||
|
if (brokerHome == null)
|
||||||
|
{
|
||||||
|
/* We use File URI for locating files. The ARTEMIS_HOME variable is used to determine file paths. For Windows
|
||||||
|
the ARTEMIS_HOME variable will include back slashes (An invalid file URI character path separator). For this
|
||||||
|
reason we overwrite the ARTEMIS_HOME variable with backslashes replaced with forward slashes. */
|
||||||
|
brokerHome = System.getProperty("artemis.home");
|
||||||
|
if (brokerHome != null)
|
||||||
|
{
|
||||||
|
brokerHome = brokerHome.replace("\\", "/");
|
||||||
|
System.setProperty("artemis.home", brokerHome);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (brokerHome == null)
|
||||||
|
{
|
||||||
|
// if still null we will try to improvise with "."
|
||||||
|
brokerHome = ".";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return brokerHome;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public Object execute(ActionContext context) throws Exception
|
public Object execute(ActionContext context) throws Exception
|
||||||
{
|
{
|
||||||
this.context = context;
|
this.context = context;
|
||||||
|
|
|
@ -0,0 +1,84 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.cli.commands;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import io.airlift.airline.Command;
|
||||||
|
import io.airlift.airline.Option;
|
||||||
|
import org.apache.activemq.artemis.cli.commands.util.ConsumerThread;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||||
|
|
||||||
|
@Command(name = "browse", description = "It will send consume messages from an instance")
|
||||||
|
public class Browse extends DestAbstract
|
||||||
|
{
|
||||||
|
@Option(name = "--filter", description = "filter to be used with the consumer")
|
||||||
|
String filter;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object execute(ActionContext context) throws Exception
|
||||||
|
{
|
||||||
|
super.execute(context);
|
||||||
|
|
||||||
|
System.out.println("Consumer:: filter = " + filter);
|
||||||
|
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL, user, password);
|
||||||
|
|
||||||
|
Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.QUEUE_TYPE);
|
||||||
|
try (Connection connection = factory.createConnection())
|
||||||
|
{
|
||||||
|
ConsumerThread[] threadsArray = new ConsumerThread[threads];
|
||||||
|
for (int i = 0; i < threads; i++)
|
||||||
|
{
|
||||||
|
Session session;
|
||||||
|
if (txBatchSize > 0)
|
||||||
|
{
|
||||||
|
session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
}
|
||||||
|
threadsArray[i] = new ConsumerThread(session, dest, i);
|
||||||
|
|
||||||
|
threadsArray[i].setVerbose(verbose).setSleep(sleep).setMessageCount(messageCount).setFilter(filter).setBrowse(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (ConsumerThread thread : threadsArray)
|
||||||
|
{
|
||||||
|
thread.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
int received = 0;
|
||||||
|
|
||||||
|
for (ConsumerThread thread : threadsArray)
|
||||||
|
{
|
||||||
|
thread.join();
|
||||||
|
received += thread.getReceived();
|
||||||
|
}
|
||||||
|
|
||||||
|
return received;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -50,10 +50,6 @@ public abstract class Configurable extends ActionAbstract
|
||||||
|
|
||||||
private BrokerDTO brokerDTO = null;
|
private BrokerDTO brokerDTO = null;
|
||||||
|
|
||||||
private String brokerInstance;
|
|
||||||
|
|
||||||
private String brokerHome;
|
|
||||||
|
|
||||||
private FileConfiguration fileConfiguration;
|
private FileConfiguration fileConfiguration;
|
||||||
|
|
||||||
protected void treatError(Exception e, String group, String command)
|
protected void treatError(Exception e, String group, String command)
|
||||||
|
@ -65,23 +61,6 @@ public abstract class Configurable extends ActionAbstract
|
||||||
helpGroup(group, command);
|
helpGroup(group, command);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getBrokerInstance()
|
|
||||||
{
|
|
||||||
if (brokerInstance == null)
|
|
||||||
{
|
|
||||||
/* We use File URI for locating files. The ARTEMIS_HOME variable is used to determine file paths. For Windows
|
|
||||||
the ARTEMIS_HOME variable will include back slashes (An invalid file URI character path separator). For this
|
|
||||||
reason we overwrite the ARTEMIS_HOME variable with backslashes replaced with forward slashes. */
|
|
||||||
brokerInstance = System.getProperty("artemis.instance");
|
|
||||||
if (brokerInstance != null)
|
|
||||||
{
|
|
||||||
brokerInstance = brokerInstance.replace("\\", "/");
|
|
||||||
System.setProperty("artemis.instance", brokerInstance);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return brokerInstance;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void helpGroup(String groupName, String commandName)
|
protected void helpGroup(String groupName, String commandName)
|
||||||
{
|
{
|
||||||
for (CommandGroupMetadata group: global.getCommandGroups())
|
for (CommandGroupMetadata group: global.getCommandGroups())
|
||||||
|
@ -100,23 +79,6 @@ public abstract class Configurable extends ActionAbstract
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getBrokerHome()
|
|
||||||
{
|
|
||||||
if (brokerHome == null)
|
|
||||||
{
|
|
||||||
/* We use File URI for locating files. The ARTEMIS_HOME variable is used to determine file paths. For Windows
|
|
||||||
the ARTEMIS_HOME variable will include back slashes (An invalid file URI character path separator). For this
|
|
||||||
reason we overwrite the ARTEMIS_HOME variable with backslashes replaced with forward slashes. */
|
|
||||||
brokerHome = System.getProperty("artemis.home");
|
|
||||||
if (brokerHome != null)
|
|
||||||
{
|
|
||||||
brokerHome = brokerHome.replace("\\", "/");
|
|
||||||
System.setProperty("artemis.home", brokerHome);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return brokerHome;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
protected FileConfiguration getFileConfiguration() throws Exception
|
protected FileConfiguration getFileConfiguration() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -144,6 +106,8 @@ public abstract class Configurable extends ActionAbstract
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fileConfiguration.setBrokerInstance(new File(getBrokerInstance()));
|
||||||
|
|
||||||
return fileConfiguration;
|
return fileConfiguration;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,11 +41,16 @@ public class Consumer extends DestAbstract
|
||||||
@Option(name = "--receiveTimeout", description = "Time used on receive(timeout)")
|
@Option(name = "--receiveTimeout", description = "Time used on receive(timeout)")
|
||||||
int receiveTimeout;
|
int receiveTimeout;
|
||||||
|
|
||||||
|
@Option(name = "--filter", description = "filter to be used with the consumer")
|
||||||
|
String filter;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object execute(ActionContext context) throws Exception
|
public Object execute(ActionContext context) throws Exception
|
||||||
{
|
{
|
||||||
super.execute(context);
|
super.execute(context);
|
||||||
|
|
||||||
|
System.out.println("Consumer:: filter = " + filter);
|
||||||
|
|
||||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL, user, password);
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL, user, password);
|
||||||
|
|
||||||
Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.QUEUE_TYPE);
|
Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.QUEUE_TYPE);
|
||||||
|
@ -66,7 +71,7 @@ public class Consumer extends DestAbstract
|
||||||
threadsArray[i] = new ConsumerThread(session, dest, i);
|
threadsArray[i] = new ConsumerThread(session, dest, i);
|
||||||
|
|
||||||
threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull)
|
threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull)
|
||||||
.setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout);
|
.setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout).setFilter(filter).setBrowse(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (ConsumerThread thread : threadsArray)
|
for (ConsumerThread thread : threadsArray)
|
||||||
|
@ -76,13 +81,16 @@ public class Consumer extends DestAbstract
|
||||||
|
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
||||||
|
int received = 0;
|
||||||
|
|
||||||
for (ConsumerThread thread : threadsArray)
|
for (ConsumerThread thread : threadsArray)
|
||||||
{
|
{
|
||||||
thread.join();
|
thread.join();
|
||||||
|
received += thread.getReceived();
|
||||||
|
}
|
||||||
|
|
||||||
|
return received;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -248,12 +248,7 @@ public class Create extends InputAbstract
|
||||||
{
|
{
|
||||||
if (home == null)
|
if (home == null)
|
||||||
{
|
{
|
||||||
String homeStr = System.getProperty("artemis.home");
|
home = new File(getBrokerHome());
|
||||||
if (homeStr == null)
|
|
||||||
{
|
|
||||||
homeStr = ".";
|
|
||||||
}
|
|
||||||
home = new File(homeStr);
|
|
||||||
}
|
}
|
||||||
return home;
|
return home;
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,9 +36,6 @@ public class DestAbstract extends ActionAbstract
|
||||||
@Option(name = "--password", description = "Password used to connect")
|
@Option(name = "--password", description = "Password used to connect")
|
||||||
String password;
|
String password;
|
||||||
|
|
||||||
@Option(name = "--verbose", description = "It will print messages individually")
|
|
||||||
boolean verbose;
|
|
||||||
|
|
||||||
@Option(name = "--sleep", description = "Time wait between each message")
|
@Option(name = "--sleep", description = "Time wait between each message")
|
||||||
int sleep = 0;
|
int sleep = 0;
|
||||||
|
|
||||||
|
|
|
@ -16,10 +16,35 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.cli.commands;
|
package org.apache.activemq.artemis.cli.commands;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
import io.airlift.airline.Help;
|
import io.airlift.airline.Help;
|
||||||
|
|
||||||
public class HelpAction extends Help implements Action
|
public class HelpAction extends Help implements Action
|
||||||
{
|
{
|
||||||
|
@Override
|
||||||
|
public boolean isVerbose()
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setHomeValues(File brokerHome, File brokerInstance)
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getBrokerInstance()
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getBrokerHome()
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object execute(ActionContext context) throws Exception
|
public Object execute(ActionContext context) throws Exception
|
||||||
|
|
|
@ -72,7 +72,8 @@ public class Producer extends DestAbstract
|
||||||
|
|
||||||
threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent).
|
threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent).
|
||||||
setMessageSize(messageSize).setTextMessageSize(textMessageSize).
|
setMessageSize(messageSize).setTextMessageSize(textMessageSize).
|
||||||
setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize);
|
setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize).
|
||||||
|
setMessageCount(messageCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (ProducerThread thread : threadsArray)
|
for (ProducerThread thread : threadsArray)
|
||||||
|
@ -80,13 +81,15 @@ public class Producer extends DestAbstract
|
||||||
thread.start();
|
thread.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int messagesProduced = 0;
|
||||||
for (ProducerThread thread : threadsArray)
|
for (ProducerThread thread : threadsArray)
|
||||||
{
|
{
|
||||||
thread.join();
|
thread.join();
|
||||||
|
messagesProduced += thread.getSentCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
return messagesProduced;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
package org.apache.activemq.artemis.cli.commands.tools;
|
package org.apache.activemq.artemis.cli.commands.tools;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -27,6 +28,30 @@ import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||||
public class HelpData extends Help implements Action
|
public class HelpData extends Help implements Action
|
||||||
{
|
{
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isVerbose()
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setHomeValues(File brokerHome, File brokerInstance)
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getBrokerInstance()
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getBrokerHome()
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object execute(ActionContext context) throws Exception
|
public Object execute(ActionContext context) throws Exception
|
||||||
{
|
{
|
||||||
|
|
|
@ -48,7 +48,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
||||||
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
||||||
import org.apache.activemq.artemis.cli.commands.Action;
|
import org.apache.activemq.artemis.cli.commands.ActionAbstract;
|
||||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||||
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
|
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
|
||||||
|
@ -63,7 +63,7 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||||
* for speed and simplicity.
|
* for speed and simplicity.
|
||||||
*/
|
*/
|
||||||
@Command(name = "imp", description = "Import all message-data using an XML that could be interpreted by any system.")
|
@Command(name = "imp", description = "Import all message-data using an XML that could be interpreted by any system.")
|
||||||
public final class XmlDataImporter implements Action
|
public final class XmlDataImporter extends ActionAbstract
|
||||||
{
|
{
|
||||||
// Constants -----------------------------------------------------
|
// Constants -----------------------------------------------------
|
||||||
|
|
||||||
|
|
|
@ -22,9 +22,12 @@ import javax.jms.Destination;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
import javax.jms.QueueBrowser;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
import javax.jms.Topic;
|
import javax.jms.Topic;
|
||||||
|
import java.util.Enumeration;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
public class ConsumerThread extends Thread
|
public class ConsumerThread extends Thread
|
||||||
|
@ -39,6 +42,9 @@ public class ConsumerThread extends Thread
|
||||||
int sleep;
|
int sleep;
|
||||||
int batchSize;
|
int batchSize;
|
||||||
boolean verbose;
|
boolean verbose;
|
||||||
|
boolean browse;
|
||||||
|
|
||||||
|
String filter;
|
||||||
|
|
||||||
int received = 0;
|
int received = 0;
|
||||||
int transactions = 0;
|
int transactions = 0;
|
||||||
|
@ -55,6 +61,102 @@ public class ConsumerThread extends Thread
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run()
|
public void run()
|
||||||
|
{
|
||||||
|
if (browse)
|
||||||
|
{
|
||||||
|
browse();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
consume();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void browse()
|
||||||
|
{
|
||||||
|
running = true;
|
||||||
|
QueueBrowser consumer = null;
|
||||||
|
String threadName = Thread.currentThread().getName();
|
||||||
|
System.out.println(threadName + " wait until " + messageCount + " messages are consumed");
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (filter != null)
|
||||||
|
{
|
||||||
|
consumer = session.createBrowser((Queue)destination, filter);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
consumer = session.createBrowser((Queue)destination);
|
||||||
|
}
|
||||||
|
Enumeration<Message> enumBrowse = consumer.getEnumeration();
|
||||||
|
|
||||||
|
while (enumBrowse.hasMoreElements())
|
||||||
|
{
|
||||||
|
Message msg = enumBrowse.nextElement();
|
||||||
|
if (msg != null)
|
||||||
|
{
|
||||||
|
System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
|
||||||
|
|
||||||
|
if (verbose)
|
||||||
|
{
|
||||||
|
System.out.println("..." + msg);
|
||||||
|
}
|
||||||
|
if (bytesAsText && (msg instanceof BytesMessage))
|
||||||
|
{
|
||||||
|
long length = ((BytesMessage) msg).getBodyLength();
|
||||||
|
byte[] bytes = new byte[(int) length];
|
||||||
|
((BytesMessage) msg).readBytes(bytes);
|
||||||
|
System.out.println("Message:" + msg);
|
||||||
|
}
|
||||||
|
received++;
|
||||||
|
|
||||||
|
if (received >= messageCount)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sleep > 0)
|
||||||
|
{
|
||||||
|
Thread.sleep(sleep);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
consumer.close();
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
if (finished != null)
|
||||||
|
{
|
||||||
|
finished.countDown();
|
||||||
|
}
|
||||||
|
if (consumer != null)
|
||||||
|
{
|
||||||
|
System.out.println(threadName + " Consumed: " + this.getReceived() + " messages");
|
||||||
|
try
|
||||||
|
{
|
||||||
|
consumer.close();
|
||||||
|
}
|
||||||
|
catch (JMSException e)
|
||||||
|
{
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
System.out.println(threadName + " Consumer thread finished");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void consume()
|
||||||
{
|
{
|
||||||
running = true;
|
running = true;
|
||||||
MessageConsumer consumer = null;
|
MessageConsumer consumer = null;
|
||||||
|
@ -63,19 +165,37 @@ public class ConsumerThread extends Thread
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (durable && destination instanceof Topic)
|
if (durable && destination instanceof Topic)
|
||||||
|
{
|
||||||
|
if (filter != null)
|
||||||
|
{
|
||||||
|
consumer = session.createDurableSubscriber((Topic) destination, getName(), filter, false);
|
||||||
|
}
|
||||||
|
else
|
||||||
{
|
{
|
||||||
consumer = session.createDurableSubscriber((Topic) destination, getName());
|
consumer = session.createDurableSubscriber((Topic) destination, getName());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (filter != null)
|
||||||
|
{
|
||||||
|
consumer = session.createConsumer(destination, filter);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
consumer = session.createConsumer(destination);
|
consumer = session.createConsumer(destination);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
while (running && received < messageCount)
|
while (running && received < messageCount)
|
||||||
{
|
{
|
||||||
Message msg = consumer.receive(receiveTimeOut);
|
Message msg = consumer.receive(receiveTimeOut);
|
||||||
if (msg != null)
|
if (msg != null)
|
||||||
{
|
{
|
||||||
System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
|
System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
|
||||||
|
if (verbose)
|
||||||
|
{
|
||||||
|
System.out.println("..." + msg);
|
||||||
|
}
|
||||||
if (bytesAsText && (msg instanceof BytesMessage))
|
if (bytesAsText && (msg instanceof BytesMessage))
|
||||||
{
|
{
|
||||||
long length = ((BytesMessage) msg).getBodyLength();
|
long length = ((BytesMessage) msg).getBodyLength();
|
||||||
|
@ -265,4 +385,26 @@ public class ConsumerThread extends Thread
|
||||||
this.bytesAsText = bytesAsText;
|
this.bytesAsText = bytesAsText;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getFilter()
|
||||||
|
{
|
||||||
|
return filter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ConsumerThread setFilter(String filter)
|
||||||
|
{
|
||||||
|
this.filter = filter;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isBrowse()
|
||||||
|
{
|
||||||
|
return browse;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ConsumerThread setBrowse(boolean browse)
|
||||||
|
{
|
||||||
|
this.browse = browse;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,11 +16,17 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.test;
|
package org.apache.activemq.artemis.test;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.cli.Artemis;
|
import org.apache.activemq.artemis.cli.Artemis;
|
||||||
import org.apache.activemq.artemis.cli.commands.Run;
|
import org.apache.activemq.artemis.cli.commands.Run;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
|
@ -70,10 +76,45 @@ public class ArtemisTest
|
||||||
System.setProperty("artemis.instance", temporaryFolder.getRoot().getAbsolutePath());
|
System.setProperty("artemis.instance", temporaryFolder.getRoot().getAbsolutePath());
|
||||||
// Some exceptions may happen on the initialization, but they should be ok on start the basic core protocol
|
// Some exceptions may happen on the initialization, but they should be ok on start the basic core protocol
|
||||||
Artemis.main("run");
|
Artemis.main("run");
|
||||||
Artemis.main("produce", "--txSize", "500");
|
Assert.assertEquals(Integer.valueOf(70), Artemis.execute("produce", "--txSize", "50", "--messageCount", "70", "--verbose"));
|
||||||
Artemis.main("consume", "--txSize", "500", "--verbose");
|
Assert.assertEquals(Integer.valueOf(70), Artemis.execute("consume", "--txSize", "50", "--verbose", "--breakOnNull", "--receiveTimeout", "100"));
|
||||||
Artemis.main("stop");
|
|
||||||
Artemis.main("data", "print");
|
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||||
|
Connection connection = cf.createConnection();
|
||||||
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
MessageProducer producer = session.createProducer(ActiveMQDestination.createDestination("queue://TEST", ActiveMQDestination.QUEUE_TYPE));
|
||||||
|
|
||||||
|
TextMessage message = session.createTextMessage("Banana");
|
||||||
|
message.setStringProperty("fruit", "banana");
|
||||||
|
producer.send(message);
|
||||||
|
|
||||||
|
for (int i = 0; i < 100; i++)
|
||||||
|
{
|
||||||
|
message = session.createTextMessage("orange");
|
||||||
|
message.setStringProperty("fruit", "orange");
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
cf.close();
|
||||||
|
|
||||||
|
Assert.assertEquals(Integer.valueOf(1), Artemis.execute("browse", "--txSize", "50", "--verbose", "--filter", "fruit='banana'"));
|
||||||
|
|
||||||
|
Assert.assertEquals(Integer.valueOf(100), Artemis.execute("browse", "--txSize", "50", "--verbose", "--filter", "fruit='orange'"));
|
||||||
|
|
||||||
|
Assert.assertEquals(Integer.valueOf(101), Artemis.execute("browse", "--txSize", "50", "--verbose"));
|
||||||
|
|
||||||
|
// should only receive 10 messages on browse as I'm setting messageCount=10
|
||||||
|
Assert.assertEquals(Integer.valueOf(10), Artemis.execute("browse", "--txSize", "50", "--verbose", "--messageCount", "10"));
|
||||||
|
|
||||||
|
// Nothing was consumed until here as it was only browsing, check it's receiving again
|
||||||
|
Assert.assertEquals(Integer.valueOf(1), Artemis.execute("consume", "--txSize", "50", "--verbose", "--breakOnNull", "--receiveTimeout", "100", "--filter", "fruit='banana'"));
|
||||||
|
|
||||||
|
// Checking it was acked before
|
||||||
|
Assert.assertEquals(Integer.valueOf(100), Artemis.execute("consume", "--txSize", "50", "--verbose", "--breakOnNull", "--receiveTimeout", "100"));
|
||||||
|
|
||||||
|
Artemis.execute("stop");
|
||||||
Assert.assertTrue(Run.latchRunning.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(Run.latchRunning.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,15 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.jms.client;
|
package org.apache.activemq.artemis.jms.client;
|
||||||
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.ObjectInputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Enumeration;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import javax.jms.DeliveryMode;
|
import javax.jms.DeliveryMode;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
import javax.jms.IllegalStateException;
|
import javax.jms.IllegalStateException;
|
||||||
|
@ -34,6 +25,14 @@ import javax.jms.JMSRuntimeException;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageFormatException;
|
import javax.jms.MessageFormatException;
|
||||||
import javax.jms.MessageNotWriteableException;
|
import javax.jms.MessageNotWriteableException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.ObjectInputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Enumeration;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
@ -969,6 +968,7 @@ public class ActiveMQMessage implements javax.jms.Message
|
||||||
sb.append(getJMSMessageID());
|
sb.append(getJMSMessageID());
|
||||||
sb.append("]:");
|
sb.append("]:");
|
||||||
sb.append(message.isDurable() ? "PERSISTENT" : "NON-PERSISTENT");
|
sb.append(message.isDurable() ? "PERSISTENT" : "NON-PERSISTENT");
|
||||||
|
sb.append("/" + message.toString());
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -889,11 +889,11 @@ public interface Configuration
|
||||||
/**
|
/**
|
||||||
* Set the Artemis instance relative folder for data and stuff.
|
* Set the Artemis instance relative folder for data and stuff.
|
||||||
*/
|
*/
|
||||||
void setArtemisInstance(File directory);
|
void setBrokerInstance(File directory);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the Artemis instance relative folder for data and stuff.
|
* Set the Artemis instance relative folder for data and stuff.
|
||||||
*/
|
*/
|
||||||
File getArtemisInstance();
|
File getBrokerInstance();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1103,7 +1103,7 @@ public class ConfigurationImpl implements Configuration, Serializable
|
||||||
return this.connectorServiceConfigurations;
|
return this.connectorServiceConfigurations;
|
||||||
}
|
}
|
||||||
|
|
||||||
public File getArtemisInstance()
|
public File getBrokerInstance()
|
||||||
{
|
{
|
||||||
if (artemisInstance != null)
|
if (artemisInstance != null)
|
||||||
{
|
{
|
||||||
|
@ -1122,7 +1122,7 @@ public class ConfigurationImpl implements Configuration, Serializable
|
||||||
return artemisInstance;
|
return artemisInstance;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setArtemisInstance(File directory)
|
public void setBrokerInstance(File directory)
|
||||||
{
|
{
|
||||||
this.artemisInstance = directory;
|
this.artemisInstance = directory;
|
||||||
}
|
}
|
||||||
|
@ -1627,7 +1627,7 @@ public class ConfigurationImpl implements Configuration, Serializable
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// Resolve wont work without "/" as the last character
|
// Resolve wont work without "/" as the last character
|
||||||
URI artemisHome = new URI(getArtemisInstance().toURI() + "/");
|
URI artemisHome = new URI(getBrokerInstance().toURI() + "/");
|
||||||
URI relative = artemisHome.resolve(subFolder);
|
URI relative = artemisHome.resolve(subFolder);
|
||||||
return new File(relative.getPath());
|
return new File(relative.getPath());
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,7 +50,7 @@ public class RelativePathTest extends ActiveMQTestBase
|
||||||
|
|
||||||
File instanceHome = new File(getTemporaryDir(), "artemisHome");
|
File instanceHome = new File(getTemporaryDir(), "artemisHome");
|
||||||
|
|
||||||
configuration.setArtemisInstance(instanceHome);
|
configuration.setBrokerInstance(instanceHome);
|
||||||
|
|
||||||
// the journal should be outside of the artemisInstance on this case
|
// the journal should be outside of the artemisInstance on this case
|
||||||
File journalOutside = new File(getTemporaryDir(), "./journalOut").getAbsoluteFile();
|
File journalOutside = new File(getTemporaryDir(), "./journalOut").getAbsoluteFile();
|
||||||
|
@ -89,7 +89,7 @@ public class RelativePathTest extends ActiveMQTestBase
|
||||||
|
|
||||||
System.out.println("InstanceHome->" + instanceHome);
|
System.out.println("InstanceHome->" + instanceHome);
|
||||||
instanceHome.mkdirs();
|
instanceHome.mkdirs();
|
||||||
configuration.setArtemisInstance(instanceHome);
|
configuration.setBrokerInstance(instanceHome);
|
||||||
|
|
||||||
configuration.setJournalDirectory("./data");
|
configuration.setJournalDirectory("./data");
|
||||||
configuration.setPagingDirectory("./paging");
|
configuration.setPagingDirectory("./paging");
|
||||||
|
|
Loading…
Reference in New Issue