diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index c46a1275be..45d422009e 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -148,13 +148,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe protected Location producerSequenceIdTrackerLocation = null; protected Location ackMessageFileMapLocation = null; protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); - protected transient Map> ackMessageFileMap = new HashMap>(); + protected transient Map> ackMessageFileMap = new HashMap<>(); protected int version = VERSION; protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION; public void read(DataInput is) throws IOException { state = is.readInt(); - destinations = new BTreeIndex(pageFile, is.readLong()); + destinations = new BTreeIndex<>(pageFile, is.readLong()); if (is.readBoolean()) { lastUpdate = LocationMarshaller.INSTANCE.readPayload(is); } else { @@ -317,7 +317,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe page.set(metadata); metadata.page = page; metadata.state = CLOSED_STATE; - metadata.destinations = new BTreeIndex(pageFile, tx.allocate().getPageId()); + metadata.destinations = new BTreeIndex<>(pageFile, tx.allocate().getPageId()); tx.store(metadata.page, metadataMarshaller, true); } else { @@ -468,8 +468,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe public void load() throws IOException { this.indexLock.writeLock().lock(); - IOHelper.mkdirs(directory); try { + IOHelper.mkdirs(directory); if (deleteAllMessages) { getJournal().start(); getJournal().delete(); @@ -488,7 +488,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } public void close() throws IOException, InterruptedException { - if( opened.compareAndSet(true, false)) { + if (opened.compareAndSet(true, false)) { checkpointLock.writeLock().lock(); try { if (metadata.page != null) { @@ -577,7 +577,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe int add; int remove; } - HashMap destinationOpCount = new HashMap(); + HashMap destinationOpCount = new HashMap<>(); @SuppressWarnings("rawtypes") public void track(Operation operation) { @@ -620,7 +620,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe @SuppressWarnings("rawtypes") public String getTransactions() { - ArrayList infos = new ArrayList(); + ArrayList infos = new ArrayList<>(); synchronized (inflightTransactions) { if (!inflightTransactions.isEmpty()) { for (Entry> entry : inflightTransactions.entrySet()) { @@ -716,8 +716,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe }); // rollback any recovered inflight local transactions, and discard any inflight XA transactions. - Set toRollback = new HashSet(); - Set toDiscard = new HashSet(); + Set toRollback = new HashSet<>(); + Set toDiscard = new HashSet<>(); synchronized (inflightTransactions) { for (Iterator it = inflightTransactions.keySet().iterator(); it.hasNext(); ) { TransactionId id = it.next(); @@ -824,7 +824,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe for (String key : storedDestinations.keySet()) { StoredDestination sd = storedDestinations.get(key); - final ArrayList matches = new ArrayList(); + final ArrayList matches = new ArrayList<>(); // Find all the Locations that are >= than the last Append Location. sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor(lastAppendLocation) { @Override @@ -891,7 +891,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe }); } - HashSet missingJournalFiles = new HashSet(); + HashSet missingJournalFiles = new HashSet<>(); while (!ss.isEmpty()) { missingJournalFiles.add((int) ss.removeFirst()); } @@ -909,8 +909,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe LOG.warn("Some journal files are missing: " + missingJournalFiles); } - ArrayList> knownCorruption = new ArrayList>(); - ArrayList> missingPredicates = new ArrayList>(); + ArrayList> knownCorruption = new ArrayList<>(); + ArrayList> missingPredicates = new ArrayList<>(); for (Integer missing : missingJournalFiles) { missingPredicates.add(new BTreeVisitor.BetweenVisitor(new Location(missing, 0), new Location(missing + 1, 0))); } @@ -924,7 +924,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe Sequence seq = dataFile.getCorruptedBlocks().getHead(); while (seq != null) { BTreeVisitor.BetweenVisitor visitor = - new BTreeVisitor.BetweenVisitor(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)); + new BTreeVisitor.BetweenVisitor<>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)); missingPredicates.add(visitor); knownCorruption.add(visitor); seq = seq.getNext(); @@ -935,7 +935,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if (!missingPredicates.isEmpty()) { for (Entry sdEntry : storedDestinations.entrySet()) { final StoredDestination sd = sdEntry.getValue(); - final LinkedHashMap matches = new LinkedHashMap(); + final LinkedHashMap matches = new LinkedHashMap<>(); sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor(missingPredicates) { @Override protected void matched(Location key, Long value) { @@ -1413,7 +1413,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } protected void process(KahaRewrittenDataFileCommand command, Location location) throws IOException { - final TreeSet completeFileSet = new TreeSet(journal.getFileMap().keySet()); + final TreeSet completeFileSet = new TreeSet<>(journal.getFileMap().keySet()); // Mark the current journal file as a compacted file so that gc checks can skip // over logs that are smaller compaction type logs. @@ -1431,7 +1431,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // ///////////////////////////////////////////////////////////////// protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); - private final HashSet journalFilesBeingReplicated = new HashSet(); + private final HashSet journalFilesBeingReplicated = new HashSet<>(); long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { StoredDestination sd = getStoredDestination(command.getDestination(), tx); @@ -1588,7 +1588,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) { Set referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId())); if (referenceFileIds == null) { - referenceFileIds = new HashSet(); + referenceFileIds = new HashSet<>(); referenceFileIds.add(messageLocation.getDataFileId()); metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds); } else { @@ -1722,8 +1722,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if (cleanup) { - final TreeSet completeFileSet = new TreeSet(journal.getFileMap().keySet()); - final TreeSet gcCandidateSet = new TreeSet(completeFileSet); + final TreeSet completeFileSet = new TreeSet<>(journal.getFileMap().keySet()); + final TreeSet gcCandidateSet = new TreeSet<>(completeFileSet); if (LOG.isTraceEnabled()) { LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet); @@ -1949,7 +1949,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe public void run() { int journalToAdvance = -1; - Set journalLogsReferenced = new HashSet(); + Set journalLogsReferenced = new HashSet<>(); //flag to know whether the ack forwarding completed without an exception boolean forwarded = false; @@ -1979,7 +1979,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // Map keys might not be sorted, find the earliest log file to forward acks // from and move only those, future cycles can chip away at more as needed. // We won't move files that are themselves rewritten on a previous compaction. - List journalFileIds = new ArrayList(metadata.ackMessageFileMap.keySet()); + List journalFileIds = new ArrayList<>(metadata.ackMessageFileMap.keySet()); Collections.sort(journalFileIds); for (Integer journalFileId : journalFileIds) { DataFile current = journal.getDataFileById(journalFileId); @@ -2037,7 +2037,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE); LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile); - Map> updatedAckLocations = new HashMap>(); + Map> updatedAckLocations = new HashMap<>(); try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) { KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand(); @@ -2076,7 +2076,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe for (Entry> entry : updatedAckLocations.entrySet()) { Set referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey()); if (referenceFileIds == null) { - referenceFileIds = new HashSet(); + referenceFileIds = new HashSet<>(); referenceFileIds.addAll(entry.getValue()); metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds); } else { @@ -2171,7 +2171,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // StoredDestination related implementation methods. // ///////////////////////////////////////////////////////////////// - protected final HashMap storedDestinations = new HashMap(); + protected final HashMap storedDestinations = new HashMap<>(); static class MessageKeys { final String messageId; @@ -2280,8 +2280,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe ListIndex subLocations; // Transient data used to track which Messages are no longer needed. - final TreeMap messageReferences = new TreeMap(); - final HashSet subscriptionCache = new LinkedHashSet(); + final TreeMap messageReferences = new TreeMap<>(); + final HashSet subscriptionCache = new LinkedHashSet<>(); public void trackPendingAdd(Long seq) { orderIndex.trackPendingAdd(seq); @@ -2304,26 +2304,26 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe @Override public StoredDestination readPayload(final DataInput dataIn) throws IOException { final StoredDestination value = new StoredDestination(); - value.orderIndex.defaultPriorityIndex = new BTreeIndex(pageFile, dataIn.readLong()); - value.locationIndex = new BTreeIndex(pageFile, dataIn.readLong()); - value.messageIdIndex = new BTreeIndex(pageFile, dataIn.readLong()); + value.orderIndex.defaultPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong()); + value.locationIndex = new BTreeIndex<>(pageFile, dataIn.readLong()); + value.messageIdIndex = new BTreeIndex<>(pageFile, dataIn.readLong()); if (dataIn.readBoolean()) { - value.subscriptions = new BTreeIndex(pageFile, dataIn.readLong()); - value.subscriptionAcks = new BTreeIndex(pageFile, dataIn.readLong()); + value.subscriptions = new BTreeIndex<>(pageFile, dataIn.readLong()); + value.subscriptionAcks = new BTreeIndex<>(pageFile, dataIn.readLong()); if (metadata.version >= 4) { - value.ackPositions = new ListIndex(pageFile, dataIn.readLong()); + value.ackPositions = new ListIndex<>(pageFile, dataIn.readLong()); } else { // upgrade pageFile.tx().execute(new Transaction.Closure() { @Override public void execute(Transaction tx) throws IOException { - LinkedHashMap temp = new LinkedHashMap(); + LinkedHashMap temp = new LinkedHashMap<>(); if (metadata.version >= 3) { // migrate BTreeIndex> oldAckPositions = - new BTreeIndex>(pageFile, dataIn.readLong()); + new BTreeIndex<>(pageFile, dataIn.readLong()); oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE); oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); oldAckPositions.load(tx); @@ -2348,7 +2348,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } // Now move the pending messages to ack data into the store backed // structure. - value.ackPositions = new ListIndex(pageFile, tx.allocate()); + value.ackPositions = new ListIndex<>(pageFile, tx.allocate()); value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); value.ackPositions.load(tx); @@ -2361,13 +2361,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } if (metadata.version >= 5) { - value.subLocations = new ListIndex(pageFile, dataIn.readLong()); + value.subLocations = new ListIndex<>(pageFile, dataIn.readLong()); } else { // upgrade pageFile.tx().execute(new Transaction.Closure() { @Override public void execute(Transaction tx) throws IOException { - value.subLocations = new ListIndex(pageFile, tx.allocate()); + value.subLocations = new ListIndex<>(pageFile, tx.allocate()); value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); value.subLocations.load(tx); @@ -2376,19 +2376,19 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } if (metadata.version >= 2) { - value.orderIndex.lowPriorityIndex = new BTreeIndex(pageFile, dataIn.readLong()); - value.orderIndex.highPriorityIndex = new BTreeIndex(pageFile, dataIn.readLong()); + value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong()); + value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong()); } else { // upgrade pageFile.tx().execute(new Transaction.Closure() { @Override public void execute(Transaction tx) throws IOException { - value.orderIndex.lowPriorityIndex = new BTreeIndex(pageFile, tx.allocate()); + value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate()); value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller); value.orderIndex.lowPriorityIndex.load(tx); - value.orderIndex.highPriorityIndex = new BTreeIndex(pageFile, tx.allocate()); + value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate()); value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller); value.orderIndex.highPriorityIndex.load(tx); @@ -2471,14 +2471,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // Brand new destination.. allocate indexes for it. rc = new StoredDestination(); rc.orderIndex.allocate(tx); - rc.locationIndex = new BTreeIndex(pageFile, tx.allocate()); - rc.messageIdIndex = new BTreeIndex(pageFile, tx.allocate()); + rc.locationIndex = new BTreeIndex<>(pageFile, tx.allocate()); + rc.messageIdIndex = new BTreeIndex<>(pageFile, tx.allocate()); if (topic) { - rc.subscriptions = new BTreeIndex(pageFile, tx.allocate()); - rc.subscriptionAcks = new BTreeIndex(pageFile, tx.allocate()); - rc.ackPositions = new ListIndex(pageFile, tx.allocate()); - rc.subLocations = new ListIndex(pageFile, tx.allocate()); + rc.subscriptions = new BTreeIndex<>(pageFile, tx.allocate()); + rc.subscriptionAcks = new BTreeIndex<>(pageFile, tx.allocate()); + rc.ackPositions = new ListIndex<>(pageFile, tx.allocate()); + rc.subLocations = new ListIndex<>(pageFile, tx.allocate()); } metadata.destinations.put(tx, key, rc); } @@ -2532,7 +2532,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); rc.subLocations.load(tx); - rc.subscriptionCursors = new HashMap(); + rc.subscriptionCursors = new HashMap<>(); if (metadata.version < 3) { @@ -2696,7 +2696,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe * KahaDestination key */ protected final ConcurrentMap storeCache = - new ConcurrentHashMap(); + new ConcurrentHashMap<>(); /** * Locate the storeMessageSize counter for this KahaDestination @@ -2871,7 +2871,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return; } - ArrayList unreferenced = new ArrayList(); + ArrayList unreferenced = new ArrayList<>(); for(Long sequenceId : sequences) { Long references = sd.messageReferences.get(sequenceId); @@ -2889,7 +2889,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe for(Long sequenceId : unreferenced) { // Find all the entries that need to get deleted. - ArrayList> deletes = new ArrayList>(); + ArrayList> deletes = new ArrayList<>(); sd.orderIndex.getDeleteList(tx, deletes, sequenceId); // Do the actual deletes. @@ -2941,7 +2941,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } // Find all the entries that need to get deleted. - ArrayList> deletes = new ArrayList>(); + ArrayList> deletes = new ArrayList<>(); sd.orderIndex.getDeleteList(tx, deletes, messageSequence); // Do the actual deletes. @@ -3001,11 +3001,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // Transaction related implementation methods. // ///////////////////////////////////////////////////////////////// @SuppressWarnings("rawtypes") - private final LinkedHashMap> inflightTransactions = new LinkedHashMap>(); + private final LinkedHashMap> inflightTransactions = new LinkedHashMap<>(); @SuppressWarnings("rawtypes") - protected final LinkedHashMap> preparedTransactions = new LinkedHashMap>(); - protected final Set ackedAndPrepared = new HashSet(); - protected final Set rolledBackAcks = new HashSet(); + protected final LinkedHashMap> preparedTransactions = new LinkedHashMap<>(); + protected final Set ackedAndPrepared = new HashSet<>(); + protected final Set rolledBackAcks = new HashSet<>(); // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback, // till then they are skipped by the store. @@ -3201,6 +3201,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe * @deprecated use {@link #getJournalDiskSyncStrategy} instead * @return */ + @Deprecated public boolean isEnableJournalDiskSyncs() { return journalDiskSyncStrategy != null && JournalDiskSyncStrategy.ALWAYS.name().equals( journalDiskSyncStrategy.trim().toUpperCase()); @@ -3210,6 +3211,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe * @deprecated use {@link #setEnableJournalDiskSyncs} instead * @param syncWrites */ + @Deprecated public void setEnableJournalDiskSyncs(boolean syncWrites) { if (syncWrites) { journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name(); @@ -3486,7 +3488,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe Long lastHighKey; Long lastLowKey; byte lastGetPriority; - final List pendingAdditions = new LinkedList(); + final List pendingAdditions = new LinkedList<>(); final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller(); MessageKeys remove(Transaction tx, Long key) throws IOException { @@ -3513,16 +3515,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } void allocate(Transaction tx) throws IOException { - defaultPriorityIndex = new BTreeIndex(pageFile, tx.allocate()); + defaultPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate()); if (metadata.version >= 2) { - lowPriorityIndex = new BTreeIndex(pageFile, tx.allocate()); - highPriorityIndex = new BTreeIndex(pageFile, tx.allocate()); + lowPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate()); + highPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate()); } } void configureLast(Transaction tx) throws IOException { // Figure out the next key using the last entry in the destination. - TreeSet orderedSet = new TreeSet(); + TreeSet orderedSet = new TreeSet<>(); addLast(orderedSet, highPriorityIndex, tx); addLast(orderedSet, defaultPriorityIndex, tx);