This commit is contained in:
franz1981 2022-03-02 13:32:43 +01:00
commit d744668270
36 changed files with 3415 additions and 11 deletions

View File

@ -112,6 +112,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>

View File

@ -37,6 +37,9 @@ import org.apache.activemq.artemis.cli.commands.check.HelpCheck;
import org.apache.activemq.artemis.cli.commands.check.NodeCheck;
import org.apache.activemq.artemis.cli.commands.check.QueueCheck;
import org.apache.activemq.artemis.cli.commands.messages.Transfer;
import org.apache.activemq.artemis.cli.commands.messages.perf.PerfClientCommand;
import org.apache.activemq.artemis.cli.commands.messages.perf.PerfConsumerCommand;
import org.apache.activemq.artemis.cli.commands.messages.perf.PerfProducerCommand;
import org.apache.activemq.artemis.cli.commands.queue.StatQueue;
import org.apache.activemq.artemis.cli.commands.Run;
import org.apache.activemq.artemis.cli.commands.Stop;
@ -163,6 +166,10 @@ public class Artemis {
withCommand(HelpAction.class).withCommand(Producer.class).withCommand(Transfer.class).withCommand(Consumer.class).
withCommand(Browse.class).withCommand(Mask.class).withCommand(PrintVersion.class).withDefaultCommand(HelpAction.class);
builder.withGroup("perf").withDescription("Perf tools group (example ./artemis perf client)")
.withDefaultCommand(PerfClientCommand.class)
.withCommands(PerfProducerCommand.class, PerfConsumerCommand.class, PerfClientCommand.class);
builder.withGroup("check").withDescription("Check tools group (node|queue) (example ./artemis check node)").
withDefaultCommand(HelpCheck.class).withCommands(NodeCheck.class, QueueCheck.class);

View File

@ -43,10 +43,10 @@ public class ConnectionAbstract extends InputAbstract {
protected String password;
@Option(name = "--clientID", description = "ClientID to be associated with connection")
String clientID;
protected String clientID;
@Option(name = "--protocol", description = "Protocol used. Valid values are amqp or core. Default=core.")
String protocol = "core";
protected String protocol = "core";
public String getBrokerURL() {
return brokerURL;
@ -126,16 +126,27 @@ public class ConnectionAbstract extends InputAbstract {
}
protected ConnectionFactory createConnectionFactory() throws Exception {
return createConnectionFactory(brokerURL, user, password, clientID, protocol);
}
protected ConnectionFactory createConnectionFactory(String brokerURL,
String user,
String password,
String clientID,
String protocol) throws Exception {
if (protocol.equals("core")) {
return createCoreConnectionFactory();
return createCoreConnectionFactory(brokerURL, user, password, clientID);
} else if (protocol.equals("amqp")) {
return createAMQPConnectionFactory();
return createAMQPConnectionFactory(brokerURL, user, password, clientID);
} else {
throw new IllegalStateException("protocol " + protocol + " not supported");
}
}
private ConnectionFactory createAMQPConnectionFactory() {
private ConnectionFactory createAMQPConnectionFactory(String brokerURL,
String user,
String password,
String clientID) {
if (brokerURL.startsWith("tcp://")) {
// replacing tcp:// by amqp://
brokerURL = "amqp" + brokerURL.substring(3);
@ -172,8 +183,14 @@ public class ConnectionAbstract extends InputAbstract {
}
protected ActiveMQConnectionFactory createCoreConnectionFactory() {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL, user, password);
return createCoreConnectionFactory(brokerURL, user, password, clientID);
}
protected ActiveMQConnectionFactory createCoreConnectionFactory(String brokerURL,
String user,
String password,
String clientID) {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL, user, password);
if (clientID != null) {
System.out.println("Consumer:: clientID = " + clientID);
cf.setClientID(clientID);

View File

@ -65,13 +65,17 @@ public class DestAbstract extends ConnectionAbstract {
}
protected Destination getDestination(Session session) throws JMSException {
return getDestination(session, destination);
}
public static Destination getDestination(Session session, String destination) throws JMSException {
if (destination.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) {
return session.createTopic(stripPrefix(destination));
}
return session.createQueue(stripPrefix(destination));
}
private String stripPrefix(String destination) {
public static String stripPrefix(String destination) {
int index = destination.indexOf("://");
if (index != -1) {
return destination.substring(index + 3);

View File

@ -0,0 +1,274 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
import javax.jms.BytesMessage;
import javax.jms.CompletionListener;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import static java.util.Objects.requireNonNull;
public final class AsyncJms2ProducerFacade {
private final long id;
protected final Session session;
private final MessageProducer producer;
/*
* maxPending limits the number of in-flight sent messages
* in a way that if the limit is reached and a single completion arrive,
* a subsequent send attempt will succeed.
*/
private long pending;
private final long maxPending;
/*
* Tracking sent messages in transaction requires using 2 separate counters
* ie pendingMsgInTransaction, completedMsgInTransaction
* because, using just one won't allow tracking completions of previously sent messages in order to commit
* the transaction while there are no more in-flight ones.
*/
private final long transactionCapacity;
private long pendingMsgInTransaction;
private long completedMsgInTransaction;
private final List<Runnable> availableObservers;
private final List<Runnable> closedObservers;
private static final AtomicLongFieldUpdater<AsyncJms2ProducerFacade> MESSAGE_SENT_UPDATER = AtomicLongFieldUpdater.newUpdater(AsyncJms2ProducerFacade.class, "messageSent");
private static final AtomicLongFieldUpdater<AsyncJms2ProducerFacade> MESSAGE_COMPLETED_UPDATER = AtomicLongFieldUpdater.newUpdater(AsyncJms2ProducerFacade.class, "messageCompleted");
private static final AtomicLongFieldUpdater<AsyncJms2ProducerFacade> NOT_AVAILABLE_UPDATER = AtomicLongFieldUpdater.newUpdater(AsyncJms2ProducerFacade.class, "notAvailable");
private volatile long messageSent;
private volatile long messageCompleted;
private volatile long notAvailable;
private boolean closing;
private boolean closed;
private final Destination destination;
public AsyncJms2ProducerFacade(final long id,
final Session session,
final MessageProducer producer,
final Destination destination,
final long maxPending,
final long transactionCapacity) {
this.id = id;
this.session = requireNonNull(session);
this.producer = requireNonNull(producer);
this.destination = destination;
this.pending = 0;
this.maxPending = transactionCapacity > 0 && maxPending > 0 ? Math.max(maxPending, transactionCapacity) : maxPending;
this.availableObservers = new ArrayList<>(1);
this.closedObservers = new ArrayList<>(1);
this.messageSent = 0;
this.messageCompleted = 0;
this.notAvailable = 0;
try {
if (transactionCapacity < 0) {
throw new IllegalStateException("transactionCapacity must be >= 0");
}
if (transactionCapacity > 0) {
if (!session.getTransacted()) {
throw new IllegalStateException("session must be transacted with transactionCapacity != 0");
}
} else {
if (session.getTransacted()) {
throw new IllegalStateException("session cannot be transacted with transactionCapacity = 0");
}
}
} catch (final JMSException ex) {
throw new IllegalStateException(ex);
}
this.transactionCapacity = transactionCapacity;
this.pendingMsgInTransaction = 0;
this.completedMsgInTransaction = 0;
this.closing = false;
this.closed = false;
}
public long getId() {
return id;
}
public Destination getDestination() {
return destination;
}
BytesMessage createBytesMessage() throws JMSException {
return session.createBytesMessage();
}
private void addedPendingSend() {
if (transactionCapacity > 0 && pendingMsgInTransaction == transactionCapacity) {
throw new IllegalStateException("reached max in-flight transacted sent messages");
}
if (maxPending > 0 && pending == maxPending) {
throw new IllegalStateException("reached max in-flight sent messages");
}
pending++;
pendingMsgInTransaction++;
}
/**
* if {@code true}, a subsequent {@link #trySend} would return {@link SendAttemptResult#Success}.<br>
* Otherwise, a subsequent {@link #trySend} would return {@link SendAttemptResult#NotAvailable}.
*/
private boolean isAvailable() {
if (maxPending > 0 && pending == maxPending) {
return false;
}
return transactionCapacity == 0 || pendingMsgInTransaction != transactionCapacity;
}
public enum SendAttemptResult {
Closing, Closed, NotAvailable, Success
}
public SendAttemptResult trySend(final Message message,
final CompletionListener completionListener,
final Runnable availableObserver) throws JMSException {
if (closing) {
return SendAttemptResult.Closing;
}
if (closed) {
return SendAttemptResult.Closed;
}
if (!isAvailable()) {
availableObservers.add(availableObserver);
orderedIncrementNotAvailable();
return SendAttemptResult.NotAvailable;
}
producer.send(message, completionListener);
orderedIncrementSent();
addedPendingSend();
return SendAttemptResult.Success;
}
public void onSendErrored() {
if (closed) {
return;
}
availableObservers.clear();
closedObservers.forEach(Runnable::run);
closedObservers.clear();
closed = true;
}
public JMSException onSendCompleted() {
if (closed) {
return null;
}
JMSException completionError = null;
orderedIncrementCompleted();
if (transactionCapacity > 0 && completedMsgInTransaction == transactionCapacity) {
throw new IllegalStateException("cannot complete more send");
}
if (pending == 0) {
throw new IllegalStateException("cannot complete more send");
}
pending--;
completedMsgInTransaction++;
if (transactionCapacity > 0) {
if (completedMsgInTransaction == transactionCapacity || (closing && pending == 0)) {
completedMsgInTransaction = 0;
pendingMsgInTransaction = 0;
try {
session.commit();
} catch (final JMSException fatal) {
completionError = fatal;
closing = true;
}
if (closing) {
closing = false;
closed = true;
closedObservers.forEach(Runnable::run);
closedObservers.clear();
} else if (isAvailable()) {
availableObservers.forEach(Runnable::run);
availableObservers.clear();
}
}
} else {
if (closing && pending == 0) {
closing = false;
closed = true;
closedObservers.forEach(Runnable::run);
closedObservers.clear();
} else if (isAvailable()) {
availableObservers.forEach(Runnable::run);
availableObservers.clear();
}
}
return completionError;
}
public long getMessageSent() {
return messageSent;
}
private void orderedIncrementSent() {
MESSAGE_SENT_UPDATER.lazySet(this, messageSent + 1);
}
public long getMessageCompleted() {
return messageCompleted;
}
private void orderedIncrementCompleted() {
MESSAGE_COMPLETED_UPDATER.lazySet(this, messageCompleted + 1);
}
public long getNotAvailable() {
return notAvailable;
}
private void orderedIncrementNotAvailable() {
NOT_AVAILABLE_UPDATER.lazySet(this, notAvailable + 1);
}
public void requestClose() {
requestClose(() -> {
});
}
public void requestClose(final Runnable onClosed) {
if (closed) {
onClosed.run();
return;
}
if (closing) {
closedObservers.add(onClosed);
return;
}
availableObservers.clear();
if (pending > 0) {
closing = true;
closedObservers.add(onClosed);
} else {
closed = true;
onClosed.run();
}
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
public interface BenchmarkService extends AutoCloseable {
BenchmarkService start();
boolean anyError();
boolean isRunning();
@Override
void close();
}

View File

@ -0,0 +1,408 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
import java.io.FileWriter;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.concurrent.TimeUnit;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramLogWriter;
import org.apache.activemq.artemis.json.JsonObjectBuilder;
import org.apache.activemq.artemis.utils.JsonLoader;
public final class LiveStatistics {
public enum ReportInterval {
ns(1),
us(TimeUnit.MICROSECONDS.toNanos(1)),
ms(TimeUnit.MILLISECONDS.toNanos(1)),
sec(TimeUnit.SECONDS.toNanos(1));
public final long nanoseconds;
ReportInterval(long value) {
this.nanoseconds = value;
}
}
private final ProducerLoadGenerator[] producers;
private final Histogram[] waitLatencies;
private final Histogram intervalWaitLatencies;
private final Histogram[] sentLatencies;
private final Histogram intervalSentLatencies;
private final RecordingMessageListener[] listeners;
private final Histogram[] endToEndLatencies;
private final Histogram intervalEndToEndLatencies;
private final RateSampler sentMsg;
private final RateSampler blockedMsg;
private final RateSampler completedMsg;
private final RateSampler receivedMsg;
private final Histogram accumulatedWaitLatencies;
private final Histogram accumulatedSentLatencies;
private final Histogram accumulatedEndToEndLatencies;
private final DecimalFormat jsonLatencyFormat;
private final PaddingDecimalFormat latencyFormat;
private final PaddingDecimalFormat rateFormat;
private final PaddingDecimalFormat countFormat;
private long sampleTime;
private final FileWriter jsonWriter;
private long jsonSamples;
private final HistogramLogWriter latenciesLogWriter;
public LiveStatistics(final String jsonOutput, final String hdrOutput, final ProducerLoadGenerator[] producers, final RecordingMessageListener[] listeners) throws IOException {
if (producers != null && producers.length > 0) {
this.producers = producers;
sentLatencies = new Histogram[producers.length];
intervalSentLatencies = new Histogram(2);
accumulatedSentLatencies = new Histogram(2);
if (producers[0] instanceof ProducerTargetRateLoadGenerator) {
waitLatencies = new Histogram[producers.length];
intervalWaitLatencies = new Histogram(2);
accumulatedWaitLatencies = new Histogram(2);
} else {
waitLatencies = null;
intervalWaitLatencies = null;
accumulatedWaitLatencies = null;
}
sentMsg = RateSampler.of(() -> {
long sum = 0;
for (ProducerLoadGenerator producer : producers) {
sum += producer.getProducer().getMessageSent();
}
return sum;
});
blockedMsg = RateSampler.of(() -> {
long sum = 0;
for (ProducerLoadGenerator producer : producers) {
sum += producer.getProducer().getNotAvailable();
}
return sum;
});
completedMsg = RateSampler.of(() -> {
long sum = 0;
for (ProducerLoadGenerator producer : producers) {
sum += producer.getProducer().getMessageCompleted();
}
return sum;
});
} else {
this.producers = null;
sentLatencies = null;
intervalSentLatencies = null;
accumulatedSentLatencies = null;
waitLatencies = null;
intervalWaitLatencies = null;
accumulatedWaitLatencies = null;
sentMsg = null;
blockedMsg = null;
completedMsg = null;
}
if (listeners != null) {
this.listeners = listeners;
endToEndLatencies = new Histogram[listeners.length];
intervalEndToEndLatencies = new Histogram(2);
accumulatedEndToEndLatencies = new Histogram(2);
receivedMsg = RateSampler.of(() -> {
long sum = 0;
for (RecordingMessageListener listener : listeners) {
sum += listener.getReceivedMessages();
}
return sum;
});
} else {
this.listeners = null;
endToEndLatencies = null;
intervalEndToEndLatencies = null;
accumulatedEndToEndLatencies = null;
receivedMsg = null;
}
this.sampleTime = System.currentTimeMillis();
this.jsonLatencyFormat = new DecimalFormat("0.00");
this.latencyFormat = new PaddingDecimalFormat("0.00", 9);
this.rateFormat = new PaddingDecimalFormat("0", 8);
this.countFormat = new PaddingDecimalFormat("0", 12);
this.jsonSamples = 0;
if (jsonOutput != null) {
this.jsonWriter = new FileWriter(jsonOutput);
this.jsonWriter.write("[\n");
} else {
this.jsonWriter = null;
}
if (hdrOutput != null) {
this.latenciesLogWriter = new HistogramLogWriter(hdrOutput);
this.latenciesLogWriter.outputLogFormatVersion();
this.latenciesLogWriter.outputLegend();
} else {
this.latenciesLogWriter = null;
}
}
private boolean anyFatalError() {
if (producers != null) {
for (ProducerLoadGenerator producer : producers) {
if (producer.getFatalException() != null) {
return true;
}
}
}
if (listeners != null) {
for (RecordingMessageListener listener : listeners) {
if (listener.anyFatalException()) {
return true;
}
}
}
return false;
}
public void sampleMetrics(final boolean warmup) {
final long lastSampleTime = this.sampleTime;
sampleTime = System.currentTimeMillis();
if (receivedMsg != null) {
receivedMsg.run();
}
if (completedMsg != null) {
completedMsg.run();
}
if (blockedMsg != null) {
blockedMsg.run();
}
if (sentMsg != null) {
sentMsg.run();
}
if (endToEndLatencies != null) {
for (int i = 0, size = listeners.length; i < size; i++) {
endToEndLatencies[i] = listeners[i].getReceiveLatencyRecorder().getIntervalHistogram(endToEndLatencies[i]);
}
}
if (sentLatencies != null) {
for (int i = 0, size = producers.length; i < size; i++) {
sentLatencies[i] = producers[i].getSendCompletedLatencies().getIntervalHistogram(sentLatencies[i]);
}
}
if (waitLatencies != null) {
for (int i = 0, size = producers.length; i < size; i++) {
waitLatencies[i] = producers[i].getWaitLatencies().getIntervalHistogram(waitLatencies[i]);
}
}
aggregateLatencies(warmup, lastSampleTime, sampleTime, intervalEndToEndLatencies, endToEndLatencies, accumulatedEndToEndLatencies);
aggregateLatencies(warmup, lastSampleTime, sampleTime, intervalSentLatencies, sentLatencies, accumulatedSentLatencies);
aggregateLatencies(warmup, lastSampleTime, sampleTime, intervalWaitLatencies, waitLatencies, accumulatedWaitLatencies);
}
private static void aggregateLatencies(final boolean warmup,
final long lastSampleTime,
final long sampleTime,
final Histogram intervalSentLatencies,
final Histogram[] sentLatencies,
final Histogram accumulatedSentLatencies) {
if (intervalSentLatencies != null) {
intervalSentLatencies.reset();
intervalSentLatencies.setStartTimeStamp(lastSampleTime);
intervalSentLatencies.setEndTimeStamp(sampleTime);
for (Histogram histogram : sentLatencies) {
intervalSentLatencies.add(histogram);
}
if (!warmup) {
accumulatedSentLatencies.add(intervalSentLatencies);
}
}
}
public void outAtInterval(final boolean warmup, final StringBuilder out, final ReportInterval interval, final boolean includeLatencies) throws IOException {
// space after "true" is to ensure length to be the same as "false": don't remove it!
out.append("\n--- warmup ").append(warmup ? "true " : "false");
appendRateOf(out, "\n--- sent: ", sentMsg, rateFormat, interval, "msg");
appendRateOf(out, "\n--- blocked: ", blockedMsg, rateFormat, interval, "msg");
appendRateOf(out, "\n--- completed:", completedMsg, rateFormat, interval, "msg");
appendRateOf(out, "\n--- received: ", receivedMsg, rateFormat, interval, "msg");
if (includeLatencies) {
outPercentiles(out, "\n--- send delay time:", intervalWaitLatencies, latencyFormat);
outPercentiles(out, "\n--- send ack time: ", intervalSentLatencies, latencyFormat);
outPercentiles(out, "\n--- transfer time: ", intervalEndToEndLatencies, latencyFormat);
}
appendJsonIntervalSampleOnFile(warmup, interval);
appendTaggedHdrHistograms(warmup);
}
private void appendJsonIntervalSampleOnFile(final boolean warmup, final ReportInterval interval) throws IOException {
if (jsonWriter == null) {
return;
}
final JsonObjectBuilder jsonBuilder = JsonLoader.createObjectBuilder();
jsonBuilder.add("sampleType", "interval");
jsonBuilder.add("warmup", warmup);
jsonBuilder.add("time", sampleTime);
addRate(jsonBuilder,"sent", sentMsg, interval);
addRate(jsonBuilder, "delayed", blockedMsg, interval);
addRate(jsonBuilder, "completed", completedMsg, interval);
addRate(jsonBuilder, "received", receivedMsg, interval);
addPercentiles(jsonBuilder, "delaySendTime", intervalWaitLatencies);
addPercentiles(jsonBuilder, "sendTime", intervalSentLatencies);
addPercentiles(jsonBuilder, "transferTime", intervalEndToEndLatencies);
final String jsonSample = jsonBuilder.build().toString();
if (jsonSamples > 0) {
jsonWriter.write(",\n");
}
jsonSamples++;
jsonWriter.write(jsonSample);
jsonWriter.flush();
}
private void appendJsonSummarySampleOnFile(final boolean failedBenchmark) throws IOException {
if (jsonWriter == null) {
return;
}
final JsonObjectBuilder jsonBuilder = JsonLoader.createObjectBuilder();
jsonBuilder.add("sampleType", "summary");
jsonBuilder.add("time", sampleTime);
jsonBuilder.add("result", failedBenchmark ? "fail" : "success");
if (sentMsg != null) {
jsonBuilder.add("totalSent", sentMsg.getLastSample());
}
if (blockedMsg != null) {
jsonBuilder.add("totalBlocked", blockedMsg.getLastSample());
}
if (completedMsg != null) {
jsonBuilder.add("totalCompleted", completedMsg.getLastSample());
}
if (receivedMsg != null) {
jsonBuilder.add("totalReceived", receivedMsg.getLastSample());
}
addPercentiles(jsonBuilder, "totalDelaySendTime", accumulatedWaitLatencies);
addPercentiles(jsonBuilder, "totalSendTime", accumulatedSentLatencies);
addPercentiles(jsonBuilder, "totalTransferTime", accumulatedEndToEndLatencies);
final String jsonSample = jsonBuilder.build().toString();
if (jsonSamples > 0) {
jsonWriter.write(",\n");
}
jsonSamples++;
jsonWriter.write(jsonSample);
jsonWriter.flush();
}
private void appendTaggedHdrHistograms(final boolean warmup) {
if (latenciesLogWriter == null) {
return;
}
if (intervalWaitLatencies != null) {
intervalWaitLatencies.setTag(warmup ? "warmup delay send" : "delay send");
latenciesLogWriter.outputIntervalHistogram(intervalWaitLatencies);
}
if (intervalSentLatencies != null) {
intervalSentLatencies.setTag(warmup ? "warmup send" : "send");
latenciesLogWriter.outputIntervalHistogram(intervalSentLatencies);
}
if (intervalEndToEndLatencies != null) {
intervalEndToEndLatencies.setTag(warmup ? "warmup transfer" : "transfer");
latenciesLogWriter.outputIntervalHistogram(intervalEndToEndLatencies);
}
}
private static JsonObjectBuilder addRate(final JsonObjectBuilder obj,
final String metric,
final RateSampler rate,
final ReportInterval interval) {
if (rate == null) {
return obj;
}
return obj.add(metric, rate.reportRate(interval.nanoseconds));
}
private JsonObjectBuilder addPercentiles(final JsonObjectBuilder obj,
final String metric,
final Histogram distribution) {
if (distribution == null) {
return obj;
}
return obj
.add(metric, JsonLoader.createObjectBuilder()
.add("mean", jsonLatencyFormat.format(distribution.getMean()))
.add("50", distribution.getValueAtPercentile(50.0d))
.add("90", distribution.getValueAtPercentile(90.0d))
.add("99", distribution.getValueAtPercentile(99.0d))
.add("99.9", distribution.getValueAtPercentile(99.9d))
.add("99.99", distribution.getValueAtPercentile(99.99d))
.add("max", distribution.getMaxValue())
.add("count", distribution.getTotalCount()));
}
public void outSummary(final StringBuilder out) throws IOException {
out.append("\n--- SUMMARY");
final boolean failedBenchmark = anyFatalError();
out.append("\n--- result: ").append(failedBenchmark ? " fail" : " success");
if (sentMsg != null) {
out.append("\n--- total sent: ").append(countFormat.format(sentMsg.getLastSample()));
}
if (blockedMsg != null) {
out.append("\n--- total blocked: ").append(countFormat.format(blockedMsg.getLastSample()));
}
if (completedMsg != null) {
out.append("\n--- total completed:").append(countFormat.format(completedMsg.getLastSample()));
}
if (receivedMsg != null) {
out.append("\n--- total received: ").append(countFormat.format(receivedMsg.getLastSample()));
}
outPercentiles(out, "\n--- aggregated delay send time:", accumulatedWaitLatencies, latencyFormat);
outPercentiles(out, "\n--- aggregated send time: ", accumulatedSentLatencies, latencyFormat);
outPercentiles(out, "\n--- aggregated transfer time: ", accumulatedEndToEndLatencies, latencyFormat);
appendJsonSummarySampleOnFile(failedBenchmark);
}
public void close() throws IOException {
if (jsonWriter != null) {
jsonWriter.write("\n]");
jsonWriter.close();
}
if (latenciesLogWriter != null) {
latenciesLogWriter.close();
}
}
private static void outPercentiles(final StringBuilder out,
final String metric,
final Histogram histogram,
final DecimalFormat latencyFormat) {
if (histogram == null) {
return;
}
out.append(' ').append(metric);
out.append(' ').append("mean: ").append(latencyFormat.format(histogram.getMean())).append(" us");
out.append(" - ").append("50.00%: ").append(latencyFormat.format(histogram.getValueAtPercentile(50.0d))).append(" us");
out.append(" - ").append("90.00%: ").append(latencyFormat.format(histogram.getValueAtPercentile(90.0d))).append(" us");
out.append(" - ").append("99.00%: ").append(latencyFormat.format(histogram.getValueAtPercentile(99.0d))).append(" us");
out.append(" - ").append("99.90%: ").append(latencyFormat.format(histogram.getValueAtPercentile(99.9d))).append(" us");
out.append(" - ").append("99.99%: ").append(latencyFormat.format(histogram.getValueAtPercentile(99.99d))).append(" us");
out.append(" - ").append("max: ").append(latencyFormat.format(histogram.getMaxValue())).append(" us");
}
private static StringBuilder appendRateOf(final StringBuilder out,
final String metric,
final RateSampler sampler,
final DecimalFormat rateFormat,
final ReportInterval outInterval,
final String unit) {
if (sampler == null) {
return out;
}
return out.append(' ').append(metric)
.append(' ').append(rateFormat.format(sampler.reportRate(outInterval.nanoseconds)))
.append(' ').append(unit).append('/').append(outInterval.name());
}
}

View File

@ -0,0 +1,305 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.HdrHistogram.SingleWriterRecorder;
public final class MessageListenerBenchmark implements BenchmarkService {
private final ConnectionFactory factory;
private final MicrosTimeProvider timeProvider;
private final int consumers;
private final boolean canDelaySetMessageCount;
private final int connections;
private final String clientID;
private final Destination[] destinations;
private final int sharedSubscription;
private final boolean durableSubscription;
private final long messageCount;
private final boolean transaction;
private Set<Connection> jmsConnections;
private boolean started;
private boolean closed;
private MessageCountLimiter msgCountLimiter;
private List<RecordingMessageListener> listeners;
private AtomicBoolean fatalException;
private List<Runnable> silentUnsubscribe;
public static final class MessageCountLimiter {
private volatile long messageLimit = Long.MAX_VALUE;
private final LongAdder totalMessagesReceived;
MessageCountLimiter() {
totalMessagesReceived = new LongAdder();
}
public MessageCountLimiter setMessageLimit(final long messageLimit) {
this.messageLimit = messageLimit;
return this;
}
public boolean isLimitReached() {
return totalMessagesReceived.sum() >= messageLimit;
}
public void onMessageReceived() {
totalMessagesReceived.increment();
}
}
public MessageListenerBenchmark(final ConnectionFactory factory,
final MicrosTimeProvider timeProvider,
final int consumers,
final long messageCount,
final int connections,
final String clientID,
final Destination[] destinations,
final boolean transaction,
final int sharedSubscription,
final boolean durableSubscription,
final boolean canDelayMessageCount) {
this.factory = factory;
this.timeProvider = timeProvider;
this.consumers = consumers;
this.messageCount = messageCount;
this.connections = connections;
this.clientID = clientID;
this.destinations = destinations;
this.transaction = transaction;
this.sharedSubscription = sharedSubscription;
this.durableSubscription = durableSubscription;
this.started = false;
this.closed = false;
this.jmsConnections = new HashSet<>(connections);
this.canDelaySetMessageCount = canDelayMessageCount;
this.listeners = null;
this.fatalException = null;
this.silentUnsubscribe = null;
}
public synchronized RecordingMessageListener[] getListeners() {
return listeners == null ? null : listeners.toArray(new RecordingMessageListener[listeners.size()]);
}
@Override
public synchronized boolean anyError() {
if (fatalException == null) {
return false;
}
return fatalException.get();
}
@Override
public synchronized boolean isRunning() {
if (!started || closed) {
return false;
}
if (fatalException.get()) {
return false;
}
if (msgCountLimiter == null) {
return true;
}
return !msgCountLimiter.isLimitReached();
}
public synchronized void setMessageCount(final long messageCount) {
if (!started || closed) {
return;
}
msgCountLimiter.setMessageLimit(messageCount);
}
@Override
public synchronized MessageListenerBenchmark start() {
if (started) {
return this;
}
started = true;
closed = false;
final AtomicLong consumerId = new AtomicLong(1);
// setup connection failure listeners
final AtomicBoolean signalBrokenConnection = new AtomicBoolean(false);
fatalException = signalBrokenConnection;
// create connections upfront and register failure listener
final Connection[] jmsConnections = new Connection[connections];
for (int i = 0; i < connections; i++) {
final Connection connection;
try {
connection = factory.createConnection();
if (clientID != null) {
if (connections > 1) {
connection.setClientID(clientID + i);
} else {
connection.setClientID(clientID);
}
}
connection.setExceptionListener(ignore -> {
signalBrokenConnection.set(true);
});
jmsConnections[i] = connection;
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
this.jmsConnections.addAll(Arrays.asList(jmsConnections));
// start connections
this.jmsConnections.forEach(connection -> {
try {
connection.start();
} catch (JMSException e) {
throw new RuntimeException(e);
}
});
int connectionSequence = 0;
final int totalListeners = consumers * destinations.length * Math.max(sharedSubscription, 1);
this.listeners = new ArrayList<>(totalListeners);
if (messageCount > 0) {
msgCountLimiter = new MessageCountLimiter().setMessageLimit(messageCount);
} else if (canDelaySetMessageCount) {
msgCountLimiter = new MessageCountLimiter();
}
// create consumers per destination
if (durableSubscription) {
silentUnsubscribe = new ArrayList<>();
}
for (int i = 0; i < destinations.length; i++) {
final Destination destination = destinations[i];
if (sharedSubscription == 0) {
final Queue<RecordingMessageListener> destinationListeners = new ArrayDeque<>(consumers);
createListeners(destinationListeners, consumerId, destination, consumers);
listeners.addAll(destinationListeners);
try {
for (int consumerIndex = 0; consumerIndex < consumers; consumerIndex++) {
final Connection connection = jmsConnections[connectionSequence % connections];
connectionSequence++;
final Session session = connection.createSession(transaction ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer;
if (durableSubscription) {
final Topic topic = (Topic) destination;
consumer = session.createDurableConsumer((Topic) destination, topic.getTopicName() + consumerIndex);
} else {
consumer = session.createConsumer(destination);
}
consumer.setMessageListener(destinationListeners.remove());
}
} catch (JMSException e) {
throw new RuntimeException(e);
}
} else {
final int listenersPerDestination = sharedSubscription * consumers;
final Queue<RecordingMessageListener> destinationListeners = new ArrayDeque<>(listenersPerDestination);
createListeners(destinationListeners, consumerId, destination, listenersPerDestination);
listeners.addAll(destinationListeners);
try {
final String topicName = ((Topic) destination).getTopicName();
for (int subscriptionIndex = 0; subscriptionIndex < sharedSubscription; subscriptionIndex++) {
Connection connection = null;
if (clientID != null) {
connection = jmsConnections[connectionSequence % connections];
assert connection.getClientID() != null;
connectionSequence++;
}
for (int consumerIndex = 0; consumerIndex < consumers; consumerIndex++) {
if (clientID == null) {
assert connection == null;
connection = jmsConnections[connectionSequence % connections];
connectionSequence++;
}
final Session session = connection.createSession(transaction ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer;
if (durableSubscription) {
final String subscriptionName = topicName + subscriptionIndex;
consumer = session.createSharedDurableConsumer((Topic) destination, subscriptionName);
silentUnsubscribe.add(() -> {
try {
session.unsubscribe(subscriptionName);
} catch (JMSException e) {
throw new RuntimeException(e);
}
});
} else {
consumer = session.createSharedConsumer((Topic) destination, topicName + subscriptionIndex);
}
consumer.setMessageListener(destinationListeners.remove());
}
}
} catch (JMSException fatal) {
throw new RuntimeException(fatal);
}
}
}
return this;
}
private void createListeners(final Collection<? super RecordingMessageListener> listeners,
final AtomicLong consumerId,
final Destination destination,
final int count) {
for (int c = 0; c < count; c++) {
listeners.add(new RecordingMessageListener(consumerId.getAndIncrement(), destination, transaction,
new AtomicLong(0),
msgCountLimiter == null ? null : msgCountLimiter::onMessageReceived,
timeProvider, new SingleWriterRecorder(2),
fatalException));
}
}
@Override
public synchronized void close() {
if (!started || closed) {
return;
}
listeners = null;
started = false;
closed = true;
msgCountLimiter = null;
fatalException = null;
if (silentUnsubscribe != null) {
silentUnsubscribe.forEach(Runnable::run);
silentUnsubscribe = null;
}
jmsConnections.forEach(connection -> {
try {
connection.close();
} catch (JMSException ignore) {
}
});
jmsConnections.clear();
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
public class MessageListenerBenchmarkBuilder {
private ConnectionFactory factory;
private MicrosTimeProvider timeProvider;
private int consumers;
private long messageCount;
private int connections;
private String clientID;
private Destination[] destinations;
private boolean transaction;
private int sharedSubscription;
private boolean durableSubscription;
private boolean canDelayMessageCount;
public MessageListenerBenchmarkBuilder setFactory(final ConnectionFactory factory) {
this.factory = factory;
return this;
}
public MessageListenerBenchmarkBuilder setTimeProvider(final MicrosTimeProvider timeProvider) {
this.timeProvider = timeProvider;
return this;
}
public MessageListenerBenchmarkBuilder setConsumers(final int consumers) {
this.consumers = consumers;
return this;
}
public MessageListenerBenchmarkBuilder setMessageCount(final long messageCount) {
this.messageCount = messageCount;
return this;
}
public MessageListenerBenchmarkBuilder setConnections(final int connections) {
this.connections = connections;
return this;
}
public MessageListenerBenchmarkBuilder setClientID(final String clientID) {
this.clientID = clientID;
return this;
}
public MessageListenerBenchmarkBuilder setDestinations(final Destination[] destinations) {
this.destinations = destinations;
return this;
}
public MessageListenerBenchmarkBuilder setTransacted(final boolean transacted) {
this.transaction = transacted;
return this;
}
public MessageListenerBenchmarkBuilder setSharedSubscription(final int sharedSubscription) {
this.sharedSubscription = sharedSubscription;
return this;
}
public MessageListenerBenchmarkBuilder setDurableSubscription(final boolean durableSubscription) {
this.durableSubscription = durableSubscription;
return this;
}
public MessageListenerBenchmarkBuilder setCanDelayMessageCount(final boolean canDelayMessageCount) {
this.canDelayMessageCount = canDelayMessageCount;
return this;
}
public MessageListenerBenchmark createMessageListenerBenchmark() {
return new MessageListenerBenchmark(factory, timeProvider, consumers, messageCount, connections, clientID,
destinations, transaction, sharedSubscription, durableSubscription,
canDelayMessageCount);
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
import java.util.concurrent.TimeUnit;
public class MicrosClock {
// no need for volatile here
private static long offset = -1;
private static long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
private static final boolean AVAILABLE = checkAvailable();
private static boolean checkAvailable() {
try {
final long now = now();
if (now < 0) {
return false;
}
return true;
} catch (Throwable t) {
return false;
}
}
public static boolean isAvailable() {
return AVAILABLE;
}
public static long now() {
long epochSecond = offset;
long nanoAdjustment = jdk.internal.misc.VM.getNanoTimeAdjustment(epochSecond);
if (nanoAdjustment == -1) {
epochSecond = System.currentTimeMillis() / 1000 - 1024;
nanoAdjustment = jdk.internal.misc.VM.getNanoTimeAdjustment(epochSecond);
if (nanoAdjustment == -1) {
throw new InternalError("Offset " + epochSecond + " is not in range");
} else {
offset = epochSecond;
}
}
final long secs = Math.addExact(epochSecond, Math.floorDiv(nanoAdjustment, NANOS_PER_SECOND));
final long secsInUs = TimeUnit.SECONDS.toMicros(secs);
final long nsOffset = (int) Math.floorMod(nanoAdjustment, NANOS_PER_SECOND);
final long usOffset = TimeUnit.NANOSECONDS.toMicros(nsOffset);
return secsInUs + usOffset;
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
@FunctionalInterface
public interface MicrosTimeProvider {
long now();
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.text.FieldPosition;
public class PaddingDecimalFormat extends DecimalFormat {
private int minimumLength;
private final StringBuilder pad;
/**
* Creates a PaddingDecimalFormat using the given pattern and minimum {@code minLength} and the symbols for the default
* locale.
*/
public PaddingDecimalFormat(String pattern, int minLength) {
super(pattern);
minimumLength = minLength;
pad = new StringBuilder();
}
/**
* Creates a PaddingDecimalFormat using the given pattern, symbols and minimum minimumLength.
*/
public PaddingDecimalFormat(String pattern, DecimalFormatSymbols symbols, int minLength) {
super(pattern, symbols);
minimumLength = minLength;
pad = new StringBuilder();
}
@Override
public StringBuffer format(double number, StringBuffer toAppendTo, FieldPosition pos) {
int initLength = toAppendTo.length();
super.format(number, toAppendTo, pos);
return pad(toAppendTo, initLength);
}
@Override
public StringBuffer format(long number, StringBuffer toAppendTo, FieldPosition pos) {
int initLength = toAppendTo.length();
super.format(number, toAppendTo, pos);
return pad(toAppendTo, initLength);
}
private StringBuffer pad(StringBuffer toAppendTo, int initLength) {
int numLength = toAppendTo.length() - initLength;
int padLength = minimumLength - numLength;
if (padLength > 0) {
final int initialPadLength = pad.length();
for (int i = initialPadLength; i < padLength; i++) {
pad.append(' ');
}
pad.setLength(padLength);
toAppendTo.insert(initLength, pad);
}
return toAppendTo;
}
}

View File

@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoop;
import org.apache.activemq.artemis.cli.commands.ActionContext;
@Command(name = "client", description = "It will produce and consume messages to a broker instance")
public class PerfClientCommand extends PerfCommand {
@Option(name = "--tx", description = "Perform Message::acknowledge per each message received (Default: disabled)")
protected boolean transaction;
@Option(name = "--shared", description = "Create shared subscription (Default: 0)")
protected int sharedSubscription = 0;
@Option(name = "--durable", description = "Enabled durable subscription (Default: disabled)")
protected boolean durableSubscription = false;
@Option(name = "--consumer-connections", description = "Number of consumer connections to be used. Default is same as the total number of consumers")
protected int consumerConnections = 0;
@Option(name = "--consumers", description = "Number of consumer to use for each generated destination (Default: 1)")
protected int consumersPerDestination = 1;
@Option(name = "--persistent", description = "It will send messages persistently. Default is non persistent")
protected boolean persistent = false;
@Option(name = "--message-size", description = "Size of each byteMessage (Default is 1024)")
protected int messageSize = 1024;
@Option(name = "--rate", description = "Expected total message rate. (Default is unbounded)")
protected Long rate = null;
@Option(name = "--ttl", description = "TTL for each message")
protected long ttl = 0L;
@Option(name = "--group", description = "Message Group to be used")
protected String msgGroupID = null;
@Option(name = "--shared-connections", description = "It create --threads shared connections among producers (Default: not shared)")
protected boolean sharedConnections = false;
@Option(name = "--tx-size", description = "TX Size")
protected long txSize;
@Option(name = "--producers", description = "Number of producers to use for each generated destination (Default: 1)")
protected int producersPerDestination = 1;
@Option(name = "--threads", description = "Number of worker threads to schedule producer load tasks (Default: 1)")
protected int threads = 1;
@Option(name = "--max-pending", description = "How many not yet completed messages can exists (Default is 1)")
protected long maxPending = 1;
@Option(name = "--consumer-url", description = "Setup the url used for MessageListener(s) connections. Default is same as --url")
protected String consumerUrl = null;
@Option(name = "--consumer-protocol", description = "Setup the protocol used for MessageListener(s) connections. Default is same as --protocol")
protected String consumerProtocol = null;
@Option(name = "--enable-msg-id", description = "Enable setting JMS messageID per-message (Default: disabled)")
protected boolean enableMessageID;
@Option(name = "--enable-timestamp", description = "Enable setting JMS timestamp per-message (Default: disabled)")
protected boolean enableTimestamp;
private volatile BenchmarkService producerBenchmark;
@Override
protected void onExecuteBenchmark(final ConnectionFactory producerConnectionFactory, final Destination[] jmsDestinations, final ActionContext context) throws Exception {
final String listenerProtocol = this.consumerProtocol != null ? this.consumerProtocol : getProtocol();
final String listenerUrl = this.consumerUrl != null ? this.consumerUrl : brokerURL;
final ConnectionFactory consumerConnectionFactory = createConnectionFactory(listenerUrl, user, password, null, listenerProtocol);
if (consumerConnections == 0) {
if (sharedSubscription > 0) {
if (getClientID() == null) {
consumerConnections = sharedSubscription * consumersPerDestination * jmsDestinations.length;
} else {
consumerConnections = sharedSubscription * jmsDestinations.length;
}
} else {
consumerConnections = consumersPerDestination * jmsDestinations.length;
}
}
final int totalProducers = producersPerDestination * jmsDestinations.length;
if (threads >= totalProducers) {
if (threads > totalProducers) {
context.err.println("Doesn't make sense to set workers > producers: auto-adjusting it to be the same as the producer count");
threads = totalProducers;
}
}
boolean warmingUp = warmup != 0;
final LiveStatistics statistics;
final StringBuilder skratchBuffer = new StringBuilder();
try (MessageListenerBenchmark consumerBenchmark = new MessageListenerBenchmarkBuilder()
.setClientID(getClientID())
.setDestinations(consumerProtocol != null ? lookupDestinations(consumerConnectionFactory) : jmsDestinations)
.setFactory(consumerConnectionFactory)
.setTransacted(transaction)
.setConsumers(consumersPerDestination)
.setConnections(consumerConnections)
.setTimeProvider(() -> TimeUnit.NANOSECONDS.toMicros(System.nanoTime()))
.setCanDelayMessageCount(true)
.setSharedSubscription(sharedSubscription)
.setDurableSubscription(durableSubscription)
.createMessageListenerBenchmark()) {
final DefaultEventLoopGroup eventLoopGroup = new DefaultEventLoopGroup(threads) {
@Override
protected EventLoop newChild(final Executor executor, final Object... args) {
return new DefaultEventLoop(this, executor) {
@Override
protected Queue<Runnable> newTaskQueue(final int maxPendingTasks) {
return new LinkedTransferQueue<>();
}
};
}
};
try (ProducerBenchmark producerBenchmark = new ProducerBenchmarkBuilder()
.setPersistent(persistent)
.setDestinations(jmsDestinations)
.setFactory(producerConnectionFactory)
.setTtl(ttl)
.setTransactionCapacity(txSize)
.setGroup(msgGroupID)
.setProducers(producersPerDestination)
.setMessageRate(rate)
.setMessageCount(messageCount)
.setMessageSize(messageSize)
.setTimeProvider(() -> TimeUnit.NANOSECONDS.toMicros(System.nanoTime()))
.setLoopGroup(eventLoopGroup)
.setMaxPending(maxPending)
.setSharedConnections(sharedConnections)
.setEnableMessageID(enableMessageID)
.setEnableTimestamp(enableTimestamp)
.createProducerBenchmark()) {
this.producerBenchmark = producerBenchmark;
consumerBenchmark.start();
producerBenchmark.start();
final long now = System.currentTimeMillis();
final long endWarmup = warmup > 0 ? now + TimeUnit.SECONDS.toMillis(warmup) : 0;
final long end = duration > 0 ? now + TimeUnit.SECONDS.toMillis(duration) : 0;
statistics = new LiveStatistics(reportFileName, hdrFileName, producerBenchmark.getGenerators(), consumerBenchmark.getListeners());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
warmingUp = collectAndReportStatisticsWhileRunning(warmingUp, statistics, skratchBuffer, endWarmup, end, producerBenchmark);
final boolean producerFatalError = producerBenchmark.anyError();
producerBenchmark.asyncClose();
if (!producerFatalError) {
consumerBenchmark.setMessageCount(producerBenchmark.expectedTotalMessageCountToReceive(sharedSubscription, consumersPerDestination));
// we don't care about duration here, but just on the expected messages to receive
warmingUp = collectAndReportStatisticsWhileRunning(warmingUp, statistics, skratchBuffer, endWarmup, 0, consumerBenchmark);
}
}
// last sample must be collected while the whole benchmark is complete
statistics.sampleMetrics(warmingUp);
skratchBuffer.setLength(0);
statistics.outSummary(skratchBuffer);
if (!isSilentInput()) {
context.out.println(skratchBuffer);
}
eventLoopGroup.shutdownGracefully();
statistics.close();
}
}
@Override
protected void onInterruptBenchmark() {
final BenchmarkService benchmark = this.producerBenchmark;
if (benchmark != null) {
benchmark.close();
}
}
}

View File

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Session;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import io.airlift.airline.Arguments;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.messages.ConnectionAbstract;
import org.apache.activemq.artemis.cli.commands.messages.DestAbstract;
import static java.util.Collections.singletonList;
public abstract class PerfCommand extends ConnectionAbstract {
@Option(name = "--show-latency", description = "Show latencies at interval on output (Default is disabled)")
protected boolean showLatency = false;
@Option(name = "--json", description = "Report file name (Default is none)")
protected String reportFileName = null;
@Option(name = "--hdr", description = "HDR Histogram Report file name (Default is none)")
protected String hdrFileName = null;
@Option(name = "--duration", description = "Test duration in seconds (Default: 0)")
protected int duration = 0;
@Option(name = "--warmup", description = "Warmup in seconds (Default: 0)")
protected int warmup = 0;
@Option(name = "--message-count", description = "Total number of messages (Default: 0)")
protected long messageCount = 0;
@Option(name = "--num-destinations", description = "If present, generate --num-destinations for each destination name, using it as a prefix and adding a number [0,--num-destinations) as suffix. (Default: none)")
protected int numDestinations = 1;
@Arguments(description = "List of destination names. Each name can be prefixed with queue:// or topic:// and can be an FQQN in the form of <address>::<queue>. (Default: queue://TEST)")
protected List<String> destinations;
private final CountDownLatch completed = new CountDownLatch(1);
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
final ConnectionFactory factory = createConnectionFactory(brokerURL, user, password, null, getProtocol());
final Destination[] jmsDestinations = lookupDestinations(factory, destinations, numDestinations);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
onInterruptBenchmark();
try {
completed.await();
} catch (InterruptedException ignored) {
}
}));
try {
onExecuteBenchmark(factory, jmsDestinations, context);
} finally {
completed.countDown();
}
return null;
}
protected abstract void onExecuteBenchmark(ConnectionFactory factory,
Destination[] jmsDestinations,
ActionContext context) throws Exception;
protected abstract void onInterruptBenchmark();
protected boolean collectAndReportStatisticsWhileRunning(boolean warmingUp,
final LiveStatistics statistics,
final StringBuilder skratchBuffer,
final long endWarmup,
final long end,
final BenchmarkService benchmark) throws IOException {
while (benchmark.isRunning()) {
if (end != 0) {
final long tick = System.currentTimeMillis();
if (tick - end >= 0) {
break;
}
}
if (endWarmup != 0 && warmingUp) {
final long tick = System.currentTimeMillis();
if (tick - endWarmup >= 0) {
warmingUp = false;
}
}
statistics.sampleMetrics(warmingUp);
skratchBuffer.setLength(0);
statistics.outAtInterval(warmingUp, skratchBuffer, LiveStatistics.ReportInterval.sec, showLatency);
if (!isSilentInput()) {
context.out.println(skratchBuffer);
}
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
}
return warmingUp;
}
protected final Destination[] lookupDestinations(final ConnectionFactory factory) throws Exception {
return lookupDestinations(factory, destinations, numDestinations);
}
private static Destination[] lookupDestinations(final ConnectionFactory factory,
final List<String> destinations,
final int numDestinations) throws Exception {
final List<String> destinationNames;
if (destinations == null || destinations.isEmpty()) {
destinationNames = singletonList("queue://TEST");
} else {
destinationNames = destinations;
}
final Destination[] jmsDestinations = new Destination[destinationNames.size() * numDestinations];
try (Connection connection = factory.createConnection();
Session session = connection.createSession()) {
int i = 0;
for (String destinationName : destinationNames) {
if (numDestinations == 1) {
jmsDestinations[i] = DestAbstract.getDestination(session, destinationName);
i++;
} else {
for (int suffix = 0; suffix < numDestinations; suffix++) {
jmsDestinations[i] = DestAbstract.getDestination(session, destinationName + suffix);
i++;
}
}
}
}
return jmsDestinations;
}
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.cli.commands.ActionContext;
@Command(name = "consumer", description = "It will consume messages from a broker instance")
public class PerfConsumerCommand extends PerfCommand {
@Option(name = "--tx", description = "Perform Message::acknowledge per each message received (Default: disabled)")
protected boolean transaction;
@Option(name = "--shared", description = "Create shared subscription (Default: 0)")
protected int sharedSubscription = 0;
@Option(name = "--durable", description = "Enabled durable subscription (Default: disabled)")
protected boolean durableSubscription = false;
@Option(name = "--num-connections", description = "Number of connections to be used. Default is same as the total number of consumers")
protected int connections = 0;
@Option(name = "--consumers", description = "Number of consumer to use for each generated destination (Default: 1)")
protected int consumersPerDestination = 1;
private BenchmarkService benchmark;
@Override
protected void onExecuteBenchmark(final ConnectionFactory factory,
final Destination[] jmsDestinations,
final ActionContext context) throws Exception {
MicrosTimeProvider timeProvider = () -> TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis());
if (MicrosClock.isAvailable()) {
timeProvider = MicrosClock::now;
} else {
context.err.println("Microseconds wall-clock time not available: using System::currentTimeMillis. Add --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED to the JVM parameters to enable it.");
}
if (connections == 0) {
if (sharedSubscription > 0) {
if (clientID == null) {
connections = sharedSubscription * consumersPerDestination * jmsDestinations.length;
} else {
connections = sharedSubscription * jmsDestinations.length;
}
} else {
connections = consumersPerDestination * jmsDestinations.length;
}
}
boolean warmingUp = warmup != 0;
final StringBuilder skratchBuffer = new StringBuilder();
final LiveStatistics statistics;
try (MessageListenerBenchmark benchmark = new MessageListenerBenchmarkBuilder()
.setClientID(getClientID())
.setDestinations(jmsDestinations)
.setFactory(factory)
.setTransacted(transaction)
.setConsumers(consumersPerDestination)
.setMessageCount(messageCount)
.setConnections(connections)
.setTimeProvider(timeProvider)
.setSharedSubscription(sharedSubscription)
.setDurableSubscription(durableSubscription)
.createMessageListenerBenchmark()) {
this.benchmark = benchmark;
benchmark.start();
final long now = System.currentTimeMillis();
final long endWarmup = warmup > 0 ? now + TimeUnit.SECONDS.toMillis(warmup) : 0;
final long end = duration > 0 ? now + TimeUnit.SECONDS.toMillis(duration) : 0;
statistics = new LiveStatistics(reportFileName, hdrFileName, null, benchmark.getListeners());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
warmingUp = collectAndReportStatisticsWhileRunning(warmingUp, statistics, skratchBuffer, endWarmup, end, benchmark);
}
// last sample must be collected while the whole benchmark is complete
statistics.sampleMetrics(warmingUp);
skratchBuffer.setLength(0);
statistics.outSummary(skratchBuffer);
if (!isSilentInput()) {
context.out.println(skratchBuffer);
}
statistics.close();
}
@Override
protected void onInterruptBenchmark() {
final BenchmarkService benchmark = this.benchmark;
if (benchmark != null) {
benchmark.close();
}
}
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoop;
import org.apache.activemq.artemis.cli.commands.ActionContext;
@Command(name = "producer", description = "It will send messages to a broker instance")
public class PerfProducerCommand extends PerfCommand {
@Option(name = "--persistent", description = "It will send messages persistently. Default is non persistent")
protected boolean persistent = false;
@Option(name = "--message-size", description = "Size of each byteMessage (Default is 1024)")
protected int messageSize = 1024;
@Option(name = "--rate", description = "Expected total message rate. (Default is unbounded)")
protected Long rate = null;
@Option(name = "--ttl", description = "TTL for each message")
protected long ttl = 0L;
@Option(name = "--group", description = "Message Group to be used")
protected String msgGroupID = null;
@Option(name = "--shared-connections", description = "It create --threads shared connections among producers (Default: not shared)")
protected boolean sharedConnections = false;
@Option(name = "--tx-size", description = "TX Size")
protected long txSize;
@Option(name = "--producers", description = "Number of producers to use for each generated destination (Default: 1)")
protected int producersPerDestination = 1;
@Option(name = "--threads", description = "Number of worker threads to schedule producer load tasks (Default: 1)")
protected int threads = 1;
@Option(name = "--max-pending", description = "How many not yet completed messages can exists (Default is 1)")
protected long maxPending = 1;
@Option(name = "--enable-msg-id", description = "Enable setting JMS messageID per-message (Default: disabled)")
protected boolean enableMessageID;
@Option(name = "--enable-timestamp", description = "Enable setting JMS timestamp per-message (Default: disabled)")
protected boolean enableTimestamp;
protected volatile BenchmarkService benchmark;
@Override
protected void onExecuteBenchmark(final ConnectionFactory factory,
final Destination[] jmsDestinations,
final ActionContext context) throws Exception {
if (getClientID() != null) {
context.err.println("ClientID configuration is not supported");
}
MicrosTimeProvider timeProvider = () -> TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis());
if (MicrosClock.isAvailable()) {
timeProvider = MicrosClock::now;
} else {
context.err.println("Microseconds wall-clock time not available: using System::currentTimeMillis. Add --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED to the JVM parameters to enable it.");
}
final int totalProducers = producersPerDestination * jmsDestinations.length;
if (threads >= totalProducers) {
if (threads > totalProducers) {
context.err.println("Doesn't make sense to set workers > producers: auto-adjusting it to be the same as the producer count");
threads = totalProducers;
}
}
final DefaultEventLoopGroup eventLoopGroup = new DefaultEventLoopGroup(threads) {
@Override
protected EventLoop newChild(final Executor executor, final Object... args) {
return new DefaultEventLoop(this, executor) {
@Override
protected Queue<Runnable> newTaskQueue(final int maxPendingTasks) {
return new LinkedTransferQueue<>();
}
};
}
};
boolean warmingUp = warmup != 0;
final LiveStatistics statistics;
final StringBuilder skratchBuffer = new StringBuilder();
try (ProducerBenchmark benchmark = new ProducerBenchmarkBuilder()
.setPersistent(persistent)
.setDestinations(jmsDestinations)
.setFactory(factory)
.setTtl(ttl)
.setTransactionCapacity(txSize)
.setGroup(msgGroupID)
.setProducers(producersPerDestination)
.setMessageRate(rate)
.setMessageCount(messageCount)
.setMessageSize(messageSize)
.setTimeProvider(timeProvider)
.setLoopGroup(eventLoopGroup)
.setMaxPending(maxPending)
.setSharedConnections(sharedConnections)
.setEnableMessageID(enableMessageID)
.setEnableTimestamp(enableTimestamp)
.createProducerBenchmark()) {
this.benchmark = benchmark;
benchmark.start();
final long now = System.currentTimeMillis();
final long endWarmup = warmup > 0 ? now + TimeUnit.SECONDS.toMillis(warmup) : 0;
final long end = duration > 0 ? now + TimeUnit.SECONDS.toMillis(duration) : 0;
statistics = new LiveStatistics(reportFileName, hdrFileName, benchmark.getGenerators(), null);
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
warmingUp = collectAndReportStatisticsWhileRunning(warmingUp, statistics, skratchBuffer, endWarmup, end, benchmark);
}
// last sample must be collected while the whole benchmark is complete
statistics.sampleMetrics(warmingUp);
skratchBuffer.setLength(0);
statistics.outSummary(skratchBuffer);
if (!isSilentInput()) {
context.out.println(skratchBuffer);
}
eventLoopGroup.shutdownGracefully();
statistics.close();
}
@Override
protected void onInterruptBenchmark() {
final BenchmarkService benchmark = this.benchmark;
if (benchmark != null) {
benchmark.close();
}
}
}

View File

@ -0,0 +1,341 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.stream.Stream;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import org.HdrHistogram.SingleWriterRecorder;
import org.apache.activemq.artemis.api.core.ObjLongPair;
public final class ProducerBenchmark implements BenchmarkService {
private final ConnectionFactory factory;
private final MicrosTimeProvider timeProvider;
private final EventLoopGroup eventLoopGroup;
private final int producers;
private final long messageCount;
private final String group;
private final long ttl;
private final int messageSize;
private final Destination[] destinations;
private final boolean persistent;
private final long maxPending;
private final long transactionCapacity;
private final Long messageRate;
private final boolean sharedConnections;
private final boolean enableTimestamp;
private final boolean enableMessageID;
private Set<Connection> connections;
private ProducerLoadGenerator[] generators;
private boolean started;
private boolean closed;
private final Map<Destination, List<AsyncJms2ProducerFacade>> producersPerDestination;
private CompletableFuture<?> allGeneratorClosed;
public ProducerBenchmark(final ConnectionFactory factory,
final MicrosTimeProvider timeProvider,
final EventLoopGroup loopGroup,
final int producers,
final long messageCount,
final boolean sharedConnections,
final String group,
final long ttl,
final int messageSize,
final Destination[] destinations,
final boolean persistent,
final long maxPending,
final long transactionCapacity,
final Long messageRate,
final boolean enableMessageID,
final boolean enableTimestamp) {
this.factory = factory;
this.timeProvider = timeProvider;
this.eventLoopGroup = loopGroup;
this.producers = producers;
this.messageCount = messageCount;
this.sharedConnections = sharedConnections;
this.group = group;
this.ttl = ttl;
this.messageSize = messageSize;
this.destinations = destinations;
this.persistent = persistent;
this.maxPending = maxPending;
this.transactionCapacity = transactionCapacity;
this.messageRate = messageRate;
this.started = false;
this.closed = false;
this.connections = new HashSet<>();
this.producersPerDestination = new HashMap<>(destinations.length);
this.enableMessageID = enableMessageID;
this.enableTimestamp = enableTimestamp;
}
private synchronized Stream<ObjLongPair<Destination>> messageSentPerDestination() {
return producersPerDestination.entrySet().stream()
.map(producers -> new ObjLongPair(producers.getKey(),
producers.getValue().stream()
.mapToLong(AsyncJms2ProducerFacade::getMessageSent).sum()));
}
public synchronized long expectedTotalMessageCountToReceive(final int sharedSubscriptions,
final int consumersPerDestination) {
return expectedTotalMessageCountToReceive(messageSentPerDestination(), sharedSubscriptions, consumersPerDestination);
}
public static long expectedTotalMessageCountToReceive(final Stream<ObjLongPair<Destination>> messageSentPerDestination,
final int sharedSubscriptions,
final int consumersPerDestination) {
return messageSentPerDestination.mapToLong(messagesPerDestination -> {
if (messagesPerDestination.getA() instanceof Topic) {
final int subscribers = sharedSubscriptions > 0 ? sharedSubscriptions : consumersPerDestination;
return subscribers * messagesPerDestination.getB();
}
assert messagesPerDestination.getA() instanceof Queue;
return messagesPerDestination.getB();
}).sum();
}
public synchronized ProducerLoadGenerator[] getGenerators() {
return generators;
}
@Override
public synchronized boolean anyError() {
if (!started || closed) {
return false;
}
for (ProducerLoadGenerator loadGenerator : generators) {
if (loadGenerator.getFatalException() != null) {
return true;
}
}
return false;
}
@Override
public synchronized boolean isRunning() {
if (!started || closed) {
return false;
}
final ProducerLoadGenerator[] generators = this.generators;
if (generators == null) {
return false;
}
boolean running = false;
for (ProducerLoadGenerator loadGenerator : generators) {
if (!loadGenerator.isCompleted()) {
running = true;
} else if (loadGenerator.getFatalException() != null) {
running = false;
break;
}
}
return running;
}
@Override
public synchronized ProducerBenchmark start() {
if (started) {
return this;
}
producersPerDestination.clear();
started = true;
closed = false;
// create connections: if shared, one for each event loop, if !shared, one for each producer
final int totalProducers = destinations.length * producers;
final IdentityHashMap<EventExecutor, Connection> sharedConnections = this.sharedConnections ? new IdentityHashMap<>() : null;
final Connection[] exclusiveConnections = !this.sharedConnections ? new Connection[totalProducers] : null;
if (this.sharedConnections) {
eventLoopGroup.forEach(eventExecutor -> {
final Connection connection;
try {
connection = factory.createConnection();
connections.add(connection);
sharedConnections.put(eventExecutor, connection);
} catch (JMSException e) {
throw new RuntimeException(e);
}
});
} else {
for (int i = 0; i < totalProducers; i++) {
try {
final Connection connection = factory.createConnection();
exclusiveConnections[i] = connection;
connections.add(connection);
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
}
// start connections
connections.forEach(connection -> {
try {
connection.start();
} catch (JMSException e) {
throw new RuntimeException(e);
}
});
final AtomicLong producerId = new AtomicLong(1);
// create shared content
final byte[] messageContent = new byte[messageSize];
Arrays.fill(messageContent, (byte) 1);
// create producers/sessions/senders/load generators
this.generators = new ProducerLoadGenerator[totalProducers];
final ArrayList<AsyncJms2ProducerFacade> allProducers = new ArrayList<>(totalProducers);
int producerSequence = 0;
final int messageCountPerProducer = (int) (messageCount / totalProducers);
long remainingMessageCount = messageCount;
final Long messageRatePerProducer = messageRate == null ? null : (messageRate / totalProducers);
Long remainingMessageRate = messageRate;
for (int d = 0; d < destinations.length; d++) {
final Destination destination = destinations[d];
final ArrayList<AsyncJms2ProducerFacade> producers = new ArrayList<>(this.producers);
producersPerDestination.put(destination, producers);
for (int i = 0; i < this.producers; i++) {
final EventLoop eventLoop = eventLoopGroup.next();
final Connection connection;
if (this.sharedConnections) {
connection = sharedConnections.get(eventLoop);
} else {
connection = exclusiveConnections[producerSequence];
}
final Session session;
final MessageProducer producer;
try {
session = connection.createSession(transactionCapacity > 0 ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(destination);
producer.setDisableMessageID(!enableMessageID);
producer.setDisableMessageTimestamp(!enableTimestamp);
producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
producer.setTimeToLive(ttl);
} catch (JMSException e) {
throw new RuntimeException(e);
}
final AsyncJms2ProducerFacade producerFacade = new AsyncJms2ProducerFacade(producerId.getAndIncrement(),
session, producer, destination,
maxPending, transactionCapacity);
allProducers.add(producerFacade);
producers.add(producerFacade);
final BooleanSupplier keepOnSendingStrategy;
if (messageCount == 0) {
keepOnSendingStrategy = () -> true;
} else {
final long count = Math.min(messageCountPerProducer, remainingMessageCount);
remainingMessageCount -= count;
keepOnSendingStrategy = () -> producerFacade.getMessageSent() < count;
}
final SingleWriterRecorder sendLatencyRecorder = new SingleWriterRecorder(2);
final Long ratePeriodNanos;
if (messageRatePerProducer != null) {
final long rate = Math.min(messageRatePerProducer, remainingMessageRate);
ratePeriodNanos = TimeUnit.SECONDS.toNanos(1) / rate;
remainingMessageRate -= rate;
} else {
ratePeriodNanos = null;
}
generators[producerSequence] = ratePeriodNanos != null ?
new ProducerTargetRateLoadGenerator(producerFacade, eventLoop, timeProvider, keepOnSendingStrategy, ratePeriodNanos, group, messageContent, sendLatencyRecorder, new SingleWriterRecorder(2)) :
new ProducerMaxLoadGenerator(producerFacade, eventLoop, timeProvider, keepOnSendingStrategy, group, messageContent, sendLatencyRecorder);
producerSequence++;
}
}
// deploy and start generators
for (int i = 0; i < totalProducers; i++) {
generators[i].getExecutor().execute(generators[i]);
}
return this;
}
/**
* After this, now new messages are sent, but there still be some to be completed: the return value can be used
* to await completions to arrive.
*/
public synchronized CompletionStage<?> asyncClose() {
if (!started || closed) {
return CompletableFuture.completedFuture(null);
}
if (allGeneratorClosed != null) {
return allGeneratorClosed;
}
final CompletableFuture[] closedGenerators = new CompletableFuture[generators.length];
for (int i = 0; i < generators.length; i++) {
final CompletableFuture<?> onClosed = new CompletableFuture();
closedGenerators[i] = onClosed;
try {
generators[i].asyncClose(() -> onClosed.complete(null)).get();
} catch (final Throwable ignore) {
closedGenerators[i].completeExceptionally(ignore);
}
}
CompletableFuture<?> allGeneratorClosed = CompletableFuture.allOf(closedGenerators);
final Connection[] openedConnections = connections.toArray(new Connection[connections.size()]);
this.allGeneratorClosed = allGeneratorClosed.whenCompleteAsync((res, error) -> {
synchronized (this) {
generators = null;
started = false;
closed = true;
this.allGeneratorClosed = null;
}
for (Connection connection : openedConnections) {
// close connection: it should roll-back pending opened sessions and await completion to be called
try {
connection.close();
} catch (JMSException ignore) {
}
}
}, eventLoopGroup);
connections.clear();
return allGeneratorClosed;
}
@Override
public void close() {
asyncClose().toCompletableFuture().join();
}
}

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import io.netty.channel.EventLoopGroup;
public class ProducerBenchmarkBuilder {
private ConnectionFactory factory;
private MicrosTimeProvider timeProvider;
private EventLoopGroup loopGroup;
private int producers;
private long messageCount;
private String group;
private long ttl;
private int messageSize;
private Destination[] destinations;
private boolean persistent;
private long maxPending;
private long transactionCapacity;
private Long messageRate;
private boolean sharedConnections;
private boolean enableMessageID;
private boolean enableTimestamp;
public ProducerBenchmarkBuilder setFactory(final ConnectionFactory factory) {
this.factory = factory;
return this;
}
public ProducerBenchmarkBuilder setTimeProvider(final MicrosTimeProvider timeProvider) {
this.timeProvider = timeProvider;
return this;
}
public ProducerBenchmarkBuilder setLoopGroup(final EventLoopGroup loopGroup) {
this.loopGroup = loopGroup;
return this;
}
public ProducerBenchmarkBuilder setProducers(final int producers) {
this.producers = producers;
return this;
}
public ProducerBenchmarkBuilder setMessageCount(final long messageCount) {
this.messageCount = messageCount;
return this;
}
public ProducerBenchmarkBuilder setSharedConnections(final boolean sharedConnections) {
this.sharedConnections = sharedConnections;
return this;
}
public ProducerBenchmarkBuilder setGroup(final String group) {
this.group = group;
return this;
}
public ProducerBenchmarkBuilder setTtl(final long ttl) {
this.ttl = ttl;
return this;
}
public ProducerBenchmarkBuilder setMessageSize(final int messageSize) {
this.messageSize = messageSize;
return this;
}
public ProducerBenchmarkBuilder setDestinations(final Destination[] destinations) {
this.destinations = destinations;
return this;
}
public ProducerBenchmarkBuilder setPersistent(final boolean persistent) {
this.persistent = persistent;
return this;
}
public ProducerBenchmarkBuilder setMaxPending(final long maxPending) {
this.maxPending = maxPending;
return this;
}
public ProducerBenchmarkBuilder setTransactionCapacity(final long transactionCapacity) {
this.transactionCapacity = transactionCapacity;
return this;
}
public ProducerBenchmarkBuilder setMessageRate(final Long messageRate) {
this.messageRate = messageRate;
return this;
}
public ProducerBenchmarkBuilder setEnableTimestamp(final boolean enableTimestamp) {
this.enableTimestamp = enableTimestamp;
return this;
}
public ProducerBenchmarkBuilder setEnableMessageID(final boolean enableMessageID) {
this.enableMessageID = enableMessageID;
return this;
}
public ProducerBenchmark createProducerBenchmark() {
return new ProducerBenchmark(factory, timeProvider, loopGroup, producers, messageCount, sharedConnections,
group, ttl, messageSize, destinations, persistent, maxPending,
transactionCapacity, messageRate, enableMessageID, enableTimestamp);
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
import java.util.concurrent.Future;
import io.netty.util.concurrent.OrderedEventExecutor;
import org.HdrHistogram.SingleWriterRecorder;
public interface ProducerLoadGenerator extends Runnable {
OrderedEventExecutor getExecutor();
@Override
void run();
Future<?> asyncClose(Runnable onClosed);
SingleWriterRecorder getWaitLatencies();
SingleWriterRecorder getSendCompletedLatencies();
AsyncJms2ProducerFacade getProducer();
boolean isCompleted();
Exception getFatalException();
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
import java.util.function.BooleanSupplier;
import io.netty.util.concurrent.OrderedEventExecutor;
import org.HdrHistogram.SingleWriterRecorder;
public final class ProducerMaxLoadGenerator extends SkeletalProducerLoadGenerator {
public ProducerMaxLoadGenerator(final AsyncJms2ProducerFacade producer,
final OrderedEventExecutor executor,
final MicrosTimeProvider timeProvider,
final BooleanSupplier keepOnSending,
final String group,
final byte[] msgContent,
final SingleWriterRecorder sendCompletedLatencies) {
super(producer, executor, timeProvider, keepOnSending, group, msgContent, sendCompletedLatencies, null);
}
@Override
public void run() {
if (closed || stopLoad) {
return;
}
if (!trySend(timeProvider.now())) {
return;
}
if (!keepOnSending.getAsBoolean()) {
producer.requestClose();
stopLoad = true;
return;
}
asyncContinue();
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import io.netty.util.concurrent.OrderedEventExecutor;
import org.HdrHistogram.SingleWriterRecorder;
public final class ProducerTargetRateLoadGenerator extends SkeletalProducerLoadGenerator {
private final long usPeriod;
private long fireTimeMicros;
private boolean started;
public ProducerTargetRateLoadGenerator(final AsyncJms2ProducerFacade producer,
final OrderedEventExecutor executor,
final MicrosTimeProvider timeProvider,
final BooleanSupplier keepOnSending,
final long nsPeriod,
final String group,
final byte[] msgContent,
final SingleWriterRecorder sendCompletedLatencies,
final SingleWriterRecorder waitLatencies) {
super(producer, executor, timeProvider, keepOnSending, group, msgContent, sendCompletedLatencies, waitLatencies);
this.fireTimeMicros = 0;
this.usPeriod = TimeUnit.NANOSECONDS.toMicros(nsPeriod);
this.started = false;
}
@Override
public void run() {
if (closed || stopLoad) {
return;
}
final long now = timeProvider.now();
if (!started) {
started = true;
fireTimeMicros = now;
}
if (!trySend(fireTimeMicros, now)) {
return;
}
if (!keepOnSending.getAsBoolean()) {
producer.requestClose();
stopLoad = true;
return;
}
fireTimeMicros += usPeriod;
final long delay = fireTimeMicros - timeProvider.now();
final long usToNextFireTime = Math.max(0, delay);
asyncContinue(usToNextFireTime);
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
import java.util.function.LongSupplier;
public final class RateSampler implements Runnable {
private final LongSupplier sampling;
private long lastSample;
private long lastSampleTime;
private long rate;
private long timeSpanNs;
private RateSampler(final LongSupplier sampling) {
this.sampling = sampling;
this.timeSpanNs = 0;
this.lastSampleTime = System.nanoTime();
this.lastSample = sampling.getAsLong();
this.rate = 0;
}
@Override
public void run() {
final long now = System.nanoTime();
final long newSample = sampling.getAsLong();
rate = newSample - lastSample;
timeSpanNs = now - lastSampleTime;
lastSample = newSample;
lastSampleTime = now;
}
public long getLastSample() {
return lastSample;
}
public long reportRate(final long reportIntervalNs) {
return (rate * reportIntervalNs) / timeSpanNs;
}
public static RateSampler of(final LongSupplier sampling) {
return new RateSampler(sampling);
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.HdrHistogram.SingleWriterRecorder;
public final class RecordingMessageListener implements MessageListener {
private final long id;
private final Destination destination;
private final boolean transaction;
private final AtomicLong receivedMessages;
private final Runnable onMessageReceived;
private final MicrosTimeProvider timeProvider;
private final SingleWriterRecorder receiveLatencyRecorder;
private AtomicBoolean fatalException;
RecordingMessageListener(final long id,
final Destination destination,
final boolean transaction,
final AtomicLong receivedMessages,
final Runnable onMessageReceived,
final MicrosTimeProvider timeProvider,
final SingleWriterRecorder receiveLatencyRecorder,
final AtomicBoolean fatalException) {
this.id = id;
this.destination = destination;
this.transaction = transaction;
this.receivedMessages = receivedMessages;
this.onMessageReceived = onMessageReceived;
this.timeProvider = timeProvider;
this.receiveLatencyRecorder = receiveLatencyRecorder;
this.fatalException = fatalException;
}
public boolean anyFatalException() {
return fatalException.get();
}
public SingleWriterRecorder getReceiveLatencyRecorder() {
return receiveLatencyRecorder;
}
public long getId() {
return id;
}
public Destination getDestination() {
return destination;
}
public long getReceivedMessages() {
return receivedMessages.get();
}
@Override
public void onMessage(final Message message) {
if (onMessageReceived != null) {
onMessageReceived.run();
}
receivedMessages.lazySet(receivedMessages.get() + 1);
if (receiveLatencyRecorder != null) {
try {
final long start = message.getLongProperty("time");
final long receivedOn = timeProvider.now();
final long elapsedUs = receivedOn - start;
receiveLatencyRecorder.recordValue(elapsedUs);
} catch (JMSException fatal) {
fatalException.compareAndSet(false, true);
}
}
if (transaction) {
try {
message.acknowledge();
} catch (JMSException fatal) {
fatalException.compareAndSet(false, true);
}
}
}
}

View File

@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;
import javax.jms.BytesMessage;
import javax.jms.CompletionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import io.netty.util.concurrent.OrderedEventExecutor;
import org.HdrHistogram.SingleWriterRecorder;
import org.apache.activemq.artemis.cli.commands.messages.perf.AsyncJms2ProducerFacade.SendAttemptResult;
public abstract class SkeletalProducerLoadGenerator implements CompletionListener, ProducerLoadGenerator {
protected final AsyncJms2ProducerFacade producer;
private final OrderedEventExecutor executor;
protected final BooleanSupplier keepOnSending;
protected final MicrosTimeProvider timeProvider;
private final String group;
private final byte[] messageContent;
private BytesMessage messageToSend;
protected boolean closed;
protected volatile boolean stopLoad;
private final SingleWriterRecorder waitLatencies;
private final SingleWriterRecorder sendCompletedLatencies;
private final AtomicLong unprocessedCompletions;
private final AtomicBoolean scheduledProcessingCompletions;
private volatile Exception fatalException;
private boolean stopHandlingCompletions;
public SkeletalProducerLoadGenerator(final AsyncJms2ProducerFacade producer,
final OrderedEventExecutor executor,
final MicrosTimeProvider timeProvider,
final BooleanSupplier keepOnSending,
final String group,
final byte[] msgContent,
final SingleWriterRecorder sendCompletedLatencies,
final SingleWriterRecorder waitLatencies) {
this.sendCompletedLatencies = sendCompletedLatencies;
this.waitLatencies = waitLatencies;
this.producer = producer;
this.executor = executor;
this.timeProvider = timeProvider;
this.keepOnSending = keepOnSending;
this.group = group;
this.messageContent = msgContent;
this.messageToSend = null;
this.closed = false;
this.stopLoad = false;
this.unprocessedCompletions = new AtomicLong();
this.scheduledProcessingCompletions = new AtomicBoolean();
this.fatalException = null;
this.stopHandlingCompletions = false;
}
@Override
public Exception getFatalException() {
return fatalException;
}
@Override
public SingleWriterRecorder getSendCompletedLatencies() {
return sendCompletedLatencies;
}
@Override
public SingleWriterRecorder getWaitLatencies() {
return waitLatencies;
}
@Override
public AsyncJms2ProducerFacade getProducer() {
return producer;
}
@Override
public boolean isCompleted() {
if (stopLoad && fatalException != null) {
return true;
}
return stopLoad && producer.getMessageCompleted() == producer.getMessageSent();
}
@Override
public OrderedEventExecutor getExecutor() {
return executor;
}
protected final void asyncContinue() {
asyncContinue(0);
}
protected final void asyncContinue(final long usDelay) {
if (usDelay == 0) {
executor.execute(this);
} else {
executor.schedule(this, usDelay, TimeUnit.MICROSECONDS);
}
}
protected final boolean trySend(final long sendTime) {
return trySend(sendTime, sendTime);
}
protected final boolean trySend(final long expectedSendTime, final long sendTime) {
assert executor.inEventLoop();
assert !closed;
try {
if (messageToSend == null) {
messageToSend = producer.createBytesMessage();
messageToSend.writeBytes(this.messageContent);
}
messageToSend.setLongProperty("time", sendTime);
if (group != null) {
messageToSend.setStringProperty("JMSXGroupID", group);
}
final SendAttemptResult result = producer.trySend(messageToSend, this, this);
if (result != SendAttemptResult.NotAvailable) {
messageToSend = null;
if (result == SendAttemptResult.Success) {
if (waitLatencies != null) {
waitLatencies.recordValue(sendTime - expectedSendTime);
}
}
}
return result == SendAttemptResult.Success;
} catch (final JMSException e) {
onSendErrored(e);
return false;
}
}
@Override
public void onCompletion(final Message message) {
asyncOnSendCompleted(message, null);
}
@Override
public void onException(final Message message, final Exception exception) {
asyncOnSendCompleted(message, exception);
}
private void asyncOnSendCompleted(final Message message, Exception completionError) {
if (stopHandlingCompletions) {
return;
}
if (completionError == null) {
try {
recordSendCompletionLatency(message);
unprocessedCompletions.incrementAndGet();
scheduleProcessingCompletions();
} catch (final JMSException jmsException) {
completionError = jmsException;
}
}
if (completionError != null) {
stopHandlingCompletions = true;
final Exception fatal = completionError;
executor.execute(() -> onSendErrored(fatal));
}
}
private void onSendErrored(final Exception fatal) {
assert executor.inEventLoop();
if (fatalException != null) {
return;
}
producer.onSendErrored();
fatalException = fatal;
stopLoad = true;
closed = true;
}
private void scheduleProcessingCompletions() {
if (unprocessedCompletions.get() > 0 && scheduledProcessingCompletions.compareAndSet(false, true)) {
executor.execute(this::processCompletions);
}
}
private void processCompletions() {
assert executor.inEventLoop();
assert scheduledProcessingCompletions.get();
if (fatalException != null) {
return;
}
final long completions = unprocessedCompletions.getAndSet(0);
for (long i = 0; i < completions; i++) {
final JMSException completionException = producer.onSendCompleted();
if (completionException != null) {
fatalException = completionException;
return;
}
}
scheduledProcessingCompletions.set(false);
scheduleProcessingCompletions();
}
private void recordSendCompletionLatency(final Message message) throws JMSException {
final long time = message.getLongProperty("time");
final long elapsedMicros = timeProvider.now() - time;
sendCompletedLatencies.recordValue(elapsedMicros);
}
@Override
public Future<?> asyncClose(final Runnable onClosed) {
return executor.submit(() -> onClose(onClosed));
}
private void onClose(final Runnable onClosed) {
assert executor.inEventLoop();
if (closed) {
onClosed.run();
return;
}
closed = true;
// no need for this anymore
messageToSend = null;
producer.requestClose(onClosed);
}
}

View File

@ -44,8 +44,8 @@ if [ -z "$ARTEMIS_HOME" ] ; then
ARTEMIS_HOME=`cd "$ARTEMIS_HOME/.." && pwd`
fi
# Set Defaults Properties
JAVA_ARGS="-XX:+UseParallelGC -Xms512M -Xmx1024M"
# Set Defaults JAVA_ARGS Properties if not already set: this allow specific artemis commands to override default JVM configuration
if [ -z "${JAVA_ARGS}" ]; then JAVA_ARGS="-XX:+UseParallelGC -Xms512M -Xmx1024M"; fi
CLASSPATH="$ARTEMIS_HOME/lib/artemis-boot.jar"
# OS specific support.

View File

@ -45,7 +45,7 @@ echo.
:RUN_JAVA
rem "Set Defaults."
set JAVA_ARGS=-XX:+UseParallelGC -Xms512M -Xmx1024M
if "%JAVA_ARGS%" == "" set JAVA_ARGS=-XX:+UseParallelGC -Xms512M -Xmx1024M
rem "Create full JVM Args"
set JVM_ARGS=%JAVA_ARGS%

View File

@ -280,6 +280,12 @@ For jakarta.xml.bind-api:
This product bundles jakarta.xml.bind-api, which is available under the
Eclipse Distribution License (EDL) 1.0. For details, see licenses/EDL-1.0.txt
==============================================================================
For HdrHistogram:
==============================================================================
This product bundles HdrHistogram, which is available under the
"2-clause BSD" license. For details, see licences/LICENSE-hdrhistogram.txt.
==============================================================================
Apache ActiveMQ Artemis Subcomponents:

View File

@ -0,0 +1,41 @@
The code in this repository code was Written by Gil Tene, Michael Barker,
and Matt Warren, and released to the public domain, as explained at
http://creativecommons.org/publicdomain/zero/1.0/
For users of this code who wish to consume it under the "BSD" license
rather than under the public domain or CC0 contribution text mentioned
above, the code found under this directory is *also* provided under the
following license (commonly referred to as the BSD 2-Clause License). This
license does not detract from the above stated release of the code into
the public domain, and simply represents an additional license granted by
the Author.
-----------------------------------------------------------------------------
** Beginning of "BSD 2-Clause License" text. **
Copyright (c) 2012, 2013, 2014, 2015, 2016 Gil Tene
Copyright (c) 2014 Michael Barker
Copyright (c) 2014 Matt Warren
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -73,6 +73,7 @@
<bundle dependency="true">mvn:org.apache.commons/commons-text/${commons.text.version}</bundle>
<bundle dependency="true">mvn:org.apache.commons/commons-lang3/${commons.lang.version}</bundle>
<bundle dependency="true">mvn:org.jctools/jctools-core/${jctools.version}</bundle>
<bundle dependency="true">mvn:org.hdrhistogram/HdrHistogram/${hdrhistogram.version}</bundle>
<bundle dependency="true">mvn:com.google.guava/failureaccess/1.0.1</bundle>
<bundle dependency="true">mvn:com.google.guava/guava/${guava.version}</bundle>
<bundle dependency="true">mvn:org.apache.commons/commons-dbcp2/${commons.dbcp2.version}</bundle>

View File

@ -85,5 +85,6 @@
* [Maven Plugin](maven-plugin.md)
* [Unit Testing](unit-testing.md)
* [Troubleshooting and Performance Tuning](perf-tuning.md)
* [Performance Tools](perf-tools.md)
* [Configuration Reference](configuration-index.md)
* [Restart Sequence](restart-sequence.md)

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 55 KiB

View File

@ -0,0 +1,379 @@
#Artemis `perf` commands
Artemis provides some built-in performance test tools based on the [JMS 2 API](https://javaee.github.io/jms-spec/pages/JMS20FinalRelease)
to help users (and developers) to stress test a configured Artemis broker instance in different scenarios.
These command-line tools won't represent a full-fat benchmark (such as [Open Messaging](https://openmessaging.cloud/docs/benchmarks/)),
but can be used as building blocks to produce one. They are also quite useful on their own.
In summary, the provided `perf` tools are:
1. `producer` tool: it can generate both all-out throughput or target-rate load, using [BytesMessage](https://jakarta.ee/specifications/messaging/2.0/apidocs/javax/jms/bytesmessage) of a configured size
4. `consumer` tool: it uses a [MessageListener](https://jakarta.ee/specifications/messaging/2.0/apidocs/javax/jms/messagelistener) to consume messages sent by the `producer` command
5. `client` tools: it packs both tools as a single command
Most users will just need the `client` tool, but the `producer` and `consumer` tools allow performing tests in additional scenario(s):
- delaying consumer start, in order to cause the broker to page
- running producers and consumers on different machines
- ...
The examples below (running on a `64 bit Linux 5.14 with Intel® Core™ i7-9850H CPU @ 2.60GHz × 12 with Turbo Boost disabled, 32 GB of RAM and SSD`)
show different use cases of increasing complexity. As they progress, some internal architectural details of the tool and the configuration options supported, are explored.
> **Note:**
> The tools can run both from within the broker instance's folder or
> from the base artemis `bin` folder.
> In the former case it will use the same JVM parameter configured on the instance (on `artemis.profile`),
> while in the latter case the user should set `JAVA_ARGS` environment variable to override default heap and GC parameters
>
> ie `-XX:+UseParallelGC -Xms512M -Xmx1024M`
## Case 1: Single producer Single consumer over a queue
This is the simplest possible case: running a load test with 1 producer and 1 consumer on a non-durable queue `TEST_QUEUE`,
using [non-persistent](https://jakarta.ee/specifications/messaging/2.0/apidocs/javax/jms/deliverymode#NON_PERSISTENT)
1024 bytes long (by default) messages, using [auto-acknowledge](https://jakarta.ee/specifications/messaging/2.0/apidocs/javax/jms/session#AUTO_ACKNOWLEDGE).
Let's see what happens after typing:
```bash
$ ./artemis perf client queue://TEST_QUEUE
Connection brokerURL = tcp://localhost:61616
2022-01-18 10:30:54,535 WARN [org.apache.activemq.artemis.core.client] AMQ212053: CompletionListener/SendAcknowledgementHandler used with confirmationWindowSize=-1. Enable confirmationWindowSize to receive acks from server!
--- warmup false
--- sent: 7316 msg/sec
--- blocked: 6632 msg/sec
--- completed: 7320 msg/sec
--- received: 7317 msg/sec
# ...
```
The test keeps on running, until `SIGTERM` or `SIGINT` signals are sent to the Java process (on Linux Console it translates into pressing **CTRL + C**).
Before looking what the metrics mean, there's an initial `WARN` log that shouldn't be ignored:
```bash
WARN [org.apache.activemq.artemis.core.client] AMQ212053: CompletionListener/SendAcknowledgementHandler used with confirmationWindowSize=-1. Enable confirmationWindowSize to receive acks from server!
```
It shows two things:
1. the load generator uses [async message producers](https://jakarta.ee/specifications/messaging/2.0/apidocs/javax/jms/messageproducer#send-javax.jms.Destination-javax.jms.Message-javax.jms.CompletionListener-)
2. `confirmationWindowSize` is an Artemis CORE protocol specific setting; the `perf` commands uses CORE as the default JMS provider
###Live Latency Console Reporting
The `perf client` command can report on Console different latency percentiles metrics by adding `--show-latency` to the command arguments, but in order to obtain meaningful metrics, we need to address `WARN` by setting `confirmationWindowSize` on the producer `url`,
setting `--consumer-url` to save applying the same configuration for consumer(s).
In short, the command is using these additional parameters:
```bash
--show-latency --url tcp://localhost:61616?confirmationWindowSize=20000 --consumer-url tcp://localhost:61616
```
####Running it
```bash
$ ./artemis perf client --show-latency --url tcp://localhost:61616?confirmationWindowSize=20000 --consumer-url tcp://localhost:61616 queue://TEST_QUEUE
--- warmup false
--- sent: 8114 msg/sec
--- blocked: 8114 msg/sec
--- completed: 8114 msg/sec
--- received: 8113 msg/sec
--- send ack time: mean: 113.01 us - 50.00%: 106.00 us - 90.00%: 142.00 us - 99.00%: 204.00 us - 99.90%: 371.00 us - 99.99%: 3455.00 us - max: 3455.00 us
--- transfer time: mean: 213.71 us - 50.00%: 126.00 us - 90.00%: 177.00 us - 99.00%: 3439.00 us - 99.90%: 7967.00 us - 99.99%: 8895.00 us - max: 8895.00 us
# CTRL + C pressed
--- SUMMARY
--- result: success
--- total sent: 70194
--- total blocked: 70194
--- total completed: 70194
--- total received: 70194
--- aggregated send time: mean: 101.53 us - 50.00%: 86.00 us - 90.00%: 140.00 us - 99.00%: 283.00 us - 99.90%: 591.00 us - 99.99%: 2007.00 us - max: 24959.00 us
--- aggregated transfer time: mean: 127.48 us - 50.00%: 97.00 us - 90.00%: 166.00 us - 99.00%: 449.00 us - 99.90%: 4671.00 us - 99.99%: 8255.00 us - max: 27263.00 us
```
Some notes:
1. `WARN` message is now gone
2. `send ack time` and `transfer time` statistics are printed at second interval
3. `total` and `aggregated` metrics are printed on test completion (more on this later)
The meaning of the live latency statistics are:
- `send ack time`: percentiles of latency to acknowledge sent messages
- `transfer time`: percentiles of latency to transfer messages from producer(s) to consumer(s)
The `perf` commands uses [JMS 2 async message producers](https://jakarta.ee/specifications/messaging/2.0/apidocs/javax/jms/messageproducer#send-javax.jms.Destination-javax.jms.Message-javax.jms.CompletionListener-)
that allow the load generator to accumulate in-flight sent messages and depending on the protocol implementation, may block its producer thread due to producer flow control.
e.g: the Artemis CORE protocol can block producers threads to refill producers credits, while the [QPID-JMS](https://qpid.apache.org/components/jms/index.html) won't.
The `perf` tool is implementing its own in-flight sent requests tracking and can be configured to limit the amount of pending sent messages,
while reporting the rate by which producers are "blocked" awaiting completions
> **Producers threads are `blocked`?**
> Although the load back-pressure mechanism is non-blocking, given that the load generator cannot push further load while back-pressured
> by the protocol client, the load is semantically "blocked".
> This detail is relevant to explain the live rate [statistics](#running-it) on Console:
>
By default, the `perf` tools (i.e: `client` and `producer`) **limits the number of in-flight request to 1**: to change the default setting
users should add `--max-pending` parameter configuration.
> **Note:**
> Setting `--max-pending 0` will disable the load generator in-flight sent messages limiter, allowing the tool to accumulate
> an unbounded number of in-flight messages, risking `OutOfMemoryError`.
> This is **NOT RECOMMENDED!**
More detail on the metrics:
- `warmup`: the generator phase while the statistics sample is collected; warmup duration can be set by setting `--warmup`
- `sent`: the message sent rate
- `blocked`: the rate of attempts to send a new message, "blocked" awaiting `--max-pending` refill
- `completed`: the rate of message send acknowledgements received by producer(s)
- `received`: the rate of messages received by consumer(s)
###How to read the live statistics?
The huge amount of `blocked` vs `sent` means that the broker wasn't fast enough to refill the single `--max-pending` budget
before sending a new message.
It can be changed into:
```bash
--max-pending 100
```
#####to our previous command:
```bash
$ ./artemis perf client --warmup 20 --max-pending 100 --show-latency --url tcp://localhost:61616?confirmationWindowSize=20000 --consumer-url tcp://localhost:61616 queue://TEST_QUEUE
Connection brokerURL = tcp://localhost:61616?confirmationWindowSize=20000
# first samples shows very BAD performance because client JVM is still warming up
--- warmup true
--- sent: 27366 msg/sec
--- blocked: 361 msg/sec
--- completed: 27305 msg/sec
--- received: 26195 msg/sec
--- send ack time: mean: 1743.39 us - 50.00%: 1551.00 us - 90.00%: 3119.00 us - 99.00%: 5215.00 us - 99.90%: 8575.00 us - 99.99%: 8703.00 us - max: 23679.00 us
--- transfer time: mean: 11860.32 us - 50.00%: 11583.00 us - 90.00%: 18559.00 us - 99.00%: 24319.00 us - 99.90%: 31359.00 us - 99.99%: 31615.00 us - max: 31615.00 us
# ... > 20 seconds later ...
# performance is now way better then during warmup
--- warmup false
--- sent: 86525 msg/sec
--- blocked: 5734 msg/sec
--- completed: 86525 msg/sec
--- received: 86556 msg/sec
--- send ack time: mean: 1109.13 us - 50.00%: 1103.00 us - 90.00%: 1447.00 us - 99.00%: 1687.00 us - 99.90%: 5791.00 us - 99.99%: 5983.00 us - max: 5983.00 us
--- transfer time: mean: 4662.94 us - 50.00%: 1679.00 us - 90.00%: 12159.00 us - 99.00%: 14079.00 us - 99.90%: 14527.00 us - 99.99%: 14783.00 us - max: 14783.00 us
# CTRL + C
--- SUMMARY
--- result: success
--- total sent: 3450389
--- total blocked: 168863
--- total completed: 3450389
--- total received: 3450389
--- aggregated send time: mean: 1056.09 us - 50.00%: 1003.00 us - 90.00%: 1423.00 us - 99.00%: 1639.00 us - 99.90%: 4287.00 us - 99.99%: 7103.00 us - max: 19583.00 us
--- aggregated transfer time: mean: 18647.51 us - 50.00%: 10751.00 us - 90.00%: 54271.00 us - 99.00%: 84991.00 us - 99.90%: 90111.00 us - 99.99%: 93183.00 us - max: 94207.00 us
```
Some notes on the results:
- we now have a reasonable `blocked/sent` ratio (< ~10%)
- sent rate has improved **ten-fold** if compared to [previous results](#running-it)
And on the `SUMMARY` statistics:
- `total` counters include measurements collected with `warmup true`
- `aggregated` latencies **don't** include measurements collected with `warmup true`
###How to compare latencies across tests?
The Console output format isn't designed for easy latency comparisons, however the
`perf` commands expose `--hdr <hdr file name>` parameter to produce a [HDR Histogram](http://hdrhistogram.org/) compatible report that can be opened with different visualizers
eg [Online HdrHistogram Log Analyzer](https://hdrhistogram.github.io/HdrHistogramJSDemo/logparser.html), [HdrHistogramVisualizer](https://github.com/ennerf/HdrHistogramVisualizer) or [HistogramLogAnalyzer](https://github.com/HdrHistogram/HistogramLogAnalyzer).
> **Note:**
> Any latency collected trace on this guide is going to use [Online HdrHistogram Log Analyzer](https://hdrhistogram.github.io/HdrHistogramJSDemo/logparser.html)
> as HDR Histogram visualizer tool.
Below is the visualization of the HDR histograms collected while adding to the previous benchmark
```bash
--hdr /tmp/non_durable_queue.hdr
```
Whole test execution shows tagged latencies, to distinguish `warmup` ones:
![test](images/test.png)
Filtering out `warmup` latencies, it looks like
![hot test](images/hot_test.png)
Latency results shows that at higher percentiles `transfer` latency is way higher than the `sent` one
(reminder: `sent` it's the time to acknowledge sent messages), probably meaning that some queuing-up is happening on the broker.
In order to test this theory we switch to **target rate tests**.
## Case 2: Target Rate Single producer Single consumer over a queue
`perf client` and `perf producer` tools allow specifying a target rate to schedule producer(s) requests: adding
```bash
--rate <msg/sec integer value>
```
The previous example [last run](#to-our-previous-command) shows that `--max-pending 100` guarantees < 10% blocked/sent messages with
aggregated latencies
```bash
--- aggregated send time: mean: 1056.09 us - 50.00%: 1003.00 us - 90.00%: 1423.00 us - 99.00%: 1639.00 us - 99.90%: 4287.00 us - 99.99%: 7103.00 us - max: 19583.00 us
--- aggregated transfer time: mean: 18647.51 us - 50.00%: 10751.00 us - 90.00%: 54271.00 us - 99.00%: 84991.00 us - 99.90%: 90111.00 us - 99.99%: 93183.00 us - max: 94207.00 us
```
We would like to lower `transfer time` sub-millisecond; let's try
by running a load test with ~30% of the max perceived sent rate, by setting:
```bash
--rate 30000 --hdr /tmp/30K.hdr
```
The whole command is then:
```bash
$ ./artemis perf client --rate 30000 --hdr /tmp/30K.hdr --warmup 20 --max-pending 100 --show-latency --url tcp://localhost:61616?confirmationWindowSize=20000 --consumer-url tcp://localhost:61616 queue://TEST_QUEUE
# ... after 20 warmup seconds ...
--- warmup false
--- sent: 30302 msg/sec
--- blocked: 0 msg/sec
--- completed: 30302 msg/sec
--- received: 30303 msg/sec
--- send delay time: mean: 24.20 us - 50.00%: 21.00 us - 90.00%: 54.00 us - 99.00%: 72.00 us - 99.90%: 233.00 us - 99.99%: 659.00 us - max: 731.00 us
--- send ack time: mean: 150.48 us - 50.00%: 120.00 us - 90.00%: 172.00 us - 99.00%: 1223.00 us - 99.90%: 2543.00 us - 99.99%: 3183.00 us - max: 3247.00 us
--- transfer time: mean: 171.53 us - 50.00%: 135.00 us - 90.00%: 194.00 us - 99.00%: 1407.00 us - 99.90%: 2607.00 us - 99.99%: 3151.00 us - max: 3183.00 us
# CTRL + C
--- SUMMARY
--- result: success
--- total sent: 1216053
--- total blocked: 845
--- total completed: 1216053
--- total received: 1216053
--- aggregated delay send time: mean: 35.84 us - 50.00%: 20.00 us - 90.00%: 55.00 us - 99.00%: 116.00 us - 99.90%: 3359.00 us - 99.99%: 5503.00 us - max: 6495.00 us
--- aggregated send time: mean: 147.38 us - 50.00%: 117.00 us - 90.00%: 165.00 us - 99.00%: 991.00 us - 99.90%: 4191.00 us - 99.99%: 5695.00 us - max: 7103.00 us
--- aggregated transfer time: mean: 178.48 us - 50.00%: 134.00 us - 90.00%: 188.00 us - 99.00%: 1359.00 us - 99.90%: 5471.00 us - 99.99%: 8831.00 us - max: 12799.00 us
```
We've now achieved sub-millisecond `transfer` latencies until `90.00 pencentile`.
Opening `/tmp/30K.hdr` makes easier to see it:
![test](images/30K.png)
Now `send` and `transfer` time looks quite similar and there's no sign of queueing, but...
### What `delay send time` means?
This metric is borrowed from the [Coordinated Omission](http://highscalability.com/blog/2015/10/5/your-load-generator-is-probably-lying-to-you-take-the-red-pi.html) concept,
and it measures the delay of producer(s) while trying to send messages at the requested rate.
The source of such delay could be:
- slow responding broker: the load generator reached `--max-pending` and the expected rate cannot be honored
- client running out of resources (lack of CPU time, GC pauses, etc etc): load generator cannot keep-up with the expected rate because it is just "too fast" for it
- protocol-dependent blocking behaviours: CORE JMS 2 async send can block due to `producerWindowSize` exhaustion
A sane run of a target rate test should keep `delay send time` under control or investigation actions must be taken
to understand what's the source of the delay.
Let's show it with an example: we've already checked the all-out rate of the broker ie ~90K msg/sec
By running a `--rate 90000` test under the same conditions, latencies will look as
![test](images/90K.png)
It clearly shows that the load generator is getting delayed and cannot keep-up with the expected rate.
Below is a more complex example involving destinations (auto)generation with "asymmetric" load i.e: the producer number is different from consumer number.
## Case 3: Target Rate load on 10 durable topics, each with 3 producers and 2 unshared consumers
The `perf` tool can auto generate destinations using
```bash
--num-destinations <number of destinations to generate>
```
and naming them by using the destination name specified as the seed and an ordered sequence suffix.
eg
```bash
--num-destinations 3 topic://TOPIC
```
would generate 3 topics: `TOPIC0`, `TOPIC1`, `TOPIC2`.
With the default configuration (without specifying `--num-destinations`) it would just create `TOPIC`, without any numerical suffix.
In order to create a load generation on 10 topics, **each** with 3 producers and 2 unshared consumers:
```bash
--producers 3 --consumers 2 --num-destinations 10 topic://TOPIC
```
The whole `perf client` all-out throughput command would be:
```bash
# same as in the previous cases
./artemis perf client --warmup 20 --max-pending 100 --s
how-latency --url tcp://localhost:61616?confirmationWindowSize=20000 --consumer-url tcp://localhost:61616 \
--producers 3 --consumers 2 --num-destinations 10 --durable --persistent topic://DURABLE_TOPIC
# this last part above is new
```
and it would print...
```bash
javax.jms.IllegalStateException: Cannot create durable subscription - client ID has not been set
```
Given that the generator is creating [unshared durable Topic subscriptions](https://jakarta.ee/specifications/messaging/2.0/apidocs/javax/jms/session#createDurableConsumer-javax.jms.Topic-java.lang.String-), is it
mandatory to set a ClientID for each connection used.
The `perf client` tool creates a connection for each consumer by default and auto-generates both ClientIDs
and subscriptions names (as required by the [unshared durable Topic subscriptions API](https://jakarta.ee/specifications/messaging/2.0/apidocs/javax/jms/session#createDurableConsumer-javax.jms.Topic-java.lang.String-)).
ClientID still requires users to specify Client ID prefixes with `--clientID <Client ID prefix>` and takes care to unsubscribe the consumers on test completion.
The complete commands now looks like:
```bash
./artemis perf client --warmup 20 --max-pending 100 --show-latency --url tcp://localhost:61616?confirmationWindowSize=20000 --consumer-url tcp://localhost:61616 \
--producers 3 --consumers 2 --num-destinations 10 --durable --persistent topic://DURABLE_TOPIC --clientID test_id
# after few seconds
--- warmup false
--- sent: 74842 msg/sec
--- blocked: 2702 msg/sec
--- completed: 74641 msg/sec
--- received: 146412 msg/sec
--- send ack time: mean: 37366.13 us - 50.00%: 37119.00 us - 90.00%: 46079.00 us - 99.00%: 68095.00 us - 99.90%: 84479.00 us - 99.99%: 94719.00 us - max: 95743.00 us
--- transfer time: mean: 44060.66 us - 50.00%: 43263.00 us - 90.00%: 54527.00 us - 99.00%: 75775.00 us - 99.90%: 87551.00 us - 99.99%: 91135.00 us - max: 91135.00 us
# CTRL + C
--- SUMMARY
--- result: success
--- total sent: 2377653
--- total blocked: 80004
--- total completed: 2377653
--- total received: 4755306
--- aggregated send time: mean: 39423.69 us - 50.00%: 38911.00 us - 90.00%: 49663.00 us - 99.00%: 66047.00 us - 99.90%: 85503.00 us - 99.99%: 101887.00 us - max: 115711.00 us
--- aggregated transfer time: mean: 46216.99 us - 50.00%: 45311.00 us - 90.00%: 57855.00 us - 99.00%: 78335.00 us - 99.90%: 97791.00 us - 99.99%: 113151.00 us - max: 125439.00 us
```
Results shows that `tranfer time` isn't queuing up, meaning that subscribers are capable to keep-up with the producers: hence a reasonable
rate to test could be ~80% of the perceived `sent` rate ie `--rate 60000`:
```bash
./artemis perf client --warmup 20 --max-pending 100 --show-latency --url tcp://localhost:61616?confirmationWindowSize=20000 --consumer-url tcp://localhost:61616 \
--producers 3 --consumers 2 --num-destinations 10 --durable --persistent topic://DURABLE_TOPIC --clientID test_id \
--rate 60000
# after many seconds while running
--- warmup false
--- sent: 55211 msg/sec
--- blocked: 2134 msg/sec
--- completed: 54444 msg/sec
--- received: 111622 msg/sec
--- send delay time: mean: 6306710.04 us - 50.00%: 6094847.00 us - 90.00%: 7766015.00 us - 99.00%: 8224767.00 us - 99.90%: 8257535.00 us - 99.99%: 8257535.00 us - max: 8257535.00 us
--- send ack time: mean: 50072.92 us - 50.00%: 50431.00 us - 90.00%: 57855.00 us - 99.00%: 65023.00 us - 99.90%: 71167.00 us - 99.99%: 71679.00 us - max: 71679.00 us
--- transfer time: mean: 63672.92 us - 50.00%: 65535.00 us - 90.00%: 78847.00 us - 99.00%: 86015.00 us - 99.90%: 90623.00 us - 99.99%: 93183.00 us - max: 94719.00 us
# it won't get any better :(
```
What's wrong with the `send delay time`?
Results show that the load generator cannot keep up with the expected rate and it's accumulating a huge delay
on the expected scheduled load: lets trying fixing it by adding more producers
threads, adding
```bash
--threads <producer threads>
```
By using two producers threads, the command now looks like:
```bash
./artemis perf client --warmup 20 --max-pending 100 --show-latency --url tcp://localhost:61616?confirmationWindowSize=20000 --consumer-url tcp://localhost:61616 \
--producers 3 --consumers 2 --num-destinations 10 --durable --persistent topic://DURABLE_TOPIC --clientID test_id \
--rate 60000 --threads 2
# after few seconds warming up....
--- warmup false
--- sent: 59894 msg/sec
--- blocked: 694 msg/sec
--- completed: 58925 msg/sec
--- received: 114857 msg/sec
--- send delay time: mean: 3189.96 us - 50.00%: 277.00 us - 90.00%: 10623.00 us - 99.00%: 35583.00 us - 99.90%: 47871.00 us - 99.99%: 56063.00 us - max: 58367.00 us
--- send ack time: mean: 31500.93 us - 50.00%: 31231.00 us - 90.00%: 48383.00 us - 99.00%: 65535.00 us - 99.90%: 83455.00 us - 99.99%: 95743.00 us - max: 98303.00 us
--- transfer time: mean: 38151.21 us - 50.00%: 37119.00 us - 90.00%: 55807.00 us - 99.00%: 84479.00 us - 99.90%: 104959.00 us - 99.99%: 118271.00 us - max: 121855.00 us
```
`send delay time` now seems under control, meaning that the load generator need some tuning in order to work at its best.

14
pom.xml
View File

@ -114,6 +114,7 @@
<mockito.version>3.12.4</mockito.version>
<jctools.version>2.1.2</jctools.version>
<netty.version>4.1.74.Final</netty.version>
<hdrhistogram.version>2.1.12</hdrhistogram.version>
<curator.version>5.2.0</curator.version>
<zookeeper.version>3.6.3</zookeeper.version>
@ -578,6 +579,12 @@
<version>${jctools.version}</version>
<!-- License: Apache 2.0 -->
</dependency>
<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
<version>${hdrhistogram.version}</version>
<!-- Licence: BSD 2-Clause -->
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
@ -1024,6 +1031,7 @@
<compilerArgs>
<arg>-Xdiags:verbose</arg>
<arg>--add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED</arg>
<arg>--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED</arg>
<arg>-XDcompilePolicy=simple</arg>
<arg>-Xplugin:ErrorProne -Xep:MissingOverride:ERROR -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.*</arg>
</compilerArgs>
@ -1057,6 +1065,7 @@
<compilerArgs>
<arg>-Xdiags:verbose</arg>
<arg>--add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED</arg>
<arg>--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED</arg>
<arg>-XDcompilePolicy=simple</arg>
<arg>-Xplugin:ErrorProne -Xep:MissingOverride:WARN -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.*</arg>
<arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED</arg>
@ -1491,7 +1500,10 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<showWarnings>true</showWarnings>
<showWarnings>true</showWarnings>
<compilerArgs>
<arg>--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>