Rewrite older acks that can be preventing GC of log files.
This commit is contained in:
Timothy Bish 2016-03-14 11:04:57 -04:00
parent b39ab7867c
commit 946e62d702
14 changed files with 898 additions and 171 deletions

View File

@ -82,7 +82,9 @@ public final class ThreadPoolUtils {
* {@link #shutdownNow(java.util.concurrent.ExecutorService)} which
* forces a shutdown. The parameter <tt>shutdownAwaitTermination</tt>
* is used as timeout value waiting for orderly shutdown to
* complete normally, before going aggressively.
* complete normally, before going aggressively. If the shutdownAwaitTermination
* value is negative the shutdown waits indefinitely for the ExecutorService
* to complete its shutdown.
*
* @param executorService the executor service to shutdown
* @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown
@ -130,6 +132,19 @@ public final class ThreadPoolUtils {
Thread.currentThread().interrupt();
}
}
} else if (shutdownAwaitTermination < 0) {
try {
awaitTermination(executorService);
} catch (InterruptedException e) {
warned = true;
LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
// we were interrupted during shutdown, so force shutdown
try {
executorService.shutdownNow();
} finally {
Thread.currentThread().interrupt();
}
}
}
// if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log
@ -143,6 +158,29 @@ public final class ThreadPoolUtils {
}
}
/**
* Awaits the termination of the thread pool indefinitely (Use with Caution).
* <p/>
* This implementation will log every 2nd second at INFO level that we are waiting, so the end user
* can see we are not hanging in case it takes longer time to terminate the pool.
*
* @param executorService the thread pool
*
* @throws InterruptedException is thrown if we are interrupted during the waiting
*/
public static void awaitTermination(ExecutorService executorService) throws InterruptedException {
// log progress every 5th second so end user is aware of we are shutting down
StopWatch watch = new StopWatch();
final long interval = 2000;
while (true) {
if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) {
return;
} else {
LOG.info("Waited {} for ExecutorService: {} to terminate...", TimeUtils.printDuration(watch.taken()), executorService);
}
}
}
/**
* Awaits the termination of the thread pool.
* <p/>

View File

@ -48,6 +48,10 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -73,6 +77,7 @@ import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
@ -84,6 +89,7 @@ import org.apache.activemq.store.kahadb.disk.index.ListIndex;
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender;
import org.apache.activemq.store.kahadb.disk.page.Page;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
@ -97,9 +103,11 @@ import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -122,6 +130,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
static final int VERSION = 6;
static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1;
protected class Metadata {
protected Page<Metadata> page;
protected int state;
@ -234,8 +244,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
protected boolean deleteAllMessages;
protected File directory = DEFAULT_DIRECTORY;
protected File indexDirectory = null;
protected Thread checkpointThread;
protected boolean enableJournalDiskSyncs=true;
protected ScheduledExecutorService scheduler;
private final Object schedulerLock = new Object();
protected boolean enableJournalDiskSyncs = true;
protected boolean archiveDataLogs;
protected File directoryArchive;
protected AtomicLong journalSize = new AtomicLong(0);
@ -254,7 +266,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private boolean checkForCorruptJournalFiles = false;
private boolean checksumJournalFiles = true;
protected boolean forceRecoverIndex = false;
private final Object checkpointThreadLock = new Object();
private boolean archiveCorruptedIndex = false;
private boolean useIndexLFRUEviction = false;
private float indexLFUEvictionFactor = 0.2f;
@ -263,6 +274,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private boolean enableIndexPageCaching = true;
ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
private int compactAcksAfterNoGC = 10;
private boolean compactAcksIgnoresStoreGrowth = false;
private int checkPointCyclesWithNoGC;
private int journalLogOnLastCompactionCheck;
@Override
public void doStart() throws Exception {
load();
@ -330,51 +346,59 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
private void startCheckpoint() {
if (checkpointInterval == 0 && cleanupInterval == 0) {
if (checkpointInterval == 0 && cleanupInterval == 0) {
LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
return;
}
synchronized (checkpointThreadLock) {
boolean start = false;
if (checkpointThread == null) {
start = true;
} else if (!checkpointThread.isAlive()) {
start = true;
LOG.info("KahaDB: Recovering checkpoint thread after death");
}
if (start) {
checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
@Override
public void run() {
try {
long lastCleanup = System.currentTimeMillis();
long lastCheckpoint = System.currentTimeMillis();
// Sleep for a short time so we can periodically check
// to see if we need to exit this thread.
long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
while (opened.get()) {
Thread.sleep(sleepTime);
long now = System.currentTimeMillis();
if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
checkpointCleanup(true);
lastCleanup = now;
lastCheckpoint = now;
} else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
checkpointCleanup(false);
lastCheckpoint = now;
}
}
} catch (InterruptedException e) {
// Looks like someone really wants us to exit this thread...
} catch (IOException ioe) {
LOG.error("Checkpoint failed", ioe);
brokerService.handleIOException(ioe);
}
}
};
synchronized (schedulerLock) {
if (scheduler == null) {
scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
checkpointThread.setDaemon(true);
checkpointThread.start();
@Override
public Thread newThread(Runnable r) {
Thread schedulerThread = new Thread(r);
schedulerThread.setName("ActiveMQ Journal Checkpoint Worker");
schedulerThread.setDaemon(true);
return schedulerThread;
}
});
// Short intervals for check-point and cleanups
long delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS);
}
}
}
private final class CheckpointRunner implements Runnable {
private long lastCheckpoint = System.currentTimeMillis();
private long lastCleanup = System.currentTimeMillis();
@Override
public void run() {
try {
// Decide on cleanup vs full checkpoint here.
if (opened.get()) {
long now = System.currentTimeMillis();
if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) {
checkpointCleanup(true);
lastCleanup = now;
lastCheckpoint = now;
} else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) {
checkpointCleanup(false);
lastCheckpoint = now;
}
}
} catch (IOException ioe) {
LOG.error("Checkpoint failed", ioe);
brokerService.handleIOException(ioe);
} catch (Throwable e) {
LOG.error("Checkpoint failed", e);
brokerService.handleIOException(IOExceptionSupport.create(e));
}
}
}
@ -444,12 +468,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
checkpointLock.writeLock().unlock();
}
journal.close();
synchronized (checkpointThreadLock) {
if (checkpointThread != null) {
checkpointThread.join();
}
}
//clear the cache and journalSize on shutdown of the store
ThreadPoolUtils.shutdownGraceful(scheduler, -1);
// clear the cache and journalSize on shutdown of the store
storeCache.clear();
journalSize.set(0);
}
@ -503,11 +523,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
@SuppressWarnings("rawtypes")
private void trackMaxAndMin(Location[] range, List<Operation> ops) {
Location t = ops.get(0).getLocation();
if (range[0]==null || t.compareTo(range[0]) <= 0) {
if (range[0] == null || t.compareTo(range[0]) <= 0) {
range[0] = t;
}
t = ops.get(ops.size() -1).getLocation();
if (range[1]==null || t.compareTo(range[1]) >= 0) {
if (range[1] == null || t.compareTo(range[1]) >= 0) {
range[1] = t;
}
}
@ -776,7 +796,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
if( undoCounter > 0 ) {
if (undoCounter > 0) {
// The rolledback operations are basically in flight journal writes. To avoid getting
// these the end user should do sync writes to the journal.
if (LOG.isInfoEnabled()) {
@ -909,7 +929,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
if( undoCounter > 0 ) {
if (undoCounter > 0) {
// The rolledback operations are basically in flight journal writes. To avoid getting these the end user
// should do sync writes to the journal.
if (LOG.isInfoEnabled()) {
@ -1019,31 +1039,31 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
try {
ByteSequence sequence = toByteSequence(data);
Location location;
checkpointLock.readLock().lock();
try {
long start = System.currentTimeMillis();
location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
long start2 = System.currentTimeMillis();
process(data, location, before);
long end = System.currentTimeMillis();
if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
if (LOG.isInfoEnabled()) {
LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
}
}
} finally{
} finally {
checkpointLock.readLock().unlock();
}
if (after != null) {
after.run();
}
if (checkpointThread != null && !checkpointThread.isAlive() && opened.get()) {
if (scheduler == null && opened.get()) {
startCheckpoint();
}
return location;
@ -1167,6 +1187,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void visit(KahaUpdateMessageCommand command) throws IOException {
process(command, location);
}
@Override
public void visit(KahaRewrittenDataFileCommand command) throws IOException {
process(command, location);
}
});
}
@ -1323,6 +1348,19 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
protected void process(KahaRewrittenDataFileCommand command, Location location) throws IOException {
final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
// Mark the current journal file as a compacted file so that gc checks can skip
// over logs that are smaller compaction type logs.
DataFile current = journal.getDataFileById(location.getDataFileId());
current.setTypeCode(command.getRewriteType());
// Move offset so that next location read jumps to next file.
location.setOffset(journalMaxFileLength);
}
}
// /////////////////////////////////////////////////////////////////
// These methods do the actual index updates.
// /////////////////////////////////////////////////////////////////
@ -1595,7 +1633,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
tx.store(metadata.page, metadataMarshaller, true);
pageFile.flush();
if( cleanup ) {
if (cleanup) {
final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
@ -1743,6 +1781,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
LOG.trace("gc candidates: " + gcCandidateSet);
LOG.trace("ackMessageFileMap: " + metadata.ackMessageFileMap);
}
boolean ackMessageFileMapMod = false;
Iterator<Integer> candidates = gcCandidateSet.iterator();
while (candidates.hasNext()) {
@ -1768,9 +1807,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
if (!gcCandidateSet.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
}
LOG.debug("Cleanup removing the data files: {}", gcCandidateSet);
journal.removeDataFiles(gcCandidateSet);
for (Integer candidate : gcCandidateSet) {
for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
@ -1780,12 +1817,153 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
if (ackMessageFileMapMod) {
checkpointUpdate(tx, false);
}
} else {
if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) {
// First check length of journal to make sure it makes sense to even try.
//
// If there is only one journal file with Acks in it we don't need to move
// it since it won't be chained to any later logs.
//
// If the logs haven't grown since the last time then we need to compact
// otherwise there seems to still be room for growth and we don't need to incur
// the overhead. Depending on configuration this check can be avoided and
// Ack compaction will run any time the store has not GC'd a journal file in
// the configured amount of cycles.
if (metadata.ackMessageFileMap.size() > 1 &&
(journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) {
LOG.trace("No files GC'd checking if threshold to ACK compaction has been met.");
try {
scheduler.execute(new AckCompactionRunner());
} catch (Exception ex) {
LOG.warn("Error on queueing the Ack Compactor", ex);
}
} else {
LOG.trace("Journal activity detected, no Ack compaction scheduled.");
}
checkPointCyclesWithNoGC = 0;
} else {
LOG.trace("Not yet time to check for compaction: {} of {} cycles",
checkPointCyclesWithNoGC, getCompactAcksAfterNoGC());
}
journalLogOnLastCompactionCheck = journal.getCurrentDataFileId();
}
}
LOG.debug("Checkpoint done.");
}
private final class AckCompactionRunner implements Runnable {
@Override
public void run() {
// Lock index to capture the ackMessageFileMap data
indexLock.writeLock().lock();
// Map keys might not be sorted, find the earliest log file to forward acks
// from and move only those, future cycles can chip away at more as needed.
// We won't move files that are themselves rewritten on a previous compaction.
List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
Collections.sort(journalFileIds);
int journalToAdvance = -1;
for (Integer journalFileId : journalFileIds) {
DataFile current = journal.getDataFileById(journalFileId);
if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
journalToAdvance = journalFileId;
break;
}
}
// Check if we found one, or if we only found the current file being written to.
if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
return;
}
Set<Integer> journalLogsReferenced =
new HashSet<Integer>(metadata.ackMessageFileMap.get(journalToAdvance));
indexLock.writeLock().unlock();
try {
// Background rewrite of the old acks
forwardAllAcks(journalToAdvance, journalLogsReferenced);
// Checkpoint with changes from the ackMessageFileMap
checkpointUpdate(false);
} catch (IOException ioe) {
LOG.error("Checkpoint failed", ioe);
brokerService.handleIOException(ioe);
} catch (Throwable e) {
LOG.error("Checkpoint failed", e);
brokerService.handleIOException(IOExceptionSupport.create(e));
}
}
}
private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException {
LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead);
DataFile forwardsFile = journal.reserveDataFile();
LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile);
Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>();
try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) {
KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand();
compactionMarker.setSourceDataFileId(journalToRead);
compactionMarker.setRewriteType(COMPACTED_JOURNAL_FILE);
ByteSequence payload = toByteSequence(compactionMarker);
appender.storeItem(payload, Journal.USER_RECORD_TYPE, isEnableJournalDiskSyncs());
LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead);
Location nextLocation = journal.getNextLocation(new Location(journalToRead, 0));
while (nextLocation != null && nextLocation.getDataFileId() == journalToRead) {
JournalCommand<?> command = null;
try {
command = load(nextLocation);
} catch (IOException ex) {
LOG.trace("Error loading command during ack forward: {}", nextLocation);
}
if (command != null && command instanceof KahaRemoveMessageCommand) {
payload = toByteSequence(command);
Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, isEnableJournalDiskSyncs());
updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced);
}
nextLocation = journal.getNextLocation(nextLocation);
}
}
LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations);
// Lock index while we update the ackMessageFileMap.
indexLock.writeLock().lock();
// Update the ack map with the new locations of the acks
for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) {
Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey());
if (referenceFileIds == null) {
referenceFileIds = new HashSet<Integer>();
referenceFileIds.addAll(entry.getValue());
metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds);
} else {
referenceFileIds.addAll(entry.getValue());
}
}
// remove the old location data from the ack map so that the old journal log file can
// be removed on next GC.
metadata.ackMessageFileMap.remove(journalToRead);
indexLock.writeLock().unlock();
LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
}
final Runnable nullCompletionCallback = new Runnable() {
@Override
public void run() {
@ -1943,7 +2121,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
class StoredDestination {
MessageOrderIndex orderIndex = new MessageOrderIndex();
@ -2708,7 +2885,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
runWithIndexLock.sequenceAssignedWithIndexLocked(seq);
}
}
}
class RemoveOperation extends Operation<KahaRemoveMessageCommand> {
@ -2728,7 +2904,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// /////////////////////////////////////////////////////////////////
private PageFile createPageFile() throws IOException {
if( indexDirectory == null ) {
if (indexDirectory == null) {
indexDirectory = directory;
}
IOHelper.mkdirs(indexDirectory);
@ -3456,4 +3632,43 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void setPreallocationStrategy(String preallocationStrategy) {
this.preallocationStrategy = preallocationStrategy;
}
public int getCompactAcksAfterNoGC() {
return compactAcksAfterNoGC;
}
/**
* Sets the number of GC cycles where no journal logs were removed before an attempt to
* move forward all the acks in the last log that contains them and is otherwise unreferenced.
* <p>
* A value of -1 will disable this feature.
*
* @param compactAcksAfterNoGC
* Number of empty GC cycles before we rewrite old ACKS.
*/
public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
this.compactAcksAfterNoGC = compactAcksAfterNoGC;
}
/**
* Returns whether Ack compaction will ignore that the store is still growing
* and run more often.
*
* @return the compactAcksIgnoresStoreGrowth current value.
*/
public boolean isCompactAcksIgnoresStoreGrowth() {
return compactAcksIgnoresStoreGrowth;
}
/**
* Configure if Ack compaction will occur regardless of continued growth of the
* journal logs meaning that the store has not run out of space yet. Because the
* compaction operation can be costly this value is defaulted to off and the Ack
* compaction is only done when it seems that the store cannot grow and larger.
*
* @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
*/
public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth;
}
}

