ARTEMIS-4396 make address/queue internal prop durable

This commit is contained in:
Justin Bertram 2023-08-29 11:53:12 -05:00 committed by Robbie Gemmell
parent 424ed6123b
commit cd8a2e5c49
9 changed files with 120 additions and 91 deletions

View File

@ -540,6 +540,8 @@ public final class ActiveMQDefaultConfiguration {
public static final boolean DEFAULT_ENABLED = true;
public static final boolean DEFAULT_INTERNAL = false;
public static final boolean DEFAULT_QUEUE_AUTO_DELETE = true;
public static final boolean DEFAULT_CREATED_QUEUE_AUTO_DELETE = false;
@ -1573,6 +1575,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_ENABLED;
}
public static boolean getDefaultInternal() {
return DEFAULT_INTERNAL;
}
public static boolean getDefaultQueueAutoDelete(boolean autoCreated) {
return autoCreated ? getDefaultQueueAutoDelete() : getDefaultCreatedQueueAutoDelete();
}

View File

@ -28,9 +28,11 @@ public interface AddressBindingInfo {
SimpleString getName();
boolean getAutoCreated();
boolean isAutoCreated();
EnumSet<RoutingType> getRoutingTypes();
AddressStatusEncoding getAddressStatusEncoding();
boolean isInternal();
}

View File

@ -105,4 +105,6 @@ public interface QueueBindingInfo {
long getAutoDeleteMessageCount();
long getRingSize();
boolean isInternal();
}

View File

@ -1450,7 +1450,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
SimpleString filterString = filter == null ? null : filter.getFilterString();
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isEnabled(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), queue.getRoutingType().getType(), queue.isConfigurationManaged(), queue.getRingSize());
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isEnabled(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), queue.getRoutingType().getType(), queue.isConfigurationManaged(), queue.getRingSize(), queue.isInternalQueue());
try (ArtemisCloseable lock = closeableReadLock()) {
if (update) {
@ -1506,7 +1506,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override
public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception {
PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingTypes(), addressInfo.isAutoCreated());
PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingTypes(), addressInfo.isAutoCreated(), addressInfo.isInternal());
try (ArtemisCloseable lock = closeableReadLock()) {
long recordID = idGenerator.generateID();

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import java.util.EnumSet;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
@ -29,14 +30,17 @@ import static org.apache.activemq.artemis.utils.Preconditions.checkNotNull;
public class PersistentAddressBindingEncoding implements EncodingSupport, AddressBindingInfo {
public long id;
private long id;
public SimpleString name;
private SimpleString name;
public boolean autoCreated;
public AddressStatusEncoding addressStatusEncoding;
private boolean autoCreated;
public EnumSet<RoutingType> routingTypes;
private AddressStatusEncoding addressStatusEncoding;
private EnumSet<RoutingType> routingTypes;
private boolean internal;
public PersistentAddressBindingEncoding() {
routingTypes = EnumSet.noneOf(RoutingType.class);
@ -54,19 +58,22 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
sb.deleteCharAt(sb.length() - 1);
}
sb.append("}");
sb.append(", autoCreated=" + autoCreated + "]");
sb.append(", autoCreated=" + autoCreated);
sb.append(", internal=" + internal + "]");
return sb.toString();
}
public PersistentAddressBindingEncoding(final SimpleString name,
final EnumSet<RoutingType> routingTypes,
final boolean autoCreated) {
final boolean autoCreated,
final boolean internal) {
checkNotNull(name);
checkNotNull(routingTypes);
this.name = name;
this.routingTypes = routingTypes;
this.autoCreated = autoCreated;
this.internal = internal;
}
@Override
@ -84,7 +91,7 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
}
@Override
public boolean getAutoCreated() {
public boolean isAutoCreated() {
return autoCreated;
}
@ -102,6 +109,11 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
this.addressStatusEncoding = addressStatusEncoding;
}
@Override
public boolean isInternal() {
return internal;
}
@Override
public void decode(final ActiveMQBuffer buffer) {
name = buffer.readSimpleString();
@ -110,6 +122,12 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
routingTypes.add(RoutingType.getType(buffer.readByte()));
}
autoCreated = buffer.readBoolean();
if (buffer.readableBytes() > 0) {
internal = buffer.readBoolean();
} else {
internal = ActiveMQDefaultConfiguration.getDefaultInternal();
}
}
@Override
@ -120,6 +138,7 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
buffer.writeByte(d.getType());
}
buffer.writeBoolean(autoCreated);
buffer.writeBoolean(internal);
}
@Override
@ -127,6 +146,7 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
return SimpleString.sizeofString(name) +
DataConstants.SIZE_INT +
(DataConstants.SIZE_BYTE * routingTypes.size()) +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_BOOLEAN;
}
}

