ARTEMIS-144 Producer / Consumer command
https://issues.apache.org/jira/browse/ARTEMIS-144 As I was working through the examples I realized the tool used to consume and sending messages through the console. this will import such tool to the CLI as users are used to consume and send messages.
This commit is contained in:
parent
077a416ee0
commit
cf777ec6b6
|
@ -22,9 +22,11 @@ import java.io.OutputStream;
|
|||
import io.airlift.airline.Cli;
|
||||
import org.apache.activemq.artemis.cli.commands.Action;
|
||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||
import org.apache.activemq.artemis.cli.commands.Consumer;
|
||||
import org.apache.activemq.artemis.cli.commands.Create;
|
||||
import org.apache.activemq.artemis.cli.commands.HelpAction;
|
||||
import org.apache.activemq.artemis.cli.commands.Kill;
|
||||
import org.apache.activemq.artemis.cli.commands.Producer;
|
||||
import org.apache.activemq.artemis.cli.commands.Run;
|
||||
import org.apache.activemq.artemis.cli.commands.Stop;
|
||||
import org.apache.activemq.artemis.cli.commands.tools.DecodeJournal;
|
||||
|
@ -37,12 +39,14 @@ import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter;
|
|||
public class Artemis
|
||||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
public static void main(String[] args) throws Exception
|
||||
public static void main(String...args) throws Exception
|
||||
{
|
||||
String instance = System.getProperty("artemis.instance");
|
||||
Cli.CliBuilder<Action> builder = Cli.<Action>builder("artemis")
|
||||
.withDescription("ActiveMQ Artemis Command Line")
|
||||
.withCommand(HelpAction.class)
|
||||
.withCommand(Producer.class)
|
||||
.withCommand(Consumer.class)
|
||||
.withDefaultCommand(HelpAction.class);
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.cli.commands;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Session;
|
||||
|
||||
import io.airlift.airline.Command;
|
||||
import io.airlift.airline.Option;
|
||||
import org.apache.activemq.artemis.cli.commands.util.ConsumerThread;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||
|
||||
@Command(name = "consume", description = "It will send consume messages from an instance")
|
||||
public class Consumer extends DestAbstract
|
||||
{
|
||||
|
||||
|
||||
@Option(name = "--durable", description = "It will use durable subscription in case of client")
|
||||
boolean durable = false;
|
||||
|
||||
@Option(name = "--breakOnNull", description = "It will break on null messages")
|
||||
boolean breakOnNull = false;
|
||||
|
||||
@Option(name = "--receiveTimeout", description = "Time used on receive(timeout)")
|
||||
int receiveTimeout;
|
||||
|
||||
@Override
|
||||
public Object execute(ActionContext context) throws Exception
|
||||
{
|
||||
super.execute(context);
|
||||
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL, user, password);
|
||||
|
||||
Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.QUEUE_TYPE);
|
||||
try (Connection connection = factory.createConnection())
|
||||
{
|
||||
ConsumerThread[] threadsArray = new ConsumerThread[threads];
|
||||
for (int i = 0; i < threads; i++)
|
||||
{
|
||||
Session session;
|
||||
if (txBatchSize > 0)
|
||||
{
|
||||
session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
}
|
||||
else
|
||||
{
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
threadsArray[i] = new ConsumerThread(session, dest, i);
|
||||
|
||||
threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull)
|
||||
.setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout);
|
||||
}
|
||||
|
||||
for (ConsumerThread thread : threadsArray)
|
||||
{
|
||||
thread.start();
|
||||
}
|
||||
|
||||
connection.start();
|
||||
|
||||
for (ConsumerThread thread : threadsArray)
|
||||
{
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
|
@ -248,7 +248,12 @@ public class Create extends InputAbstract
|
|||
{
|
||||
if (home == null)
|
||||
{
|
||||
home = new File(System.getProperty("artemis.home"));
|
||||
String homeStr = System.getProperty("artemis.home");
|
||||
if (homeStr == null)
|
||||
{
|
||||
homeStr = ".";
|
||||
}
|
||||
home = new File(homeStr);
|
||||
}
|
||||
return home;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.cli.commands;
|
||||
|
||||
import io.airlift.airline.Option;
|
||||
|
||||
public class DestAbstract extends ActionAbstract
|
||||
{
|
||||
@Option(name = "--url", description = "URL towards the broker. (default: tcp://localhost:61616)")
|
||||
String brokerURL = "tcp://localhost:61616";
|
||||
|
||||
@Option(name = "--destination", description = "Destination to be used. it could be prefixed with queue:// or topic:: (Default: queue://TEST")
|
||||
String destination = "queue://TEST";
|
||||
|
||||
@Option(name = "--messageCount", description = "Number of messages to act on (Default: 1000)")
|
||||
int messageCount = 1000;
|
||||
|
||||
@Option(name = "--user", description = "User used to connect")
|
||||
String user;
|
||||
|
||||
@Option(name = "--password", description = "Password used to connect")
|
||||
String password;
|
||||
|
||||
@Option(name = "--verbose", description = "It will print messages individually")
|
||||
boolean verbose;
|
||||
|
||||
@Option(name = "--sleep", description = "Time wait between each message")
|
||||
int sleep = 0;
|
||||
|
||||
@Option(name = "--txSize", description = "TX Batch Size")
|
||||
int txBatchSize;
|
||||
|
||||
@Option(name = "--threads", description = "Number of Threads to be used (Default: 1)")
|
||||
int threads = 1;
|
||||
|
||||
}
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.cli.commands;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Session;
|
||||
|
||||
import io.airlift.airline.Command;
|
||||
import io.airlift.airline.Option;
|
||||
import org.apache.activemq.artemis.cli.commands.util.ProducerThread;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||
|
||||
@Command(name = "produce", description = "It will send messages to an instance")
|
||||
public class Producer extends DestAbstract
|
||||
{
|
||||
|
||||
@Option(name = "--nonPersistent", description = "It will send messages non persistently")
|
||||
boolean nonpersistent = false;
|
||||
|
||||
@Option(name = "--messageSize", description = "Size of each byteMessage (The producer will use byte message on this case)")
|
||||
int messageSize = 0;
|
||||
|
||||
@Option(name = "--textSize", description = "Size of each textNessage (The producer will use text message on this case)")
|
||||
int textMessageSize;
|
||||
|
||||
@Option(name = "--msgttl", description = "TTL for each message")
|
||||
long msgTTL = 0L;
|
||||
|
||||
@Option(name = "--group", description = "Message Group to be used")
|
||||
String msgGroupID = null;
|
||||
|
||||
@Override
|
||||
public Object execute(ActionContext context) throws Exception
|
||||
{
|
||||
super.execute(context);
|
||||
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL, user, password);
|
||||
|
||||
Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.QUEUE_TYPE);
|
||||
try (Connection connection = factory.createConnection())
|
||||
{
|
||||
ProducerThread[] threadsArray = new ProducerThread[threads];
|
||||
for (int i = 0; i < threads; i++)
|
||||
{
|
||||
Session session;
|
||||
if (txBatchSize > 0)
|
||||
{
|
||||
session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
}
|
||||
else
|
||||
{
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
threadsArray[i] = new ProducerThread(session, dest, i);
|
||||
|
||||
threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent).
|
||||
setMessageSize(messageSize).setTextMessageSize(textMessageSize).
|
||||
setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize);
|
||||
}
|
||||
|
||||
for (ProducerThread thread : threadsArray)
|
||||
{
|
||||
thread.start();
|
||||
}
|
||||
|
||||
for (ProducerThread thread : threadsArray)
|
||||
{
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
|
@ -34,6 +34,7 @@ import org.apache.activemq.artemis.factory.SecurityManagerFactory;
|
|||
import org.apache.activemq.artemis.integration.Broker;
|
||||
import org.apache.activemq.artemis.integration.bootstrap.ActiveMQBootstrapLogger;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
|
||||
@Command(name = "run", description = "runs the broker instance")
|
||||
public class Run extends Configurable
|
||||
|
@ -41,6 +42,20 @@ public class Run extends Configurable
|
|||
@Option(name = "--allow-kill", description = "This will allow the server to kill itself. Useful for tests (failover tests for instance)")
|
||||
boolean allowKill;
|
||||
|
||||
static boolean embedded = false;
|
||||
|
||||
public static final ReusableLatch latchRunning = new ReusableLatch(0);
|
||||
|
||||
/**
|
||||
* This will disable the System.exit at the end of the server.stop, as that means there are other things
|
||||
* happening on the same VM.
|
||||
* @param embedded
|
||||
*/
|
||||
public static void setEmbedded(boolean embedded)
|
||||
{
|
||||
Run.embedded = true;
|
||||
}
|
||||
|
||||
private Broker server;
|
||||
|
||||
private ArrayList<ActiveMQComponent> components = new ArrayList<>();
|
||||
|
@ -96,6 +111,8 @@ public class Run extends Configurable
|
|||
*/
|
||||
private void addShutdownHook(File configurationDir)
|
||||
{
|
||||
|
||||
latchRunning.countUp();
|
||||
final File file = new File(configurationDir,"STOP_ME");
|
||||
if (file.exists())
|
||||
{
|
||||
|
@ -147,7 +164,11 @@ public class Run extends Configurable
|
|||
}
|
||||
finally
|
||||
{
|
||||
Runtime.getRuntime().exit(0);
|
||||
latchRunning.countDown();
|
||||
if (!embedded)
|
||||
{
|
||||
Runtime.getRuntime().exit(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,268 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.cli.commands.util;
|
||||
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class ConsumerThread extends Thread
|
||||
{
|
||||
|
||||
int messageCount = 1000;
|
||||
int receiveTimeOut = 3000;
|
||||
Destination destination;
|
||||
Session session;
|
||||
boolean durable;
|
||||
boolean breakOnNull = true;
|
||||
int sleep;
|
||||
int batchSize;
|
||||
boolean verbose;
|
||||
|
||||
int received = 0;
|
||||
int transactions = 0;
|
||||
boolean running = false;
|
||||
CountDownLatch finished;
|
||||
boolean bytesAsText;
|
||||
|
||||
public ConsumerThread(Session session, Destination destination, int threadNr)
|
||||
{
|
||||
super("Consumer " + destination.toString() + ", thread=" + threadNr);
|
||||
this.destination = destination;
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
running = true;
|
||||
MessageConsumer consumer = null;
|
||||
String threadName = Thread.currentThread().getName();
|
||||
System.out.println(threadName + " wait until " + messageCount + " messages are consumed");
|
||||
try
|
||||
{
|
||||
if (durable && destination instanceof Topic)
|
||||
{
|
||||
consumer = session.createDurableSubscriber((Topic) destination, getName());
|
||||
}
|
||||
else
|
||||
{
|
||||
consumer = session.createConsumer(destination);
|
||||
}
|
||||
while (running && received < messageCount)
|
||||
{
|
||||
Message msg = consumer.receive(receiveTimeOut);
|
||||
if (msg != null)
|
||||
{
|
||||
System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
|
||||
if (bytesAsText && (msg instanceof BytesMessage))
|
||||
{
|
||||
long length = ((BytesMessage) msg).getBodyLength();
|
||||
byte[] bytes = new byte[(int) length];
|
||||
((BytesMessage) msg).readBytes(bytes);
|
||||
System.out.println("Message:" + msg);
|
||||
}
|
||||
received++;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (breakOnNull)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (session.getTransacted())
|
||||
{
|
||||
if (batchSize > 0 && received > 0 && received % batchSize == 0)
|
||||
{
|
||||
System.out.println(threadName + " Committing transaction: " + transactions++);
|
||||
session.commit();
|
||||
}
|
||||
}
|
||||
else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
|
||||
{
|
||||
if (batchSize > 0 && received > 0 && received % batchSize == 0)
|
||||
{
|
||||
System.out.println("Acknowledging last " + batchSize + " messages; messages so far = " + received);
|
||||
msg.acknowledge();
|
||||
}
|
||||
}
|
||||
if (sleep > 0)
|
||||
{
|
||||
Thread.sleep(sleep);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
session.commit();
|
||||
}
|
||||
catch (Throwable ignored)
|
||||
{
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (finished != null)
|
||||
{
|
||||
finished.countDown();
|
||||
}
|
||||
if (consumer != null)
|
||||
{
|
||||
System.out.println(threadName + " Consumed: " + this.getReceived() + " messages");
|
||||
try
|
||||
{
|
||||
consumer.close();
|
||||
}
|
||||
catch (JMSException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
System.out.println(threadName + " Consumer thread finished");
|
||||
}
|
||||
|
||||
public int getReceived()
|
||||
{
|
||||
return received;
|
||||
}
|
||||
|
||||
public boolean isDurable()
|
||||
{
|
||||
return durable;
|
||||
}
|
||||
|
||||
public ConsumerThread setDurable(boolean durable)
|
||||
{
|
||||
this.durable = durable;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ConsumerThread setMessageCount(int messageCount)
|
||||
{
|
||||
this.messageCount = messageCount;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ConsumerThread setBreakOnNull(boolean breakOnNull)
|
||||
{
|
||||
this.breakOnNull = breakOnNull;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getBatchSize()
|
||||
{
|
||||
return batchSize;
|
||||
}
|
||||
|
||||
public ConsumerThread setBatchSize(int batchSize)
|
||||
{
|
||||
this.batchSize = batchSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getMessageCount()
|
||||
{
|
||||
return messageCount;
|
||||
}
|
||||
|
||||
public boolean isBreakOnNull()
|
||||
{
|
||||
return breakOnNull;
|
||||
}
|
||||
|
||||
public int getReceiveTimeOut()
|
||||
{
|
||||
return receiveTimeOut;
|
||||
}
|
||||
|
||||
public ConsumerThread setReceiveTimeOut(int receiveTimeOut)
|
||||
{
|
||||
this.receiveTimeOut = receiveTimeOut;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isRunning()
|
||||
{
|
||||
return running;
|
||||
}
|
||||
|
||||
public ConsumerThread setRunning(boolean running)
|
||||
{
|
||||
this.running = running;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getSleep()
|
||||
{
|
||||
return sleep;
|
||||
}
|
||||
|
||||
public ConsumerThread setSleep(int sleep)
|
||||
{
|
||||
this.sleep = sleep;
|
||||
return this;
|
||||
}
|
||||
|
||||
public CountDownLatch getFinished()
|
||||
{
|
||||
return finished;
|
||||
}
|
||||
|
||||
public ConsumerThread setFinished(CountDownLatch finished)
|
||||
{
|
||||
this.finished = finished;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isBytesAsText()
|
||||
{
|
||||
return bytesAsText;
|
||||
}
|
||||
|
||||
public boolean isVerbose()
|
||||
{
|
||||
return verbose;
|
||||
}
|
||||
|
||||
public ConsumerThread setVerbose(boolean verbose)
|
||||
{
|
||||
this.verbose = verbose;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ConsumerThread setBytesAsText(boolean bytesAsText)
|
||||
{
|
||||
this.bytesAsText = bytesAsText;
|
||||
return this;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,431 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.cli.commands.util;
|
||||
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.URL;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
|
||||
public class ProducerThread extends Thread
|
||||
{
|
||||
protected final Session session;
|
||||
|
||||
boolean verbose;
|
||||
int messageCount = 1000;
|
||||
boolean runIndefinitely = false;
|
||||
Destination destination;
|
||||
int sleep = 0;
|
||||
boolean persistent = true;
|
||||
int messageSize = 0;
|
||||
int textMessageSize;
|
||||
long msgTTL = 0L;
|
||||
String msgGroupID = null;
|
||||
int transactionBatchSize;
|
||||
|
||||
int transactions = 0;
|
||||
final AtomicInteger sentCount = new AtomicInteger(0);
|
||||
String message;
|
||||
String messageText = null;
|
||||
String payloadUrl = null;
|
||||
byte[] payload = null;
|
||||
boolean running = false;
|
||||
final ReusableLatch finished = new ReusableLatch(1);
|
||||
final ReusableLatch paused = new ReusableLatch(0);
|
||||
|
||||
|
||||
public ProducerThread(Session session, Destination destination, int threadNr)
|
||||
{
|
||||
super("Producer " + destination.toString() + ", thread=" + threadNr);
|
||||
this.destination = destination;
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
public void run()
|
||||
{
|
||||
MessageProducer producer = null;
|
||||
String threadName = Thread.currentThread().getName();
|
||||
try
|
||||
{
|
||||
producer = session.createProducer(destination);
|
||||
producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
|
||||
producer.setTimeToLive(msgTTL);
|
||||
initPayLoad();
|
||||
running = true;
|
||||
|
||||
System.out.println(threadName + " Started to calculate elapsed time ...\n");
|
||||
long tStart = System.currentTimeMillis();
|
||||
|
||||
if (runIndefinitely)
|
||||
{
|
||||
while (running)
|
||||
{
|
||||
paused.await();
|
||||
sendMessage(producer, threadName);
|
||||
sentCount.incrementAndGet();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (sentCount.set(0); sentCount.get() < messageCount && running; sentCount.incrementAndGet())
|
||||
{
|
||||
paused.await();
|
||||
sendMessage(producer, threadName);
|
||||
}
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
session.commit();
|
||||
}
|
||||
catch (Throwable ignored)
|
||||
{
|
||||
}
|
||||
|
||||
System.out.println(threadName + " Produced: " + this.getSentCount() + " messages");
|
||||
long tEnd = System.currentTimeMillis();
|
||||
long elapsed = (tEnd - tStart) / 1000;
|
||||
System.out.println(threadName + " Elapsed time in second : " + elapsed + " s");
|
||||
System.out.println(threadName + " Elapsed time in milli second : " + (tEnd - tStart) + " milli seconds");
|
||||
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (finished != null)
|
||||
{
|
||||
finished.countDown();
|
||||
}
|
||||
if (producer != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
producer.close();
|
||||
}
|
||||
catch (JMSException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void sendMessage(MessageProducer producer, String threadName) throws Exception
|
||||
{
|
||||
Message message = createMessage(sentCount.get(), threadName);
|
||||
producer.send(message);
|
||||
if (verbose)
|
||||
{
|
||||
System.out.println(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID()));
|
||||
}
|
||||
|
||||
if (transactionBatchSize > 0 && sentCount.get() > 0 && sentCount.get() % transactionBatchSize == 0)
|
||||
{
|
||||
System.out.println(threadName + " Committing transaction: " + transactions++);
|
||||
session.commit();
|
||||
}
|
||||
|
||||
if (sleep > 0)
|
||||
{
|
||||
Thread.sleep(sleep);
|
||||
}
|
||||
}
|
||||
|
||||
private void initPayLoad()
|
||||
{
|
||||
if (messageSize > 0)
|
||||
{
|
||||
payload = new byte[messageSize];
|
||||
for (int i = 0; i < payload.length; i++)
|
||||
{
|
||||
payload[i] = '.';
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected Message createMessage(int i, String threadName) throws Exception
|
||||
{
|
||||
Message answer;
|
||||
if (payload != null)
|
||||
{
|
||||
answer = session.createBytesMessage();
|
||||
((BytesMessage) answer).writeBytes(payload);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (textMessageSize > 0)
|
||||
{
|
||||
if (messageText == null)
|
||||
{
|
||||
messageText = readInputStream(getClass().getResourceAsStream("demo.txt"), textMessageSize, i);
|
||||
}
|
||||
}
|
||||
else if (payloadUrl != null)
|
||||
{
|
||||
messageText = readInputStream(new URL(payloadUrl).openStream(), -1, i);
|
||||
}
|
||||
else if (message != null)
|
||||
{
|
||||
messageText = message;
|
||||
}
|
||||
else
|
||||
{
|
||||
messageText = createDefaultMessage(i);
|
||||
}
|
||||
answer = session.createTextMessage(messageText);
|
||||
}
|
||||
if ((msgGroupID != null) && (!msgGroupID.isEmpty()))
|
||||
{
|
||||
answer.setStringProperty("JMSXGroupID", msgGroupID);
|
||||
}
|
||||
|
||||
answer.setIntProperty("count", i);
|
||||
answer.setStringProperty("ThreadSent", threadName);
|
||||
return answer;
|
||||
}
|
||||
|
||||
private String readInputStream(InputStream is, int size, int messageNumber) throws IOException
|
||||
{
|
||||
InputStreamReader reader = new InputStreamReader(is);
|
||||
try
|
||||
{
|
||||
char[] buffer;
|
||||
if (size > 0)
|
||||
{
|
||||
buffer = new char[size];
|
||||
}
|
||||
else
|
||||
{
|
||||
buffer = new char[1024];
|
||||
}
|
||||
int count;
|
||||
StringBuilder builder = new StringBuilder();
|
||||
while ((count = reader.read(buffer)) != -1)
|
||||
{
|
||||
builder.append(buffer, 0, count);
|
||||
if (size > 0) break;
|
||||
}
|
||||
return builder.toString();
|
||||
}
|
||||
catch (IOException ioe)
|
||||
{
|
||||
return createDefaultMessage(messageNumber);
|
||||
}
|
||||
finally
|
||||
{
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
|
||||
private String createDefaultMessage(int messageNumber)
|
||||
{
|
||||
return "test message: " + messageNumber;
|
||||
}
|
||||
|
||||
public ProducerThread setMessageCount(int messageCount)
|
||||
{
|
||||
this.messageCount = messageCount;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getSleep()
|
||||
{
|
||||
return sleep;
|
||||
}
|
||||
|
||||
public ProducerThread setSleep(int sleep)
|
||||
{
|
||||
this.sleep = sleep;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getMessageCount()
|
||||
{
|
||||
return messageCount;
|
||||
}
|
||||
|
||||
public int getSentCount()
|
||||
{
|
||||
return sentCount.get();
|
||||
}
|
||||
|
||||
public boolean isPersistent()
|
||||
{
|
||||
return persistent;
|
||||
}
|
||||
|
||||
public ProducerThread setPersistent(boolean persistent)
|
||||
{
|
||||
this.persistent = persistent;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isRunning()
|
||||
{
|
||||
return running;
|
||||
}
|
||||
|
||||
public ProducerThread setRunning(boolean running)
|
||||
{
|
||||
this.running = running;
|
||||
return this;
|
||||
}
|
||||
|
||||
public long getMsgTTL()
|
||||
{
|
||||
return msgTTL;
|
||||
}
|
||||
|
||||
public ProducerThread setMsgTTL(long msgTTL)
|
||||
{
|
||||
this.msgTTL = msgTTL;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getTransactionBatchSize()
|
||||
{
|
||||
return transactionBatchSize;
|
||||
}
|
||||
|
||||
public ProducerThread setTransactionBatchSize(int transactionBatchSize)
|
||||
{
|
||||
this.transactionBatchSize = transactionBatchSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getMsgGroupID()
|
||||
{
|
||||
return msgGroupID;
|
||||
}
|
||||
|
||||
public ProducerThread setMsgGroupID(String msgGroupID)
|
||||
{
|
||||
this.msgGroupID = msgGroupID;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getTextMessageSize()
|
||||
{
|
||||
return textMessageSize;
|
||||
}
|
||||
|
||||
public ProducerThread setTextMessageSize(int textMessageSize)
|
||||
{
|
||||
this.textMessageSize = textMessageSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getMessageSize()
|
||||
{
|
||||
return messageSize;
|
||||
}
|
||||
|
||||
public ProducerThread setMessageSize(int messageSize)
|
||||
{
|
||||
this.messageSize = messageSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ReusableLatch getFinished()
|
||||
{
|
||||
return finished;
|
||||
}
|
||||
|
||||
public ProducerThread setFinished(int value)
|
||||
{
|
||||
finished.setCount(value);
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getPayloadUrl()
|
||||
{
|
||||
return payloadUrl;
|
||||
}
|
||||
|
||||
public ProducerThread setPayloadUrl(String payloadUrl)
|
||||
{
|
||||
this.payloadUrl = payloadUrl;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getMessage()
|
||||
{
|
||||
return message;
|
||||
}
|
||||
|
||||
public ProducerThread setMessage(String message)
|
||||
{
|
||||
this.message = message;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isRunIndefinitely()
|
||||
{
|
||||
return runIndefinitely;
|
||||
}
|
||||
|
||||
public ProducerThread setRunIndefinitely(boolean runIndefinitely)
|
||||
{
|
||||
this.runIndefinitely = runIndefinitely;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ProducerThread pauseProducer()
|
||||
{
|
||||
this.paused.countUp();
|
||||
return this;
|
||||
}
|
||||
|
||||
public ProducerThread resumeProducer()
|
||||
{
|
||||
this.paused.countDown();
|
||||
return this;
|
||||
}
|
||||
|
||||
public ProducerThread resetCounters()
|
||||
{
|
||||
this.sentCount.set(0);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
public boolean isVerbose()
|
||||
{
|
||||
return verbose;
|
||||
}
|
||||
|
||||
public ProducerThread setVerbose(boolean verbose)
|
||||
{
|
||||
this.verbose = verbose;
|
||||
return this;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
|
||||
Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.
|
||||
Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi.
|
||||
Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat.
|
||||
Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis.
|
||||
At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores duo eirmod eos erat, et nonumy sed tempor et et invidunt justo labore Stet clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero voluptua. est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat.
|
||||
Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
|
||||
Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.
|
||||
Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi.
|
||||
Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat.Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis.
|
||||
At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores duo eirmod eos erat, et nonumy sed tempor et et invidunt justo labore Stet clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero voluptua. est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat.
|
||||
Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
|
||||
Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.
|
||||
Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi.
|
||||
Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat.
|
|
@ -16,15 +16,40 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.test;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.cli.Artemis;
|
||||
import org.apache.activemq.artemis.cli.commands.Run;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
/**
|
||||
* Test to validate that the CLI doesn't throw improper exceptions when invoked.
|
||||
*/
|
||||
public class ArtemisTest
|
||||
{
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder;
|
||||
|
||||
public ArtemisTest()
|
||||
{
|
||||
File parent = new File("./target/tmp");
|
||||
parent.mkdirs();
|
||||
temporaryFolder = new TemporaryFolder(parent);
|
||||
}
|
||||
|
||||
|
||||
@After
|
||||
public void cleanup()
|
||||
{
|
||||
System.clearProperty("artemis.instance");
|
||||
Run.setEmbedded(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void invalidCliDoesntThrowException()
|
||||
{
|
||||
|
@ -37,6 +62,22 @@ public class ArtemisTest
|
|||
testCli("create","/rawr");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleRun() throws Exception
|
||||
{
|
||||
Run.setEmbedded(true);
|
||||
Artemis.main("create", temporaryFolder.getRoot().getAbsolutePath(), "--force", "--silent-input", "--no-web");
|
||||
System.setProperty("artemis.instance", temporaryFolder.getRoot().getAbsolutePath());
|
||||
// Some exceptions may happen on the initialization, but they should be ok on start the basic core protocol
|
||||
Artemis.main("run");
|
||||
Artemis.main("produce", "--txSize", "500");
|
||||
Artemis.main("consume", "--txSize", "500", "--verbose");
|
||||
Artemis.main("stop");
|
||||
Artemis.main("data", "print");
|
||||
Assert.assertTrue(Run.latchRunning.await(5, TimeUnit.SECONDS));
|
||||
|
||||
}
|
||||
|
||||
private void testCli(String... args)
|
||||
{
|
||||
try
|
||||
|
|
|
@ -83,6 +83,7 @@
|
|||
<include>org.apache.tomcat:tomcat-servlet-api</include>
|
||||
<include>commons-beanutils:commons-beanutils</include>
|
||||
<include>commons-logging:commons-logging</include>
|
||||
<include>commons-collections:commons-collections</include>
|
||||
<include>org.fusesource.hawtbuf:hawtbuf</include>
|
||||
<include>org.jgroups:jgroups</include>
|
||||
</includes>
|
||||
|
|
|
@ -16,15 +16,14 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.jms.client;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.UUID;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.JMSRuntimeException;
|
||||
import javax.naming.NamingException;
|
||||
import javax.naming.Reference;
|
||||
import javax.naming.Referenceable;
|
||||
import java.io.Serializable;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
@ -53,6 +52,17 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
|
|||
|
||||
public static final String JMS_TEMP_TOPIC_ADDRESS_PREFIX = "jms.temptopic.";
|
||||
|
||||
public static final String QUEUE_QUALIFIED_PREFIX = "queue://";
|
||||
public static final String TOPIC_QUALIFIED_PREFIX = "topic://";
|
||||
public static final String TEMP_QUEUE_QUALIFED_PREFIX = "temp-queue://";
|
||||
public static final String TEMP_TOPIC_QUALIFED_PREFIX = "temp-topic://";
|
||||
public static final byte QUEUE_TYPE = 0x01;
|
||||
public static final byte TOPIC_TYPE = 0x02;
|
||||
public static final byte TEMP_MASK = 0x04;
|
||||
public static final byte TEMP_TOPIC_TYPE = TOPIC_TYPE | TEMP_MASK;
|
||||
public static final byte TEMP_QUEUE_TYPE = QUEUE_TYPE | TEMP_MASK;
|
||||
|
||||
|
||||
private static final char SEPARATOR = '.';
|
||||
|
||||
private static String escape(final String input)
|
||||
|
@ -64,6 +74,44 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
|
|||
return input.replace("\\", "\\\\").replace(".", "\\.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Static helper method for working with destinations.
|
||||
*/
|
||||
public static ActiveMQDestination createDestination(String name, byte defaultType)
|
||||
{
|
||||
if (name.startsWith(QUEUE_QUALIFIED_PREFIX))
|
||||
{
|
||||
return new ActiveMQQueue(name.substring(QUEUE_QUALIFIED_PREFIX.length()));
|
||||
}
|
||||
else if (name.startsWith(TOPIC_QUALIFIED_PREFIX))
|
||||
{
|
||||
return new ActiveMQTopic(name.substring(TOPIC_QUALIFIED_PREFIX.length()));
|
||||
}
|
||||
else if (name.startsWith(TEMP_QUEUE_QUALIFED_PREFIX))
|
||||
{
|
||||
return new ActiveMQQueue(name.substring(TEMP_QUEUE_QUALIFED_PREFIX.length()), true);
|
||||
}
|
||||
else if (name.startsWith(TEMP_TOPIC_QUALIFED_PREFIX))
|
||||
{
|
||||
return new ActiveMQTopic(name.substring(TEMP_TOPIC_QUALIFED_PREFIX.length()), true);
|
||||
}
|
||||
|
||||
switch (defaultType)
|
||||
{
|
||||
case QUEUE_TYPE:
|
||||
return new ActiveMQQueue(name);
|
||||
case TOPIC_TYPE:
|
||||
return new ActiveMQTopic(name);
|
||||
case TEMP_QUEUE_TYPE:
|
||||
return new ActiveMQQueue(name, true);
|
||||
case TEMP_TOPIC_TYPE:
|
||||
return new ActiveMQTopic(name, true);
|
||||
default:
|
||||
throw new IllegalArgumentException("Invalid default destination type: " + defaultType);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static Destination fromAddress(final String address)
|
||||
{
|
||||
if (address.startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX))
|
||||
|
@ -356,7 +404,7 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
|
|||
return false;
|
||||
}
|
||||
|
||||
ActiveMQDestination that = (ActiveMQDestination)o;
|
||||
ActiveMQDestination that = (ActiveMQDestination) o;
|
||||
|
||||
return address.equals(that.address);
|
||||
}
|
||||
|
|
|
@ -46,6 +46,11 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue
|
|||
super(JMS_QUEUE_ADDRESS_PREFIX + name, name, false, true, null);
|
||||
}
|
||||
|
||||
public ActiveMQQueue(final String name, boolean temporary)
|
||||
{
|
||||
super(JMS_QUEUE_ADDRESS_PREFIX + name, name, temporary, true, null);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
|
|
|
@ -43,7 +43,12 @@ public class ActiveMQTopic extends ActiveMQDestination implements Topic
|
|||
|
||||
public ActiveMQTopic(final String name)
|
||||
{
|
||||
super(JMS_TOPIC_ADDRESS_PREFIX + name, name, false, false, null);
|
||||
this(name, false);
|
||||
}
|
||||
|
||||
public ActiveMQTopic(final String name, boolean temporary)
|
||||
{
|
||||
super(JMS_TOPIC_ADDRESS_PREFIX + name, name, temporary, false, null);
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue