Merge branch 'apache:main' into AMQ-9552

This commit is contained in:
Grzegorz Kochański 2024-09-22 10:54:56 +02:00 committed by GitHub
commit 7a2421b05d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 757 additions and 99 deletions

6
Jenkinsfile vendored
View File

@ -172,7 +172,7 @@ pipeline {
// If this build failed, send an email to the list. // If this build failed, send an email to the list.
failure { failure {
script { script {
if(env.BRANCH_NAME == "activemq-5.18.x" || env.BRANCH_NAME == "activemq-5.17.x" || env.BRANCH_NAME == "main") { if(env.BRANCH_NAME == "activemq-5.18.x" || env.BRANCH_NAME == "activemq-6.1.x" || env.BRANCH_NAME == "main") {
emailext( emailext(
subject: "[BUILD-FAILURE]: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]'", subject: "[BUILD-FAILURE]: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]'",
body: """ body: """
@ -189,7 +189,7 @@ Check console output at "<a href="${env.BUILD_URL}">${env.JOB_NAME} [${env.BRANC
// If this build didn't fail, but there were failing tests, send an email to the list. // If this build didn't fail, but there were failing tests, send an email to the list.
unstable { unstable {
script { script {
if(env.BRANCH_NAME == "activemq-5.18.x" || env.BRANCH_NAME == "activemq-5.17.x" || env.BRANCH_NAME == "main") { if(env.BRANCH_NAME == "activemq-5.18.x" || env.BRANCH_NAME == "activemq-6.1.x" || env.BRANCH_NAME == "main") {
emailext( emailext(
subject: "[BUILD-UNSTABLE]: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]'", subject: "[BUILD-UNSTABLE]: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]'",
body: """ body: """
@ -209,7 +209,7 @@ Check console output at "<a href="${env.BUILD_URL}">${env.JOB_NAME} [${env.BRANC
// (in this cae we probably don't have to do any post-build analysis) // (in this cae we probably don't have to do any post-build analysis)
deleteDir() deleteDir()
script { script {
if ((env.BRANCH_NAME == "activemq-5.18.x" || env.BRANCH_NAME == "activemq-5.17.x" || env.BRANCH_NAME == "main") && (currentBuild.previousBuild != null) && (currentBuild.previousBuild.result != 'SUCCESS')) { if ((env.BRANCH_NAME == "activemq-5.18.x" || env.BRANCH_NAME == "activemq-6.1.x" || env.BRANCH_NAME == "main") && (currentBuild.previousBuild != null) && (currentBuild.previousBuild.result != 'SUCCESS')) {
emailext ( emailext (
subject: "[BUILD-STABLE]: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]'", subject: "[BUILD-STABLE]: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]'",
body: """ body: """

View File

@ -190,9 +190,16 @@ public class TransactionBroker extends BrokerFilter {
dest.clearPendingMessages(opCount); dest.clearPendingMessages(opCount);
dest.getDestinationStatistics().getEnqueues().add(opCount); dest.getDestinationStatistics().getEnqueues().add(opCount);
dest.getDestinationStatistics().getMessages().add(opCount); dest.getDestinationStatistics().getMessages().add(opCount);
if(dest.isAdvancedNetworkStatisticsEnabled() && transactionBroker.context != null && transactionBroker.context.isNetworkConnection()) {
dest.getDestinationStatistics().getNetworkEnqueues().add(opCount);
}
LOG.debug("cleared pending from afterCommit: {}", destination); LOG.debug("cleared pending from afterCommit: {}", destination);
} else { } else {
dest.getDestinationStatistics().getDequeues().add(opCount); dest.getDestinationStatistics().getDequeues().add(opCount);
if(dest.isAdvancedNetworkStatisticsEnabled() && transactionBroker.context != null && transactionBroker.context.isNetworkConnection()) {
dest.getDestinationStatistics().getNetworkDequeues().add(opCount);
}
} }
} }
} }

View File

@ -599,4 +599,25 @@ public class DestinationView implements DestinationViewMBean {
public long getMaxUncommittedExceededCount() { public long getMaxUncommittedExceededCount() {
return destination.getDestinationStatistics().getMaxUncommittedExceededCount().getCount(); return destination.getDestinationStatistics().getMaxUncommittedExceededCount().getCount();
} }
@Override
public boolean isAdvancedNetworkStatisticsEnabled() {
return destination.isAdvancedNetworkStatisticsEnabled();
}
@Override
public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
destination.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled);
}
@Override
public long getNetworkEnqueues() {
return destination.getDestinationStatistics().getNetworkEnqueues().getCount();
}
@Override
public long getNetworkDequeues() {
return destination.getDestinationStatistics().getNetworkDequeues().getCount();
}
} }

View File

@ -481,4 +481,16 @@ public interface DestinationViewMBean {
@MBeanInfo("Number of times the max uncommitted limit has been exceed for this destination") @MBeanInfo("Number of times the max uncommitted limit has been exceed for this destination")
long getMaxUncommittedExceededCount(); long getMaxUncommittedExceededCount();
@MBeanInfo("Query Advanced Network Statistics flag")
boolean isAdvancedNetworkStatisticsEnabled();
@MBeanInfo("Toggle Advanced Network Statistics flag")
void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled);
@MBeanInfo("Number of messages sent to the destination via network connection")
long getNetworkEnqueues();
@MBeanInfo("Number of messages acknowledged from the destination via network connection")
long getNetworkDequeues();
} }

View File

@ -110,6 +110,8 @@ public abstract class BaseDestination implements Destination {
protected final Scheduler scheduler; protected final Scheduler scheduler;
private boolean disposed = false; private boolean disposed = false;
private boolean doOptimzeMessageStorage = true; private boolean doOptimzeMessageStorage = true;
private boolean advancedNetworkStatisticsEnabled = false;
/* /*
* percentage of in-flight messages above which optimize message store is disabled * percentage of in-flight messages above which optimize message store is disabled
*/ */
@ -868,6 +870,15 @@ public abstract class BaseDestination implements Destination {
this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit; this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
} }
@Override
public boolean isAdvancedNetworkStatisticsEnabled() {
return this.advancedNetworkStatisticsEnabled;
}
@Override
public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled;
}
@Override @Override
public abstract List<Subscription> getConsumers(); public abstract List<Subscription> getConsumers();

View File

@ -258,4 +258,10 @@ public interface Destination extends Service, Task, Message.MessageDestination {
boolean isSendDuplicateFromStoreToDLQ(); boolean isSendDuplicateFromStoreToDLQ();
void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ); void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ);
// [AMQ-9437]
boolean isAdvancedNetworkStatisticsEnabled();
void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled);
} }

View File

@ -409,6 +409,16 @@ public class DestinationFilter implements Destination {
next.setSendDuplicateFromStoreToDLQ(sendDuplicateFromStoreToDLQ); next.setSendDuplicateFromStoreToDLQ(sendDuplicateFromStoreToDLQ);
} }
@Override
public boolean isAdvancedNetworkStatisticsEnabled() {
return next.isAdvancedNetworkStatisticsEnabled();
}
@Override
public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
next.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled);
}
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
if (next instanceof DestinationFilter) { if (next instanceof DestinationFilter) {
DestinationFilter filter = (DestinationFilter) next; DestinationFilter filter = (DestinationFilter) next;

View File

@ -46,6 +46,10 @@ public class DestinationStatistics extends StatsImpl {
protected SizeStatisticImpl messageSize; protected SizeStatisticImpl messageSize;
protected CountStatisticImpl maxUncommittedExceededCount; protected CountStatisticImpl maxUncommittedExceededCount;
// [AMQ-9437] Advanced Statistics are optionally enabled
protected CountStatisticImpl networkEnqueues;
protected CountStatisticImpl networkDequeues;
public DestinationStatistics() { public DestinationStatistics() {
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination"); enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
@ -68,6 +72,10 @@ public class DestinationStatistics extends StatsImpl {
blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control"); blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control");
messageSize = new SizeStatisticImpl("messageSize","Size of messages passing through the destination"); messageSize = new SizeStatisticImpl("messageSize","Size of messages passing through the destination");
maxUncommittedExceededCount = new CountStatisticImpl("maxUncommittedExceededCount", "number of times maxUncommittedCount has been exceeded"); maxUncommittedExceededCount = new CountStatisticImpl("maxUncommittedExceededCount", "number of times maxUncommittedCount has been exceeded");
networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection");
networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection");
addStatistic("enqueues", enqueues); addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched); addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues); addStatistic("dequeues", dequeues);
@ -83,6 +91,9 @@ public class DestinationStatistics extends StatsImpl {
addStatistic("blockedTime",blockedTime); addStatistic("blockedTime",blockedTime);
addStatistic("messageSize",messageSize); addStatistic("messageSize",messageSize);
addStatistic("maxUncommittedExceededCount", maxUncommittedExceededCount); addStatistic("maxUncommittedExceededCount", maxUncommittedExceededCount);
addStatistic("networkEnqueues", networkEnqueues);
addStatistic("networkDequeues", networkDequeues);
} }
public CountStatisticImpl getEnqueues() { public CountStatisticImpl getEnqueues() {
@ -151,6 +162,14 @@ public class DestinationStatistics extends StatsImpl {
return this.maxUncommittedExceededCount; return this.maxUncommittedExceededCount;
} }
public CountStatisticImpl getNetworkEnqueues() {
return networkEnqueues;
}
public CountStatisticImpl getNetworkDequeues() {
return networkDequeues;
}
public void reset() { public void reset() {
if (this.isDoReset()) { if (this.isDoReset()) {
super.reset(); super.reset();
@ -165,6 +184,8 @@ public class DestinationStatistics extends StatsImpl {
blockedTime.reset(); blockedTime.reset();
messageSize.reset(); messageSize.reset();
maxUncommittedExceededCount.reset(); maxUncommittedExceededCount.reset();
networkEnqueues.reset();
networkDequeues.reset();
} }
} }
@ -187,6 +208,9 @@ public class DestinationStatistics extends StatsImpl {
messageSize.setEnabled(enabled); messageSize.setEnabled(enabled);
maxUncommittedExceededCount.setEnabled(enabled); maxUncommittedExceededCount.setEnabled(enabled);
// [AMQ-9437] Advanced Statistics
networkEnqueues.setEnabled(enabled);
networkDequeues.setEnabled(enabled);
} }
public void setParent(DestinationStatistics parent) { public void setParent(DestinationStatistics parent) {
@ -207,6 +231,8 @@ public class DestinationStatistics extends StatsImpl {
blockedTime.setParent(parent.blockedTime); blockedTime.setParent(parent.blockedTime);
messageSize.setParent(parent.messageSize); messageSize.setParent(parent.messageSize);
maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount); maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount);
networkEnqueues.setParent(parent.networkEnqueues);
networkDequeues.setParent(parent.networkDequeues);
} else { } else {
enqueues.setParent(null); enqueues.setParent(null);
dispatched.setParent(null); dispatched.setParent(null);
@ -224,6 +250,8 @@ public class DestinationStatistics extends StatsImpl {
blockedTime.setParent(null); blockedTime.setParent(null);
messageSize.setParent(null); messageSize.setParent(null);
maxUncommittedExceededCount.setParent(null); maxUncommittedExceededCount.setParent(null);
networkEnqueues.setParent(null);
networkDequeues.setParent(null);
} }
} }

View File

@ -371,6 +371,9 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment(); ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
if (info.isNetworkSubscription()) { if (info.isNetworkSubscription()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount()); ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
if(((Destination)node.getRegionDestination()).isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount());
}
} }
} }

View File

@ -1873,7 +1873,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// This sends the ack the the journal.. // This sends the ack the the journal..
if (!ack.isInTransaction()) { if (!ack.isInTransaction()) {
acknowledge(context, sub, ack, reference); acknowledge(context, sub, ack, reference);
dropMessage(reference); dropMessage(context, reference);
} else { } else {
try { try {
acknowledge(context, sub, ack, reference); acknowledge(context, sub, ack, reference);
@ -1882,7 +1882,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
@Override @Override
public void afterCommit() throws Exception { public void afterCommit() throws Exception {
dropMessage(reference); dropMessage(context, reference);
wakeup(); wakeup();
} }
@ -1910,11 +1910,16 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
reference.setAcked(true); reference.setAcked(true);
} }
private void dropMessage(QueueMessageReference reference) { private void dropMessage(ConnectionContext context, QueueMessageReference reference) {
//use dropIfLive so we only process the statistics at most one time //use dropIfLive so we only process the statistics at most one time
if (reference.dropIfLive()) { if (reference.dropIfLive()) {
getDestinationStatistics().getDequeues().increment(); getDestinationStatistics().getDequeues().increment();
getDestinationStatistics().getMessages().decrement(); getDestinationStatistics().getMessages().decrement();
if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) {
getDestinationStatistics().getNetworkDequeues().increment();
}
pagedInMessagesLock.writeLock().lock(); pagedInMessagesLock.writeLock().lock();
try { try {
pagedInMessages.remove(reference); pagedInMessages.remove(reference);
@ -1969,6 +1974,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
destinationStatistics.getEnqueues().increment(); destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment(); destinationStatistics.getMessages().increment();
destinationStatistics.getMessageSize().addSize(msg.getSize()); destinationStatistics.getMessageSize().addSize(msg.getSize());
if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) {
destinationStatistics.getNetworkEnqueues().increment();
}
messageDelivered(context, msg); messageDelivered(context, msg);
consumersLock.readLock().lock(); consumersLock.readLock().lock();
try { try {
@ -2115,7 +2125,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
LOG.warn("{}, duplicate message {} - {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong()); LOG.warn("{}, duplicate message {} - {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong());
if (store != null) { if (store != null) {
ConnectionContext connectionContext = createConnectionContext(); ConnectionContext connectionContext = createConnectionContext();
dropMessage(ref); dropMessage(connectionContext, ref);
if (gotToTheStore(ref.getMessage())) { if (gotToTheStore(ref.getMessage())) {
LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage()); LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage());
store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POISON_ACK_TYPE, 1)); store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POISON_ACK_TYPE, 1));

View File

@ -778,6 +778,11 @@ public class Topic extends BaseDestination implements Task {
// misleading metrics. // misleading metrics.
// destinationStatistics.getMessages().increment(); // destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment(); destinationStatistics.getEnqueues().increment();
if(isAdvancedNetworkStatisticsEnabled() && context != null && context.isNetworkConnection()) {
destinationStatistics.getNetworkEnqueues().increment();
}
destinationStatistics.getMessageSize().addSize(message.getSize()); destinationStatistics.getMessageSize().addSize(message.getSize());
MessageEvaluationContext msgContext = null; MessageEvaluationContext msgContext = null;

View File

@ -449,6 +449,9 @@ public class TopicSubscription extends AbstractSubscription {
destination.getDestinationStatistics().getInflight().subtract(count); destination.getDestinationStatistics().getInflight().subtract(count);
if (info.isNetworkSubscription()) { if (info.isNetworkSubscription()) {
destination.getDestinationStatistics().getForwards().add(count); destination.getDestinationStatistics().getForwards().add(count);
if(destination.isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
destination.getDestinationStatistics().getNetworkDequeues().add(count);
}
} }
if (ack.isExpiredAck()) { if (ack.isExpiredAck()) {
destination.getDestinationStatistics().getExpired().add(count); destination.getDestinationStatistics().getExpired().add(count);
@ -746,6 +749,9 @@ public class TopicSubscription extends AbstractSubscription {
matched.remove(message); matched.remove(message);
if (destination != null) { if (destination != null) {
destination.getDestinationStatistics().getDequeues().increment(); destination.getDestinationStatistics().getDequeues().increment();
if(destination.isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
destination.getDestinationStatistics().getNetworkDequeues().increment();
}
} }
Destination dest = (Destination) message.getRegionDestination(); Destination dest = (Destination) message.getRegionDestination();
if (dest != null) { if (dest != null) {

View File

@ -106,7 +106,7 @@ public class PolicyEntry extends DestinationMapEntry {
private boolean doOptimzeMessageStorage = true; private boolean doOptimzeMessageStorage = true;
private int maxDestinations = -1; private int maxDestinations = -1;
private boolean useTopicSubscriptionInflightStats = true; private boolean useTopicSubscriptionInflightStats = true;
private boolean advancedNetworkStatisticsEnabled = false; // [AMQ-9437]
/* /*
* percentage of in-flight messages above which optimize message store is disabled * percentage of in-flight messages above which optimize message store is disabled
*/ */
@ -306,6 +306,9 @@ public class PolicyEntry extends DestinationMapEntry {
if (isUpdate("sendDuplicateFromStoreToDLQ", includedProperties)) { if (isUpdate("sendDuplicateFromStoreToDLQ", includedProperties)) {
destination.setSendDuplicateFromStoreToDLQ(isSendDuplicateFromStoreToDLQ()); destination.setSendDuplicateFromStoreToDLQ(isSendDuplicateFromStoreToDLQ());
} }
if (isUpdate("advancedNetworkStatisticsEnabled", includedProperties)) {
destination.setAdvancedNetworkStatisticsEnabled(isAdvancedNetworkStatisticsEnabled());
}
} }
public void baseConfiguration(Broker broker, BaseDestination destination) { public void baseConfiguration(Broker broker, BaseDestination destination) {
@ -1176,4 +1179,12 @@ public class PolicyEntry extends DestinationMapEntry {
public MessageInterceptorStrategy getMessageInterceptorStrategy() { public MessageInterceptorStrategy getMessageInterceptorStrategy() {
return this.messageInterceptorStrategy; return this.messageInterceptorStrategy;
} }
public boolean isAdvancedNetworkStatisticsEnabled() {
return this.advancedNetworkStatisticsEnabled;
}
public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled;
}
} }

View File

@ -210,7 +210,6 @@ public final class DataByteArrayOutputStream extends OutputStream implements Dat
ensureEnoughBuffer((int)(pos + encodedsize + 2)); ensureEnoughBuffer((int)(pos + encodedsize + 2));
writeShort((int)encodedsize); writeShort((int)encodedsize);
byte[] buffer = new byte[(int)encodedsize];
MarshallingSupport.writeUTFBytesToBuffer(text, (int) encodedsize, buf, pos); MarshallingSupport.writeUTFBytesToBuffer(text, (int) encodedsize, buf, pos);
pos += encodedsize; pos += encodedsize;
} }

View File

@ -17,66 +17,121 @@
package org.apache.activemq.util; package org.apache.activemq.util;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import java.io.UTFDataFormatException;
import org.junit.Test; import org.junit.Test;
/**
* https://issues.apache.org/jira/browse/AMQ-1911
* https://issues.apache.org/jira/browse/AMQ-8122
*/
public class DataByteArrayInputStreamTest { public class DataByteArrayInputStreamTest {
/**
* https://issues.apache.org/activemq/browse/AMQ-1911
*/
@Test @Test
public void testNonAscii() throws Exception { public void testOneByteCharacters() throws Exception {
doMarshallUnMarshallValidation("mei\u00DFen"); testCodePointRange(0x000000, 0x00007F);
String accumulator = new String();
int test = 0; // int to get Supplementary chars
while(Character.isDefined(test)) {
String toTest = String.valueOf((char)test);
accumulator += toTest;
doMarshallUnMarshallValidation(toTest);
test++;
} }
int massiveThreeByteCharValue = 0x0FFF; @Test
String toTest = String.valueOf((char)massiveThreeByteCharValue); public void testTwoBytesCharacters() throws Exception {
accumulator += toTest; testCodePointRange(0x000080, 0x0007FF);
doMarshallUnMarshallValidation(String.valueOf((char)massiveThreeByteCharValue)); }
// Altogether @Test
doMarshallUnMarshallValidation(accumulator); public void testThreeBytesCharacters() throws Exception {
testCodePointRange(0x000800, 0x00FFFF);
}
// the three byte values @Test
char t = '\u0800'; public void testFourBytesCharacters() throws Exception {
final char max = '\uffff'; testCodePointRange(0x010000, 0X10FFFF);
accumulator = String.valueOf(t); }
while (t < max) {
String val = String.valueOf(t); @Test
accumulator += val; public void testFourBytesCharacterEncodedAsBytes() throws Exception {
// Currently ActiveMQ does not properly support 4-bytes UTF characters.
// Ideally, this test should be failing. The current logic was kept as is
// intentionally. See https://issues.apache.org/jira/browse/AMQ-8398.
// 0xF0 0x80 0x80 0x80 (first valid 4-bytes character)
testInvalidCharacterBytes(new byte[]{-16, -128, -128, -128}, 4);
// 0xF7 0xBF 0xBF 0xBF (last valid 4-bytes character)
testInvalidCharacterBytes(new byte[]{-9, -65, -65, -65}, 4);
}
private void testCodePointRange(int from, int to) throws Exception {
StringBuilder accumulator = new StringBuilder();
for (int codePoint = from; codePoint <= to; codePoint++) {
String val = String.valueOf(Character.toChars(codePoint));
accumulator.append(val);
doMarshallUnMarshallValidation(val); doMarshallUnMarshallValidation(val);
t++;
} }
// Altogether so long as it is not too big // truncate string to last 20k characters
while (accumulator.length() > 20000) { if (accumulator.length() > 20_000) {
accumulator = accumulator.substring(20000); doMarshallUnMarshallValidation(accumulator.substring(
accumulator.length() - 20_000));
} else {
doMarshallUnMarshallValidation(accumulator.toString());
} }
doMarshallUnMarshallValidation(accumulator);
} }
void doMarshallUnMarshallValidation(String value) throws Exception { private void doMarshallUnMarshallValidation(String value) throws Exception {
DataByteArrayOutputStream out = new DataByteArrayOutputStream(); DataByteArrayOutputStream out = new DataByteArrayOutputStream();
out.writeBoolean(true);
out.writeUTF(value); out.writeUTF(value);
out.close(); out.close();
DataByteArrayInputStream in = new DataByteArrayInputStream(out.getData()); DataByteArrayInputStream in = new DataByteArrayInputStream(out.getData());
in.readBoolean();
String readBack = in.readUTF(); String readBack = in.readUTF();
assertEquals(value, readBack); assertEquals(value, readBack);
} }
@Test
public void testTwoBytesOutOfRangeCharacter() throws Exception {
// 0xC0 0x7F
testInvalidCharacterBytes(new byte[]{-64, 127}, 2);
// 0xDF 0xC0
testInvalidCharacterBytes(new byte[]{-33, -64}, 2);
}
@Test
public void testThreeBytesOutOfRangeCharacter() throws Exception {
// 0xE0 0x80 0x7F
testInvalidCharacterBytes(new byte[]{-32, -128, 127}, 3);
// 0xEF 0xBF 0xC0
testInvalidCharacterBytes(new byte[]{-17, -65, -64}, 3);
}
@Test
public void testFourBytesOutOfRangeCharacter() throws Exception {
// 0xF0 0x80 0x80 0x7F
testInvalidCharacterBytes(new byte[]{-16, -128, -128, 127}, 4);
// 0xF7 0xBF 0xBF 0xC0
testInvalidCharacterBytes(new byte[]{-9, -65, -65, -64}, 4);
}
private void testInvalidCharacterBytes(byte[] bytes, int encodedSize) throws Exception {
// Java guarantees that strings are always UTF-8 compliant and valid,
// any invalid sequence of bytes is either replaced or removed.
// This test demonstrates that Java takes care about and does not allow
// anything to break.
String val = new String(bytes);
doMarshallUnMarshallValidation(val);
// However, a non-java client can send an invalid sequence of bytes.
// Such data causes exceptions while unmarshalling.
DataByteArrayOutputStream out = new DataByteArrayOutputStream();
out.writeShort(encodedSize);
out.write(bytes);
out.close();
DataByteArrayInputStream in = new DataByteArrayInputStream(out.getData());
assertThrows(UTFDataFormatException.class, () -> in.readUTF());
}
@Test @Test
public void testReadLong() throws Exception { public void testReadLong() throws Exception {
DataByteArrayOutputStream out = new DataByteArrayOutputStream(8); DataByteArrayOutputStream out = new DataByteArrayOutputStream(8);

View File

@ -237,7 +237,6 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
ensureEnoughBuffer((int)(pos + encodedsize + 2)); ensureEnoughBuffer((int)(pos + encodedsize + 2));
writeShort((int)encodedsize); writeShort((int)encodedsize);
byte[] buffer = new byte[(int)encodedsize];
MarshallingSupport.writeUTFBytesToBuffer(text, (int) encodedsize, buf, pos); MarshallingSupport.writeUTFBytesToBuffer(text, (int) encodedsize, buf, pos);
pos += encodedsize; pos += encodedsize;
onWrite(); onWrite();

View File

@ -57,6 +57,18 @@ public class PolicyEntryTest extends RuntimeConfigTestSupport {
verifyBooleanField("AMQ.8397", "sendDuplicateFromStoreToDLQ", true); verifyBooleanField("AMQ.8397", "sendDuplicateFromStoreToDLQ", true);
} }
@Test
public void testModAdvancedNetworkStatistics() throws Exception {
final String brokerConfig = configurationSeed + "-policy-ml-broker";
applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedNetworkStatistics");
startBroker(brokerConfig);
assertTrue("broker alive", brokerService.isStarted());
verifyBooleanField("AMQ.9437", "advancedNetworkStatisticsEnabled", false);
applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedNetworkStatistics-mod", SLEEP);
verifyBooleanField("AMQ.9437", "advancedNetworkStatisticsEnabled", true);
}
@Test @Test
public void testAddNdMod() throws Exception { public void testAddNdMod() throws Exception {
final String brokerConfig = configurationSeed + "-policy-ml-broker"; final String brokerConfig = configurationSeed + "-policy-ml-broker";
@ -121,6 +133,9 @@ public class PolicyEntryTest extends RuntimeConfigTestSupport {
session.createConsumer(session.createQueue(dest)); session.createConsumer(session.createQueue(dest));
switch(fieldName) { switch(fieldName) {
case "advancedNetworkStatisticsEnabled":
assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isAdvancedNetworkStatisticsEnabled());
break;
case "sendDuplicateFromStoreToDLQ": case "sendDuplicateFromStoreToDLQ":
assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isSendDuplicateFromStoreToDLQ()); assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isSendDuplicateFromStoreToDLQ());
break; break;

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
<plugins>
<runtimeConfigurationPlugin checkPeriod="1000" />
</plugins>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="AMQ.9437" advancedNetworkStatisticsEnabled="true"/>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
</beans>

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
<plugins>
<runtimeConfigurationPlugin checkPeriod="1000" />
</plugins>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="AMQ.9437"/>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
</beans>

View File

@ -18,6 +18,7 @@ package org.apache.activemq.transport.stomp;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -53,12 +54,9 @@ public class LegacyFrameTranslator implements FrameTranslator {
if(intendedType.equalsIgnoreCase("text")){ if(intendedType.equalsIgnoreCase("text")){
ActiveMQTextMessage text = new ActiveMQTextMessage(); ActiveMQTextMessage text = new ActiveMQTextMessage();
try { try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4); // AMQ-8398 - get the original text back so we decode from standard UTF-8
DataOutputStream data = new DataOutputStream(bytes); // and set on the message so it will re-encode using AMQ modified UTF-8
data.writeInt(command.getContent().length); text.setText(command.getBody());
data.write(command.getContent());
text.setContent(bytes.toByteSequence());
data.close();
} catch (Throwable e) { } catch (Throwable e) {
throw new ProtocolException("Text could not bet set: " + e, false, e); throw new ProtocolException("Text could not bet set: " + e, false, e);
} }
@ -78,12 +76,9 @@ public class LegacyFrameTranslator implements FrameTranslator {
} else { } else {
ActiveMQTextMessage text = new ActiveMQTextMessage(); ActiveMQTextMessage text = new ActiveMQTextMessage();
try { try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4); // AMQ-8398 - get the original text back so we decode from standard UTF-8
DataOutputStream data = new DataOutputStream(bytes); // and set on the message so it will re-encode using AMQ modified UTF-8
data.writeInt(command.getContent().length); text.setText(command.getBody());
data.write(command.getContent());
text.setContent(bytes.toByteSequence());
data.close();
} catch (Throwable e) { } catch (Throwable e) {
throw new ProtocolException("Text could not bet set: " + e, false, e); throw new ProtocolException("Text could not bet set: " + e, false, e);
} }
@ -103,22 +98,13 @@ public class LegacyFrameTranslator implements FrameTranslator {
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this); FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this);
if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
if (!message.isCompressed() && message.getContent() != null) {
ByteSequence msgContent = message.getContent();
if (msgContent.getLength() > 4) {
byte[] content = new byte[msgContent.getLength() - 4];
System.arraycopy(msgContent.data, 4, content, 0, content.length);
command.setContent(content);
}
} else {
ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy(); ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
// AMQ-8398 - get the original text back so we decode from modified UTF-8
// and then we can re-encode using the standard JDK encoding
String messageText = msg.getText(); String messageText = msg.getText();
if (messageText != null) { if (messageText != null) {
command.setContent(msg.getText().getBytes("UTF-8")); command.setContent(msg.getText().getBytes(StandardCharsets.UTF_8));
} }
}
} else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) { } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy(); ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.stomp; package org.apache.activemq.transport.stomp;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Locale; import java.util.Locale;
@ -75,11 +76,7 @@ public class StompFrame implements Command {
} }
public String getBody() { public String getBody() {
try { return new String(content, StandardCharsets.UTF_8);
return new String(content, "UTF-8");
} catch (UnsupportedEncodingException e) {
return new String(content);
}
} }
public void setContent(byte[] data) { public void setContent(byte[] data) {

View File

@ -103,6 +103,7 @@ public class StompNIOSSLLargeMessageTest extends StompTestSupport {
@Override @Override
public void tearDown() throws Exception { public void tearDown() throws Exception {
super.tearDown();
// unregister Log4J appender // unregister Log4J appender
org.apache.logging.log4j.core.Logger rootLogger = (org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager.getRootLogger(); org.apache.logging.log4j.core.Logger rootLogger = (org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager.getRootLogger();
rootLogger.removeAppender(appender); rootLogger.removeAppender(appender);

View File

@ -219,6 +219,84 @@ public class StompTest extends StompTestSupport {
assertTrue(Math.abs(tnow - tmsg) < 1000); assertTrue(Math.abs(tnow - tmsg) < 1000);
} }
// Test that a string that requires 4 bytes to encode using standard
// UTF-8 does not break when sent by Stomp and received by JMS/OpenWire
// AMQ uses a modified UTF-8 encoding that only uses 3 bytes so conversion
// needs to happen so this works.
@Test(timeout = 60000)
public void testSend4ByteUtf8StompToJms() throws Exception {
// Create test string using emojis, requires 4 bytes with standard UTF-8
String body = "!®౩\uD83D\uDE42";
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
// publish message with string that requires 4-byte UTF-8 encoding
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + body + Stomp.NULL;
stompConnection.sendFrame(frame);
// Verify received message is original sent string
TextMessage message = (TextMessage)consumer.receive(2500);
assertNotNull(message);
assertEquals(body, message.getText());
}
// Test that a string that requires 4 bytes to encode using standard
// UTF-8 does not break when sent by JMS/OpenWire and received by Stomp
// AMQ uses a modified UTF-8 encoding that only uses 3 bytes so conversion
// needs to happen so this works.
@Test(timeout = 60000)
public void testSend4ByteUtf8JmsToStomp() throws Exception {
// Create test string using emojis, requires 4 bytes with standard UTF-8
String body = "!®౩\uD83D\uDE42";
MessageProducer producer = session.createProducer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// publish message with string that requires 4-byte UTF-8 encoding
producer.send(session.createTextMessage(body));
// Verify received message is original sent string
StompFrame message = stompConnection.receive();
assertNotNull(message);
assertEquals(body, message.getBody());
}
// Test that a string that requires 4 bytes to encode using standard
// UTF-8 does not break when sent by Stomp and received by Stomp
@Test(timeout = 60000)
public void testSend4ByteUtf8StompToStomp() throws Exception {
// Create test string using emojis, requires 4 bytes with standard UTF-8
String body = "!®౩\uD83D\uDE42";
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
// publish message with string that requires 4-byte UTF-8 encoding
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + body + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// Verify received message is original sent string
StompFrame message = stompConnection.receive();
assertNotNull(message);
assertEquals(body, message.getBody());
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testJMSXGroupIdCanBeSet() throws Exception { public void testJMSXGroupIdCanBeSet() throws Exception {

View File

@ -0,0 +1,167 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.springframework.context.support.AbstractApplicationContext;
@RunWith(value = Parameterized.class)
public class NetworkAdvancedStatisticsTest extends BaseNetworkTest {
@Parameterized.Parameters(name="includedDestination={0}, excludedDestination={1}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{ new ActiveMQTopic("include.test.bar"), new ActiveMQTopic("exclude.test.bar")},
{ new ActiveMQQueue("include.test.foo"), new ActiveMQQueue("exclude.test.foo")}});
}
protected static final int MESSAGE_COUNT = 10;
protected AbstractApplicationContext context;
protected String consumerName = "durableSubs";
private final ActiveMQDestination includedDestination;
private final ActiveMQDestination excludedDestination;
public NetworkAdvancedStatisticsTest(ActiveMQDestination includedDestionation, ActiveMQDestination excludedDestination) {
this.includedDestination = includedDestionation;
this.excludedDestination = excludedDestination;
}
@Override
protected void doSetUp(boolean deleteAllMessages) throws Exception {
super.doSetUp(deleteAllMessages);
}
@Override
protected String getRemoteBrokerURI() {
return "org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml";
}
@Override
protected String getLocalBrokerURI() {
return "org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml";
}
//Added for AMQ-9437 test advancedStatistics for networkEnqueue and networkDequeue
@Test(timeout = 60 * 1000)
public void testNetworkAdvancedStatistics() throws Exception {
// create a remote durable consumer to create demand
MessageConsumer remoteConsumer;
if(includedDestination.isTopic()) {
remoteConsumer = remoteSession.createDurableSubscriber(ActiveMQTopic.class.cast(includedDestination), consumerName);
} else {
remoteConsumer = remoteSession.createConsumer(includedDestination);
remoteConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
}
});
}
Thread.sleep(1000);
MessageProducer producer = localSession.createProducer(includedDestination);
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message test = localSession.createTextMessage("test-" + i);
producer.send(test);
}
Thread.sleep(1000);
MessageProducer producerExcluded = localSession.createProducer(excludedDestination);
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message test = localSession.createTextMessage("test-" + i);
producerExcluded.send(test);
}
Thread.sleep(1000);
//Make sure stats are correct for local -> remote
assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueues().getCount());
assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getDequeues().getCount());
assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getForwards().getCount());
assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
assertEquals(0, localBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
assertEquals(MESSAGE_COUNT, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueues().getCount());
assertEquals(0, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getForwards().getCount());
assertEquals(MESSAGE_COUNT, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
assertEquals(0, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
// Make sure stats do not increment for local-only
assertEquals(MESSAGE_COUNT, localBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueues().getCount());
assertEquals(0, localBroker.getDestination(excludedDestination).getDestinationStatistics().getForwards().getCount());
assertEquals(0, localBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
assertEquals(0, localBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueues().getCount());
assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getDequeues().getCount());
assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getForwards().getCount());
assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
if(includedDestination.isTopic()) {
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
}
}, 10000, 500));
} else {
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
// The number of message that remain is due to the exclude queue
return localBroker.getAdminView().getTotalMessageCount() == MESSAGE_COUNT;
}
}, 10000, 500));
}
remoteConsumer.close();
}
protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception {
final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
final NetworkBridge remoteBridge = remoteBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
0 == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount() &&
expectedRemoteSent == remoteBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
0 == remoteBridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
}
}));
}
}

View File

@ -0,0 +1,63 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker brokerName="localBroker" start="false" persistent="true" useShutdownHook="false" monitorConnectionSplits="true" xmlns="http://activemq.apache.org/schema/core">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="exclude.>" advancedNetworkStatisticsEnabled="true"/>
<policyEntry queue="include.>" advancedNetworkStatisticsEnabled="true"/>
<policyEntry topic="ActiveMQ.Advisory.>" />
<policyEntry topic="exclude.>" advancedNetworkStatisticsEnabled="true"/>
<policyEntry topic="include.>" advancedNetworkStatisticsEnabled="true"/>
</policyEntries>
</policyMap>
</destinationPolicy>
<networkConnectors>
<networkConnector uri="static:(tcp://localhost:61617)"
dynamicOnly = "false"
conduitSubscriptions = "true"
decreaseNetworkConsumerPriority = "false"
name="networkConnector">
<dynamicallyIncludedDestinations>
<queue physicalName="include.test.foo"/>
<topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>
<excludedDestinations>
<queue physicalName="exclude.test.foo"/>
<topic physicalName="exclude.test.bar"/>
</excludedDestinations>
</networkConnector>
</networkConnectors>
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
</broker>
</beans>

View File

@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker brokerName="remoteBroker" start="false" useJmx="false" persistent="true" useShutdownHook="false" monitorConnectionSplits="false" xmlns="http://activemq.apache.org/schema/core">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="exclude.>" advancedNetworkStatisticsEnabled="true"/>
<policyEntry queue="include.>" advancedNetworkStatisticsEnabled="true"/>
<policyEntry topic="ActiveMQ.Advisory.>" />
<policyEntry topic="exclude.>" advancedNetworkStatisticsEnabled="true"/>
<policyEntry topic="include.>" advancedNetworkStatisticsEnabled="true"/>
</policyEntries>
</policyMap>
</destinationPolicy>
<networkConnectors>
<networkConnector uri="static:(tcp://localhost:61616)" />
</networkConnectors>
<transportConnectors>
<transportConnector uri="tcp://localhost:61617"/>
</transportConnectors>
</broker>
</beans>

View File

@ -25,9 +25,6 @@ set.default.ACTIVEMQ_BASE=../..
set.default.ACTIVEMQ_CONF=%ACTIVEMQ_BASE%/conf set.default.ACTIVEMQ_CONF=%ACTIVEMQ_BASE%/conf
set.default.ACTIVEMQ_DATA=%ACTIVEMQ_BASE%/data set.default.ACTIVEMQ_DATA=%ACTIVEMQ_BASE%/data
# JDK 9+ modules
set.JDK_JAVA_OPTIONS=--add-reads=java.xml=java.logging --add-opens java.base/java.security=ALL-UNNAMED --add-opens java.base/java.net=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.naming/javax.naming.spi=ALL-UNNAMED --add-opens java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED --add-opens java.base/sun.nio.ch=ALL-UNNAMED --add-exports=java.base/sun.net.www.protocol.http=ALL-UNNAMED --add-exports=java.base/sun.net.www.protocol.https=ALL-UNNAMED --add-exports=java.base/sun.net.www.protocol.jar=ALL-UNNAMED --add-exports=jdk.xml.dom/org.w3c.dom.html=ALL-UNNAMED --add-exports=jdk.naming.rmi/com.sun.jndi.url.rmi=ALL-UNNAMED
wrapper.working.dir=. wrapper.working.dir=.
# Java Application # Java Application
@ -63,6 +60,23 @@ wrapper.java.additional.11=-Dactivemq.data=%ACTIVEMQ_DATA%
wrapper.java.additional.12=-Djava.security.auth.login.config=%ACTIVEMQ_CONF%/login.config wrapper.java.additional.12=-Djava.security.auth.login.config=%ACTIVEMQ_CONF%/login.config
wrapper.java.additional.13=-Djolokia.conf=file:%ACTIVEMQ_CONF%/jolokia-access.xml wrapper.java.additional.13=-Djolokia.conf=file:%ACTIVEMQ_CONF%/jolokia-access.xml
## ------------------------------------------------------------------
## Java Platform Module System (JPMS) - Java 9+.
## ------------------------------------------------------------------
wrapper.java.additional.20=--add-reads=java.xml=java.logging
wrapper.java.additional.21=--add-opens=java.base/java.security=ALL-UNNAMED
wrapper.java.additional.22=--add-opens=java.base/java.net=ALL-UNNAMED
wrapper.java.additional.23=--add-opens=java.base/java.lang=ALL-UNNAMED
wrapper.java.additional.25=--add-opens=java.base/java.util=ALL-UNNAMED
wrapper.java.additional.26=--add-opens=java.naming/javax.naming.spi=ALL-UNNAMED
wrapper.java.additional.27=--add-opens=java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED
wrapper.java.additional.28=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
wrapper.java.additional.29=--add-exports=java.base/sun.net.www.protocol.http=ALL-UNNAMED
wrapper.java.additional.30=--add-exports=java.base/sun.net.www.protocol.https=ALL-UNNAMED
wrapper.java.additional.31=--add-exports=java.base/sun.net.www.protocol.jar=ALL-UNNAMED
wrapper.java.additional.32=--add-exports=jdk.xml.dom/org.w3c.dom.html=ALL-UNNAMED
wrapper.java.additional.33=--add-exports=jdk.naming.rmi/com.sun.jndi.url.rmi=ALL-UNNAMED
# Uncomment to enable jmx # Uncomment to enable jmx
#wrapper.java.additional.n=-Dcom.sun.management.jmxremote.port=1616 #wrapper.java.additional.n=-Dcom.sun.management.jmxremote.port=1616
#wrapper.java.additional.n=-Dcom.sun.management.jmxremote.authenticate=false #wrapper.java.additional.n=-Dcom.sun.management.jmxremote.authenticate=false

View File

@ -25,9 +25,6 @@ set.default.ACTIVEMQ_BASE=../..
set.default.ACTIVEMQ_CONF=%ACTIVEMQ_BASE%/conf set.default.ACTIVEMQ_CONF=%ACTIVEMQ_BASE%/conf
set.default.ACTIVEMQ_DATA=%ACTIVEMQ_BASE%/data set.default.ACTIVEMQ_DATA=%ACTIVEMQ_BASE%/data
# JDK 9+ modules
set.JDK_JAVA_OPTIONS=--add-reads=java.xml=java.logging --add-opens java.base/java.security=ALL-UNNAMED --add-opens java.base/java.net=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.naming/javax.naming.spi=ALL-UNNAMED --add-opens java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED --add-opens java.base/sun.nio.ch=ALL-UNNAMED --add-exports=java.base/sun.net.www.protocol.http=ALL-UNNAMED --add-exports=java.base/sun.net.www.protocol.https=ALL-UNNAMED --add-exports=java.base/sun.net.www.protocol.jar=ALL-UNNAMED --add-exports=jdk.xml.dom/org.w3c.dom.html=ALL-UNNAMED --add-exports=jdk.naming.rmi/com.sun.jndi.url.rmi=ALL-UNNAMED
wrapper.working.dir=. wrapper.working.dir=.
# Java Application # Java Application
@ -63,6 +60,23 @@ wrapper.java.additional.11=-Dactivemq.data=%ACTIVEMQ_DATA%
wrapper.java.additional.12=-Djava.security.auth.login.config=%ACTIVEMQ_CONF%/login.config wrapper.java.additional.12=-Djava.security.auth.login.config=%ACTIVEMQ_CONF%/login.config
wrapper.java.additional.13=-Djolokia.conf=file:%ACTIVEMQ_CONF%/jolokia-access.xml wrapper.java.additional.13=-Djolokia.conf=file:%ACTIVEMQ_CONF%/jolokia-access.xml
## ------------------------------------------------------------------
## Java Platform Module System (JPMS) - Java 9+.
## ------------------------------------------------------------------
wrapper.java.additional.20=--add-reads=java.xml=java.logging
wrapper.java.additional.21=--add-opens=java.base/java.security=ALL-UNNAMED
wrapper.java.additional.22=--add-opens=java.base/java.net=ALL-UNNAMED
wrapper.java.additional.23=--add-opens=java.base/java.lang=ALL-UNNAMED
wrapper.java.additional.25=--add-opens=java.base/java.util=ALL-UNNAMED
wrapper.java.additional.26=--add-opens=java.naming/javax.naming.spi=ALL-UNNAMED
wrapper.java.additional.27=--add-opens=java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED
wrapper.java.additional.28=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
wrapper.java.additional.29=--add-exports=java.base/sun.net.www.protocol.http=ALL-UNNAMED
wrapper.java.additional.30=--add-exports=java.base/sun.net.www.protocol.https=ALL-UNNAMED
wrapper.java.additional.31=--add-exports=java.base/sun.net.www.protocol.jar=ALL-UNNAMED
wrapper.java.additional.32=--add-exports=jdk.xml.dom/org.w3c.dom.html=ALL-UNNAMED
wrapper.java.additional.33=--add-exports=jdk.naming.rmi/com.sun.jndi.url.rmi=ALL-UNNAMED
# Uncomment to enable jmx # Uncomment to enable jmx
#wrapper.java.additional.n=-Dcom.sun.management.jmxremote.port=1616 #wrapper.java.additional.n=-Dcom.sun.management.jmxremote.port=1616
#wrapper.java.additional.n=-Dcom.sun.management.jmxremote.authenticate=false #wrapper.java.additional.n=-Dcom.sun.management.jmxremote.authenticate=false

View File

@ -26,9 +26,6 @@ set.default.ACTIVEMQ_CONF=%ACTIVEMQ_BASE%/conf
set.default.ACTIVEMQ_DATA=%ACTIVEMQ_BASE%/data set.default.ACTIVEMQ_DATA=%ACTIVEMQ_BASE%/data
set.default.JOLOKIA_CONF=file:..\\..\\conf\\jolokia-access.xml set.default.JOLOKIA_CONF=file:..\\..\\conf\\jolokia-access.xml
# JDK 9+ modules
set.JDK_JAVA_OPTIONS=--add-reads=java.xml=java.logging --add-opens java.base/java.security=ALL-UNNAMED --add-opens java.base/java.net=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.util.concurrent=ALL-UNNAMED --add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens java.naming/javax.naming.spi=ALL-UNNAMED --add-opens java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED --add-opens java.base/sun.nio.ch=ALL-UNNAMED --add-exports=java.base/sun.net.www.protocol.http=ALL-UNNAMED --add-exports=java.base/sun.net.www.protocol.https=ALL-UNNAMED --add-exports=java.base/sun.net.www.protocol.jar=ALL-UNNAMED --add-exports=jdk.xml.dom/org.w3c.dom.html=ALL-UNNAMED --add-exports=jdk.naming.rmi/com.sun.jndi.url.rmi=ALL-UNNAMED
wrapper.working.dir=. wrapper.working.dir=.
# Java Application # Java Application
@ -64,6 +61,23 @@ wrapper.java.additional.11=-Dactivemq.data="%ACTIVEMQ_DATA%"
wrapper.java.additional.12=-Djava.security.auth.login.config="%ACTIVEMQ_CONF%/login.config" wrapper.java.additional.12=-Djava.security.auth.login.config="%ACTIVEMQ_CONF%/login.config"
wrapper.java.additional.13=-Djolokia.conf="%JOLOKIA_CONF%" wrapper.java.additional.13=-Djolokia.conf="%JOLOKIA_CONF%"
## ------------------------------------------------------------------
## Java Platform Module System (JPMS) - Java 9+.
## ------------------------------------------------------------------
wrapper.java.additional.20=--add-reads=java.xml=java.logging
wrapper.java.additional.21=--add-opens=java.base/java.security=ALL-UNNAMED
wrapper.java.additional.22=--add-opens=java.base/java.net=ALL-UNNAMED
wrapper.java.additional.23=--add-opens=java.base/java.lang=ALL-UNNAMED
wrapper.java.additional.25=--add-opens=java.base/java.util=ALL-UNNAMED
wrapper.java.additional.26=--add-opens=java.naming/javax.naming.spi=ALL-UNNAMED
wrapper.java.additional.27=--add-opens=java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED
wrapper.java.additional.28=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
wrapper.java.additional.29=--add-exports=java.base/sun.net.www.protocol.http=ALL-UNNAMED
wrapper.java.additional.30=--add-exports=java.base/sun.net.www.protocol.https=ALL-UNNAMED
wrapper.java.additional.31=--add-exports=java.base/sun.net.www.protocol.jar=ALL-UNNAMED
wrapper.java.additional.32=--add-exports=jdk.xml.dom/org.w3c.dom.html=ALL-UNNAMED
wrapper.java.additional.33=--add-exports=jdk.naming.rmi/com.sun.jndi.url.rmi=ALL-UNNAMED
# Uncomment to enable remote jmx # Uncomment to enable remote jmx
#wrapper.java.additional.n=-Dcom.sun.management.jmxremote.port=1616 #wrapper.java.additional.n=-Dcom.sun.management.jmxremote.port=1616
#wrapper.java.additional.n=-Dcom.sun.management.jmxremote.authenticate=false #wrapper.java.additional.n=-Dcom.sun.management.jmxremote.authenticate=false

View File

@ -74,6 +74,11 @@
<property name="name" value="X-Content-Type-Options"/> <property name="name" value="X-Content-Type-Options"/>
<property name="value" value="nosniff"/> <property name="value" value="nosniff"/>
</bean> </bean>
<bean id="header" class="org.eclipse.jetty.rewrite.handler.HeaderPatternRule">
<property name="pattern" value="*"/>
<property name="name" value="Cache-Control"/>
<property name="value" value="no-store"/>
</bean>
</list> </list>
</property> </property>
</bean> </bean>

View File

@ -43,7 +43,7 @@
<activemq-protobuf-version>1.1</activemq-protobuf-version> <activemq-protobuf-version>1.1</activemq-protobuf-version>
<activesoap-version>1.3</activesoap-version> <activesoap-version>1.3</activesoap-version>
<annogen-version>0.1.0</annogen-version> <annogen-version>0.1.0</annogen-version>
<ant-version>1.10.14</ant-version> <ant-version>1.10.15</ant-version>
<aries-version>1.1.0</aries-version> <aries-version>1.1.0</aries-version>
<axion-version>1.0-M3-dev</axion-version> <axion-version>1.0-M3-dev</axion-version>
<camel-version>4.4.3</camel-version> <camel-version>4.4.3</camel-version>
@ -53,7 +53,7 @@
<commons-dbcp2-version>2.12.0</commons-dbcp2-version> <commons-dbcp2-version>2.12.0</commons-dbcp2-version>
<commons-io-version>2.16.1</commons-io-version> <commons-io-version>2.16.1</commons-io-version>
<commons-lang-version>3.14.0</commons-lang-version> <commons-lang-version>3.14.0</commons-lang-version>
<commons-logging-version>1.3.3</commons-logging-version> <commons-logging-version>1.3.4</commons-logging-version>
<commons-pool2-version>2.12.0</commons-pool2-version> <commons-pool2-version>2.12.0</commons-pool2-version>
<commons-primitives-version>1.0</commons-primitives-version> <commons-primitives-version>1.0</commons-primitives-version>
<directory-version>2.0.0.AM25</directory-version> <directory-version>2.0.0.AM25</directory-version>
@ -72,7 +72,7 @@
<jaxb-bundle-version>2.3.2_1</jaxb-bundle-version> <jaxb-bundle-version>2.3.2_1</jaxb-bundle-version>
<jetty-version>11.0.22</jetty-version> <jetty-version>11.0.22</jetty-version>
<jetty-version-range>[11,13)</jetty-version-range> <jetty-version-range>[11,13)</jetty-version-range>
<jmdns-version>3.5.9</jmdns-version> <jmdns-version>3.5.12</jmdns-version>
<tomcat-api-version>9.0.65</tomcat-api-version> <tomcat-api-version>9.0.65</tomcat-api-version>
<jettison-version>1.5.4</jettison-version> <jettison-version>1.5.4</jettison-version>
<jmock-version>2.13.1</jmock-version> <jmock-version>2.13.1</jmock-version>
@ -97,7 +97,7 @@
<shiro-version>1.13.0</shiro-version> <shiro-version>1.13.0</shiro-version>
<slf4j-version>2.0.13</slf4j-version> <slf4j-version>2.0.13</slf4j-version>
<snappy-version>1.1.2</snappy-version> <snappy-version>1.1.2</snappy-version>
<spring-version>6.1.11</spring-version> <spring-version>6.1.12</spring-version>
<spring-version-range>[6,7)</spring-version-range> <spring-version-range>[6,7)</spring-version-range>
<taglibs-version>1.2.5</taglibs-version> <taglibs-version>1.2.5</taglibs-version>
<velocity-version>2.3</velocity-version> <velocity-version>2.3</velocity-version>