ensure that the lock is always released in the load method.
This commit is contained in:
Timothy Bish 2017-01-13 11:48:19 -05:00
parent 8f960ea356
commit 281d600ae2
1 changed files with 64 additions and 62 deletions

View File

@ -148,13 +148,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
protected Location producerSequenceIdTrackerLocation = null;
protected Location ackMessageFileMapLocation = null;
protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<>();
protected int version = VERSION;
protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
public void read(DataInput is) throws IOException {
state = is.readInt();
destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
destinations = new BTreeIndex<>(pageFile, is.readLong());
if (is.readBoolean()) {
lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
} else {
@ -317,7 +317,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
page.set(metadata);
metadata.page = page;
metadata.state = CLOSED_STATE;
metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
metadata.destinations = new BTreeIndex<>(pageFile, tx.allocate().getPageId());
tx.store(metadata.page, metadataMarshaller, true);
} else {
@ -468,8 +468,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void load() throws IOException {
this.indexLock.writeLock().lock();
IOHelper.mkdirs(directory);
try {
IOHelper.mkdirs(directory);
if (deleteAllMessages) {
getJournal().start();
getJournal().delete();
@ -488,7 +488,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
public void close() throws IOException, InterruptedException {
if( opened.compareAndSet(true, false)) {
if (opened.compareAndSet(true, false)) {
checkpointLock.writeLock().lock();
try {
if (metadata.page != null) {
@ -577,7 +577,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
int add;
int remove;
}
HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>();
HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<>();
@SuppressWarnings("rawtypes")
public void track(Operation operation) {
@ -620,7 +620,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
@SuppressWarnings("rawtypes")
public String getTransactions() {
ArrayList<TranInfo> infos = new ArrayList<TranInfo>();
ArrayList<TranInfo> infos = new ArrayList<>();
synchronized (inflightTransactions) {
if (!inflightTransactions.isEmpty()) {
for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) {
@ -716,8 +716,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
});
// rollback any recovered inflight local transactions, and discard any inflight XA transactions.
Set<TransactionId> toRollback = new HashSet<TransactionId>();
Set<TransactionId> toDiscard = new HashSet<TransactionId>();
Set<TransactionId> toRollback = new HashSet<>();
Set<TransactionId> toDiscard = new HashSet<>();
synchronized (inflightTransactions) {
for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
TransactionId id = it.next();
@ -824,7 +824,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
for (String key : storedDestinations.keySet()) {
StoredDestination sd = storedDestinations.get(key);
final ArrayList<Long> matches = new ArrayList<Long>();
final ArrayList<Long> matches = new ArrayList<>();
// Find all the Locations that are >= than the last Append Location.
sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
@Override
@ -891,7 +891,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
});
}
HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
HashSet<Integer> missingJournalFiles = new HashSet<>();
while (!ss.isEmpty()) {
missingJournalFiles.add((int) ss.removeFirst());
}
@ -909,8 +909,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
LOG.warn("Some journal files are missing: " + missingJournalFiles);
}
ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>();
ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<>();
ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<>();
for (Integer missing : missingJournalFiles) {
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
}
@ -924,7 +924,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
Sequence seq = dataFile.getCorruptedBlocks().getHead();
while (seq != null) {
BTreeVisitor.BetweenVisitor<Location, Long> visitor =
new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
new BTreeVisitor.BetweenVisitor<>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
missingPredicates.add(visitor);
knownCorruption.add(visitor);
seq = seq.getNext();
@ -935,7 +935,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
if (!missingPredicates.isEmpty()) {
for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
final StoredDestination sd = sdEntry.getValue();
final LinkedHashMap<Long, Location> matches = new LinkedHashMap<Long, Location>();
final LinkedHashMap<Long, Location> matches = new LinkedHashMap<>();
sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
@Override
protected void matched(Location key, Long value) {
@ -1413,7 +1413,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
protected void process(KahaRewrittenDataFileCommand command, Location location) throws IOException {
final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
final TreeSet<Integer> completeFileSet = new TreeSet<>(journal.getFileMap().keySet());
// Mark the current journal file as a compacted file so that gc checks can skip
// over logs that are smaller compaction type logs.
@ -1431,7 +1431,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// /////////////////////////////////////////////////////////////////
protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<>();
long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
@ -1588,7 +1588,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
if (referenceFileIds == null) {
referenceFileIds = new HashSet<Integer>();
referenceFileIds = new HashSet<>();
referenceFileIds.add(messageLocation.getDataFileId());
metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
} else {
@ -1722,8 +1722,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
if (cleanup) {
final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
final TreeSet<Integer> completeFileSet = new TreeSet<>(journal.getFileMap().keySet());
final TreeSet<Integer> gcCandidateSet = new TreeSet<>(completeFileSet);
if (LOG.isTraceEnabled()) {
LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
@ -1949,7 +1949,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void run() {
int journalToAdvance = -1;
Set<Integer> journalLogsReferenced = new HashSet<Integer>();
Set<Integer> journalLogsReferenced = new HashSet<>();
//flag to know whether the ack forwarding completed without an exception
boolean forwarded = false;
@ -1979,7 +1979,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// Map keys might not be sorted, find the earliest log file to forward acks
// from and move only those, future cycles can chip away at more as needed.
// We won't move files that are themselves rewritten on a previous compaction.
List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
List<Integer> journalFileIds = new ArrayList<>(metadata.ackMessageFileMap.keySet());
Collections.sort(journalFileIds);
for (Integer journalFileId : journalFileIds) {
DataFile current = journal.getDataFileById(journalFileId);
@ -2037,7 +2037,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE);
LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile);
Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>();
Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<>();
try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) {
KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand();
@ -2076,7 +2076,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) {
Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey());
if (referenceFileIds == null) {
referenceFileIds = new HashSet<Integer>();
referenceFileIds = new HashSet<>();
referenceFileIds.addAll(entry.getValue());
metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds);
} else {
@ -2171,7 +2171,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// StoredDestination related implementation methods.
// /////////////////////////////////////////////////////////////////
protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<>();
static class MessageKeys {
final String messageId;
@ -2280,8 +2280,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
ListIndex<String, Location> subLocations;
// Transient data used to track which Messages are no longer needed.
final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
final TreeMap<Long, Long> messageReferences = new TreeMap<>();
final HashSet<String> subscriptionCache = new LinkedHashSet<>();
public void trackPendingAdd(Long seq) {
orderIndex.trackPendingAdd(seq);
@ -2304,26 +2304,26 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
@Override
public StoredDestination readPayload(final DataInput dataIn) throws IOException {
final StoredDestination value = new StoredDestination();
value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
value.orderIndex.defaultPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
value.locationIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
value.messageIdIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
if (dataIn.readBoolean()) {
value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
value.subscriptions = new BTreeIndex<>(pageFile, dataIn.readLong());
value.subscriptionAcks = new BTreeIndex<>(pageFile, dataIn.readLong());
if (metadata.version >= 4) {
value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
value.ackPositions = new ListIndex<>(pageFile, dataIn.readLong());
} else {
// upgrade
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<>();
if (metadata.version >= 3) {
// migrate
BTreeIndex<Long, HashSet<String>> oldAckPositions =
new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
new BTreeIndex<>(pageFile, dataIn.readLong());
oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
oldAckPositions.load(tx);
@ -2348,7 +2348,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
// Now move the pending messages to ack data into the store backed
// structure.
value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
value.ackPositions = new ListIndex<>(pageFile, tx.allocate());
value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
value.ackPositions.load(tx);
@ -2361,13 +2361,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
if (metadata.version >= 5) {
value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong());
value.subLocations = new ListIndex<>(pageFile, dataIn.readLong());
} else {
// upgrade
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
value.subLocations = new ListIndex<>(pageFile, tx.allocate());
value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
value.subLocations.load(tx);
@ -2376,19 +2376,19 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
if (metadata.version >= 2) {
value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
} else {
// upgrade
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
value.orderIndex.lowPriorityIndex.load(tx);
value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
value.orderIndex.highPriorityIndex.load(tx);
@ -2471,14 +2471,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// Brand new destination.. allocate indexes for it.
rc = new StoredDestination();
rc.orderIndex.allocate(tx);
rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
rc.locationIndex = new BTreeIndex<>(pageFile, tx.allocate());
rc.messageIdIndex = new BTreeIndex<>(pageFile, tx.allocate());
if (topic) {
rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
rc.subscriptions = new BTreeIndex<>(pageFile, tx.allocate());
rc.subscriptionAcks = new BTreeIndex<>(pageFile, tx.allocate());
rc.ackPositions = new ListIndex<>(pageFile, tx.allocate());
rc.subLocations = new ListIndex<>(pageFile, tx.allocate());
}
metadata.destinations.put(tx, key, rc);
}
@ -2532,7 +2532,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
rc.subLocations.load(tx);
rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
rc.subscriptionCursors = new HashMap<>();
if (metadata.version < 3) {
@ -2696,7 +2696,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
* KahaDestination key
*/
protected final ConcurrentMap<String, MessageStore> storeCache =
new ConcurrentHashMap<String, MessageStore>();
new ConcurrentHashMap<>();
/**
* Locate the storeMessageSize counter for this KahaDestination
@ -2871,7 +2871,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return;
}
ArrayList<Long> unreferenced = new ArrayList<Long>();
ArrayList<Long> unreferenced = new ArrayList<>();
for(Long sequenceId : sequences) {
Long references = sd.messageReferences.get(sequenceId);
@ -2889,7 +2889,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
for(Long sequenceId : unreferenced) {
// Find all the entries that need to get deleted.
ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<>();
sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
// Do the actual deletes.
@ -2941,7 +2941,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
// Find all the entries that need to get deleted.
ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<>();
sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
// Do the actual deletes.
@ -3001,11 +3001,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// Transaction related implementation methods.
// /////////////////////////////////////////////////////////////////
@SuppressWarnings("rawtypes")
private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<>();
@SuppressWarnings("rawtypes")
protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
protected final Set<String> ackedAndPrepared = new HashSet<String>();
protected final Set<String> rolledBackAcks = new HashSet<String>();
protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<>();
protected final Set<String> ackedAndPrepared = new HashSet<>();
protected final Set<String> rolledBackAcks = new HashSet<>();
// messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
// till then they are skipped by the store.
@ -3201,6 +3201,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
* @deprecated use {@link #getJournalDiskSyncStrategy} instead
* @return
*/
@Deprecated
public boolean isEnableJournalDiskSyncs() {
return journalDiskSyncStrategy != null && JournalDiskSyncStrategy.ALWAYS.name().equals(
journalDiskSyncStrategy.trim().toUpperCase());
@ -3210,6 +3211,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
* @deprecated use {@link #setEnableJournalDiskSyncs} instead
* @param syncWrites
*/
@Deprecated
public void setEnableJournalDiskSyncs(boolean syncWrites) {
if (syncWrites) {
journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name();
@ -3486,7 +3488,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
Long lastHighKey;
Long lastLowKey;
byte lastGetPriority;
final List<Long> pendingAdditions = new LinkedList<Long>();
final List<Long> pendingAdditions = new LinkedList<>();
final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
MessageKeys remove(Transaction tx, Long key) throws IOException {
@ -3513,16 +3515,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
void allocate(Transaction tx) throws IOException {
defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
defaultPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
if (metadata.version >= 2) {
lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
lowPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
highPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
}
}
void configureLast(Transaction tx) throws IOException {
// Figure out the next key using the last entry in the destination.
TreeSet<Long> orderedSet = new TreeSet<Long>();
TreeSet<Long> orderedSet = new TreeSet<>();
addLast(orderedSet, highPriorityIndex, tx);
addLast(orderedSet, defaultPriorityIndex, tx);