Properly setting typeCode value for new journal files used for ack
compaction
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-05-23 18:09:01 +00:00
parent b4e35fe8a3
commit 4d6cc4b460
1 changed files with 8 additions and 6 deletions

View File

@ -1375,12 +1375,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
protected void process(KahaRewrittenDataFileCommand command, Location location) throws IOException {
final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
// Mark the current journal file as a compacted file so that gc checks can skip
// over logs that are smaller compaction type logs.
DataFile current = journal.getDataFileById(location.getDataFileId());
current.setTypeCode(command.getRewriteType());
// Mark the current journal file as a compacted file so that gc checks can skip
// over logs that are smaller compaction type logs.
DataFile current = journal.getDataFileById(location.getDataFileId());
current.setTypeCode(command.getRewriteType());
if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
// Move offset so that next location read jumps to next file.
location.setOffset(journalMaxFileLength);
}
@ -1971,6 +1972,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead);
DataFile forwardsFile = journal.reserveDataFile();
forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE);
LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile);
Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>();
@ -1978,7 +1980,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) {
KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand();
compactionMarker.setSourceDataFileId(journalToRead);
compactionMarker.setRewriteType(COMPACTED_JOURNAL_FILE);
compactionMarker.setRewriteType(forwardsFile.getTypeCode());
ByteSequence payload = toByteSequence(compactionMarker);
appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);