From 946e62d702d2bf5fbcdb0ed4cb6977046acb659b Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Mon, 14 Mar 2016 11:04:57 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6203 Rewrite older acks that can be preventing GC of log files. --- .../apache/activemq/util/ThreadPoolUtils.java | 40 +- .../store/kahadb/MessageDatabase.java | 349 ++++++++++++++---- .../apache/activemq/store/kahadb/Visitor.java | 4 + .../store/kahadb/disk/journal/DataFile.java | 23 +- .../kahadb/disk/journal/DataFileAccessor.java | 5 +- .../kahadb/disk/journal/DataFileAppender.java | 32 +- .../kahadb/disk/journal/FileAppender.java | 8 +- .../store/kahadb/disk/journal/Journal.java | 124 +++++-- .../journal/TargetedDataFileAppender.java | 297 +++++++++++++++ .../disk/util/DataByteArrayInputStream.java | 31 +- .../disk/util/DataByteArrayOutputStream.java | 24 +- .../src/main/proto/journal-data.proto | 12 + .../journal/TargetedDataFileAppenderTest.java | 116 ++++++ ...TransactedStoreUsageSuspendResumeTest.java | 4 +- 14 files changed, 898 insertions(+), 171 deletions(-) create mode 100644 activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java create mode 100644 activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java b/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java index 27b69fc37d..554f73003a 100644 --- a/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java +++ b/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java @@ -82,7 +82,9 @@ public final class ThreadPoolUtils { * {@link #shutdownNow(java.util.concurrent.ExecutorService)} which * forces a shutdown. The parameter shutdownAwaitTermination * is used as timeout value waiting for orderly shutdown to - * complete normally, before going aggressively. + * complete normally, before going aggressively. If the shutdownAwaitTermination + * value is negative the shutdown waits indefinitely for the ExecutorService + * to complete its shutdown. * * @param executorService the executor service to shutdown * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown @@ -130,6 +132,19 @@ public final class ThreadPoolUtils { Thread.currentThread().interrupt(); } } + } else if (shutdownAwaitTermination < 0) { + try { + awaitTermination(executorService); + } catch (InterruptedException e) { + warned = true; + LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService); + // we were interrupted during shutdown, so force shutdown + try { + executorService.shutdownNow(); + } finally { + Thread.currentThread().interrupt(); + } + } } // if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log @@ -143,6 +158,29 @@ public final class ThreadPoolUtils { } } + /** + * Awaits the termination of the thread pool indefinitely (Use with Caution). + *

+ * This implementation will log every 2nd second at INFO level that we are waiting, so the end user + * can see we are not hanging in case it takes longer time to terminate the pool. + * + * @param executorService the thread pool + * + * @throws InterruptedException is thrown if we are interrupted during the waiting + */ + public static void awaitTermination(ExecutorService executorService) throws InterruptedException { + // log progress every 5th second so end user is aware of we are shutting down + StopWatch watch = new StopWatch(); + final long interval = 2000; + while (true) { + if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) { + return; + } else { + LOG.info("Waited {} for ExecutorService: {} to terminate...", TimeUtils.printDuration(watch.taken()), executorService); + } + } + } + /** * Awaits the termination of the thread pool. *

diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 931a18bf05..434e49ca13 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -48,6 +48,10 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -73,6 +77,7 @@ import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; +import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand; import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; import org.apache.activemq.store.kahadb.data.KahaTraceCommand; @@ -84,6 +89,7 @@ import org.apache.activemq.store.kahadb.disk.index.ListIndex; import org.apache.activemq.store.kahadb.disk.journal.DataFile; import org.apache.activemq.store.kahadb.disk.journal.Journal; import org.apache.activemq.store.kahadb.disk.journal.Location; +import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender; import org.apache.activemq.store.kahadb.disk.page.Page; import org.apache.activemq.store.kahadb.disk.page.PageFile; import org.apache.activemq.store.kahadb.disk.page.Transaction; @@ -97,9 +103,11 @@ import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.DataByteArrayInputStream; import org.apache.activemq.util.DataByteArrayOutputStream; +import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; +import org.apache.activemq.util.ThreadPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -122,6 +130,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe static final int VERSION = 6; + static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1; + protected class Metadata { protected Page page; protected int state; @@ -234,8 +244,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe protected boolean deleteAllMessages; protected File directory = DEFAULT_DIRECTORY; protected File indexDirectory = null; - protected Thread checkpointThread; - protected boolean enableJournalDiskSyncs=true; + protected ScheduledExecutorService scheduler; + private final Object schedulerLock = new Object(); + + protected boolean enableJournalDiskSyncs = true; protected boolean archiveDataLogs; protected File directoryArchive; protected AtomicLong journalSize = new AtomicLong(0); @@ -254,7 +266,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe private boolean checkForCorruptJournalFiles = false; private boolean checksumJournalFiles = true; protected boolean forceRecoverIndex = false; - private final Object checkpointThreadLock = new Object(); private boolean archiveCorruptedIndex = false; private boolean useIndexLFRUEviction = false; private float indexLFUEvictionFactor = 0.2f; @@ -263,6 +274,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe private boolean enableIndexPageCaching = true; ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); + private int compactAcksAfterNoGC = 10; + private boolean compactAcksIgnoresStoreGrowth = false; + private int checkPointCyclesWithNoGC; + private int journalLogOnLastCompactionCheck; + @Override public void doStart() throws Exception { load(); @@ -330,51 +346,59 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } private void startCheckpoint() { - if (checkpointInterval == 0 && cleanupInterval == 0) { + if (checkpointInterval == 0 && cleanupInterval == 0) { LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart"); return; } - synchronized (checkpointThreadLock) { - boolean start = false; - if (checkpointThread == null) { - start = true; - } else if (!checkpointThread.isAlive()) { - start = true; - LOG.info("KahaDB: Recovering checkpoint thread after death"); - } - if (start) { - checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") { - @Override - public void run() { - try { - long lastCleanup = System.currentTimeMillis(); - long lastCheckpoint = System.currentTimeMillis(); - // Sleep for a short time so we can periodically check - // to see if we need to exit this thread. - long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); - while (opened.get()) { - Thread.sleep(sleepTime); - long now = System.currentTimeMillis(); - if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) { - checkpointCleanup(true); - lastCleanup = now; - lastCheckpoint = now; - } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) { - checkpointCleanup(false); - lastCheckpoint = now; - } - } - } catch (InterruptedException e) { - // Looks like someone really wants us to exit this thread... - } catch (IOException ioe) { - LOG.error("Checkpoint failed", ioe); - brokerService.handleIOException(ioe); - } - } - }; + synchronized (schedulerLock) { + if (scheduler == null) { + scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { - checkpointThread.setDaemon(true); - checkpointThread.start(); + @Override + public Thread newThread(Runnable r) { + Thread schedulerThread = new Thread(r); + + schedulerThread.setName("ActiveMQ Journal Checkpoint Worker"); + schedulerThread.setDaemon(true); + + return schedulerThread; + } + }); + + // Short intervals for check-point and cleanups + long delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); + + scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS); + } + } + } + + private final class CheckpointRunner implements Runnable { + + private long lastCheckpoint = System.currentTimeMillis(); + private long lastCleanup = System.currentTimeMillis(); + + @Override + public void run() { + try { + // Decide on cleanup vs full checkpoint here. + if (opened.get()) { + long now = System.currentTimeMillis(); + if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) { + checkpointCleanup(true); + lastCleanup = now; + lastCheckpoint = now; + } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) { + checkpointCleanup(false); + lastCheckpoint = now; + } + } + } catch (IOException ioe) { + LOG.error("Checkpoint failed", ioe); + brokerService.handleIOException(ioe); + } catch (Throwable e) { + LOG.error("Checkpoint failed", e); + brokerService.handleIOException(IOExceptionSupport.create(e)); } } } @@ -444,12 +468,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe checkpointLock.writeLock().unlock(); } journal.close(); - synchronized (checkpointThreadLock) { - if (checkpointThread != null) { - checkpointThread.join(); - } - } - //clear the cache and journalSize on shutdown of the store + ThreadPoolUtils.shutdownGraceful(scheduler, -1); + // clear the cache and journalSize on shutdown of the store storeCache.clear(); journalSize.set(0); } @@ -503,11 +523,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe @SuppressWarnings("rawtypes") private void trackMaxAndMin(Location[] range, List ops) { Location t = ops.get(0).getLocation(); - if (range[0]==null || t.compareTo(range[0]) <= 0) { + if (range[0] == null || t.compareTo(range[0]) <= 0) { range[0] = t; } t = ops.get(ops.size() -1).getLocation(); - if (range[1]==null || t.compareTo(range[1]) >= 0) { + if (range[1] == null || t.compareTo(range[1]) >= 0) { range[1] = t; } } @@ -776,7 +796,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } - if( undoCounter > 0 ) { + if (undoCounter > 0) { // The rolledback operations are basically in flight journal writes. To avoid getting // these the end user should do sync writes to the journal. if (LOG.isInfoEnabled()) { @@ -909,7 +929,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } - if( undoCounter > 0 ) { + if (undoCounter > 0) { // The rolledback operations are basically in flight journal writes. To avoid getting these the end user // should do sync writes to the journal. if (LOG.isInfoEnabled()) { @@ -1019,31 +1039,31 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe public Location store(JournalCommand data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException { try { ByteSequence sequence = toByteSequence(data); - Location location; + checkpointLock.readLock().lock(); try { long start = System.currentTimeMillis(); - location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ; + location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ; long start2 = System.currentTimeMillis(); process(data, location, before); long end = System.currentTimeMillis(); - if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { + if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { if (LOG.isInfoEnabled()) { LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); } } - - } finally{ + } finally { checkpointLock.readLock().unlock(); } + if (after != null) { after.run(); } - if (checkpointThread != null && !checkpointThread.isAlive() && opened.get()) { + if (scheduler == null && opened.get()) { startCheckpoint(); } return location; @@ -1167,6 +1187,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe public void visit(KahaUpdateMessageCommand command) throws IOException { process(command, location); } + + @Override + public void visit(KahaRewrittenDataFileCommand command) throws IOException { + process(command, location); + } }); } @@ -1323,6 +1348,19 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } + protected void process(KahaRewrittenDataFileCommand command, Location location) throws IOException { + final TreeSet completeFileSet = new TreeSet(journal.getFileMap().keySet()); + if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) { + // Mark the current journal file as a compacted file so that gc checks can skip + // over logs that are smaller compaction type logs. + DataFile current = journal.getDataFileById(location.getDataFileId()); + current.setTypeCode(command.getRewriteType()); + + // Move offset so that next location read jumps to next file. + location.setOffset(journalMaxFileLength); + } + } + // ///////////////////////////////////////////////////////////////// // These methods do the actual index updates. // ///////////////////////////////////////////////////////////////// @@ -1595,7 +1633,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe tx.store(metadata.page, metadataMarshaller, true); pageFile.flush(); - if( cleanup ) { + if (cleanup) { final TreeSet completeFileSet = new TreeSet(journal.getFileMap().keySet()); final TreeSet gcCandidateSet = new TreeSet(completeFileSet); @@ -1743,6 +1781,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe LOG.trace("gc candidates: " + gcCandidateSet); LOG.trace("ackMessageFileMap: " + metadata.ackMessageFileMap); } + boolean ackMessageFileMapMod = false; Iterator candidates = gcCandidateSet.iterator(); while (candidates.hasNext()) { @@ -1768,9 +1807,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } if (!gcCandidateSet.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Cleanup removing the data files: " + gcCandidateSet); - } + LOG.debug("Cleanup removing the data files: {}", gcCandidateSet); journal.removeDataFiles(gcCandidateSet); for (Integer candidate : gcCandidateSet) { for (Set ackFiles : metadata.ackMessageFileMap.values()) { @@ -1780,12 +1817,153 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if (ackMessageFileMapMod) { checkpointUpdate(tx, false); } + } else { + if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) { + // First check length of journal to make sure it makes sense to even try. + // + // If there is only one journal file with Acks in it we don't need to move + // it since it won't be chained to any later logs. + // + // If the logs haven't grown since the last time then we need to compact + // otherwise there seems to still be room for growth and we don't need to incur + // the overhead. Depending on configuration this check can be avoided and + // Ack compaction will run any time the store has not GC'd a journal file in + // the configured amount of cycles. + if (metadata.ackMessageFileMap.size() > 1 && + (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) { + + LOG.trace("No files GC'd checking if threshold to ACK compaction has been met."); + try { + scheduler.execute(new AckCompactionRunner()); + } catch (Exception ex) { + LOG.warn("Error on queueing the Ack Compactor", ex); + } + } else { + LOG.trace("Journal activity detected, no Ack compaction scheduled."); + } + + checkPointCyclesWithNoGC = 0; + } else { + LOG.trace("Not yet time to check for compaction: {} of {} cycles", + checkPointCyclesWithNoGC, getCompactAcksAfterNoGC()); + } + + journalLogOnLastCompactionCheck = journal.getCurrentDataFileId(); } } LOG.debug("Checkpoint done."); } + private final class AckCompactionRunner implements Runnable { + + @Override + public void run() { + // Lock index to capture the ackMessageFileMap data + indexLock.writeLock().lock(); + + // Map keys might not be sorted, find the earliest log file to forward acks + // from and move only those, future cycles can chip away at more as needed. + // We won't move files that are themselves rewritten on a previous compaction. + List journalFileIds = new ArrayList(metadata.ackMessageFileMap.keySet()); + Collections.sort(journalFileIds); + int journalToAdvance = -1; + for (Integer journalFileId : journalFileIds) { + DataFile current = journal.getDataFileById(journalFileId); + if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { + journalToAdvance = journalFileId; + break; + } + } + + // Check if we found one, or if we only found the current file being written to. + if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) { + return; + } + + Set journalLogsReferenced = + new HashSet(metadata.ackMessageFileMap.get(journalToAdvance)); + + indexLock.writeLock().unlock(); + + try { + // Background rewrite of the old acks + forwardAllAcks(journalToAdvance, journalLogsReferenced); + + // Checkpoint with changes from the ackMessageFileMap + checkpointUpdate(false); + } catch (IOException ioe) { + LOG.error("Checkpoint failed", ioe); + brokerService.handleIOException(ioe); + } catch (Throwable e) { + LOG.error("Checkpoint failed", e); + brokerService.handleIOException(IOExceptionSupport.create(e)); + } + } + } + + private void forwardAllAcks(Integer journalToRead, Set journalLogsReferenced) throws IllegalStateException, IOException { + LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead); + + DataFile forwardsFile = journal.reserveDataFile(); + LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile); + + Map> updatedAckLocations = new HashMap>(); + + try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) { + KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand(); + compactionMarker.setSourceDataFileId(journalToRead); + compactionMarker.setRewriteType(COMPACTED_JOURNAL_FILE); + + ByteSequence payload = toByteSequence(compactionMarker); + appender.storeItem(payload, Journal.USER_RECORD_TYPE, isEnableJournalDiskSyncs()); + LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead); + + Location nextLocation = journal.getNextLocation(new Location(journalToRead, 0)); + while (nextLocation != null && nextLocation.getDataFileId() == journalToRead) { + JournalCommand command = null; + try { + command = load(nextLocation); + } catch (IOException ex) { + LOG.trace("Error loading command during ack forward: {}", nextLocation); + } + + if (command != null && command instanceof KahaRemoveMessageCommand) { + payload = toByteSequence(command); + Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, isEnableJournalDiskSyncs()); + updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced); + } + + nextLocation = journal.getNextLocation(nextLocation); + } + } + + LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations); + + // Lock index while we update the ackMessageFileMap. + indexLock.writeLock().lock(); + + // Update the ack map with the new locations of the acks + for (Entry> entry : updatedAckLocations.entrySet()) { + Set referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey()); + if (referenceFileIds == null) { + referenceFileIds = new HashSet(); + referenceFileIds.addAll(entry.getValue()); + metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds); + } else { + referenceFileIds.addAll(entry.getValue()); + } + } + + // remove the old location data from the ack map so that the old journal log file can + // be removed on next GC. + metadata.ackMessageFileMap.remove(journalToRead); + + indexLock.writeLock().unlock(); + + LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap); + } + final Runnable nullCompletionCallback = new Runnable() { @Override public void run() { @@ -1943,7 +2121,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } - class StoredDestination { MessageOrderIndex orderIndex = new MessageOrderIndex(); @@ -2708,7 +2885,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe runWithIndexLock.sequenceAssignedWithIndexLocked(seq); } } - } class RemoveOperation extends Operation { @@ -2728,7 +2904,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // ///////////////////////////////////////////////////////////////// private PageFile createPageFile() throws IOException { - if( indexDirectory == null ) { + if (indexDirectory == null) { indexDirectory = directory; } IOHelper.mkdirs(indexDirectory); @@ -3456,4 +3632,43 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe public void setPreallocationStrategy(String preallocationStrategy) { this.preallocationStrategy = preallocationStrategy; } + + public int getCompactAcksAfterNoGC() { + return compactAcksAfterNoGC; + } + + /** + * Sets the number of GC cycles where no journal logs were removed before an attempt to + * move forward all the acks in the last log that contains them and is otherwise unreferenced. + *

+ * A value of -1 will disable this feature. + * + * @param compactAcksAfterNoGC + * Number of empty GC cycles before we rewrite old ACKS. + */ + public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) { + this.compactAcksAfterNoGC = compactAcksAfterNoGC; + } + + /** + * Returns whether Ack compaction will ignore that the store is still growing + * and run more often. + * + * @return the compactAcksIgnoresStoreGrowth current value. + */ + public boolean isCompactAcksIgnoresStoreGrowth() { + return compactAcksIgnoresStoreGrowth; + } + + /** + * Configure if Ack compaction will occur regardless of continued growth of the + * journal logs meaning that the store has not run out of space yet. Because the + * compaction operation can be costly this value is defaulted to off and the Ack + * compaction is only done when it seems that the store cannot grow and larger. + * + * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set + */ + public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) { + this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth; + } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java index 43fc152ae4..641f176e7f 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java @@ -30,6 +30,7 @@ import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand; import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand; +import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand; import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; import org.apache.activemq.store.kahadb.data.KahaTraceCommand; @@ -84,4 +85,7 @@ public class Visitor { public void visit(KahaUpdateMessageCommand kahaUpdateMessageCommand) throws IOException { } + + public void visit(KahaRewrittenDataFileCommand kahaUpdateMessageCommand) throws IOException { + } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java index f1e078dd96..126d82b351 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java @@ -18,29 +18,23 @@ package org.apache.activemq.store.kahadb.disk.journal; import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; import org.apache.activemq.store.kahadb.disk.util.LinkedNode; import org.apache.activemq.store.kahadb.disk.util.SequenceSet; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.RecoverableRandomAccessFile; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * DataFile - * - * */ public class DataFile extends LinkedNode implements Comparable { - private static final Logger LOG = LoggerFactory.getLogger(DataFile.class); + public final static byte STANDARD_LOG_FILE = 0x0; protected final File file; protected final Integer dataFileId; protected volatile int length; + protected int typeCode = STANDARD_LOG_FILE; protected final SequenceSet corruptedBlocks = new SequenceSet(); DataFile(File file, int number) { @@ -57,6 +51,14 @@ public class DataFile extends LinkedNode implements Comparable implements Comparable=0 && location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size <= dataFile.getLength()) { - location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size); + if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) { + location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size); } else { - // Perhaps it's just some corruption... scan through the file to find the next valid batch record. We + // Perhaps it's just some corruption... scan through the + // file to find the next valid batch record. We // may have subsequent valid batch records. - int nextOffset = findNextBatchRecord(reader, location.getOffset()+1); - if( nextOffset >=0 ) { + int nextOffset = findNextBatchRecord(reader, location.getOffset() + 1); + if (nextOffset >= 0) { Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1); LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence); dataFile.corruptedBlocks.add(sequence); @@ -391,9 +396,9 @@ public class Journal { totalLength.addAndGet(dataFile.getLength() - existingLen); } - if( !dataFile.corruptedBlocks.isEmpty() ) { + if (!dataFile.corruptedBlocks.isEmpty()) { // Is the end of the data file corrupted? - if( dataFile.corruptedBlocks.getTail().getLast()+1 == location.getOffset() ) { + if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) { dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst()); } } @@ -407,19 +412,19 @@ public class Journal { ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data)); int pos = 0; - while( true ) { + while (true) { pos = bs.indexOf(header, pos); - if( pos >= 0 ) { - return offset+pos; + if (pos >= 0) { + return offset + pos; } else { // need to load the next data chunck in.. - if( bs.length != data.length ) { + if (bs.length != data.length) { // If we had a short read then we were at EOF return -1; } - offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length; + offset += bs.length - BATCH_CONTROL_RECORD_HEADER.length; bs = new ByteSequence(data, 0, reader.read(offset, data)); - pos=0; + pos = 0; } } } @@ -431,34 +436,34 @@ public class Journal { reader.readFully(offset, controlRecord); - // Assert that it's a batch record. - for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) { - if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) { + // Assert that it's a batch record. + for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) { + if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) { return -1; } } int size = controlIs.readInt(); - if( size > MAX_BATCH_SIZE ) { + if (size > MAX_BATCH_SIZE) { return -1; } - if( isChecksum() ) { + if (isChecksum()) { long expectedChecksum = controlIs.readLong(); - if( expectedChecksum == 0 ) { + if (expectedChecksum == 0) { // Checksuming was not enabled when the record was stored. // we can't validate the record :( return size; } byte data[] = new byte[size]; - reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data); + reader.readFully(offset + BATCH_CONTROL_RECORD_SIZE, data); Checksum checksum = new Adler32(); checksum.update(data, 0, data.length); - if( expectedChecksum!=checksum.getValue() ) { + if (expectedChecksum != checksum.getValue()) { return -1; } } @@ -474,15 +479,22 @@ public class Journal { return totalLength.get(); } - synchronized DataFile getCurrentWriteFile() throws IOException { + synchronized DataFile getOrCreateCurrentWriteFile() throws IOException { if (dataFiles.isEmpty()) { rotateWriteFile(); } - return dataFiles.getTail(); + + DataFile current = dataFiles.getTail(); + + if (current != null) { + return current; + } else { + return rotateWriteFile(); + } } synchronized DataFile rotateWriteFile() { - int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1; + int nextNum = nextDataFileId++; File file = getFile(nextNum); DataFile nextWriteFile = new DataFile(file, nextNum); fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile); @@ -491,6 +503,20 @@ public class Journal { return nextWriteFile; } + public synchronized DataFile reserveDataFile() { + int nextNum = nextDataFileId++; + File file = getFile(nextNum); + DataFile reservedDataFile = new DataFile(file, nextNum); + fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile); + fileByFileMap.put(file, reservedDataFile); + if (dataFiles.isEmpty()) { + dataFiles.addLast(reservedDataFile); + } else { + dataFiles.getTail().linkBefore(reservedDataFile); + } + return reservedDataFile; + } + public File getFile(int nextNum) { String fileName = filePrefix + nextNum + fileSuffix; File file = new File(directory, fileName); @@ -517,10 +543,6 @@ public class Journal { return dataFile.getFile(); } - private DataFile getNextDataFile(DataFile dataFile) { - return dataFile.getNext(); - } - public void close() throws IOException { synchronized (this) { if (!started) { @@ -559,6 +581,7 @@ public class Journal { DataFile dataFile = i.next(); result &= dataFile.delete(); } + totalLength.set(0); fileMap.clear(); fileByFileMap.clear(); @@ -574,11 +597,11 @@ public class Journal { public synchronized void removeDataFiles(Set files) throws IOException { for (Integer key : files) { // Can't remove the data file (or subsequent files) that is currently being written to. - if( key >= lastAppendLocation.get().getDataFileId() ) { + if (key >= lastAppendLocation.get().getDataFileId()) { continue; } DataFile dataFile = fileMap.get(key); - if( dataFile!=null ) { + if (dataFile != null) { forceRemoveDataFile(dataFile); } } @@ -607,7 +630,7 @@ public class Journal { LOG.debug("Successfully moved data file"); } else { LOG.debug("Deleting data file: {}", dataFile); - if ( dataFile.delete() ) { + if (dataFile.delete()) { LOG.debug("Discarded data file: {}", dataFile); } else { LOG.warn("Failed to discard data file : {}", dataFile.getFile()); @@ -644,7 +667,7 @@ public class Journal { if (cur == null) { if (location == null) { DataFile head = dataFiles.getHead(); - if( head == null ) { + if (head == null) { return null; } cur = new Location(); @@ -667,7 +690,7 @@ public class Journal { // Did it go into the next file?? if (dataFile.getLength() <= cur.getOffset()) { - dataFile = getNextDataFile(dataFile); + dataFile = dataFile.getNext(); if (dataFile == null) { return null; } else { @@ -796,10 +819,35 @@ public class Journal { this.archiveDataLogs = archiveDataLogs; } - synchronized public Integer getCurrentDataFileId() { - if (dataFiles.isEmpty()) + public synchronized DataFile getDataFileById(int dataFileId) { + if (dataFiles.isEmpty()) { return null; - return dataFiles.getTail().getDataFileId(); + } + + return fileMap.get(Integer.valueOf(dataFileId)); + } + + public synchronized DataFile getCurrentDataFile() { + if (dataFiles.isEmpty()) { + return null; + } + + DataFile current = dataFiles.getTail(); + + if (current != null) { + return current; + } else { + return null; + } + } + + public synchronized Integer getCurrentDataFileId() { + DataFile current = getCurrentDataFile(); + if (current != null) { + return current.getDataFileId(); + } else { + return null; + } } /** diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java new file mode 100644 index 0000000000..3e3e090a04 --- /dev/null +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb.disk.journal; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.zip.Adler32; +import java.util.zip.Checksum; + +import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream; +import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.RecoverableRandomAccessFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * File Appender instance that performs batched writes in the thread where the write is + * queued. This appender does not honor the maxFileLength value in the journal as the + * files created here are out-of-band logs used for other purposes such as journal level + * compaction. + */ +public class TargetedDataFileAppender implements FileAppender { + + private static final Logger LOG = LoggerFactory.getLogger(TargetedDataFileAppender.class); + + private final Journal journal; + private final DataFile target; + private final Map inflightWrites; + private final int maxWriteBatchSize; + + private boolean closed; + private boolean preallocate; + private WriteBatch nextWriteBatch; + private int statIdx = 0; + private int[] stats = new int[maxStat]; + + public class WriteBatch { + + protected final int offset; + + public final DataFile dataFile; + public final LinkedNodeList writes = new LinkedNodeList(); + public int size = Journal.BATCH_CONTROL_RECORD_SIZE; + public AtomicReference exception = new AtomicReference(); + + public WriteBatch(DataFile dataFile, int offset) { + this.dataFile = dataFile; + this.offset = offset; + this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE); + this.size = Journal.BATCH_CONTROL_RECORD_SIZE; + journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE); + } + + public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws IOException { + this(dataFile, offset); + append(write); + } + + public boolean canAppend(Journal.WriteCommand write) { + int newSize = size + write.location.getSize(); + if (newSize >= maxWriteBatchSize) { + return false; + } + return true; + } + + public void append(Journal.WriteCommand write) throws IOException { + this.writes.addLast(write); + write.location.setDataFileId(dataFile.getDataFileId()); + write.location.setOffset(offset + size); + int s = write.location.getSize(); + size += s; + dataFile.incrementLength(s); + journal.addToTotalLength(s); + } + } + + /** + * Construct a Store writer + */ + public TargetedDataFileAppender(Journal journal, DataFile target) { + this.journal = journal; + this.target = target; + this.inflightWrites = this.journal.getInflightWrites(); + this.maxWriteBatchSize = this.journal.getWriteBatchSize(); + } + + @Override + public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException { + checkClosed(); + + // Write the packet our internal buffer. + int size = data.getLength() + Journal.RECORD_HEAD_SPACE; + + final Location location = new Location(); + location.setSize(size); + location.setType(type); + + Journal.WriteCommand write = new Journal.WriteCommand(location, data, sync); + + enqueueWrite(write); + + if (sync) { + writePendingBatch(); + } + + return location; + } + + @Override + public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException { + checkClosed(); + + // Write the packet our internal buffer. + int size = data.getLength() + Journal.RECORD_HEAD_SPACE; + + final Location location = new Location(); + location.setSize(size); + location.setType(type); + + Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete); + + enqueueWrite(write); + + return location; + } + + @Override + public void close() throws IOException { + if (!closed) { + if (nextWriteBatch != null) { + // force sync of current in-progress batched write. + LOG.debug("Close of targeted appender flushing last batch."); + writePendingBatch(); + } + + closed = true; + } + } + + //----- Appender Configuration -------------------------------------------// + + public boolean isPreallocate() { + return preallocate; + } + + public void setPreallocate(boolean preallocate) { + this.preallocate = preallocate; + } + + //----- Internal Implementation ------------------------------------------// + + private void checkClosed() throws IOException { + if (closed) { + throw new IOException("The appender is clsoed"); + } + } + + private WriteBatch enqueueWrite(Journal.WriteCommand write) throws IOException { + while (true) { + if (nextWriteBatch == null) { + nextWriteBatch = new WriteBatch(target, target.getLength(), write); + break; + } else { + // Append to current batch if possible.. + if (nextWriteBatch.canAppend(write)) { + nextWriteBatch.append(write); + break; + } else { + // Flush current batch and start a new one. + writePendingBatch(); + nextWriteBatch = null; + } + } + } + + if (!write.sync) { + inflightWrites.put(new Journal.WriteKey(write.location), write); + } + + return nextWriteBatch; + } + + private void writePendingBatch() throws IOException { + DataFile dataFile = nextWriteBatch.dataFile; + + try (RecoverableRandomAccessFile file = dataFile.openRandomAccessFile(); + DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);) { + + // preallocate on first open of new file (length == 0) if configured to do so. + // NOTE: dataFile.length cannot be used because it is updated in enqueue + if (file.length() == 0L && isPreallocate()) { + journal.preallocateEntireJournalDataFile(file); + } + + Journal.WriteCommand write = nextWriteBatch.writes.getHead(); + + // Write an empty batch control record. + buff.reset(); + buff.writeInt(Journal.BATCH_CONTROL_RECORD_SIZE); + buff.writeByte(Journal.BATCH_CONTROL_RECORD_TYPE); + buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC); + buff.writeInt(0); + buff.writeLong(0); + + while (write != null) { + buff.writeInt(write.location.getSize()); + buff.writeByte(write.location.getType()); + buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength()); + write = write.getNext(); + } + + // append 'unset' next batch (5 bytes) so read can always find eof + buff.writeInt(0); + buff.writeByte(0); + + ByteSequence sequence = buff.toByteSequence(); + + // Now we can fill in the batch control record properly. + buff.reset(); + buff.skip(5 + Journal.BATCH_CONTROL_RECORD_MAGIC.length); + buff.writeInt(sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5); + if (journal.isChecksum()) { + Checksum checksum = new Adler32(); + checksum.update(sequence.getData(), + sequence.getOffset() + Journal.BATCH_CONTROL_RECORD_SIZE, + sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5); + buff.writeLong(checksum.getValue()); + } + + // Now do the 1 big write. + file.seek(nextWriteBatch.offset); + if (maxStat > 0) { + if (statIdx < maxStat) { + stats[statIdx++] = sequence.getLength(); + } else { + long all = 0; + for (; statIdx > 0;) { + all += stats[--statIdx]; + } + LOG.trace("Ave writeSize: {}", all / maxStat); + } + } + + file.write(sequence.getData(), sequence.getOffset(), sequence.getLength()); + + ReplicationTarget replicationTarget = journal.getReplicationTarget(); + if (replicationTarget != null) { + replicationTarget.replicate(nextWriteBatch.writes.getHead().location, sequence, true); + } + + file.sync(); + + signalDone(nextWriteBatch); + } catch (IOException e) { + LOG.info("Journal failed while writing at: {}", nextWriteBatch.offset); + throw e; + } + } + + private void signalDone(WriteBatch writeBatch) { + // Now that the data is on disk, remove the writes from the in + // flight cache and signal any onComplete requests. + Journal.WriteCommand write = writeBatch.writes.getHead(); + while (write != null) { + if (!write.sync) { + inflightWrites.remove(new Journal.WriteKey(write.location)); + } + + if (write.onComplete != null) { + try { + write.onComplete.run(); + } catch (Throwable e) { + LOG.info("Add exception was raised while executing the run command for onComplete", e); + } + } + + write = write.getNext(); + } + } +} diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStream.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStream.java index 7147fd922e..455a020672 100755 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStream.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStream.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,19 +16,18 @@ */ package org.apache.activemq.store.kahadb.disk.util; -import org.apache.activemq.util.ByteSequence; - import java.io.DataInput; import java.io.IOException; import java.io.InputStream; import java.io.UTFDataFormatException; +import org.apache.activemq.util.ByteSequence; + /** * Optimized ByteArrayInputStream that can be used more than once - * - * */ -public final class DataByteArrayInputStream extends InputStream implements DataInput { +public final class DataByteArrayInputStream extends InputStream implements DataInput, AutoCloseable { + private byte[] buf; private int pos; private int offset; @@ -137,6 +136,7 @@ public final class DataByteArrayInputStream extends InputStream implements DataI * @return the next byte of data, or -1 if the end of the * stream has been reached. */ + @Override public int read() { return (pos < length) ? (buf[pos++] & 0xff) : -1; } @@ -152,6 +152,7 @@ public final class DataByteArrayInputStream extends InputStream implements DataI * -1 if there is no more data because the end of the * stream has been reached. */ + @Override public int read(byte b[], int off, int len) { if (b == null) { throw new NullPointerException(); @@ -174,18 +175,22 @@ public final class DataByteArrayInputStream extends InputStream implements DataI * @return the number of bytes that can be read from the input stream * without blocking. */ + @Override public int available() { return length - pos; } + @Override public void readFully(byte[] b) { read(b, 0, b.length); } + @Override public void readFully(byte[] b, int off, int len) { read(b, off, len); } + @Override public int skipBytes(int n) { if (pos + n > length) { n = length - pos; @@ -197,39 +202,47 @@ public final class DataByteArrayInputStream extends InputStream implements DataI return n; } + @Override public boolean readBoolean() { return read() != 0; } + @Override public byte readByte() { return (byte)read(); } + @Override public int readUnsignedByte() { return read(); } + @Override public short readShort() { this.read(work, 0, 2); return (short) (((work[0] & 0xff) << 8) | (work[1] & 0xff)); } + @Override public int readUnsignedShort() { this.read(work, 0, 2); - return (int) (((work[0] & 0xff) << 8) | (work[1] & 0xff)); + return ((work[0] & 0xff) << 8) | (work[1] & 0xff); } + @Override public char readChar() { this.read(work, 0, 2); return (char) (((work[0] & 0xff) << 8) | (work[1] & 0xff)); } + @Override public int readInt() { this.read(work, 0, 4); return ((work[0] & 0xff) << 24) | ((work[1] & 0xff) << 16) | ((work[2] & 0xff) << 8) | (work[3] & 0xff); } + @Override public long readLong() { this.read(work, 0, 8); @@ -241,14 +254,17 @@ public final class DataByteArrayInputStream extends InputStream implements DataI return ((i1 & 0xffffffffL) << 32) | (i2 & 0xffffffffL); } + @Override public float readFloat() throws IOException { return Float.intBitsToFloat(readInt()); } + @Override public double readDouble() throws IOException { return Double.longBitsToDouble(readLong()); } + @Override public String readLine() { int start = pos; while (pos < length) { @@ -267,6 +283,7 @@ public final class DataByteArrayInputStream extends InputStream implements DataI return new String(buf, start, pos); } + @Override public String readUTF() throws IOException { int length = readUnsignedShort(); int endPos = pos + length; diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayOutputStream.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayOutputStream.java index 469c85323e..d29b70f488 100755 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayOutputStream.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayOutputStream.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,21 +16,18 @@ */ package org.apache.activemq.store.kahadb.disk.util; -import org.apache.activemq.store.kahadb.disk.page.PageFile; -import org.apache.activemq.util.ByteSequence; - import java.io.DataOutput; import java.io.IOException; import java.io.OutputStream; import java.io.UTFDataFormatException; +import org.apache.activemq.store.kahadb.disk.page.PageFile; +import org.apache.activemq.util.ByteSequence; /** * Optimized ByteArrayOutputStream - * - * */ -public class DataByteArrayOutputStream extends OutputStream implements DataOutput { +public class DataByteArrayOutputStream extends OutputStream implements DataOutput, AutoCloseable { private static final int DEFAULT_SIZE = PageFile.DEFAULT_PAGE_SIZE; protected byte buf[]; protected int pos; @@ -88,6 +85,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu * @param b the byte to be written. * @throws IOException */ + @Override public void write(int b) throws IOException { int newcount = pos + 1; ensureEnoughBuffer(newcount); @@ -105,6 +103,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu * @param len the number of bytes to write. * @throws IOException */ + @Override public void write(byte b[], int off, int len) throws IOException { if (len == 0) { return; @@ -146,18 +145,21 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu return pos; } + @Override public void writeBoolean(boolean v) throws IOException { ensureEnoughBuffer(pos + 1); buf[pos++] = (byte)(v ? 1 : 0); onWrite(); } + @Override public void writeByte(int v) throws IOException { ensureEnoughBuffer(pos + 1); buf[pos++] = (byte)(v >>> 0); onWrite(); } + @Override public void writeShort(int v) throws IOException { ensureEnoughBuffer(pos + 2); buf[pos++] = (byte)(v >>> 8); @@ -165,6 +167,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu onWrite(); } + @Override public void writeChar(int v) throws IOException { ensureEnoughBuffer(pos + 2); buf[pos++] = (byte)(v >>> 8); @@ -172,6 +175,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu onWrite(); } + @Override public void writeInt(int v) throws IOException { ensureEnoughBuffer(pos + 4); buf[pos++] = (byte)(v >>> 24); @@ -181,6 +185,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu onWrite(); } + @Override public void writeLong(long v) throws IOException { ensureEnoughBuffer(pos + 8); buf[pos++] = (byte)(v >>> 56); @@ -194,14 +199,17 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu onWrite(); } + @Override public void writeFloat(float v) throws IOException { writeInt(Float.floatToIntBits(v)); } + @Override public void writeDouble(double v) throws IOException { writeLong(Double.doubleToLongBits(v)); } + @Override public void writeBytes(String s) throws IOException { int length = s.length(); for (int i = 0; i < length; i++) { @@ -209,6 +217,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu } } + @Override public void writeChars(String s) throws IOException { int length = s.length(); for (int i = 0; i < length; i++) { @@ -218,6 +227,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu } } + @Override public void writeUTF(String str) throws IOException { int strlen = str.length(); int encodedsize = 0; diff --git a/activemq-kahadb-store/src/main/proto/journal-data.proto b/activemq-kahadb-store/src/main/proto/journal-data.proto index 01607a519f..2dd97b951f 100644 --- a/activemq-kahadb-store/src/main/proto/journal-data.proto +++ b/activemq-kahadb-store/src/main/proto/journal-data.proto @@ -37,6 +37,7 @@ enum KahaEntryType { KAHA_REMOVE_SCHEDULED_JOB_COMMAND = 13; KAHA_REMOVE_SCHEDULED_JOBS_COMMAND = 14; KAHA_DESTROY_SCHEDULER_COMMAND = 15; + KAHA_REWRITTEN_DATA_FILE_COMMAND = 16; } message KahaTraceCommand { @@ -240,6 +241,17 @@ message KahaDestroySchedulerCommand { required string scheduler=1; } +message KahaRewrittenDataFileCommand { + //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand"; + //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; + //| option java_type_method = "KahaEntryType"; + + required int32 sourceDataFileId = 1; + optional int32 rewriteType = 2; + optional bool skipIfSourceExists = 3 [default = true]; + +} + // TODO things to ponder // should we move more message fields // that are set by the sender (and rarely required by the broker diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java new file mode 100644 index 0000000000..a6cdb9e004 --- /dev/null +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb.disk.journal; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.IOHelper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test the single threaded DataFileAppender class. + */ +public class TargetedDataFileAppenderTest { + + private Journal dataManager; + private TargetedDataFileAppender appender; + private DataFile dataFile; + private File dir; + + @Before + public void setUp() throws Exception { + dir = new File("target/tests/TargetedDataFileAppenderTest"); + dir.mkdirs(); + dataManager = new Journal(); + dataManager.setDirectory(dir); + dataManager.start(); + + dataFile = dataManager.reserveDataFile(); + appender = new TargetedDataFileAppender(dataManager, dataFile); + } + + @After + public void tearDown() throws Exception { + dataManager.close(); + IOHelper.delete(dir); + } + + @Test + public void testWritesAreBatched() throws Exception { + final int iterations = 10; + ByteSequence data = new ByteSequence("DATA".getBytes()); + for (int i = 0; i < iterations; i++) { + appender.storeItem(data, Journal.USER_RECORD_TYPE, false); + } + + assertTrue("Data file should not be empty", dataFile.getLength() > 0); + assertTrue("Data file should be empty", dataFile.getFile().length() == 0); + + appender.close(); + + // at this point most probably dataManager.getInflightWrites().size() >= 0 + // as the Thread created in DataFileAppender.enqueue() may not have caught up. + assertTrue("Data file should not be empty", dataFile.getLength() > 0); + assertTrue("Data file should not be empty", dataFile.getFile().length() > 0); + } + + @Test + public void testBatchWritesCompleteAfterClose() throws Exception { + final int iterations = 10; + ByteSequence data = new ByteSequence("DATA".getBytes()); + for (int i = 0; i < iterations; i++) { + appender.storeItem(data, Journal.USER_RECORD_TYPE, false); + } + + appender.close(); + + // at this point most probably dataManager.getInflightWrites().size() >= 0 + // as the Thread created in DataFileAppender.enqueue() may not have caught up. + assertTrue("Data file should not be empty", dataFile.getLength() > 0); + assertTrue("Data file should not be empty", dataFile.getFile().length() > 0); + } + + @Test + public void testBatchWriteCallbackCompleteAfterClose() throws Exception { + final int iterations = 10; + final CountDownLatch latch = new CountDownLatch(iterations); + ByteSequence data = new ByteSequence("DATA".getBytes()); + for (int i = 0; i < iterations; i++) { + appender.storeItem(data, Journal.USER_RECORD_TYPE, new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }); + } + + appender.close(); + + // at this point most probably dataManager.getInflightWrites().size() >= 0 + // as the Thread created in DataFileAppender.enqueue() may not have caught up. + assertTrue("queued data is written", latch.await(5, TimeUnit.SECONDS)); + assertTrue("Data file should not be empty", dataFile.getLength() > 0); + assertTrue("Data file should not be empty", dataFile.getFile().length() > 0); + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java index 99a3e9eedb..f1797cb701 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java @@ -87,9 +87,7 @@ public class TransactedStoreUsageSuspendResumeTest { do { Message message = consumer.receive(5000); if (message != null) { - if ((messagesReceivedCountDown.getCount() % (MAX_MESSAGES / 5)) == 0) { - session.commit(); - } + session.commit(); messagesReceivedCountDown.countDown(); } if (messagesReceivedCountDown.getCount() % 500 == 0) {