View File

@ -28,57 +28,59 @@ import org.apache.activemq.artemis.utils.DataConstants;
public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo {
public long id;
private long id;
public SimpleString name;
private SimpleString name;
public SimpleString address;
private SimpleString address;
public SimpleString filterString;
private SimpleString filterString;
public boolean autoCreated;
private boolean autoCreated;
public SimpleString user;
private SimpleString user;
public List<QueueStatusEncoding> queueStatusEncodings;
private List<QueueStatusEncoding> queueStatusEncodings;
public int maxConsumers;
private int maxConsumers;
public boolean purgeOnNoConsumers;
private boolean purgeOnNoConsumers;
public boolean enabled;
private boolean enabled;
public boolean exclusive;
private boolean exclusive;
public boolean lastValue;
private boolean lastValue;
public SimpleString lastValueKey;
private SimpleString lastValueKey;
public boolean nonDestructive;
private boolean nonDestructive;
public int consumersBeforeDispatch;
private int consumersBeforeDispatch;
public long delayBeforeDispatch;
private long delayBeforeDispatch;
public byte routingType;
private byte routingType;
public boolean configurationManaged;
private boolean configurationManaged;
public boolean groupRebalance;
private boolean groupRebalance;
public boolean groupRebalancePauseDispatch;
private boolean groupRebalancePauseDispatch;
public int groupBuckets;
private int groupBuckets;
public SimpleString groupFirstKey;
private SimpleString groupFirstKey;
public boolean autoDelete;
private boolean autoDelete;
public long autoDeleteDelay;
private long autoDeleteDelay;
public long autoDeleteMessageCount;
private long autoDeleteMessageCount;
public long ringSize;
private long ringSize;
private boolean internal;
public PersistentQueueBindingEncoding() {
}
@ -86,52 +88,30 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
@Override
public String toString() {
return "PersistentQueueBindingEncoding [id=" + id +
", name=" +
name +
", address=" +
address +
", filterString=" +
filterString +
", user=" +
user +
", autoCreated=" +
autoCreated +
", maxConsumers=" +
maxConsumers +
", purgeOnNoConsumers=" +
purgeOnNoConsumers +
", enabled=" +
enabled +
", exclusive=" +
exclusive +
", lastValue=" +
lastValue +
", lastValueKey=" +
lastValueKey +
", nonDestructive=" +
nonDestructive +
", consumersBeforeDispatch=" +
consumersBeforeDispatch +
", delayBeforeDispatch=" +
delayBeforeDispatch +
", routingType=" +
routingType +
", configurationManaged=" +
configurationManaged +
", groupRebalance=" +
groupRebalance +
", groupRebalancePauseDispatch=" +
groupRebalancePauseDispatch +
", groupBuckets=" +
groupBuckets +
", groupFirstKey=" +
groupFirstKey +
", autoDelete=" +
autoDelete +
", autoDeleteDelay=" +
autoDeleteDelay +
", autoDeleteMessageCount=" +
autoDeleteMessageCount +
", name=" + name +
", address=" + address +
", filterString=" + filterString +
", user=" + user +
", autoCreated=" + autoCreated +
", maxConsumers=" + maxConsumers +
", purgeOnNoConsumers=" + purgeOnNoConsumers +
", enabled=" + enabled +
", exclusive=" + exclusive +
", lastValue=" + lastValue +
", lastValueKey=" + lastValueKey +
", nonDestructive=" + nonDestructive +
", consumersBeforeDispatch=" + consumersBeforeDispatch +
", delayBeforeDispatch=" + delayBeforeDispatch +
", routingType=" + routingType +
", configurationManaged=" + configurationManaged +
", groupRebalance=" + groupRebalance +
", groupRebalancePauseDispatch=" + groupRebalancePauseDispatch +
", groupBuckets=" + groupBuckets +
", groupFirstKey=" + groupFirstKey +
", autoDelete=" + autoDelete +
", autoDeleteDelay=" + autoDeleteDelay +
", autoDeleteMessageCount=" + autoDeleteMessageCount +
", internal=" + internal +
"]";
}
@ -158,7 +138,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
final long autoDeleteMessageCount,
final byte routingType,
final boolean configurationManaged,
final long ringSize) {
final long ringSize,
final boolean internal) {
this.name = name;
this.address = address;
this.filterString = filterString;
@ -183,6 +164,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
this.routingType = routingType;
this.configurationManaged = configurationManaged;
this.ringSize = ringSize;
this.internal = internal;
}
@Override
@ -387,6 +369,11 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
return ringSize;
}
@Override
public boolean isInternal() {
return internal;
}
@Override
public void decode(final ActiveMQBuffer buffer) {
name = buffer.readSimpleString();
@ -499,6 +486,12 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
} else {
groupRebalancePauseDispatch = ActiveMQDefaultConfiguration.getDefaultGroupRebalancePauseDispatch();
}
if (buffer.readableBytes() > 0) {
internal = buffer.readBoolean();
} else {
internal = ActiveMQDefaultConfiguration.getDefaultInternal();
}
}
@Override
@ -527,6 +520,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
buffer.writeLong(ringSize);
buffer.writeBoolean(enabled);
buffer.writeBoolean(groupRebalancePauseDispatch);
buffer.writeBoolean(internal);
}
@Override
@ -552,6 +546,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
SimpleString.sizeofNullableString(groupFirstKey) +
DataConstants.SIZE_LONG +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_BOOLEAN;
}

