diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 8d45586f5b..66231cfbb6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -540,6 +540,8 @@ public final class ActiveMQDefaultConfiguration { public static final boolean DEFAULT_ENABLED = true; + public static final boolean DEFAULT_INTERNAL = false; + public static final boolean DEFAULT_QUEUE_AUTO_DELETE = true; public static final boolean DEFAULT_CREATED_QUEUE_AUTO_DELETE = false; @@ -1573,6 +1575,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_ENABLED; } + public static boolean getDefaultInternal() { + return DEFAULT_INTERNAL; + } + public static boolean getDefaultQueueAutoDelete(boolean autoCreated) { return autoCreated ? getDefaultQueueAutoDelete() : getDefaultCreatedQueueAutoDelete(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java index e56e286e79..2e99b785c2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java @@ -28,9 +28,11 @@ public interface AddressBindingInfo { SimpleString getName(); - boolean getAutoCreated(); + boolean isAutoCreated(); EnumSet getRoutingTypes(); AddressStatusEncoding getAddressStatusEncoding(); + + boolean isInternal(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java index 9d3c96f797..a3d5198b1d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java @@ -105,4 +105,6 @@ public interface QueueBindingInfo { long getAutoDeleteMessageCount(); long getRingSize(); + + boolean isInternal(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 25429d6b74..9a4d8533f0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1450,7 +1450,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp SimpleString filterString = filter == null ? null : filter.getFilterString(); - PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isEnabled(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), queue.getRoutingType().getType(), queue.isConfigurationManaged(), queue.getRingSize()); + PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isEnabled(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), queue.getRoutingType().getType(), queue.isConfigurationManaged(), queue.getRingSize(), queue.isInternalQueue()); try (ArtemisCloseable lock = closeableReadLock()) { if (update) { @@ -1506,7 +1506,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception { - PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingTypes(), addressInfo.isAutoCreated()); + PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingTypes(), addressInfo.isAutoCreated(), addressInfo.isInternal()); try (ArtemisCloseable lock = closeableReadLock()) { long recordID = idGenerator.generateID(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java index fdd74a22d1..c69eac36d8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.persistence.impl.journal.codec; import java.util.EnumSet; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.journal.EncodingSupport; @@ -29,14 +30,17 @@ import static org.apache.activemq.artemis.utils.Preconditions.checkNotNull; public class PersistentAddressBindingEncoding implements EncodingSupport, AddressBindingInfo { - public long id; + private long id; - public SimpleString name; + private SimpleString name; - public boolean autoCreated; - public AddressStatusEncoding addressStatusEncoding; + private boolean autoCreated; - public EnumSet routingTypes; + private AddressStatusEncoding addressStatusEncoding; + + private EnumSet routingTypes; + + private boolean internal; public PersistentAddressBindingEncoding() { routingTypes = EnumSet.noneOf(RoutingType.class); @@ -54,19 +58,22 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres sb.deleteCharAt(sb.length() - 1); } sb.append("}"); - sb.append(", autoCreated=" + autoCreated + "]"); + sb.append(", autoCreated=" + autoCreated); + sb.append(", internal=" + internal + "]"); return sb.toString(); } public PersistentAddressBindingEncoding(final SimpleString name, final EnumSet routingTypes, - final boolean autoCreated) { + final boolean autoCreated, + final boolean internal) { checkNotNull(name); checkNotNull(routingTypes); this.name = name; this.routingTypes = routingTypes; this.autoCreated = autoCreated; + this.internal = internal; } @Override @@ -84,7 +91,7 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres } @Override - public boolean getAutoCreated() { + public boolean isAutoCreated() { return autoCreated; } @@ -102,6 +109,11 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres this.addressStatusEncoding = addressStatusEncoding; } + @Override + public boolean isInternal() { + return internal; + } + @Override public void decode(final ActiveMQBuffer buffer) { name = buffer.readSimpleString(); @@ -110,6 +122,12 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres routingTypes.add(RoutingType.getType(buffer.readByte())); } autoCreated = buffer.readBoolean(); + + if (buffer.readableBytes() > 0) { + internal = buffer.readBoolean(); + } else { + internal = ActiveMQDefaultConfiguration.getDefaultInternal(); + } } @Override @@ -120,6 +138,7 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres buffer.writeByte(d.getType()); } buffer.writeBoolean(autoCreated); + buffer.writeBoolean(internal); } @Override @@ -127,6 +146,7 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres return SimpleString.sizeofString(name) + DataConstants.SIZE_INT + (DataConstants.SIZE_BYTE * routingTypes.size()) + + DataConstants.SIZE_BOOLEAN + DataConstants.SIZE_BOOLEAN; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java index 1dc7b119fe..c9f55dcc99 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java @@ -28,57 +28,59 @@ import org.apache.activemq.artemis.utils.DataConstants; public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo { - public long id; + private long id; - public SimpleString name; + private SimpleString name; - public SimpleString address; + private SimpleString address; - public SimpleString filterString; + private SimpleString filterString; - public boolean autoCreated; + private boolean autoCreated; - public SimpleString user; + private SimpleString user; - public List queueStatusEncodings; + private List queueStatusEncodings; - public int maxConsumers; + private int maxConsumers; - public boolean purgeOnNoConsumers; + private boolean purgeOnNoConsumers; - public boolean enabled; + private boolean enabled; - public boolean exclusive; + private boolean exclusive; - public boolean lastValue; + private boolean lastValue; - public SimpleString lastValueKey; + private SimpleString lastValueKey; - public boolean nonDestructive; + private boolean nonDestructive; - public int consumersBeforeDispatch; + private int consumersBeforeDispatch; - public long delayBeforeDispatch; + private long delayBeforeDispatch; - public byte routingType; + private byte routingType; - public boolean configurationManaged; + private boolean configurationManaged; - public boolean groupRebalance; + private boolean groupRebalance; - public boolean groupRebalancePauseDispatch; + private boolean groupRebalancePauseDispatch; - public int groupBuckets; + private int groupBuckets; - public SimpleString groupFirstKey; + private SimpleString groupFirstKey; - public boolean autoDelete; + private boolean autoDelete; - public long autoDeleteDelay; + private long autoDeleteDelay; - public long autoDeleteMessageCount; + private long autoDeleteMessageCount; - public long ringSize; + private long ringSize; + + private boolean internal; public PersistentQueueBindingEncoding() { } @@ -86,52 +88,30 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin @Override public String toString() { return "PersistentQueueBindingEncoding [id=" + id + - ", name=" + - name + - ", address=" + - address + - ", filterString=" + - filterString + - ", user=" + - user + - ", autoCreated=" + - autoCreated + - ", maxConsumers=" + - maxConsumers + - ", purgeOnNoConsumers=" + - purgeOnNoConsumers + - ", enabled=" + - enabled + - ", exclusive=" + - exclusive + - ", lastValue=" + - lastValue + - ", lastValueKey=" + - lastValueKey + - ", nonDestructive=" + - nonDestructive + - ", consumersBeforeDispatch=" + - consumersBeforeDispatch + - ", delayBeforeDispatch=" + - delayBeforeDispatch + - ", routingType=" + - routingType + - ", configurationManaged=" + - configurationManaged + - ", groupRebalance=" + - groupRebalance + - ", groupRebalancePauseDispatch=" + - groupRebalancePauseDispatch + - ", groupBuckets=" + - groupBuckets + - ", groupFirstKey=" + - groupFirstKey + - ", autoDelete=" + - autoDelete + - ", autoDeleteDelay=" + - autoDeleteDelay + - ", autoDeleteMessageCount=" + - autoDeleteMessageCount + + ", name=" + name + + ", address=" + address + + ", filterString=" + filterString + + ", user=" + user + + ", autoCreated=" + autoCreated + + ", maxConsumers=" + maxConsumers + + ", purgeOnNoConsumers=" + purgeOnNoConsumers + + ", enabled=" + enabled + + ", exclusive=" + exclusive + + ", lastValue=" + lastValue + + ", lastValueKey=" + lastValueKey + + ", nonDestructive=" + nonDestructive + + ", consumersBeforeDispatch=" + consumersBeforeDispatch + + ", delayBeforeDispatch=" + delayBeforeDispatch + + ", routingType=" + routingType + + ", configurationManaged=" + configurationManaged + + ", groupRebalance=" + groupRebalance + + ", groupRebalancePauseDispatch=" + groupRebalancePauseDispatch + + ", groupBuckets=" + groupBuckets + + ", groupFirstKey=" + groupFirstKey + + ", autoDelete=" + autoDelete + + ", autoDeleteDelay=" + autoDeleteDelay + + ", autoDeleteMessageCount=" + autoDeleteMessageCount + + ", internal=" + internal + "]"; } @@ -158,7 +138,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin final long autoDeleteMessageCount, final byte routingType, final boolean configurationManaged, - final long ringSize) { + final long ringSize, + final boolean internal) { this.name = name; this.address = address; this.filterString = filterString; @@ -183,6 +164,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin this.routingType = routingType; this.configurationManaged = configurationManaged; this.ringSize = ringSize; + this.internal = internal; } @Override @@ -387,6 +369,11 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin return ringSize; } + @Override + public boolean isInternal() { + return internal; + } + @Override public void decode(final ActiveMQBuffer buffer) { name = buffer.readSimpleString(); @@ -499,6 +486,12 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin } else { groupRebalancePauseDispatch = ActiveMQDefaultConfiguration.getDefaultGroupRebalancePauseDispatch(); } + + if (buffer.readableBytes() > 0) { + internal = buffer.readBoolean(); + } else { + internal = ActiveMQDefaultConfiguration.getDefaultInternal(); + } } @Override @@ -527,6 +520,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin buffer.writeLong(ringSize); buffer.writeBoolean(enabled); buffer.writeBoolean(groupRebalancePauseDispatch); + buffer.writeBoolean(internal); } @Override @@ -552,6 +546,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin SimpleString.sizeofNullableString(groupFirstKey) + DataConstants.SIZE_LONG + DataConstants.SIZE_BOOLEAN + + DataConstants.SIZE_BOOLEAN + DataConstants.SIZE_BOOLEAN; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 94569f89c2..3dbb426c7b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -187,7 +187,7 @@ public class PostOfficeJournalLoader implements JournalLoader { for (AddressBindingInfo addressBindingInfo : addressBindingInfos) { AddressInfo addressInfo = new AddressInfo(addressBindingInfo.getName()).setRoutingTypes(addressBindingInfo.getRoutingTypes()); addressInfo.setId(addressBindingInfo.getId()); - addressInfo.setAutoCreated(addressBindingInfo.getAutoCreated()); + addressInfo.setAutoCreated(addressBindingInfo.isAutoCreated()); if (addressBindingInfo.getAddressStatusEncoding() != null && addressBindingInfo.getAddressStatusEncoding().getStatus() == AddressQueueStatus.PAUSED) { addressInfo.setStorageManager(storageManager); addressInfo.setPostOffice(postOffice); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddressBindingEncodingTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddressBindingEncodingTest.java index d2073b110d..87eed08604 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddressBindingEncodingTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddressBindingEncodingTest.java @@ -34,10 +34,12 @@ public class AddressBindingEncodingTest extends Assert { final SimpleString name = RandomUtil.randomSimpleString(); final boolean autoCreated = RandomUtil.randomBoolean(); final EnumSet routingTypes = EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST); + final boolean internal = RandomUtil.randomBoolean(); PersistentAddressBindingEncoding encoding = new PersistentAddressBindingEncoding(name, routingTypes, - autoCreated); + autoCreated, + internal); int size = encoding.getEncodeSize(); ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(size); encoding.encode(encodedBuffer); @@ -46,7 +48,8 @@ public class AddressBindingEncodingTest extends Assert { decoding.decode(encodedBuffer); assertEquals(name, decoding.getName()); - assertEquals(autoCreated, decoding.autoCreated); - assertEquals(routingTypes, decoding.routingTypes); + assertEquals(autoCreated, decoding.isAutoCreated()); + assertEquals(routingTypes, decoding.getRoutingTypes()); + assertEquals(internal, decoding.isInternal()); } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java index 56c04ce6df..209c6cbd28 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java @@ -52,6 +52,7 @@ public class QueueBindingEncodingTest extends Assert { final long ringSize = RandomUtil.randomLong(); final boolean enabled = RandomUtil.randomBoolean(); final boolean groupRebalancePauseDispatch = RandomUtil.randomBoolean(); + final boolean internal = RandomUtil.randomBoolean(); PersistentQueueBindingEncoding encoding = new PersistentQueueBindingEncoding(name, address, @@ -76,7 +77,8 @@ public class QueueBindingEncodingTest extends Assert { autoDeleteMessageCount, routingType, configurationManaged, - ringSize); + ringSize, + internal); int size = encoding.getEncodeSize(); ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(size); encoding.encode(encodedBuffer); @@ -107,8 +109,7 @@ public class QueueBindingEncodingTest extends Assert { assertEquals(routingType, decoding.getRoutingType()); assertEquals(configurationManaged, decoding.isConfigurationManaged()); assertEquals(ringSize, decoding.getRingSize()); - assertEquals(groupRebalancePauseDispatch, decoding.isGroupRebalancePauseDispatch()); - + assertEquals(internal, decoding.isInternal()); } }