ARTEMIS-813 Added persistence objects for new address model

This commit is contained in:
jbertram 2016-10-21 19:58:01 -05:00 committed by Martyn Taylor
parent 2d02a26527
commit 18c6d3035f
18 changed files with 188 additions and 71 deletions

View File

@ -25,10 +25,6 @@ public interface AddressBindingInfo {
SimpleString getName(); SimpleString getName();
boolean isAutoCreated();
SimpleString getUser();
AddressInfo.RoutingType getRoutingType(); AddressInfo.RoutingType getRoutingType();
} }

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.JournalLoader; import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
@ -301,8 +302,13 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
void deleteQueueStatus(long recordID) throws Exception; void deleteQueueStatus(long recordID) throws Exception;
void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception;
void deleteAddressBinding(long tx, long addressBindingID) throws Exception;
JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
List<GroupingInfo> groupingInfos) throws Exception; List<GroupingInfo> groupingInfos,
List<AddressBindingInfo> addressBindingInfos) throws Exception;
// grouping related operations // grouping related operations
void addGrouping(GroupBinding groupBinding) throws Exception; void addGrouping(GroupBinding groupBinding) throws Exception;

View File

@ -56,6 +56,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl; import org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl;
import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl; import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
@ -77,6 +78,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCount
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentAddressBindingEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.QueueStatusEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.QueueStatusEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding;
@ -93,6 +95,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.JournalLoader; import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.ResourceManager;
@ -1261,7 +1264,29 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
} finally { } finally {
readUnLock(); readUnLock();
} }
}
public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception {
PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingType());
readLock();
try {
long recordID = idGenerator.generateID();
bindingEncoding.setId(recordID);
bindingsJournal.appendAddRecordTransactional(tx, recordID, JournalRecordIds.ADDRESS_BINDING_RECORD, bindingEncoding);
} finally {
readUnLock();
}
}
@Override
public void deleteAddressBinding(long tx, final long addressBindingID) throws Exception {
readLock();
try {
bindingsJournal.appendDeleteRecordTransactional(tx, addressBindingID);
} finally {
readUnLock();
}
} }
@Override @Override
@ -1347,7 +1372,8 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
@Override @Override
public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos, public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
final List<GroupingInfo> groupingInfos) throws Exception { final List<GroupingInfo> groupingInfos,
final List<AddressBindingInfo> addressBindingInfos) throws Exception {
List<RecordInfo> records = new ArrayList<>(); List<RecordInfo> records = new ArrayList<>();
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>(); List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>();
@ -1364,12 +1390,15 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
byte rec = record.getUserRecordType(); byte rec = record.getUserRecordType();
if (rec == JournalRecordIds.QUEUE_BINDING_RECORD) { if (rec == JournalRecordIds.QUEUE_BINDING_RECORD) {
PersistentQueueBindingEncoding bindingEncoding = newBindingEncoding(id, buffer); PersistentQueueBindingEncoding bindingEncoding = newQueueBindingEncoding(id, buffer);
queueBindingInfos.add(bindingEncoding); queueBindingInfos.add(bindingEncoding);
mapBindings.put(bindingEncoding.getId(), bindingEncoding); mapBindings.put(bindingEncoding.getId(), bindingEncoding);
} else if (rec == JournalRecordIds.ID_COUNTER_RECORD) { } else if (rec == JournalRecordIds.ID_COUNTER_RECORD) {
idGenerator.loadState(record.id, buffer); idGenerator.loadState(record.id, buffer);
} else if (rec == JournalRecordIds.ADDRESS_BINDING_RECORD) {
PersistentAddressBindingEncoding bindingEncoding = newAddressBindingEncoding(id, buffer);
ActiveMQServerLogger.LOGGER.info("=== Loading: " + bindingEncoding);
addressBindingInfos.add(bindingEncoding);
} else if (rec == JournalRecordIds.GROUP_RECORD) { } else if (rec == JournalRecordIds.GROUP_RECORD) {
GroupingEncoding encoding = newGroupEncoding(id, buffer); GroupingEncoding encoding = newGroupEncoding(id, buffer);
groupingInfos.add(encoding); groupingInfos.add(encoding);
@ -1849,7 +1878,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
* @param buffer * @param buffer
* @return * @return
*/ */
protected static PersistentQueueBindingEncoding newBindingEncoding(long id, ActiveMQBuffer buffer) { protected static PersistentQueueBindingEncoding newQueueBindingEncoding(long id, ActiveMQBuffer buffer) {
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(); PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding();
bindingEncoding.decode(buffer); bindingEncoding.decode(buffer);
@ -1872,8 +1901,14 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
return statusEncoding; return statusEncoding;
} }
protected static PersistentAddressBindingEncoding newAddressBindingEncoding(long id, ActiveMQBuffer buffer) {
PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding();
bindingEncoding.decode(buffer);
bindingEncoding.setId(id);
return bindingEncoding;
}
@Override @Override
public boolean addToPage(PagingStore store, public boolean addToPage(PagingStore store,

View File

@ -555,7 +555,7 @@ public final class DescribeJournal {
return AbstractJournalStorageManager.newQueueStatusEncoding(id, buffer); return AbstractJournalStorageManager.newQueueStatusEncoding(id, buffer);
case QUEUE_BINDING_RECORD: case QUEUE_BINDING_RECORD:
return AbstractJournalStorageManager.newBindingEncoding(id, buffer); return AbstractJournalStorageManager.newQueueBindingEncoding(id, buffer);
case ID_COUNTER_RECORD: case ID_COUNTER_RECORD:
EncodingSupport idReturn = new IDCounterEncoding(); EncodingSupport idReturn = new IDCounterEncoding();

View File

@ -83,4 +83,6 @@ public final class JournalRecordIds {
public static final byte PAGE_CURSOR_COMPLETE = 42; public static final byte PAGE_CURSOR_COMPLETE = 42;
public static final byte PAGE_CURSOR_PENDING_COUNTER = 43; public static final byte PAGE_CURSOR_PENDING_COUNTER = 43;
public static final byte ADDRESS_BINDING_RECORD = 44;
} }

View File

@ -29,10 +29,6 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
public SimpleString name; public SimpleString name;
public boolean autoCreated;
public SimpleString user;
public AddressInfo.RoutingType routingType; public AddressInfo.RoutingType routingType;
public PersistentAddressBindingEncoding() { public PersistentAddressBindingEncoding() {
@ -43,22 +39,14 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
return "PersistentAddressBindingEncoding [id=" + id + return "PersistentAddressBindingEncoding [id=" + id +
", name=" + ", name=" +
name + name +
", user=" +
user +
", autoCreated=" +
autoCreated +
", routingType=" + ", routingType=" +
routingType + routingType +
"]"; "]";
} }
public PersistentAddressBindingEncoding(final SimpleString name, public PersistentAddressBindingEncoding(final SimpleString name,
final SimpleString user,
final boolean autoCreated,
final AddressInfo.RoutingType routingType) { final AddressInfo.RoutingType routingType) {
this.name = name; this.name = name;
this.user = user;
this.autoCreated = autoCreated;
this.routingType = routingType; this.routingType = routingType;
} }
@ -76,16 +64,6 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
return name; return name;
} }
@Override
public SimpleString getUser() {
return user;
}
@Override
public boolean isAutoCreated() {
return autoCreated;
}
@Override @Override
public AddressInfo.RoutingType getRoutingType() { public AddressInfo.RoutingType getRoutingType() {
return routingType; return routingType;
@ -94,42 +72,17 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
@Override @Override
public void decode(final ActiveMQBuffer buffer) { public void decode(final ActiveMQBuffer buffer) {
name = buffer.readSimpleString(); name = buffer.readSimpleString();
String metadata = buffer.readNullableSimpleString().toString();
if (metadata != null) {
String[] elements = metadata.split(";");
for (String element : elements) {
String[] keyValuePair = element.split("=");
if (keyValuePair.length == 2) {
if (keyValuePair[0].equals("user")) {
user = SimpleString.toSimpleString(keyValuePair[1]);
}
}
}
}
autoCreated = buffer.readBoolean();
routingType = AddressInfo.RoutingType.getType(buffer.readByte()); routingType = AddressInfo.RoutingType.getType(buffer.readByte());
} }
@Override @Override
public void encode(final ActiveMQBuffer buffer) { public void encode(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(name); buffer.writeSimpleString(name);
buffer.writeNullableSimpleString(createMetadata());
buffer.writeBoolean(autoCreated);
buffer.writeByte(routingType.getType()); buffer.writeByte(routingType.getType());
} }
@Override @Override
public int getEncodeSize() { public int getEncodeSize() {
return SimpleString.sizeofString(name) + DataConstants.SIZE_BOOLEAN + return SimpleString.sizeofString(name) + DataConstants.SIZE_BYTE;
SimpleString.sizeofNullableString(createMetadata()) +
DataConstants.SIZE_BYTE;
}
private SimpleString createMetadata() {
StringBuilder metadata = new StringBuilder();
metadata.append("user=").append(user).append(";");
return SimpleString.toSimpleString(metadata.toString());
} }
} }

View File

@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
@ -55,6 +56,7 @@ import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.JournalLoader; import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
@ -154,13 +156,22 @@ public class NullStorageManager implements StorageManager {
public void deleteQueueBinding(long tx, final long queueBindingID) throws Exception { public void deleteQueueBinding(long tx, final long queueBindingID) throws Exception {
} }
@Override
public void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception {
}
@Override
public void deleteAddressBinding(long tx, long addressBindingID) throws Exception {
}
@Override @Override
public void commit(final long txID) throws Exception { public void commit(final long txID) throws Exception {
} }
@Override @Override
public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos, public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
final List<GroupingInfo> groupingInfos) throws Exception { final List<GroupingInfo> groupingInfos,
final List<AddressBindingInfo> addressBindingInfos) throws Exception {
return new JournalLoadInformation(); return new JournalLoadInformation();
} }

View File

@ -72,6 +72,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl; import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO; import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
@ -2206,7 +2207,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
List<GroupingInfo> groupingInfos = new ArrayList<>(); List<GroupingInfo> groupingInfos = new ArrayList<>();
journalInfo[0] = storageManager.loadBindingJournal(queueBindingInfos, groupingInfos); List<AddressBindingInfo> addressBindingInfos = new ArrayList<>();
journalInfo[0] = storageManager.loadBindingJournal(queueBindingInfos, groupingInfos, addressBindingInfos);
recoverStoredConfigs(); recoverStoredConfigs();
@ -2216,6 +2219,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
journalLoader.handleGroupingBindings(groupingInfos); journalLoader.handleGroupingBindings(groupingInfos);
Map<Long, AddressBindingInfo> addressBindingInfosMap = new HashMap<>();
journalLoader.initAddresses(addressBindingInfosMap, addressBindingInfos);
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<>(); Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<>();
HashSet<Pair<Long, Long>> pendingLargeMessages = new HashSet<>(); HashSet<Pair<Long, Long>> pendingLargeMessages = new HashSet<>();
@ -2314,6 +2321,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).build(); final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).build();
final Queue queue = queueFactory.createQueueWith(queueConfig); final Queue queue = queueFactory.createQueueWith(queueConfig);
boolean addressAlreadyExists = true;
if (postOffice.getAddressInfo(queue.getAddress()) == null) {
postOffice.addAddressInfo(new AddressInfo(queue.getAddress())
.setRoutingType(AddressInfo.RoutingType.MULTICAST));
addressAlreadyExists = false;
}
if (transientQueue) { if (transientQueue) {
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName())); queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
} else if (queue.isAutoCreated()) { } else if (queue.isAutoCreated()) {
@ -2324,6 +2339,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (queue.isDurable()) { if (queue.isDurable()) {
storageManager.addQueueBinding(txID, localQueueBinding); storageManager.addQueueBinding(txID, localQueueBinding);
if (!addressAlreadyExists) {
storageManager.addAddressBinding(txID, getAddressInfo(queue.getAddress()));
}
} }
try { try {

View File

@ -36,24 +36,27 @@ public class AddressInfo {
return routingType; return routingType;
} }
public void setRoutingType(RoutingType routingType) { public AddressInfo setRoutingType(RoutingType routingType) {
this.routingType = routingType; this.routingType = routingType;
return this;
} }
public boolean isDefaultDeleteOnNoConsumers() { public boolean isDefaultDeleteOnNoConsumers() {
return defaultDeleteOnNoConsumers; return defaultDeleteOnNoConsumers;
} }
public void setDefaultDeleteOnNoConsumers(boolean defaultDeleteOnNoConsumers) { public AddressInfo setDefaultDeleteOnNoConsumers(boolean defaultDeleteOnNoConsumers) {
this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers; this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers;
return this;
} }
public int getDefaultMaxConsumers() { public int getDefaultMaxConsumers() {
return defaultMaxConsumers; return defaultMaxConsumers;
} }
public void setDefaultMaxConsumers(int defaultMaxConsumers) { public AddressInfo setDefaultMaxConsumers(int defaultMaxConsumers) {
this.defaultMaxConsumers = defaultMaxConsumers; this.defaultMaxConsumers = defaultMaxConsumers;
return this;
} }
public SimpleString getName() { public SimpleString getName() {

View File

@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending; import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
@ -37,6 +38,9 @@ public interface JournalLoader {
void initQueues(Map<Long, QueueBindingInfo> queueBindingInfosMap, void initQueues(Map<Long, QueueBindingInfo> queueBindingInfosMap,
List<QueueBindingInfo> queueBindingInfos) throws Exception; List<QueueBindingInfo> queueBindingInfos) throws Exception;
void initAddresses(Map<Long, AddressBindingInfo> addressBindingInfosMap,
List<AddressBindingInfo> addressBindingInfo) throws Exception;
void handleAddMessage(Map<Long, Map<Long, AddMessageRecord>> queueMap) throws Exception; void handleAddMessage(Map<Long, Map<Long, AddMessageRecord>> queueMap) throws Exception;
void handleNoMessageReferences(Map<Long, ServerMessage> messages); void handleNoMessageReferences(Map<Long, ServerMessage> messages);

View File

@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.QueueStatus; import org.apache.activemq.artemis.core.persistence.QueueStatus;
@ -165,6 +166,21 @@ public class PostOfficeJournalLoader implements JournalLoader {
} }
} }
@Override
public void initAddresses(Map<Long, AddressBindingInfo> addressBindingInfosMap,
List<AddressBindingInfo> addressBindingInfos) throws Exception {
for (AddressBindingInfo addressBindingInfo : addressBindingInfos) {
addressBindingInfosMap.put(addressBindingInfo.getId(), addressBindingInfo);
// TODO: figure out what else to set here
AddressInfo addressInfo = new AddressInfo(addressBindingInfo.getName())
.setRoutingType(addressBindingInfo.getRoutingType());
postOffice.addAddressInfo(addressInfo);
managementService.registerAddress(addressInfo.getName());
}
}
@Override @Override
public void handleAddMessage(Map<Long, Map<Long, AddMessageRecord>> queueMap) throws Exception { public void handleAddMessage(Map<Long, Map<Long, AddMessageRecord>> queueMap) throws Exception {
for (Map.Entry<Long, Map<Long, AddMessageRecord>> entry : queueMap.entrySet()) { for (Map.Entry<Long, Map<Long, AddMessageRecord>> entry : queueMap.entrySet()) {

View File

@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
@ -55,6 +56,7 @@ import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.JournalLoader; import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
@ -528,9 +530,20 @@ public class TransactionImplTest extends ActiveMQTestBase {
} }
@Override
public void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception {
}
@Override
public void deleteAddressBinding(long tx, long addressBindingID) throws Exception {
}
@Override @Override
public JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, public JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
List<GroupingInfo> groupingInfos) throws Exception { List<GroupingInfo> groupingInfos,
List<AddressBindingInfo> addressBindingInfos) throws Exception {
return null; return null;
} }

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <br>
* http://www.apache.org/licenses/LICENSE-2.0
* <br>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.addressing;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
import org.junit.Test;
public class AddressConfigTest extends ActiveMQTestBase {
protected ActiveMQServer server;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
Configuration configuration = createDefaultInVMConfig();
server = createServer(true, configuration);
server.start();
}
@Test
public void persistAddressConfigTest() throws Exception {
server.createQueue(SimpleString.toSimpleString("myAddress"), SimpleString.toSimpleString("myQueue"), null, true, false);
server.stop();
server.start();
AddressInfo addressInfo = server.getAddressInfo(SimpleString.toSimpleString("myAddress"));
assertNotNull(addressInfo);
assertEquals(AddressInfo.RoutingType.MULTICAST, addressInfo.getRoutingType());
}
}

View File

@ -24,6 +24,7 @@ import java.util.HashMap;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
@ -76,7 +77,7 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase {
journal.start(); journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>()); journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
FakePostOffice postOffice = new FakePostOffice(); FakePostOffice postOffice = new FakePostOffice();

View File

@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
@ -73,7 +74,7 @@ public class RestartSMTest extends ActiveMQTestBase {
List<QueueBindingInfo> queueBindingInfos = new ArrayList<>(); List<QueueBindingInfo> queueBindingInfos = new ArrayList<>();
journal.loadBindingJournal(queueBindingInfos, new ArrayList<GroupingInfo>()); journal.loadBindingJournal(queueBindingInfos, new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
journal.loadMessageJournal(postOffice, null, null, null, null, null, null, new FakeJournalLoader()); journal.loadMessageJournal(postOffice, null, null, null, null, null, null, new FakeJournalLoader());
@ -87,7 +88,7 @@ public class RestartSMTest extends ActiveMQTestBase {
queueBindingInfos = new ArrayList<>(); queueBindingInfos = new ArrayList<>();
journal.loadBindingJournal(queueBindingInfos, new ArrayList<GroupingInfo>()); journal.loadBindingJournal(queueBindingInfos, new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
journal.start(); journal.start();
} finally { } finally {

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -128,7 +129,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
journal.start(); journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>()); journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
journal.loadMessageJournal(new FakePostOffice(), null, null, null, null, null, null, new FakeJournalLoader()); journal.loadMessageJournal(new FakePostOffice(), null, null, null, null, null, null, new FakeJournalLoader());
} }

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
@ -95,7 +96,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
journal = new JournalStorageManager(configuration, factory, factory); journal = new JournalStorageManager(configuration, factory, factory);
journal.start(); journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>()); journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
HashMap<SimpleString, List<Pair<byte[], Long>>> mapDups = new HashMap<>(); HashMap<SimpleString, List<Pair<byte[], Long>>> mapDups = new HashMap<>();
@ -114,7 +115,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
journal = new JournalStorageManager(configuration, factory, factory); journal = new JournalStorageManager(configuration, factory, factory);
journal.start(); journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>()); journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null)); journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
@ -137,7 +138,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
journal = new JournalStorageManager(configuration, factory, factory); journal = new JournalStorageManager(configuration, factory, factory);
journal.start(); journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>()); journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null)); journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));

View File

@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending; import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
@ -48,6 +49,11 @@ public class FakeJournalLoader implements JournalLoader {
List<QueueBindingInfo> queueBindingInfos) throws Exception { List<QueueBindingInfo> queueBindingInfos) throws Exception {
} }
@Override
public void initAddresses(Map<Long, AddressBindingInfo> addressBindingInfosMap,
List<AddressBindingInfo> addressBindingInfo) throws Exception {
}
@Override @Override
public void handleGroupingBindings(List<GroupingInfo> groupingInfos) { public void handleGroupingBindings(List<GroupingInfo> groupingInfos) {
} }