View File

@ -187,7 +187,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
for (AddressBindingInfo addressBindingInfo : addressBindingInfos) {
AddressInfo addressInfo = new AddressInfo(addressBindingInfo.getName()).setRoutingTypes(addressBindingInfo.getRoutingTypes());
addressInfo.setId(addressBindingInfo.getId());
addressInfo.setAutoCreated(addressBindingInfo.getAutoCreated());
addressInfo.setAutoCreated(addressBindingInfo.isAutoCreated());
if (addressBindingInfo.getAddressStatusEncoding() != null && addressBindingInfo.getAddressStatusEncoding().getStatus() == AddressQueueStatus.PAUSED) {
addressInfo.setStorageManager(storageManager);
addressInfo.setPostOffice(postOffice);

View File

@ -34,10 +34,12 @@ public class AddressBindingEncodingTest extends Assert {
final SimpleString name = RandomUtil.randomSimpleString();
final boolean autoCreated = RandomUtil.randomBoolean();
final EnumSet<RoutingType> routingTypes = EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST);
final boolean internal = RandomUtil.randomBoolean();
PersistentAddressBindingEncoding encoding = new PersistentAddressBindingEncoding(name,
routingTypes,
autoCreated);
autoCreated,
internal);
int size = encoding.getEncodeSize();
ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(size);
encoding.encode(encodedBuffer);
@ -46,7 +48,8 @@ public class AddressBindingEncodingTest extends Assert {
decoding.decode(encodedBuffer);
assertEquals(name, decoding.getName());
assertEquals(autoCreated, decoding.autoCreated);
assertEquals(routingTypes, decoding.routingTypes);
assertEquals(autoCreated, decoding.isAutoCreated());
assertEquals(routingTypes, decoding.getRoutingTypes());
assertEquals(internal, decoding.isInternal());
}
}

View File

@ -52,6 +52,7 @@ public class QueueBindingEncodingTest extends Assert {
final long ringSize = RandomUtil.randomLong();
final boolean enabled = RandomUtil.randomBoolean();
final boolean groupRebalancePauseDispatch = RandomUtil.randomBoolean();
final boolean internal = RandomUtil.randomBoolean();
PersistentQueueBindingEncoding encoding = new PersistentQueueBindingEncoding(name,
address,
@ -76,7 +77,8 @@ public class QueueBindingEncodingTest extends Assert {
autoDeleteMessageCount,
routingType,
configurationManaged,
ringSize);
ringSize,
internal);
int size = encoding.getEncodeSize();
ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(size);
encoding.encode(encodedBuffer);
@ -107,8 +109,7 @@ public class QueueBindingEncodingTest extends Assert {
assertEquals(routingType, decoding.getRoutingType());
assertEquals(configurationManaged, decoding.isConfigurationManaged());
assertEquals(ringSize, decoding.getRingSize());
assertEquals(groupRebalancePauseDispatch, decoding.isGroupRebalancePauseDispatch());
assertEquals(internal, decoding.isInternal());
}
}