View File

@ -30,6 +30,7 @@ import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand;
import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand;
import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
@ -84,4 +85,7 @@ public class Visitor {
public void visit(KahaUpdateMessageCommand kahaUpdateMessageCommand) throws IOException {
}
public void visit(KahaRewrittenDataFileCommand kahaUpdateMessageCommand) throws IOException {
}
}

View File

@ -18,29 +18,23 @@ package org.apache.activemq.store.kahadb.disk.journal;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DataFile
*
*
*/
public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFile> {
private static final Logger LOG = LoggerFactory.getLogger(DataFile.class);
public final static byte STANDARD_LOG_FILE = 0x0;
protected final File file;
protected final Integer dataFileId;
protected volatile int length;
protected int typeCode = STANDARD_LOG_FILE;
protected final SequenceSet corruptedBlocks = new SequenceSet();
DataFile(File file, int number) {
@ -57,6 +51,14 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
return dataFileId;
}
public int getTypeCode() {
return typeCode;
}
public void setTypeCode(int typeCode) {
this.typeCode = typeCode;
}
public synchronized int getLength() {
return length;
}
@ -70,7 +72,7 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
}
@Override
public synchronized String toString() {
public synchronized String toString() {
return file.getName() + " number = " + dataFileId + " , length = " + length;
}
@ -95,7 +97,7 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
}
@Override
public int compareTo(DataFile df) {
public int compareTo(DataFile df) {
return dataFileId - df.dataFileId;
}
@ -112,5 +114,4 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
public int hashCode() {
return dataFileId;
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -19,7 +19,6 @@ package org.apache.activemq.store.kahadb.disk.journal;
import java.io.IOException;
import java.util.Map;
import org.apache.activemq.store.kahadb.AbstractKahaDBStore;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.slf4j.Logger;
@ -28,8 +27,6 @@ import org.slf4j.LoggerFactory;
/**
* Optimized Store reader and updater. Single threaded and synchronous. Use in
* conjunction with the DataFileAccessorPool of concurrent use.
*
*
*/
final class DataFileAccessor {

View File

@ -54,33 +54,6 @@ class DataFileAppender implements FileAppender {
protected boolean running;
private Thread thread;
public static class WriteKey {
private final int file;
private final long offset;
private final int hash;
public WriteKey(Location item) {
file = item.getDataFileId();
offset = item.getOffset();
// TODO: see if we can build a better hash
hash = (int)(file ^ offset);
}
@Override
public int hashCode() {
return hash;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof WriteKey) {
WriteKey di = (WriteKey)obj;
return di.file == file && di.offset == offset;
}
return false;
}
}
public class WriteBatch {
public final DataFile dataFile;
@ -206,7 +179,7 @@ class DataFileAppender implements FileAppender {
while ( true ) {
if (nextWriteBatch == null) {
DataFile file = journal.getCurrentWriteFile();
DataFile file = journal.getOrCreateCurrentWriteFile();
if( file.getLength() + write.location.getSize() >= journal.getMaxFileLength() ) {
file = journal.rotateWriteFile();
}
@ -287,9 +260,8 @@ class DataFileAppender implements FileAppender {
DataFile dataFile = null;
RecoverableRandomAccessFile file = null;
WriteBatch wb = null;
try {
try (DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);) {
DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
while (true) {
// Block till we get a command.

View File

@ -16,11 +16,12 @@
*/
package org.apache.activemq.store.kahadb.disk.journal;
import org.apache.activemq.util.ByteSequence;
import java.io.IOException;
public interface FileAppender {
import org.apache.activemq.util.ByteSequence;
public interface FileAppender extends AutoCloseable {
public static final String PROPERTY_LOG_WRITE_STAT_WINDOW = "org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW";
public static final int maxStat = Integer.parseInt(System.getProperty(PROPERTY_LOG_WRITE_STAT_WINDOW, "0"));
@ -28,5 +29,6 @@ public interface FileAppender {
Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException;
@Override
void close() throws IOException;
}

View File

@ -159,6 +159,7 @@ public class Journal {
protected boolean checkForCorruptionOnStartup;
protected boolean enableAsyncDiskSync = true;
private Timer timer;
private int nextDataFileId = 1;
protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
@ -220,7 +221,9 @@ public class Journal {
}
}
getCurrentWriteFile();
nextDataFileId = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
getOrCreateCurrentWriteFile();
if (preallocationStrategy != PreallocationStrategy.SPARSE_FILE && maxFileLength != DEFAULT_MAX_FILE_LENGTH) {
LOG.warn("You are using a preallocation strategy and journal maxFileLength which should be benchmarked accordingly to not introduce unexpected latencies.");
@ -345,6 +348,7 @@ public class Journal {
LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e);
}
}
private static byte[] bytes(String string) {
try {
return string.getBytes("UTF-8");
@ -360,16 +364,17 @@ public class Journal {
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
while( true ) {
while (true) {
int size = checkBatchRecord(reader, location.getOffset());
if ( size>=0 && location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size <= dataFile.getLength()) {
location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) {
location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size);
} else {
// Perhaps it's just some corruption... scan through the file to find the next valid batch record. We
// Perhaps it's just some corruption... scan through the
// file to find the next valid batch record. We
// may have subsequent valid batch records.
int nextOffset = findNextBatchRecord(reader, location.getOffset()+1);
if( nextOffset >=0 ) {
int nextOffset = findNextBatchRecord(reader, location.getOffset() + 1);
if (nextOffset >= 0) {
Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence);
dataFile.corruptedBlocks.add(sequence);
@ -391,9 +396,9 @@ public class Journal {
totalLength.addAndGet(dataFile.getLength() - existingLen);
}
if( !dataFile.corruptedBlocks.isEmpty() ) {
if (!dataFile.corruptedBlocks.isEmpty()) {
// Is the end of the data file corrupted?
if( dataFile.corruptedBlocks.getTail().getLast()+1 == location.getOffset() ) {
if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) {
dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
}
}
@ -407,19 +412,19 @@ public class Journal {
ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
int pos = 0;
while( true ) {
while (true) {
pos = bs.indexOf(header, pos);
if( pos >= 0 ) {
return offset+pos;
if (pos >= 0) {
return offset + pos;
} else {
// need to load the next data chunck in..
if( bs.length != data.length ) {
if (bs.length != data.length) {
// If we had a short read then we were at EOF
return -1;
}
offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length;
offset += bs.length - BATCH_CONTROL_RECORD_HEADER.length;
bs = new ByteSequence(data, 0, reader.read(offset, data));
pos=0;
pos = 0;
}
}
}
@ -431,34 +436,34 @@ public class Journal {
reader.readFully(offset, controlRecord);
// Assert that it's a batch record.
for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) {
if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) {
// Assert that it's a batch record.
for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) {
if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) {
return -1;
}
}
int size = controlIs.readInt();
if( size > MAX_BATCH_SIZE ) {
if (size > MAX_BATCH_SIZE) {
return -1;
}
if( isChecksum() ) {
if (isChecksum()) {
long expectedChecksum = controlIs.readLong();
if( expectedChecksum == 0 ) {
if (expectedChecksum == 0) {
// Checksuming was not enabled when the record was stored.
// we can't validate the record :(
return size;
}
byte data[] = new byte[size];
reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data);
reader.readFully(offset + BATCH_CONTROL_RECORD_SIZE, data);
Checksum checksum = new Adler32();
checksum.update(data, 0, data.length);
if( expectedChecksum!=checksum.getValue() ) {
if (expectedChecksum != checksum.getValue()) {
return -1;
}
}
@ -474,15 +479,22 @@ public class Journal {
return totalLength.get();
}
synchronized DataFile getCurrentWriteFile() throws IOException {
synchronized DataFile getOrCreateCurrentWriteFile() throws IOException {
if (dataFiles.isEmpty()) {
rotateWriteFile();
}
return dataFiles.getTail();
DataFile current = dataFiles.getTail();
if (current != null) {
return current;
} else {
return rotateWriteFile();
}
}
synchronized DataFile rotateWriteFile() {
int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
int nextNum = nextDataFileId++;
File file = getFile(nextNum);
DataFile nextWriteFile = new DataFile(file, nextNum);
fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
@ -491,6 +503,20 @@ public class Journal {
return nextWriteFile;
}
public synchronized DataFile reserveDataFile() {
int nextNum = nextDataFileId++;
File file = getFile(nextNum);
DataFile reservedDataFile = new DataFile(file, nextNum);
fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile);
fileByFileMap.put(file, reservedDataFile);
if (dataFiles.isEmpty()) {
dataFiles.addLast(reservedDataFile);
} else {
dataFiles.getTail().linkBefore(reservedDataFile);
}
return reservedDataFile;
}
public File getFile(int nextNum) {
String fileName = filePrefix + nextNum + fileSuffix;
File file = new File(directory, fileName);
@ -517,10 +543,6 @@ public class Journal {
return dataFile.getFile();
}
private DataFile getNextDataFile(DataFile dataFile) {
return dataFile.getNext();
}
public void close() throws IOException {
synchronized (this) {
if (!started) {
@ -559,6 +581,7 @@ public class Journal {
DataFile dataFile = i.next();
result &= dataFile.delete();
}
totalLength.set(0);
fileMap.clear();
fileByFileMap.clear();
@ -574,11 +597,11 @@ public class Journal {
public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
for (Integer key : files) {
// Can't remove the data file (or subsequent files) that is currently being written to.
if( key >= lastAppendLocation.get().getDataFileId() ) {
if (key >= lastAppendLocation.get().getDataFileId()) {
continue;
}
DataFile dataFile = fileMap.get(key);
if( dataFile!=null ) {
if (dataFile != null) {
forceRemoveDataFile(dataFile);
}
}
@ -607,7 +630,7 @@ public class Journal {
LOG.debug("Successfully moved data file");
} else {
LOG.debug("Deleting data file: {}", dataFile);
if ( dataFile.delete() ) {
if (dataFile.delete()) {
LOG.debug("Discarded data file: {}", dataFile);
} else {
LOG.warn("Failed to discard data file : {}", dataFile.getFile());
@ -644,7 +667,7 @@ public class Journal {
if (cur == null) {
if (location == null) {
DataFile head = dataFiles.getHead();
if( head == null ) {
if (head == null) {
return null;
}
cur = new Location();
@ -667,7 +690,7 @@ public class Journal {
// Did it go into the next file??
if (dataFile.getLength() <= cur.getOffset()) {
dataFile = getNextDataFile(dataFile);
dataFile = dataFile.getNext();
if (dataFile == null) {
return null;
} else {
@ -796,10 +819,35 @@ public class Journal {
this.archiveDataLogs = archiveDataLogs;
}
synchronized public Integer getCurrentDataFileId() {
if (dataFiles.isEmpty())
public synchronized DataFile getDataFileById(int dataFileId) {
if (dataFiles.isEmpty()) {
return null;
return dataFiles.getTail().getDataFileId();
}
return fileMap.get(Integer.valueOf(dataFileId));
}
public synchronized DataFile getCurrentDataFile() {
if (dataFiles.isEmpty()) {
return null;
}
DataFile current = dataFiles.getTail();
if (current != null) {
return current;
} else {
return null;
}
}
public synchronized Integer getCurrentDataFileId() {
DataFile current = getCurrentDataFile();
if (current != null) {
return current.getDataFileId();
} else {
return null;
}
}
/**

View File

@ -0,0 +1,297 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb.disk.journal;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* File Appender instance that performs batched writes in the thread where the write is
* queued. This appender does not honor the maxFileLength value in the journal as the
* files created here are out-of-band logs used for other purposes such as journal level
* compaction.
*/
public class TargetedDataFileAppender implements FileAppender {
private static final Logger LOG = LoggerFactory.getLogger(TargetedDataFileAppender.class);
private final Journal journal;
private final DataFile target;
private final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
private final int maxWriteBatchSize;
private boolean closed;
private boolean preallocate;
private WriteBatch nextWriteBatch;
private int statIdx = 0;
private int[] stats = new int[maxStat];
public class WriteBatch {
protected final int offset;
public final DataFile dataFile;
public final LinkedNodeList<Journal.WriteCommand> writes = new LinkedNodeList<Journal.WriteCommand>();
public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
public AtomicReference<IOException> exception = new AtomicReference<IOException>();
public WriteBatch(DataFile dataFile, int offset) {
this.dataFile = dataFile;
this.offset = offset;
this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
this.size = Journal.BATCH_CONTROL_RECORD_SIZE;
journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
}
public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws IOException {
this(dataFile, offset);
append(write);
}
public boolean canAppend(Journal.WriteCommand write) {
int newSize = size + write.location.getSize();
if (newSize >= maxWriteBatchSize) {
return false;
}
return true;
}
public void append(Journal.WriteCommand write) throws IOException {
this.writes.addLast(write);
write.location.setDataFileId(dataFile.getDataFileId());
write.location.setOffset(offset + size);
int s = write.location.getSize();
size += s;
dataFile.incrementLength(s);
journal.addToTotalLength(s);
}
}
/**
* Construct a Store writer
*/
public TargetedDataFileAppender(Journal journal, DataFile target) {
this.journal = journal;
this.target = target;
this.inflightWrites = this.journal.getInflightWrites();
this.maxWriteBatchSize = this.journal.getWriteBatchSize();
}
@Override
public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
checkClosed();
// Write the packet our internal buffer.
int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
final Location location = new Location();
location.setSize(size);
location.setType(type);
Journal.WriteCommand write = new Journal.WriteCommand(location, data, sync);
enqueueWrite(write);
if (sync) {
writePendingBatch();
}
return location;
}
@Override
public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
checkClosed();
// Write the packet our internal buffer.
int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
final Location location = new Location();
location.setSize(size);
location.setType(type);
Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete);
enqueueWrite(write);
return location;
}
@Override
public void close() throws IOException {
if (!closed) {
if (nextWriteBatch != null) {
// force sync of current in-progress batched write.
LOG.debug("Close of targeted appender flushing last batch.");
writePendingBatch();
}
closed = true;
}
}
//----- Appender Configuration -------------------------------------------//
public boolean isPreallocate() {
return preallocate;
}
public void setPreallocate(boolean preallocate) {
this.preallocate = preallocate;
}
//----- Internal Implementation ------------------------------------------//
private void checkClosed() throws IOException {
if (closed) {
throw new IOException("The appender is clsoed");
}
}
private WriteBatch enqueueWrite(Journal.WriteCommand write) throws IOException {
while (true) {
if (nextWriteBatch == null) {
nextWriteBatch = new WriteBatch(target, target.getLength(), write);
break;
} else {
// Append to current batch if possible..
if (nextWriteBatch.canAppend(write)) {
nextWriteBatch.append(write);
break;
} else {
// Flush current batch and start a new one.
writePendingBatch();
nextWriteBatch = null;
}
}
}
if (!write.sync) {
inflightWrites.put(new Journal.WriteKey(write.location), write);
}
return nextWriteBatch;
}
private void writePendingBatch() throws IOException {
DataFile dataFile = nextWriteBatch.dataFile;
try (RecoverableRandomAccessFile file = dataFile.openRandomAccessFile();
DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);) {
// preallocate on first open of new file (length == 0) if configured to do so.
// NOTE: dataFile.length cannot be used because it is updated in enqueue
if (file.length() == 0L && isPreallocate()) {
journal.preallocateEntireJournalDataFile(file);
}
Journal.WriteCommand write = nextWriteBatch.writes.getHead();
// Write an empty batch control record.
buff.reset();
buff.writeInt(Journal.BATCH_CONTROL_RECORD_SIZE);
buff.writeByte(Journal.BATCH_CONTROL_RECORD_TYPE);
buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC);
buff.writeInt(0);
buff.writeLong(0);
while (write != null) {
buff.writeInt(write.location.getSize());
buff.writeByte(write.location.getType());
buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
write = write.getNext();
}
// append 'unset' next batch (5 bytes) so read can always find eof
buff.writeInt(0);
buff.writeByte(0);
ByteSequence sequence = buff.toByteSequence();
// Now we can fill in the batch control record properly.
buff.reset();
buff.skip(5 + Journal.BATCH_CONTROL_RECORD_MAGIC.length);
buff.writeInt(sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5);
if (journal.isChecksum()) {
Checksum checksum = new Adler32();
checksum.update(sequence.getData(),
sequence.getOffset() + Journal.BATCH_CONTROL_RECORD_SIZE,
sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5);
buff.writeLong(checksum.getValue());
}
// Now do the 1 big write.
file.seek(nextWriteBatch.offset);
if (maxStat > 0) {
if (statIdx < maxStat) {
stats[statIdx++] = sequence.getLength();
} else {
long all = 0;
for (; statIdx > 0;) {
all += stats[--statIdx];
}
LOG.trace("Ave writeSize: {}", all / maxStat);
}
}
file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
ReplicationTarget replicationTarget = journal.getReplicationTarget();
if (replicationTarget != null) {
replicationTarget.replicate(nextWriteBatch.writes.getHead().location, sequence, true);
}
file.sync();
signalDone(nextWriteBatch);
} catch (IOException e) {
LOG.info("Journal failed while writing at: {}", nextWriteBatch.offset);
throw e;
}
}
private void signalDone(WriteBatch writeBatch) {
// Now that the data is on disk, remove the writes from the in
// flight cache and signal any onComplete requests.
Journal.WriteCommand write = writeBatch.writes.getHead();
while (write != null) {
if (!write.sync) {
inflightWrites.remove(new Journal.WriteKey(write.location));
}
if (write.onComplete != null) {
try {
write.onComplete.run();
} catch (Throwable e) {
LOG.info("Add exception was raised while executing the run command for onComplete", e);
}
}
write = write.getNext();
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -16,19 +16,18 @@
*/
package org.apache.activemq.store.kahadb.disk.util;
import org.apache.activemq.util.ByteSequence;
import java.io.DataInput;
import java.io.IOException;
import java.io.InputStream;
import java.io.UTFDataFormatException;
import org.apache.activemq.util.ByteSequence;
/**
* Optimized ByteArrayInputStream that can be used more than once
*
*
*/
public final class DataByteArrayInputStream extends InputStream implements DataInput {
public final class DataByteArrayInputStream extends InputStream implements DataInput, AutoCloseable {
private byte[] buf;
private int pos;
private int offset;
@ -137,6 +136,7 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
* @return the next byte of data, or <code>-1</code> if the end of the
* stream has been reached.
*/
@Override
public int read() {
return (pos < length) ? (buf[pos++] & 0xff) : -1;
}
@ -152,6 +152,7 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
* <code>-1</code> if there is no more data because the end of the
* stream has been reached.
*/
@Override
public int read(byte b[], int off, int len) {
if (b == null) {
throw new NullPointerException();
@ -174,18 +175,22 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
* @return the number of bytes that can be read from the input stream
* without blocking.
*/
@Override
public int available() {
return length - pos;
}
@Override
public void readFully(byte[] b) {
read(b, 0, b.length);
}
@Override
public void readFully(byte[] b, int off, int len) {
read(b, off, len);
}
@Override
public int skipBytes(int n) {
if (pos + n > length) {
n = length - pos;
@ -197,39 +202,47 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
return n;
}
@Override
public boolean readBoolean() {
return read() != 0;
}
@Override
public byte readByte() {
return (byte)read();
}
@Override
public int readUnsignedByte() {
return read();
}
@Override
public short readShort() {
this.read(work, 0, 2);
return (short) (((work[0] & 0xff) << 8) | (work[1] & 0xff));
}
@Override
public int readUnsignedShort() {
this.read(work, 0, 2);
return (int) (((work[0] & 0xff) << 8) | (work[1] & 0xff));
return ((work[0] & 0xff) << 8) | (work[1] & 0xff);
}
@Override
public char readChar() {
this.read(work, 0, 2);
return (char) (((work[0] & 0xff) << 8) | (work[1] & 0xff));
}
@Override
public int readInt() {
this.read(work, 0, 4);
return ((work[0] & 0xff) << 24) | ((work[1] & 0xff) << 16) |
((work[2] & 0xff) << 8) | (work[3] & 0xff);
}
@Override
public long readLong() {
this.read(work, 0, 8);
@ -241,14 +254,17 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
return ((i1 & 0xffffffffL) << 32) | (i2 & 0xffffffffL);
}
@Override
public float readFloat() throws IOException {
return Float.intBitsToFloat(readInt());
}
@Override
public double readDouble() throws IOException {
return Double.longBitsToDouble(readLong());
}
@Override
public String readLine() {
int start = pos;
while (pos < length) {
@ -267,6 +283,7 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
return new String(buf, start, pos);
}
@Override
public String readUTF() throws IOException {
int length = readUnsignedShort();
int endPos = pos + length;

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -16,21 +16,18 @@
*/
package org.apache.activemq.store.kahadb.disk.util;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.util.ByteSequence;
import java.io.DataOutput;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UTFDataFormatException;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.util.ByteSequence;
/**
* Optimized ByteArrayOutputStream
*
*
*/
public class DataByteArrayOutputStream extends OutputStream implements DataOutput {
public class DataByteArrayOutputStream extends OutputStream implements DataOutput, AutoCloseable {
private static final int DEFAULT_SIZE = PageFile.DEFAULT_PAGE_SIZE;
protected byte buf[];
protected int pos;
@ -88,6 +85,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
* @param b the byte to be written.
* @throws IOException
*/
@Override
public void write(int b) throws IOException {
int newcount = pos + 1;
ensureEnoughBuffer(newcount);
@ -105,6 +103,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
* @param len the number of bytes to write.
* @throws IOException
*/
@Override
public void write(byte b[], int off, int len) throws IOException {
if (len == 0) {
return;
@ -146,18 +145,21 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
return pos;
}
@Override
public void writeBoolean(boolean v) throws IOException {
ensureEnoughBuffer(pos + 1);
buf[pos++] = (byte)(v ? 1 : 0);
onWrite();
}
@Override
public void writeByte(int v) throws IOException {
ensureEnoughBuffer(pos + 1);
buf[pos++] = (byte)(v >>> 0);
onWrite();
}
@Override
public void writeShort(int v) throws IOException {
ensureEnoughBuffer(pos + 2);
buf[pos++] = (byte)(v >>> 8);
@ -165,6 +167,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
onWrite();
}
@Override
public void writeChar(int v) throws IOException {
ensureEnoughBuffer(pos + 2);
buf[pos++] = (byte)(v >>> 8);
@ -172,6 +175,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
onWrite();
}
@Override
public void writeInt(int v) throws IOException {
ensureEnoughBuffer(pos + 4);
buf[pos++] = (byte)(v >>> 24);
@ -181,6 +185,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
onWrite();
}
@Override
public void writeLong(long v) throws IOException {
ensureEnoughBuffer(pos + 8);
buf[pos++] = (byte)(v >>> 56);
@ -194,14 +199,17 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
onWrite();
}
@Override
public void writeFloat(float v) throws IOException {
writeInt(Float.floatToIntBits(v));
}
@Override
public void writeDouble(double v) throws IOException {
writeLong(Double.doubleToLongBits(v));
}
@Override
public void writeBytes(String s) throws IOException {
int length = s.length();
for (int i = 0; i < length; i++) {
@ -209,6 +217,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
}
}
@Override
public void writeChars(String s) throws IOException {
int length = s.length();
for (int i = 0; i < length; i++) {
@ -218,6 +227,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
}
}
@Override
public void writeUTF(String str) throws IOException {
int strlen = str.length();
int encodedsize = 0;

View File

@ -37,6 +37,7 @@ enum KahaEntryType {
KAHA_REMOVE_SCHEDULED_JOB_COMMAND = 13;
KAHA_REMOVE_SCHEDULED_JOBS_COMMAND = 14;
KAHA_DESTROY_SCHEDULER_COMMAND = 15;
KAHA_REWRITTEN_DATA_FILE_COMMAND = 16;
}
message KahaTraceCommand {
@ -240,6 +241,17 @@ message KahaDestroySchedulerCommand {
required string scheduler=1;
}
message KahaRewrittenDataFileCommand {
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRewrittenDataFileCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
//| option java_type_method = "KahaEntryType";
required int32 sourceDataFileId = 1;
optional int32 rewriteType = 2;
optional bool skipIfSourceExists = 3 [default = true];
}
// TODO things to ponder
// should we move more message fields
// that are set by the sender (and rarely required by the broker

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb.disk.journal;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Test the single threaded DataFileAppender class.
*/
public class TargetedDataFileAppenderTest {
private Journal dataManager;
private TargetedDataFileAppender appender;
private DataFile dataFile;
private File dir;
@Before
public void setUp() throws Exception {
dir = new File("target/tests/TargetedDataFileAppenderTest");
dir.mkdirs();
dataManager = new Journal();
dataManager.setDirectory(dir);
dataManager.start();
dataFile = dataManager.reserveDataFile();
appender = new TargetedDataFileAppender(dataManager, dataFile);
}
@After
public void tearDown() throws Exception {
dataManager.close();
IOHelper.delete(dir);
}
@Test
public void testWritesAreBatched() throws Exception {
final int iterations = 10;
ByteSequence data = new ByteSequence("DATA".getBytes());
for (int i = 0; i < iterations; i++) {
appender.storeItem(data, Journal.USER_RECORD_TYPE, false);
}
assertTrue("Data file should not be empty", dataFile.getLength() > 0);
assertTrue("Data file should be empty", dataFile.getFile().length() == 0);
appender.close();
// at this point most probably dataManager.getInflightWrites().size() >= 0
// as the Thread created in DataFileAppender.enqueue() may not have caught up.
assertTrue("Data file should not be empty", dataFile.getLength() > 0);
assertTrue("Data file should not be empty", dataFile.getFile().length() > 0);
}
@Test
public void testBatchWritesCompleteAfterClose() throws Exception {
final int iterations = 10;
ByteSequence data = new ByteSequence("DATA".getBytes());
for (int i = 0; i < iterations; i++) {
appender.storeItem(data, Journal.USER_RECORD_TYPE, false);
}
appender.close();
// at this point most probably dataManager.getInflightWrites().size() >= 0
// as the Thread created in DataFileAppender.enqueue() may not have caught up.
assertTrue("Data file should not be empty", dataFile.getLength() > 0);
assertTrue("Data file should not be empty", dataFile.getFile().length() > 0);
}
@Test
public void testBatchWriteCallbackCompleteAfterClose() throws Exception {
final int iterations = 10;
final CountDownLatch latch = new CountDownLatch(iterations);
ByteSequence data = new ByteSequence("DATA".getBytes());
for (int i = 0; i < iterations; i++) {
appender.storeItem(data, Journal.USER_RECORD_TYPE, new Runnable() {
@Override
public void run() {
latch.countDown();
}
});
}
appender.close();
// at this point most probably dataManager.getInflightWrites().size() >= 0
// as the Thread created in DataFileAppender.enqueue() may not have caught up.
assertTrue("queued data is written", latch.await(5, TimeUnit.SECONDS));
assertTrue("Data file should not be empty", dataFile.getLength() > 0);
assertTrue("Data file should not be empty", dataFile.getFile().length() > 0);
}
}

View File

@ -87,9 +87,7 @@ public class TransactedStoreUsageSuspendResumeTest {
do {
Message message = consumer.receive(5000);
if (message != null) {
if ((messagesReceivedCountDown.getCount() % (MAX_MESSAGES / 5)) == 0) {
session.commit();
}
session.commit();
messagesReceivedCountDown.countDown();
}
if (messagesReceivedCountDown.getCount() % 500 == 0) {