This commit is contained in:
Clebert Suconic 2018-03-22 16:20:14 -04:00
commit d8f22a399b
5 changed files with 76 additions and 3 deletions

View File

@ -58,6 +58,10 @@ public interface QueueBindingInfo {
void setExclusive(boolean exclusive);
boolean isLastValue();
void setLastValue(boolean lastValue);
byte getRoutingType();
void setRoutingType(byte routingType);

View File

@ -1275,7 +1275,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
SimpleString filterString = filter == null ? null : filter.getFilterString();
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.getRoutingType().getType());
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getRoutingType().getType());
readLock();
try {

View File

@ -48,6 +48,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
public boolean exclusive;
public boolean lastValue;
public byte routingType;
public PersistentQueueBindingEncoding() {
@ -70,8 +72,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
maxConsumers +
", purgeOnNoConsumers=" +
purgeOnNoConsumers +
", exclusive=" +
exclusive +
", exclusive=" +
exclusive +
", lastValue=" +
lastValue +
", routingType=" +
routingType +
"]";
@ -85,6 +89,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
final int maxConsumers,
final boolean purgeOnNoConsumers,
final boolean exclusive,
final boolean lastValue,
final byte routingType) {
this.name = name;
this.address = address;
@ -94,6 +99,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
this.maxConsumers = maxConsumers;
this.purgeOnNoConsumers = purgeOnNoConsumers;
this.exclusive = exclusive;
this.lastValue = lastValue;
this.routingType = routingType;
}
@ -179,6 +185,16 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
this.exclusive = exclusive;
}
@Override
public boolean isLastValue() {
return lastValue;
}
@Override
public void setLastValue(boolean lastValue) {
this.lastValue = lastValue;
}
@Override
public byte getRoutingType() {
return routingType;
@ -225,6 +241,11 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
} else {
exclusive = ActiveMQDefaultConfiguration.getDefaultExclusive();
}
if (buffer.readableBytes() > 0) {
lastValue = buffer.readBoolean();
} else {
lastValue = ActiveMQDefaultConfiguration.getDefaultLastValue();
}
}
@Override
@ -238,6 +259,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
buffer.writeBoolean(purgeOnNoConsumers);
buffer.writeByte(routingType);
buffer.writeBoolean(exclusive);
buffer.writeBoolean(lastValue);
}
@Override
@ -248,6 +270,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
DataConstants.SIZE_INT +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_BYTE +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_BOOLEAN;
}

View File

@ -152,6 +152,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
.purgeOnNoConsumers(queueBindingInfo.isPurgeOnNoConsumers())
.maxConsumers(queueBindingInfo.getMaxConsumers())
.exclusive(queueBindingInfo.isExclusive())
.lastValue(queueBindingInfo.isLastValue())
.routingType(RoutingType.getType(queueBindingInfo.getRoutingType()));
final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
queue.setConsumersRefCount(new QueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));

View File

@ -61,6 +61,51 @@ public class QueueConfigRestartTest extends ActiveMQTestBase {
Assert.assertTrue(queueBinding2.getQueue().isPurgeOnNoConsumers());
}
@Test
public void testQueueConfigLastValueAndRestart() throws Exception {
ActiveMQServer server = createServer(true);
server.start();
SimpleString address = new SimpleString("test.address");
SimpleString queue = new SimpleString("test.queue");
server.createQueue(address, RoutingType.MULTICAST, queue, null, null, true, false, false, false,false, 10, true, false, true, true);
QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertTrue(queueBinding1.getQueue().isLastValue());
server.stop();
server.start();
QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertTrue(queueBinding2.getQueue().isLastValue());
}
@Test
public void testQueueConfigExclusiveAndRestart() throws Exception {
ActiveMQServer server = createServer(true);
server.start();
SimpleString address = new SimpleString("test.address");
SimpleString queue = new SimpleString("test.queue");
server.createQueue(address, RoutingType.MULTICAST, queue, null, null, true, false, false, false,false, 10, true, true, true, true);
QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertTrue(queueBinding1.getQueue().isExclusive());
server.stop();
server.start();
QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertTrue(queueBinding2.getQueue().isExclusive());
}
@Test
public void testQueueConfigUserAndRestart() throws Exception {
ActiveMQServer server = createServer(true);