From aa4a06329f91b37e64df6ffb40ca9bf891bf38f0 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 25 Jun 2015 10:22:58 -0400 Subject: [PATCH] [ARTEMIS-144] adding Browse and a few other improvements --- .../apache/activemq/artemis/cli/Artemis.java | 88 +++++++---- .../activemq/artemis/cli/commands/Action.java | 9 ++ .../artemis/cli/commands/ActionAbstract.java | 78 ++++++++++ .../activemq/artemis/cli/commands/Browse.java | 84 ++++++++++ .../artemis/cli/commands/Configurable.java | 40 +---- .../artemis/cli/commands/Consumer.java | 14 +- .../activemq/artemis/cli/commands/Create.java | 7 +- .../artemis/cli/commands/DestAbstract.java | 3 - .../artemis/cli/commands/HelpAction.java | 25 +++ .../artemis/cli/commands/Producer.java | 9 +- .../artemis/cli/commands/tools/HelpData.java | 25 +++ .../cli/commands/tools/XmlDataImporter.java | 4 +- .../cli/commands/util/ConsumerThread.java | 146 +++++++++++++++++- .../activemq/artemis/test/ArtemisTest.java | 49 +++++- .../artemis/jms/client/ActiveMQMessage.java | 18 +-- .../artemis/core/config/Configuration.java | 4 +- .../core/config/impl/ConfigurationImpl.java | 6 +- .../integration/journal/RelativePathTest.java | 4 +- 18 files changed, 509 insertions(+), 104 deletions(-) create mode 100644 artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Browse.java diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java index 636d7298d8..1f4710e4c1 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java @@ -16,12 +16,14 @@ */ package org.apache.activemq.artemis.cli; +import java.io.File; import java.io.InputStream; import java.io.OutputStream; import io.airlift.airline.Cli; import org.apache.activemq.artemis.cli.commands.Action; import org.apache.activemq.artemis.cli.commands.ActionContext; +import org.apache.activemq.artemis.cli.commands.Browse; import org.apache.activemq.artemis.cli.commands.Consumer; import org.apache.activemq.artemis.cli.commands.Create; import org.apache.activemq.artemis.cli.commands.HelpAction; @@ -39,35 +41,11 @@ import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter; public class Artemis { @SuppressWarnings("unchecked") - public static void main(String...args) throws Exception + public static void main(String... args) throws Exception { - String instance = System.getProperty("artemis.instance"); - Cli.CliBuilder builder = Cli.builder("artemis") - .withDescription("ActiveMQ Artemis Command Line") - .withCommand(HelpAction.class) - .withCommand(Producer.class) - .withCommand(Consumer.class) - .withDefaultCommand(HelpAction.class); - - - builder.withGroup("data") - .withDescription("data tools group (print|exp|imp|exp|encode|decode) (example ./artemis data print)"). - withDefaultCommand(HelpData.class).withCommands(PrintData.class, XmlDataExporter.class, - XmlDataImporter.class,DecodeJournal.class, EncodeJournal.class); - - if (instance != null) - { - builder = builder.withCommands(Run.class, Stop.class, Kill.class); - } - else - { - builder = builder.withCommand(Create.class); - } - - Cli parser = builder.build(); try { - parser.parse(args).execute(ActionContext.system()); + execute(args); } catch (ConfigurationException configException) { @@ -79,11 +57,67 @@ public class Artemis { System.err.println(re.getMessage()); System.out.println(); + + Cli parser = builder(null).build(); + parser.parse("help").execute(ActionContext.system()); } - } + public static Object execute(String... args) throws Exception + { + return execute(null, null, args); + } + + public static Object execute(File artemisHome, File artemisInstance, String... args) throws Exception + { + Action action = builder(artemisInstance).build().parse(args); + action.setHomeValues(artemisHome, artemisInstance); + + if (action.isVerbose()) + { + System.out.print("Executing " + action.getClass().getName() + " "); + for (String arg : args) + { + System.out.print(arg + " "); + } + System.out.println(); + System.out.println("Home::" + action.getBrokerHome() + ", Instance::" + action.getBrokerInstance()); + } + + return action.execute(ActionContext.system()); + } + + private static Cli.CliBuilder builder(File artemisInstance) + { + String instance = artemisInstance != null ? artemisInstance.getAbsolutePath() : System.getProperty("artemis.instance"); + Cli.CliBuilder builder = Cli.builder("artemis") + .withDescription("ActiveMQ Artemis Command Line") + .withCommand(HelpAction.class) + .withCommand(Producer.class) + .withCommand(Consumer.class) + .withCommand(Browse.class) + .withDefaultCommand(HelpAction.class); + + + builder.withGroup("data") + .withDescription("data tools group (print|exp|imp|exp|encode|decode) (example ./artemis data print)"). + withDefaultCommand(HelpData.class).withCommands(PrintData.class, XmlDataExporter.class, + XmlDataImporter.class, DecodeJournal.class, EncodeJournal.class); + + if (instance != null) + { + builder = builder.withCommands(Run.class, Stop.class, Kill.class); + } + else + { + builder = builder.withCommand(Create.class); + } + + return builder; + } + + public static void printBanner() throws Exception { copy(Artemis.class.getResourceAsStream("banner.txt"), System.out); diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Action.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Action.java index c82cde17c8..e0510ff9a6 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Action.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Action.java @@ -16,9 +16,18 @@ */ package org.apache.activemq.artemis.cli.commands; +import java.io.File; + public interface Action { + boolean isVerbose(); + + void setHomeValues(File brokerHome, File brokerInstance); Object execute(ActionContext context) throws Exception; + String getBrokerInstance(); + + String getBrokerHome(); + } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/ActionAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/ActionAbstract.java index 7f041461eb..fb3ea649ee 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/ActionAbstract.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/ActionAbstract.java @@ -16,11 +16,89 @@ */ package org.apache.activemq.artemis.cli.commands; +import java.io.File; + +import io.airlift.airline.Option; + public abstract class ActionAbstract implements Action { + @Option(name = "--verbose", description = "Adds more information on the execution") + boolean verbose; + + private String brokerInstance; + + private String brokerHome; + protected ActionContext context; + @Override + public boolean isVerbose() + { + return verbose; + + } + + @Override + public void setHomeValues(File brokerHome, File brokerInstance) + { + if (brokerHome != null) + { + this.brokerHome = brokerHome.getAbsolutePath(); + } + if (brokerInstance != null) + { + this.brokerInstance = brokerInstance.getAbsolutePath(); + } + } + + public String getBrokerInstance() + { + if (brokerInstance == null) + { + /* We use File URI for locating files. The ARTEMIS_HOME variable is used to determine file paths. For Windows + the ARTEMIS_HOME variable will include back slashes (An invalid file URI character path separator). For this + reason we overwrite the ARTEMIS_HOME variable with backslashes replaced with forward slashes. */ + brokerInstance = System.getProperty("artemis.instance"); + if (brokerInstance != null) + { + brokerInstance = brokerInstance.replace("\\", "/"); + System.setProperty("artemis.instance", brokerInstance); + } + if (brokerInstance == null) + { + // if still null we will try to improvise with "." + brokerInstance = "."; + } + } + return brokerInstance; + } + + + public String getBrokerHome() + { + if (brokerHome == null) + { + /* We use File URI for locating files. The ARTEMIS_HOME variable is used to determine file paths. For Windows + the ARTEMIS_HOME variable will include back slashes (An invalid file URI character path separator). For this + reason we overwrite the ARTEMIS_HOME variable with backslashes replaced with forward slashes. */ + brokerHome = System.getProperty("artemis.home"); + if (brokerHome != null) + { + brokerHome = brokerHome.replace("\\", "/"); + System.setProperty("artemis.home", brokerHome); + } + + if (brokerHome == null) + { + // if still null we will try to improvise with "." + brokerHome = "."; + } + } + return brokerHome; + } + + public Object execute(ActionContext context) throws Exception { this.context = context; diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Browse.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Browse.java new file mode 100644 index 0000000000..614ab6cc62 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Browse.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.cli.commands; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Session; + +import io.airlift.airline.Command; +import io.airlift.airline.Option; +import org.apache.activemq.artemis.cli.commands.util.ConsumerThread; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; + +@Command(name = "browse", description = "It will send consume messages from an instance") +public class Browse extends DestAbstract +{ + @Option(name = "--filter", description = "filter to be used with the consumer") + String filter; + + @Override + public Object execute(ActionContext context) throws Exception + { + super.execute(context); + + System.out.println("Consumer:: filter = " + filter); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL, user, password); + + Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.QUEUE_TYPE); + try (Connection connection = factory.createConnection()) + { + ConsumerThread[] threadsArray = new ConsumerThread[threads]; + for (int i = 0; i < threads; i++) + { + Session session; + if (txBatchSize > 0) + { + session = connection.createSession(true, Session.SESSION_TRANSACTED); + } + else + { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + threadsArray[i] = new ConsumerThread(session, dest, i); + + threadsArray[i].setVerbose(verbose).setSleep(sleep).setMessageCount(messageCount).setFilter(filter).setBrowse(true); + } + + for (ConsumerThread thread : threadsArray) + { + thread.start(); + } + + connection.start(); + + int received = 0; + + for (ConsumerThread thread : threadsArray) + { + thread.join(); + received += thread.getReceived(); + } + + return received; + } + } + +} diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Configurable.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Configurable.java index 651620f38d..8f3dbf573f 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Configurable.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Configurable.java @@ -50,10 +50,6 @@ public abstract class Configurable extends ActionAbstract private BrokerDTO brokerDTO = null; - private String brokerInstance; - - private String brokerHome; - private FileConfiguration fileConfiguration; protected void treatError(Exception e, String group, String command) @@ -65,23 +61,6 @@ public abstract class Configurable extends ActionAbstract helpGroup(group, command); } - protected String getBrokerInstance() - { - if (brokerInstance == null) - { - /* We use File URI for locating files. The ARTEMIS_HOME variable is used to determine file paths. For Windows - the ARTEMIS_HOME variable will include back slashes (An invalid file URI character path separator). For this - reason we overwrite the ARTEMIS_HOME variable with backslashes replaced with forward slashes. */ - brokerInstance = System.getProperty("artemis.instance"); - if (brokerInstance != null) - { - brokerInstance = brokerInstance.replace("\\", "/"); - System.setProperty("artemis.instance", brokerInstance); - } - } - return brokerInstance; - } - protected void helpGroup(String groupName, String commandName) { for (CommandGroupMetadata group: global.getCommandGroups()) @@ -100,23 +79,6 @@ public abstract class Configurable extends ActionAbstract } } - protected String getBrokerHome() - { - if (brokerHome == null) - { - /* We use File URI for locating files. The ARTEMIS_HOME variable is used to determine file paths. For Windows - the ARTEMIS_HOME variable will include back slashes (An invalid file URI character path separator). For this - reason we overwrite the ARTEMIS_HOME variable with backslashes replaced with forward slashes. */ - brokerHome = System.getProperty("artemis.home"); - if (brokerHome != null) - { - brokerHome = brokerHome.replace("\\", "/"); - System.setProperty("artemis.home", brokerHome); - } - } - return brokerHome; - } - protected FileConfiguration getFileConfiguration() throws Exception { @@ -144,6 +106,8 @@ public abstract class Configurable extends ActionAbstract } } + fileConfiguration.setBrokerInstance(new File(getBrokerInstance())); + return fileConfiguration; } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Consumer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Consumer.java index 96b8455dcb..77d96433a7 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Consumer.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Consumer.java @@ -41,11 +41,16 @@ public class Consumer extends DestAbstract @Option(name = "--receiveTimeout", description = "Time used on receive(timeout)") int receiveTimeout; + @Option(name = "--filter", description = "filter to be used with the consumer") + String filter; + @Override public Object execute(ActionContext context) throws Exception { super.execute(context); + System.out.println("Consumer:: filter = " + filter); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL, user, password); Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.QUEUE_TYPE); @@ -66,7 +71,7 @@ public class Consumer extends DestAbstract threadsArray[i] = new ConsumerThread(session, dest, i); threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull) - .setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout); + .setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout).setFilter(filter).setBrowse(false); } for (ConsumerThread thread : threadsArray) @@ -76,13 +81,16 @@ public class Consumer extends DestAbstract connection.start(); + int received = 0; + for (ConsumerThread thread : threadsArray) { thread.join(); + received += thread.getReceived(); } - } - return null; + return received; + } } } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java index 182b662f93..a3f1bada90 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java @@ -248,12 +248,7 @@ public class Create extends InputAbstract { if (home == null) { - String homeStr = System.getProperty("artemis.home"); - if (homeStr == null) - { - homeStr = "."; - } - home = new File(homeStr); + home = new File(getBrokerHome()); } return home; } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/DestAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/DestAbstract.java index b54e7b8905..e8f7b587b5 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/DestAbstract.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/DestAbstract.java @@ -36,9 +36,6 @@ public class DestAbstract extends ActionAbstract @Option(name = "--password", description = "Password used to connect") String password; - @Option(name = "--verbose", description = "It will print messages individually") - boolean verbose; - @Option(name = "--sleep", description = "Time wait between each message") int sleep = 0; diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/HelpAction.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/HelpAction.java index c96b55e432..eecf9c6bc2 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/HelpAction.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/HelpAction.java @@ -16,10 +16,35 @@ */ package org.apache.activemq.artemis.cli.commands; +import java.io.File; + import io.airlift.airline.Help; public class HelpAction extends Help implements Action { + @Override + public boolean isVerbose() + { + return false; + } + + @Override + public void setHomeValues(File brokerHome, File brokerInstance) + { + + } + + @Override + public String getBrokerInstance() + { + return null; + } + + @Override + public String getBrokerHome() + { + return null; + } @Override public Object execute(ActionContext context) throws Exception diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Producer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Producer.java index e55e0b8fa1..96c1331ddb 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Producer.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Producer.java @@ -72,7 +72,8 @@ public class Producer extends DestAbstract threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent). setMessageSize(messageSize).setTextMessageSize(textMessageSize). - setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize); + setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize). + setMessageCount(messageCount); } for (ProducerThread thread : threadsArray) @@ -80,13 +81,15 @@ public class Producer extends DestAbstract thread.start(); } + int messagesProduced = 0; for (ProducerThread thread : threadsArray) { thread.join(); + messagesProduced += thread.getSentCount(); } - } - return null; + return messagesProduced; + } } } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java index e994e54251..808d6cca61 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.cli.commands.tools; +import java.io.File; import java.util.ArrayList; import java.util.List; @@ -27,6 +28,30 @@ import org.apache.activemq.artemis.cli.commands.ActionContext; public class HelpData extends Help implements Action { + @Override + public boolean isVerbose() + { + return false; + } + + @Override + public void setHomeValues(File brokerHome, File brokerInstance) + { + + } + + @Override + public String getBrokerInstance() + { + return null; + } + + @Override + public String getBrokerHome() + { + return null; + } + @Override public Object execute(ActionContext context) throws Exception { diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java index 4bb620a3eb..33aa5f3f2f 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java @@ -48,7 +48,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ResourceNames; -import org.apache.activemq.artemis.cli.commands.Action; +import org.apache.activemq.artemis.cli.commands.ActionAbstract; import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; @@ -63,7 +63,7 @@ import org.apache.activemq.artemis.utils.UUIDGenerator; * for speed and simplicity. */ @Command(name = "imp", description = "Import all message-data using an XML that could be interpreted by any system.") -public final class XmlDataImporter implements Action +public final class XmlDataImporter extends ActionAbstract { // Constants ----------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java index 3efa1af92f..8a241b7780 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java @@ -22,9 +22,12 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; +import java.util.Enumeration; import java.util.concurrent.CountDownLatch; public class ConsumerThread extends Thread @@ -39,6 +42,9 @@ public class ConsumerThread extends Thread int sleep; int batchSize; boolean verbose; + boolean browse; + + String filter; int received = 0; int transactions = 0; @@ -55,6 +61,102 @@ public class ConsumerThread extends Thread @Override public void run() + { + if (browse) + { + browse(); + } + else + { + consume(); + } + } + + public void browse() + { + running = true; + QueueBrowser consumer = null; + String threadName = Thread.currentThread().getName(); + System.out.println(threadName + " wait until " + messageCount + " messages are consumed"); + try + { + if (filter != null) + { + consumer = session.createBrowser((Queue)destination, filter); + } + else + { + consumer = session.createBrowser((Queue)destination); + } + Enumeration enumBrowse = consumer.getEnumeration(); + + while (enumBrowse.hasMoreElements()) + { + Message msg = enumBrowse.nextElement(); + if (msg != null) + { + System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID())); + + if (verbose) + { + System.out.println("..." + msg); + } + if (bytesAsText && (msg instanceof BytesMessage)) + { + long length = ((BytesMessage) msg).getBodyLength(); + byte[] bytes = new byte[(int) length]; + ((BytesMessage) msg).readBytes(bytes); + System.out.println("Message:" + msg); + } + received++; + + if (received >= messageCount) + { + break; + } + } + else + { + break; + } + + if (sleep > 0) + { + Thread.sleep(sleep); + } + + } + + consumer.close(); + } + catch (Exception e) + { + e.printStackTrace(); + } + finally + { + if (finished != null) + { + finished.countDown(); + } + if (consumer != null) + { + System.out.println(threadName + " Consumed: " + this.getReceived() + " messages"); + try + { + consumer.close(); + } + catch (JMSException e) + { + e.printStackTrace(); + } + } + } + + System.out.println(threadName + " Consumer thread finished"); + } + + public void consume() { running = true; MessageConsumer consumer = null; @@ -64,11 +166,25 @@ public class ConsumerThread extends Thread { if (durable && destination instanceof Topic) { - consumer = session.createDurableSubscriber((Topic) destination, getName()); + if (filter != null) + { + consumer = session.createDurableSubscriber((Topic) destination, getName(), filter, false); + } + else + { + consumer = session.createDurableSubscriber((Topic) destination, getName()); + } } else { - consumer = session.createConsumer(destination); + if (filter != null) + { + consumer = session.createConsumer(destination, filter); + } + else + { + consumer = session.createConsumer(destination); + } } while (running && received < messageCount) { @@ -76,6 +192,10 @@ public class ConsumerThread extends Thread if (msg != null) { System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID())); + if (verbose) + { + System.out.println("..." + msg); + } if (bytesAsText && (msg instanceof BytesMessage)) { long length = ((BytesMessage) msg).getBodyLength(); @@ -265,4 +385,26 @@ public class ConsumerThread extends Thread this.bytesAsText = bytesAsText; return this; } + + public String getFilter() + { + return filter; + } + + public ConsumerThread setFilter(String filter) + { + this.filter = filter; + return this; + } + + public boolean isBrowse() + { + return browse; + } + + public ConsumerThread setBrowse(boolean browse) + { + this.browse = browse; + return this; + } } diff --git a/artemis-cli/src/test/java/org/apache/activemq/artemis/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/artemis/test/ArtemisTest.java index 7d00328e74..595a2465fc 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/artemis/test/ArtemisTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/artemis/test/ArtemisTest.java @@ -16,11 +16,17 @@ */ package org.apache.activemq.artemis.test; +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; import java.io.File; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.cli.Artemis; import org.apache.activemq.artemis.cli.commands.Run; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.junit.After; import org.junit.Assert; import org.junit.Rule; @@ -70,10 +76,45 @@ public class ArtemisTest System.setProperty("artemis.instance", temporaryFolder.getRoot().getAbsolutePath()); // Some exceptions may happen on the initialization, but they should be ok on start the basic core protocol Artemis.main("run"); - Artemis.main("produce", "--txSize", "500"); - Artemis.main("consume", "--txSize", "500", "--verbose"); - Artemis.main("stop"); - Artemis.main("data", "print"); + Assert.assertEquals(Integer.valueOf(70), Artemis.execute("produce", "--txSize", "50", "--messageCount", "70", "--verbose")); + Assert.assertEquals(Integer.valueOf(70), Artemis.execute("consume", "--txSize", "50", "--verbose", "--breakOnNull", "--receiveTimeout", "100")); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); + Connection connection = cf.createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(ActiveMQDestination.createDestination("queue://TEST", ActiveMQDestination.QUEUE_TYPE)); + + TextMessage message = session.createTextMessage("Banana"); + message.setStringProperty("fruit", "banana"); + producer.send(message); + + for (int i = 0; i < 100; i++) + { + message = session.createTextMessage("orange"); + message.setStringProperty("fruit", "orange"); + producer.send(message); + } + session.commit(); + + connection.close(); + cf.close(); + + Assert.assertEquals(Integer.valueOf(1), Artemis.execute("browse", "--txSize", "50", "--verbose", "--filter", "fruit='banana'")); + + Assert.assertEquals(Integer.valueOf(100), Artemis.execute("browse", "--txSize", "50", "--verbose", "--filter", "fruit='orange'")); + + Assert.assertEquals(Integer.valueOf(101), Artemis.execute("browse", "--txSize", "50", "--verbose")); + + // should only receive 10 messages on browse as I'm setting messageCount=10 + Assert.assertEquals(Integer.valueOf(10), Artemis.execute("browse", "--txSize", "50", "--verbose", "--messageCount", "10")); + + // Nothing was consumed until here as it was only browsing, check it's receiving again + Assert.assertEquals(Integer.valueOf(1), Artemis.execute("consume", "--txSize", "50", "--verbose", "--breakOnNull", "--receiveTimeout", "100", "--filter", "fruit='banana'")); + + // Checking it was acked before + Assert.assertEquals(Integer.valueOf(100), Artemis.execute("consume", "--txSize", "50", "--verbose", "--breakOnNull", "--receiveTimeout", "100")); + + Artemis.execute("stop"); Assert.assertTrue(Run.latchRunning.await(5, TimeUnit.SECONDS)); } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java index ba93933cfe..cc3f2552ec 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java @@ -16,15 +16,6 @@ */ package org.apache.activemq.artemis.jms.client; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.OutputStream; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; - import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.IllegalStateException; @@ -34,6 +25,14 @@ import javax.jms.JMSRuntimeException; import javax.jms.Message; import javax.jms.MessageFormatException; import javax.jms.MessageNotWriteableException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -969,6 +968,7 @@ public class ActiveMQMessage implements javax.jms.Message sb.append(getJMSMessageID()); sb.append("]:"); sb.append(message.isDurable() ? "PERSISTENT" : "NON-PERSISTENT"); + sb.append("/" + message.toString()); return sb.toString(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 0c1bc3caca..1a99aee961 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -889,11 +889,11 @@ public interface Configuration /** * Set the Artemis instance relative folder for data and stuff. */ - void setArtemisInstance(File directory); + void setBrokerInstance(File directory); /** * Set the Artemis instance relative folder for data and stuff. */ - File getArtemisInstance(); + File getBrokerInstance(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index aa1443491f..0f0616b62c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -1103,7 +1103,7 @@ public class ConfigurationImpl implements Configuration, Serializable return this.connectorServiceConfigurations; } - public File getArtemisInstance() + public File getBrokerInstance() { if (artemisInstance != null) { @@ -1122,7 +1122,7 @@ public class ConfigurationImpl implements Configuration, Serializable return artemisInstance; } - public void setArtemisInstance(File directory) + public void setBrokerInstance(File directory) { this.artemisInstance = directory; } @@ -1627,7 +1627,7 @@ public class ConfigurationImpl implements Configuration, Serializable try { // Resolve wont work without "/" as the last character - URI artemisHome = new URI(getArtemisInstance().toURI() + "/"); + URI artemisHome = new URI(getBrokerInstance().toURI() + "/"); URI relative = artemisHome.resolve(subFolder); return new File(relative.getPath()); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/RelativePathTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/RelativePathTest.java index 4bffdfd1ff..43ab75989e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/RelativePathTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/RelativePathTest.java @@ -50,7 +50,7 @@ public class RelativePathTest extends ActiveMQTestBase File instanceHome = new File(getTemporaryDir(), "artemisHome"); - configuration.setArtemisInstance(instanceHome); + configuration.setBrokerInstance(instanceHome); // the journal should be outside of the artemisInstance on this case File journalOutside = new File(getTemporaryDir(), "./journalOut").getAbsoluteFile(); @@ -89,7 +89,7 @@ public class RelativePathTest extends ActiveMQTestBase System.out.println("InstanceHome->" + instanceHome); instanceHome.mkdirs(); - configuration.setArtemisInstance(instanceHome); + configuration.setBrokerInstance(instanceHome); configuration.setJournalDirectory("./data"); configuration.setPagingDirectory("./paging");