diff --git a/Jenkinsfile b/Jenkinsfile
index c2734864ab..760bf9c6c7 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -172,7 +172,7 @@ pipeline {
// If this build failed, send an email to the list.
failure {
script {
- if(env.BRANCH_NAME == "activemq-5.18.x" || env.BRANCH_NAME == "activemq-5.17.x" || env.BRANCH_NAME == "main") {
+ if(env.BRANCH_NAME == "activemq-5.18.x" || env.BRANCH_NAME == "activemq-6.1.x" || env.BRANCH_NAME == "main") {
emailext(
subject: "[BUILD-FAILURE]: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]'",
body: """
@@ -189,7 +189,7 @@ Check console output at "${env.JOB_NAME} [${env.BRANC
// If this build didn't fail, but there were failing tests, send an email to the list.
unstable {
script {
- if(env.BRANCH_NAME == "activemq-5.18.x" || env.BRANCH_NAME == "activemq-5.17.x" || env.BRANCH_NAME == "main") {
+ if(env.BRANCH_NAME == "activemq-5.18.x" || env.BRANCH_NAME == "activemq-6.1.x" || env.BRANCH_NAME == "main") {
emailext(
subject: "[BUILD-UNSTABLE]: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]'",
body: """
@@ -209,7 +209,7 @@ Check console output at "${env.JOB_NAME} [${env.BRANC
// (in this cae we probably don't have to do any post-build analysis)
deleteDir()
script {
- if ((env.BRANCH_NAME == "activemq-5.18.x" || env.BRANCH_NAME == "activemq-5.17.x" || env.BRANCH_NAME == "main") && (currentBuild.previousBuild != null) && (currentBuild.previousBuild.result != 'SUCCESS')) {
+ if ((env.BRANCH_NAME == "activemq-5.18.x" || env.BRANCH_NAME == "activemq-6.1.x" || env.BRANCH_NAME == "main") && (currentBuild.previousBuild != null) && (currentBuild.previousBuild.result != 'SUCCESS')) {
emailext (
subject: "[BUILD-STABLE]: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]'",
body: """
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
index 12e9f7742a..c8af4a6144 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
@@ -190,9 +190,16 @@ public class TransactionBroker extends BrokerFilter {
dest.clearPendingMessages(opCount);
dest.getDestinationStatistics().getEnqueues().add(opCount);
dest.getDestinationStatistics().getMessages().add(opCount);
+
+ if(dest.isAdvancedNetworkStatisticsEnabled() && transactionBroker.context != null && transactionBroker.context.isNetworkConnection()) {
+ dest.getDestinationStatistics().getNetworkEnqueues().add(opCount);
+ }
LOG.debug("cleared pending from afterCommit: {}", destination);
} else {
dest.getDestinationStatistics().getDequeues().add(opCount);
+ if(dest.isAdvancedNetworkStatisticsEnabled() && transactionBroker.context != null && transactionBroker.context.isNetworkConnection()) {
+ dest.getDestinationStatistics().getNetworkDequeues().add(opCount);
+ }
}
}
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
index 8abcc67163..02a7f65057 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
@@ -599,4 +599,25 @@ public class DestinationView implements DestinationViewMBean {
public long getMaxUncommittedExceededCount() {
return destination.getDestinationStatistics().getMaxUncommittedExceededCount().getCount();
}
+
+ @Override
+ public boolean isAdvancedNetworkStatisticsEnabled() {
+ return destination.isAdvancedNetworkStatisticsEnabled();
+ }
+
+ @Override
+ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
+ destination.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled);
+ }
+
+ @Override
+ public long getNetworkEnqueues() {
+ return destination.getDestinationStatistics().getNetworkEnqueues().getCount();
+ }
+
+ @Override
+ public long getNetworkDequeues() {
+ return destination.getDestinationStatistics().getNetworkDequeues().getCount();
+ }
+
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
index 45ed51b994..328ddb09f0 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
@@ -481,4 +481,16 @@ public interface DestinationViewMBean {
@MBeanInfo("Number of times the max uncommitted limit has been exceed for this destination")
long getMaxUncommittedExceededCount();
+
+ @MBeanInfo("Query Advanced Network Statistics flag")
+ boolean isAdvancedNetworkStatisticsEnabled();
+
+ @MBeanInfo("Toggle Advanced Network Statistics flag")
+ void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled);
+
+ @MBeanInfo("Number of messages sent to the destination via network connection")
+ long getNetworkEnqueues();
+
+ @MBeanInfo("Number of messages acknowledged from the destination via network connection")
+ long getNetworkDequeues();
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 4ca3913c74..e34f23a4dd 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -110,6 +110,8 @@ public abstract class BaseDestination implements Destination {
protected final Scheduler scheduler;
private boolean disposed = false;
private boolean doOptimzeMessageStorage = true;
+ private boolean advancedNetworkStatisticsEnabled = false;
+
/*
* percentage of in-flight messages above which optimize message store is disabled
*/
@@ -868,6 +870,15 @@ public abstract class BaseDestination implements Destination {
this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
}
+ @Override
+ public boolean isAdvancedNetworkStatisticsEnabled() {
+ return this.advancedNetworkStatisticsEnabled;
+ }
+
+ @Override
+ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
+ this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled;
+ }
@Override
public abstract List getConsumers();
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
index 70f807be86..45e3de7b3c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
@@ -258,4 +258,10 @@ public interface Destination extends Service, Task, Message.MessageDestination {
boolean isSendDuplicateFromStoreToDLQ();
void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ);
+
+ // [AMQ-9437]
+ boolean isAdvancedNetworkStatisticsEnabled();
+
+ void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled);
+
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
index 6b288a234f..85ef367a77 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
@@ -409,6 +409,16 @@ public class DestinationFilter implements Destination {
next.setSendDuplicateFromStoreToDLQ(sendDuplicateFromStoreToDLQ);
}
+ @Override
+ public boolean isAdvancedNetworkStatisticsEnabled() {
+ return next.isAdvancedNetworkStatisticsEnabled();
+ }
+
+ @Override
+ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
+ next.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled);
+ }
+
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
if (next instanceof DestinationFilter) {
DestinationFilter filter = (DestinationFilter) next;
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
index 9d30c622f3..dc6b17dfaf 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
@@ -46,6 +46,10 @@ public class DestinationStatistics extends StatsImpl {
protected SizeStatisticImpl messageSize;
protected CountStatisticImpl maxUncommittedExceededCount;
+ // [AMQ-9437] Advanced Statistics are optionally enabled
+ protected CountStatisticImpl networkEnqueues;
+ protected CountStatisticImpl networkDequeues;
+
public DestinationStatistics() {
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
@@ -68,6 +72,10 @@ public class DestinationStatistics extends StatsImpl {
blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control");
messageSize = new SizeStatisticImpl("messageSize","Size of messages passing through the destination");
maxUncommittedExceededCount = new CountStatisticImpl("maxUncommittedExceededCount", "number of times maxUncommittedCount has been exceeded");
+
+ networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection");
+ networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection");
+
addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
@@ -83,6 +91,9 @@ public class DestinationStatistics extends StatsImpl {
addStatistic("blockedTime",blockedTime);
addStatistic("messageSize",messageSize);
addStatistic("maxUncommittedExceededCount", maxUncommittedExceededCount);
+
+ addStatistic("networkEnqueues", networkEnqueues);
+ addStatistic("networkDequeues", networkDequeues);
}
public CountStatisticImpl getEnqueues() {
@@ -151,6 +162,14 @@ public class DestinationStatistics extends StatsImpl {
return this.maxUncommittedExceededCount;
}
+ public CountStatisticImpl getNetworkEnqueues() {
+ return networkEnqueues;
+ }
+
+ public CountStatisticImpl getNetworkDequeues() {
+ return networkDequeues;
+ }
+
public void reset() {
if (this.isDoReset()) {
super.reset();
@@ -165,6 +184,8 @@ public class DestinationStatistics extends StatsImpl {
blockedTime.reset();
messageSize.reset();
maxUncommittedExceededCount.reset();
+ networkEnqueues.reset();
+ networkDequeues.reset();
}
}
@@ -187,6 +208,9 @@ public class DestinationStatistics extends StatsImpl {
messageSize.setEnabled(enabled);
maxUncommittedExceededCount.setEnabled(enabled);
+ // [AMQ-9437] Advanced Statistics
+ networkEnqueues.setEnabled(enabled);
+ networkDequeues.setEnabled(enabled);
}
public void setParent(DestinationStatistics parent) {
@@ -207,6 +231,8 @@ public class DestinationStatistics extends StatsImpl {
blockedTime.setParent(parent.blockedTime);
messageSize.setParent(parent.messageSize);
maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount);
+ networkEnqueues.setParent(parent.networkEnqueues);
+ networkDequeues.setParent(parent.networkDequeues);
} else {
enqueues.setParent(null);
dispatched.setParent(null);
@@ -224,6 +250,8 @@ public class DestinationStatistics extends StatsImpl {
blockedTime.setParent(null);
messageSize.setParent(null);
maxUncommittedExceededCount.setParent(null);
+ networkEnqueues.setParent(null);
+ networkDequeues.setParent(null);
}
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index e0dc6d0f07..6946a33fa5 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -371,6 +371,9 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
if (info.isNetworkSubscription()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
+ if(((Destination)node.getRegionDestination()).isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
+ ((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount());
+ }
}
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 6502a20633..0ed6763f7b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1873,7 +1873,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// This sends the ack the the journal..
if (!ack.isInTransaction()) {
acknowledge(context, sub, ack, reference);
- dropMessage(reference);
+ dropMessage(context, reference);
} else {
try {
acknowledge(context, sub, ack, reference);
@@ -1882,7 +1882,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
@Override
public void afterCommit() throws Exception {
- dropMessage(reference);
+ dropMessage(context, reference);
wakeup();
}
@@ -1910,11 +1910,16 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
reference.setAcked(true);
}
- private void dropMessage(QueueMessageReference reference) {
+ private void dropMessage(ConnectionContext context, QueueMessageReference reference) {
//use dropIfLive so we only process the statistics at most one time
if (reference.dropIfLive()) {
getDestinationStatistics().getDequeues().increment();
getDestinationStatistics().getMessages().decrement();
+
+ if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) {
+ getDestinationStatistics().getNetworkDequeues().increment();
+ }
+
pagedInMessagesLock.writeLock().lock();
try {
pagedInMessages.remove(reference);
@@ -1969,6 +1974,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
destinationStatistics.getMessageSize().addSize(msg.getSize());
+
+ if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) {
+ destinationStatistics.getNetworkEnqueues().increment();
+ }
+
messageDelivered(context, msg);
consumersLock.readLock().lock();
try {
@@ -2115,7 +2125,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
LOG.warn("{}, duplicate message {} - {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong());
if (store != null) {
ConnectionContext connectionContext = createConnectionContext();
- dropMessage(ref);
+ dropMessage(connectionContext, ref);
if (gotToTheStore(ref.getMessage())) {
LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage());
store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POISON_ACK_TYPE, 1));
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index cad0d3b883..a9e07874e0 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -778,6 +778,11 @@ public class Topic extends BaseDestination implements Task {
// misleading metrics.
// destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();
+
+ if(isAdvancedNetworkStatisticsEnabled() && context != null && context.isNetworkConnection()) {
+ destinationStatistics.getNetworkEnqueues().increment();
+ }
+
destinationStatistics.getMessageSize().addSize(message.getSize());
MessageEvaluationContext msgContext = null;
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index a5b97241b4..4403dea6b5 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -449,6 +449,9 @@ public class TopicSubscription extends AbstractSubscription {
destination.getDestinationStatistics().getInflight().subtract(count);
if (info.isNetworkSubscription()) {
destination.getDestinationStatistics().getForwards().add(count);
+ if(destination.isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
+ destination.getDestinationStatistics().getNetworkDequeues().add(count);
+ }
}
if (ack.isExpiredAck()) {
destination.getDestinationStatistics().getExpired().add(count);
@@ -746,6 +749,9 @@ public class TopicSubscription extends AbstractSubscription {
matched.remove(message);
if (destination != null) {
destination.getDestinationStatistics().getDequeues().increment();
+ if(destination.isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
+ destination.getDestinationStatistics().getNetworkDequeues().increment();
+ }
}
Destination dest = (Destination) message.getRegionDestination();
if (dest != null) {
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index 7230957022..e33f13b48c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -106,7 +106,7 @@ public class PolicyEntry extends DestinationMapEntry {
private boolean doOptimzeMessageStorage = true;
private int maxDestinations = -1;
private boolean useTopicSubscriptionInflightStats = true;
-
+ private boolean advancedNetworkStatisticsEnabled = false; // [AMQ-9437]
/*
* percentage of in-flight messages above which optimize message store is disabled
*/
@@ -306,6 +306,9 @@ public class PolicyEntry extends DestinationMapEntry {
if (isUpdate("sendDuplicateFromStoreToDLQ", includedProperties)) {
destination.setSendDuplicateFromStoreToDLQ(isSendDuplicateFromStoreToDLQ());
}
+ if (isUpdate("advancedNetworkStatisticsEnabled", includedProperties)) {
+ destination.setAdvancedNetworkStatisticsEnabled(isAdvancedNetworkStatisticsEnabled());
+ }
}
public void baseConfiguration(Broker broker, BaseDestination destination) {
@@ -1175,5 +1178,13 @@ public class PolicyEntry extends DestinationMapEntry {
public MessageInterceptorStrategy getMessageInterceptorStrategy() {
return this.messageInterceptorStrategy;
+ }
+
+ public boolean isAdvancedNetworkStatisticsEnabled() {
+ return this.advancedNetworkStatisticsEnabled;
+ }
+
+ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
+ this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled;
}
}
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java b/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java
index 692fb94a4a..f1aaf2983c 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java
@@ -210,7 +210,6 @@ public final class DataByteArrayOutputStream extends OutputStream implements Dat
ensureEnoughBuffer((int)(pos + encodedsize + 2));
writeShort((int)encodedsize);
- byte[] buffer = new byte[(int)encodedsize];
MarshallingSupport.writeUTFBytesToBuffer(text, (int) encodedsize, buf, pos);
pos += encodedsize;
}
diff --git a/activemq-client/src/test/java/org/apache/activemq/util/DataByteArrayInputStreamTest.java b/activemq-client/src/test/java/org/apache/activemq/util/DataByteArrayInputStreamTest.java
index 632bfa30a2..f21260d9d8 100644
--- a/activemq-client/src/test/java/org/apache/activemq/util/DataByteArrayInputStreamTest.java
+++ b/activemq-client/src/test/java/org/apache/activemq/util/DataByteArrayInputStreamTest.java
@@ -17,66 +17,121 @@
package org.apache.activemq.util;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import java.io.UTFDataFormatException;
import org.junit.Test;
+/**
+ * https://issues.apache.org/jira/browse/AMQ-1911
+ * https://issues.apache.org/jira/browse/AMQ-8122
+ */
public class DataByteArrayInputStreamTest {
- /**
- * https://issues.apache.org/activemq/browse/AMQ-1911
- */
@Test
- public void testNonAscii() throws Exception {
- doMarshallUnMarshallValidation("mei\u00DFen");
-
- String accumulator = new String();
-
- int test = 0; // int to get Supplementary chars
- while(Character.isDefined(test)) {
- String toTest = String.valueOf((char)test);
- accumulator += toTest;
- doMarshallUnMarshallValidation(toTest);
- test++;
- }
-
- int massiveThreeByteCharValue = 0x0FFF;
- String toTest = String.valueOf((char)massiveThreeByteCharValue);
- accumulator += toTest;
- doMarshallUnMarshallValidation(String.valueOf((char)massiveThreeByteCharValue));
-
- // Altogether
- doMarshallUnMarshallValidation(accumulator);
-
- // the three byte values
- char t = '\u0800';
- final char max = '\uffff';
- accumulator = String.valueOf(t);
- while (t < max) {
- String val = String.valueOf(t);
- accumulator += val;
- doMarshallUnMarshallValidation(val);
- t++;
- }
-
- // Altogether so long as it is not too big
- while (accumulator.length() > 20000) {
- accumulator = accumulator.substring(20000);
- }
- doMarshallUnMarshallValidation(accumulator);
+ public void testOneByteCharacters() throws Exception {
+ testCodePointRange(0x000000, 0x00007F);
}
- void doMarshallUnMarshallValidation(String value) throws Exception {
+ @Test
+ public void testTwoBytesCharacters() throws Exception {
+ testCodePointRange(0x000080, 0x0007FF);
+ }
+
+ @Test
+ public void testThreeBytesCharacters() throws Exception {
+ testCodePointRange(0x000800, 0x00FFFF);
+ }
+
+ @Test
+ public void testFourBytesCharacters() throws Exception {
+ testCodePointRange(0x010000, 0X10FFFF);
+ }
+
+ @Test
+ public void testFourBytesCharacterEncodedAsBytes() throws Exception {
+ // Currently ActiveMQ does not properly support 4-bytes UTF characters.
+ // Ideally, this test should be failing. The current logic was kept as is
+ // intentionally. See https://issues.apache.org/jira/browse/AMQ-8398.
+
+ // 0xF0 0x80 0x80 0x80 (first valid 4-bytes character)
+ testInvalidCharacterBytes(new byte[]{-16, -128, -128, -128}, 4);
+ // 0xF7 0xBF 0xBF 0xBF (last valid 4-bytes character)
+ testInvalidCharacterBytes(new byte[]{-9, -65, -65, -65}, 4);
+ }
+
+
+ private void testCodePointRange(int from, int to) throws Exception {
+ StringBuilder accumulator = new StringBuilder();
+ for (int codePoint = from; codePoint <= to; codePoint++) {
+ String val = String.valueOf(Character.toChars(codePoint));
+ accumulator.append(val);
+ doMarshallUnMarshallValidation(val);
+ }
+
+ // truncate string to last 20k characters
+ if (accumulator.length() > 20_000) {
+ doMarshallUnMarshallValidation(accumulator.substring(
+ accumulator.length() - 20_000));
+ } else {
+ doMarshallUnMarshallValidation(accumulator.toString());
+ }
+ }
+
+ private void doMarshallUnMarshallValidation(String value) throws Exception {
DataByteArrayOutputStream out = new DataByteArrayOutputStream();
- out.writeBoolean(true);
out.writeUTF(value);
out.close();
DataByteArrayInputStream in = new DataByteArrayInputStream(out.getData());
- in.readBoolean();
String readBack = in.readUTF();
+
assertEquals(value, readBack);
}
+ @Test
+ public void testTwoBytesOutOfRangeCharacter() throws Exception {
+ // 0xC0 0x7F
+ testInvalidCharacterBytes(new byte[]{-64, 127}, 2);
+ // 0xDF 0xC0
+ testInvalidCharacterBytes(new byte[]{-33, -64}, 2);
+ }
+
+ @Test
+ public void testThreeBytesOutOfRangeCharacter() throws Exception {
+ // 0xE0 0x80 0x7F
+ testInvalidCharacterBytes(new byte[]{-32, -128, 127}, 3);
+ // 0xEF 0xBF 0xC0
+ testInvalidCharacterBytes(new byte[]{-17, -65, -64}, 3);
+ }
+
+ @Test
+ public void testFourBytesOutOfRangeCharacter() throws Exception {
+ // 0xF0 0x80 0x80 0x7F
+ testInvalidCharacterBytes(new byte[]{-16, -128, -128, 127}, 4);
+ // 0xF7 0xBF 0xBF 0xC0
+ testInvalidCharacterBytes(new byte[]{-9, -65, -65, -64}, 4);
+ }
+
+ private void testInvalidCharacterBytes(byte[] bytes, int encodedSize) throws Exception {
+ // Java guarantees that strings are always UTF-8 compliant and valid,
+ // any invalid sequence of bytes is either replaced or removed.
+ // This test demonstrates that Java takes care about and does not allow
+ // anything to break.
+ String val = new String(bytes);
+ doMarshallUnMarshallValidation(val);
+
+ // However, a non-java client can send an invalid sequence of bytes.
+ // Such data causes exceptions while unmarshalling.
+ DataByteArrayOutputStream out = new DataByteArrayOutputStream();
+ out.writeShort(encodedSize);
+ out.write(bytes);
+ out.close();
+
+ DataByteArrayInputStream in = new DataByteArrayInputStream(out.getData());
+ assertThrows(UTFDataFormatException.class, () -> in.readUTF());
+ }
+
@Test
public void testReadLong() throws Exception {
DataByteArrayOutputStream out = new DataByteArrayOutputStream(8);
@@ -87,4 +142,4 @@ public class DataByteArrayInputStreamTest {
long readBack = in.readLong();
assertEquals(Long.MAX_VALUE, readBack);
}
-}
+}
\ No newline at end of file
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayOutputStream.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayOutputStream.java
index 5a3fba4715..595726b8cd 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayOutputStream.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayOutputStream.java
@@ -237,7 +237,6 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
ensureEnoughBuffer((int)(pos + encodedsize + 2));
writeShort((int)encodedsize);
- byte[] buffer = new byte[(int)encodedsize];
MarshallingSupport.writeUTFBytesToBuffer(text, (int) encodedsize, buf, pos);
pos += encodedsize;
onWrite();
diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java
index 0f46e089e1..53341c3a53 100644
--- a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java
+++ b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java
@@ -57,6 +57,18 @@ public class PolicyEntryTest extends RuntimeConfigTestSupport {
verifyBooleanField("AMQ.8397", "sendDuplicateFromStoreToDLQ", true);
}
+ @Test
+ public void testModAdvancedNetworkStatistics() throws Exception {
+ final String brokerConfig = configurationSeed + "-policy-ml-broker";
+ applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedNetworkStatistics");
+ startBroker(brokerConfig);
+ assertTrue("broker alive", brokerService.isStarted());
+
+ verifyBooleanField("AMQ.9437", "advancedNetworkStatisticsEnabled", false);
+ applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedNetworkStatistics-mod", SLEEP);
+ verifyBooleanField("AMQ.9437", "advancedNetworkStatisticsEnabled", true);
+ }
+
@Test
public void testAddNdMod() throws Exception {
final String brokerConfig = configurationSeed + "-policy-ml-broker";
@@ -121,6 +133,9 @@ public class PolicyEntryTest extends RuntimeConfigTestSupport {
session.createConsumer(session.createQueue(dest));
switch(fieldName) {
+ case "advancedNetworkStatisticsEnabled":
+ assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isAdvancedNetworkStatisticsEnabled());
+ break;
case "sendDuplicateFromStoreToDLQ":
assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isSendDuplicateFromStoreToDLQ());
break;
diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics-mod.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics-mod.xml
new file mode 100644
index 0000000000..534f884d4b
--- /dev/null
+++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics-mod.xml
@@ -0,0 +1,36 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics.xml
new file mode 100644
index 0000000000..a6c710e075
--- /dev/null
+++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics.xml
@@ -0,0 +1,36 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
index 38eab6fc77..0c6ee175f5 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.stomp;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@@ -53,12 +54,9 @@ public class LegacyFrameTranslator implements FrameTranslator {
if(intendedType.equalsIgnoreCase("text")){
ActiveMQTextMessage text = new ActiveMQTextMessage();
try {
- ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
- DataOutputStream data = new DataOutputStream(bytes);
- data.writeInt(command.getContent().length);
- data.write(command.getContent());
- text.setContent(bytes.toByteSequence());
- data.close();
+ // AMQ-8398 - get the original text back so we decode from standard UTF-8
+ // and set on the message so it will re-encode using AMQ modified UTF-8
+ text.setText(command.getBody());
} catch (Throwable e) {
throw new ProtocolException("Text could not bet set: " + e, false, e);
}
@@ -78,12 +76,9 @@ public class LegacyFrameTranslator implements FrameTranslator {
} else {
ActiveMQTextMessage text = new ActiveMQTextMessage();
try {
- ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
- DataOutputStream data = new DataOutputStream(bytes);
- data.writeInt(command.getContent().length);
- data.write(command.getContent());
- text.setContent(bytes.toByteSequence());
- data.close();
+ // AMQ-8398 - get the original text back so we decode from standard UTF-8
+ // and set on the message so it will re-encode using AMQ modified UTF-8
+ text.setText(command.getBody());
} catch (Throwable e) {
throw new ProtocolException("Text could not bet set: " + e, false, e);
}
@@ -103,22 +98,13 @@ public class LegacyFrameTranslator implements FrameTranslator {
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this);
if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
-
- if (!message.isCompressed() && message.getContent() != null) {
- ByteSequence msgContent = message.getContent();
- if (msgContent.getLength() > 4) {
- byte[] content = new byte[msgContent.getLength() - 4];
- System.arraycopy(msgContent.data, 4, content, 0, content.length);
- command.setContent(content);
- }
- } else {
- ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
- String messageText = msg.getText();
- if (messageText != null) {
- command.setContent(msg.getText().getBytes("UTF-8"));
- }
+ ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
+ // AMQ-8398 - get the original text back so we decode from modified UTF-8
+ // and then we can re-encode using the standard JDK encoding
+ String messageText = msg.getText();
+ if (messageText != null) {
+ command.setContent(msg.getText().getBytes(StandardCharsets.UTF_8));
}
-
} else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
index d386550881..8304232940 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
@@ -17,6 +17,7 @@
package org.apache.activemq.transport.stomp;
import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Locale;
@@ -75,11 +76,7 @@ public class StompFrame implements Command {
}
public String getBody() {
- try {
- return new String(content, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- return new String(content);
- }
+ return new String(content, StandardCharsets.UTF_8);
}
public void setContent(byte[] data) {
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLLargeMessageTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLLargeMessageTest.java
index e4277e2627..21e181a4f0 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLLargeMessageTest.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLLargeMessageTest.java
@@ -103,6 +103,7 @@ public class StompNIOSSLLargeMessageTest extends StompTestSupport {
@Override
public void tearDown() throws Exception {
+ super.tearDown();
// unregister Log4J appender
org.apache.logging.log4j.core.Logger rootLogger = (org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager.getRootLogger();
rootLogger.removeAppender(appender);
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
index 9f0d65e909..9c82261e32 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
@@ -219,6 +219,84 @@ public class StompTest extends StompTestSupport {
assertTrue(Math.abs(tnow - tmsg) < 1000);
}
+ // Test that a string that requires 4 bytes to encode using standard
+ // UTF-8 does not break when sent by Stomp and received by JMS/OpenWire
+ // AMQ uses a modified UTF-8 encoding that only uses 3 bytes so conversion
+ // needs to happen so this works.
+ @Test(timeout = 60000)
+ public void testSend4ByteUtf8StompToJms() throws Exception {
+ // Create test string using emojis, requires 4 bytes with standard UTF-8
+ String body = "!®౩\uD83D\uDE42";
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ // publish message with string that requires 4-byte UTF-8 encoding
+ frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + body + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ // Verify received message is original sent string
+ TextMessage message = (TextMessage)consumer.receive(2500);
+ assertNotNull(message);
+ assertEquals(body, message.getText());
+ }
+
+ // Test that a string that requires 4 bytes to encode using standard
+ // UTF-8 does not break when sent by JMS/OpenWire and received by Stomp
+ // AMQ uses a modified UTF-8 encoding that only uses 3 bytes so conversion
+ // needs to happen so this works.
+ @Test(timeout = 60000)
+ public void testSend4ByteUtf8JmsToStomp() throws Exception {
+ // Create test string using emojis, requires 4 bytes with standard UTF-8
+ String body = "!®౩\uD83D\uDE42";
+ MessageProducer producer = session.createProducer(queue);
+
+ String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+ frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ // publish message with string that requires 4-byte UTF-8 encoding
+ producer.send(session.createTextMessage(body));
+
+ // Verify received message is original sent string
+ StompFrame message = stompConnection.receive();
+ assertNotNull(message);
+ assertEquals(body, message.getBody());
+ }
+
+ // Test that a string that requires 4 bytes to encode using standard
+ // UTF-8 does not break when sent by Stomp and received by Stomp
+ @Test(timeout = 60000)
+ public void testSend4ByteUtf8StompToStomp() throws Exception {
+ // Create test string using emojis, requires 4 bytes with standard UTF-8
+ String body = "!®౩\uD83D\uDE42";
+
+ String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ // publish message with string that requires 4-byte UTF-8 encoding
+ frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + body + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ // Verify received message is original sent string
+ StompFrame message = stompConnection.receive();
+ assertNotNull(message);
+ assertEquals(body, message.getBody());
+ }
+
@Test(timeout = 60000)
public void testJMSXGroupIdCanBeSet() throws Exception {
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java
new file mode 100644
index 0000000000..df99bab354
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageListener;
+import jakarta.jms.MessageProducer;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.springframework.context.support.AbstractApplicationContext;
+
+@RunWith(value = Parameterized.class)
+public class NetworkAdvancedStatisticsTest extends BaseNetworkTest {
+
+ @Parameterized.Parameters(name="includedDestination={0}, excludedDestination={1}")
+ public static Collection