From 62bdbb0db5dc4354f0e00fd5259b3db53eb1432d Mon Sep 17 00:00:00 2001 From: gtully Date: Fri, 29 Apr 2016 16:57:03 +0100 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5603 - add preallocationScope=full_journal_async that will preallocate a journal in advance or use to avoid latency jitter on journal rotation. Added none option to disable preallocation --- .../store/kahadb/MessageDatabase.java | 49 ++- .../store/kahadb/disk/journal/DataFile.java | 11 + .../kahadb/disk/journal/DataFileAppender.java | 38 +- .../store/kahadb/disk/journal/Journal.java | 397 ++++++++++++------ .../journal/TargetedDataFileAppender.java | 8 +- ...JournalCorruptionEofIndexRecoveryTest.java | 5 +- .../JournalCorruptionIndexRecoveryTest.java | 2 + .../kahadb/disk/journal/JournalTest.java | 1 + .../PreallocationJournalLatencyTest.java | 15 +- .../journal/TargetedDataFileAppenderTest.java | 1 + .../bugs/AMQ2584ConcurrentDlqTest.java | 2 + .../org/apache/activemq/bugs/AMQ3120Test.java | 4 +- .../org/apache/activemq/bugs/AMQ4323Test.java | 4 +- .../store/kahadb/KahaDBIndexLocationTest.java | 2 +- 14 files changed, 344 insertions(+), 195 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 8bb9491828..3e754f78e8 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -257,7 +257,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; boolean enableIndexWriteAsync = false; int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; - private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name(); + private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name(); private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name(); protected AtomicBoolean opened = new AtomicBoolean(); @@ -1860,33 +1860,38 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe @Override public void run() { + + int journalToAdvance = -1; + Set journalLogsReferenced = new HashSet(); + // Lock index to capture the ackMessageFileMap data indexLock.writeLock().lock(); - // Map keys might not be sorted, find the earliest log file to forward acks - // from and move only those, future cycles can chip away at more as needed. - // We won't move files that are themselves rewritten on a previous compaction. - List journalFileIds = new ArrayList(metadata.ackMessageFileMap.keySet()); - Collections.sort(journalFileIds); - int journalToAdvance = -1; - for (Integer journalFileId : journalFileIds) { - DataFile current = journal.getDataFileById(journalFileId); - if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { - journalToAdvance = journalFileId; - break; + try { + // Map keys might not be sorted, find the earliest log file to forward acks + // from and move only those, future cycles can chip away at more as needed. + // We won't move files that are themselves rewritten on a previous compaction. + List journalFileIds = new ArrayList(metadata.ackMessageFileMap.keySet()); + Collections.sort(journalFileIds); + for (Integer journalFileId : journalFileIds) { + DataFile current = journal.getDataFileById(journalFileId); + if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { + journalToAdvance = journalFileId; + break; + } } + + // Check if we found one, or if we only found the current file being written to. + if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) { + return; + } + + journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance)); + + } finally { + indexLock.writeLock().unlock(); } - // Check if we found one, or if we only found the current file being written to. - if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) { - return; - } - - Set journalLogsReferenced = - new HashSet(metadata.ackMessageFileMap.get(journalToAdvance)); - - indexLock.writeLock().unlock(); - try { // Background rewrite of the old acks forwardAllAcks(journalToAdvance, journalLogsReferenced); diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java index 126d82b351..5b96adfc43 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java @@ -36,6 +36,7 @@ public class DataFile extends LinkedNode implements Comparable implements Comparable= journal.getMaxFileLength() ) { - file = journal.rotateWriteFile(); - } - - + DataFile file = journal.getCurrentDataFile(write.location.getSize()); nextWriteBatch = newWriteBatch(write, file); enqueueMutex.notifyAll(); break; @@ -285,23 +283,14 @@ class DataFileAppender implements FileAppender { dataFile.closeRandomAccessFile(file); } dataFile = wb.dataFile; - file = dataFile.openRandomAccessFile(); - // pre allocate on first open of new file (length==0) - // note dataFile.length cannot be used because it is updated in enqueue - if (file.length() == 0l) { - journal.preallocateEntireJournalDataFile(file); - } + file = dataFile.appendRandomAccessFile(); } Journal.WriteCommand write = wb.writes.getHead(); // Write an empty batch control record. buff.reset(); - buff.writeInt(Journal.BATCH_CONTROL_RECORD_SIZE); - buff.writeByte(Journal.BATCH_CONTROL_RECORD_TYPE); - buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC); - buff.writeInt(0); - buff.writeLong(0); + buff.write(EMPTY_BATCH_CONTROL_RECORD); boolean forceToDisk = false; while (write != null) { @@ -312,19 +301,18 @@ class DataFileAppender implements FileAppender { write = write.getNext(); } - // append 'unset' next batch (5 bytes) so read can always find eof - buff.writeInt(0); - buff.writeByte(0); + // append 'unset', zero length next batch so read can always find eof + buff.write(Journal.EOF_RECORD); ByteSequence sequence = buff.toByteSequence(); // Now we can fill in the batch control record properly. buff.reset(); - buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length); - buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE - 5); + buff.skip(RECORD_HEAD_SPACE + Journal.BATCH_CONTROL_RECORD_MAGIC.length); + buff.writeInt(sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - Journal.EOF_RECORD.length); if( journal.isChecksum() ) { Checksum checksum = new Adler32(); - checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE - 5); + checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE-Journal.EOF_RECORD.length); buff.writeLong(checksum.getValue()); } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java index da0d5b432d..182a3d7757 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java @@ -23,19 +23,16 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; import java.nio.channels.FileChannel; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; -import java.util.TreeMap; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.zip.Adler32; @@ -43,13 +40,13 @@ import java.util.zip.Checksum; import org.apache.activemq.store.kahadb.disk.util.LinkedNode; import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList; -import org.apache.activemq.store.kahadb.disk.util.SchedulerTimerTask; import org.apache.activemq.store.kahadb.disk.util.Sequence; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.DataByteArrayInputStream; import org.apache.activemq.util.DataByteArrayOutputStream; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.RecoverableRandomAccessFile; +import org.apache.activemq.util.ThreadPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +70,12 @@ public class Journal { public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH"); public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE + BATCH_CONTROL_RECORD_MAGIC.length + 4 + 8; public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader(); + public static final byte[] EMPTY_BATCH_CONTROL_RECORD = createEmptyBatchControlRecordHeader(); + public static final int EOF_INT = ByteBuffer.wrap(new byte[]{'-', 'q', 'M', 'a'}).getInt(); + public static final byte EOF_EOT = '4'; + public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord(); + + private ScheduledExecutorService scheduler; // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss public void corruptRecoveryLocation(Location recoveryPosition) throws IOException { @@ -103,7 +106,9 @@ public class Journal { } public enum PreallocationScope { - ENTIRE_JOURNAL; + ENTIRE_JOURNAL, + ENTIRE_JOURNAL_ASYNC, + NONE; } private static byte[] createBatchControlRecordHeader() { @@ -119,13 +124,39 @@ public class Journal { } } + private static byte[] createEmptyBatchControlRecordHeader() { + try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { + os.writeInt(BATCH_CONTROL_RECORD_SIZE); + os.writeByte(BATCH_CONTROL_RECORD_TYPE); + os.write(BATCH_CONTROL_RECORD_MAGIC); + os.writeInt(0); + os.writeLong(0l); + ByteSequence sequence = os.toByteSequence(); + sequence.compact(); + return sequence.getData(); + } catch (IOException e) { + throw new RuntimeException("Could not create empty batch control record header.", e); + } + } + + private static byte[] createEofBatchAndLocationRecord() { + try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { + os.writeInt(EOF_INT); + os.writeByte(EOF_EOT); + ByteSequence sequence = os.toByteSequence(); + sequence.compact(); + return sequence.getData(); + } catch (IOException e) { + throw new RuntimeException("Could not create eof header.", e); + } + } + public static final String DEFAULT_DIRECTORY = "."; public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; public static final String DEFAULT_FILE_PREFIX = "db-"; public static final String DEFAULT_FILE_SUFFIX = ".log"; public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30; - public static final int PREFERED_DIFF = 1024 * 512; public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4; private static final Logger LOG = LoggerFactory.getLogger(Journal.class); @@ -151,18 +182,21 @@ public class Journal { protected LinkedNodeList dataFiles = new LinkedNodeList(); protected final AtomicReference lastAppendLocation = new AtomicReference(); - protected Runnable cleanupTask; + protected ScheduledFuture cleanupTask; protected AtomicLong totalLength = new AtomicLong(); protected boolean archiveDataLogs; private ReplicationTarget replicationTarget; protected boolean checksum; protected boolean checkForCorruptionOnStartup; protected boolean enableAsyncDiskSync = true; - private Timer timer; private int nextDataFileId = 1; + private Object dataFileIdLock = new Object(); + private final AtomicReference currentDataFile = new AtomicReference<>(null); + private volatile DataFile nextDataFile; - protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL; + protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL_ASYNC; protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE; + private File osKernelCopyTemplateFile = null; public interface DataFileRemovedListener { void fileRemoved(DataFile datafile); @@ -204,13 +238,15 @@ public class Journal { // Sort the list so that we can link the DataFiles together in the // right order. - List l = new ArrayList(fileMap.values()); + LinkedList l = new LinkedList<>(fileMap.values()); Collections.sort(l); for (DataFile df : l) { if (df.getLength() == 0) { // possibly the result of a previous failed write LOG.info("ignoring zero length, partially initialised journal data file: " + df); continue; + } else if (l.getLast().equals(df) && isUnusedPreallocated(df)) { + continue; } dataFiles.addLast(df); fileByFileMap.put(df.getFile(), df); @@ -221,9 +257,31 @@ public class Journal { } } - nextDataFileId = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1; + if (preallocationScope != PreallocationScope.NONE && preallocationStrategy == PreallocationStrategy.OS_KERNEL_COPY) { + // create a template file that will be used to pre-allocate the journal files + if (osKernelCopyTemplateFile == null) { + osKernelCopyTemplateFile = createJournalTemplateFile(); + } + } - getOrCreateCurrentWriteFile(); + scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread schedulerThread = new Thread(r); + schedulerThread.setName("ActiveMQ Journal Scheduled executor"); + schedulerThread.setDaemon(true); + return schedulerThread; + } + }); + + // init current write file + if (dataFiles.isEmpty()) { + nextDataFileId = 1; + rotateWriteFile(); + } else { + currentDataFile.set(dataFiles.getTail()); + nextDataFileId = currentDataFile.get().dataFileId + 1; + } if (preallocationStrategy != PreallocationStrategy.SPARSE_FILE && maxFileLength != DEFAULT_MAX_FILE_LENGTH) { LOG.warn("You are using a preallocation strategy and journal maxFileLength which should be benchmarked accordingly to not introduce unexpected latencies."); @@ -239,23 +297,20 @@ public class Journal { totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength); } - cleanupTask = new Runnable() { + cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { cleanup(); } - }; + }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS); - this.timer = new Timer("KahaDB Scheduler", true); - TimerTask task = new SchedulerTimerTask(cleanupTask); - this.timer.scheduleAtFixedRate(task, DEFAULT_CLEANUP_INTERVAL,DEFAULT_CLEANUP_INTERVAL); long end = System.currentTimeMillis(); LOG.trace("Startup took: "+(end-start)+" ms"); } public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) { - if (PreallocationScope.ENTIRE_JOURNAL == preallocationScope) { + if (PreallocationScope.NONE != preallocationScope) { if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) { doPreallocationKernelCopy(file); @@ -266,58 +321,68 @@ public class Journal { } else { doPreallocationSparseFile(file); } - } else { - LOG.info("Using journal preallocation scope of batch allocation"); } } private void doPreallocationSparseFile(RecoverableRandomAccessFile file) { + final ByteBuffer journalEof = ByteBuffer.wrap(EOF_RECORD); try { - file.seek(maxFileLength - 1); - file.write((byte)0x00); + FileChannel channel = file.getChannel(); + channel.position(0); + channel.write(journalEof); + channel.position(maxFileLength - 5); + journalEof.rewind(); + channel.write(journalEof); + channel.force(false); + channel.position(0); + } catch (ClosedByInterruptException ignored) { + LOG.trace("Could not preallocate journal file with sparse file", ignored); } catch (IOException e) { - LOG.error("Could not preallocate journal file with sparse file! Will continue without preallocation", e); + LOG.error("Could not preallocate journal file with sparse file", e); } } private void doPreallocationZeros(RecoverableRandomAccessFile file) { ByteBuffer buffer = ByteBuffer.allocate(maxFileLength); - + buffer.put(EOF_RECORD); + buffer.rewind(); try { FileChannel channel = file.getChannel(); channel.write(buffer); channel.force(false); channel.position(0); + } catch (ClosedByInterruptException ignored) { + LOG.trace("Could not preallocate journal file with zeros", ignored); } catch (IOException e) { - LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e); + LOG.error("Could not preallocate journal file with zeros", e); } } private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) { - // create a template file that will be used to pre-allocate the journal files - File templateFile = createJournalTemplateFile(); - - RandomAccessFile templateRaf = null; try { - templateRaf = new RandomAccessFile(templateFile, "rw"); - templateRaf.setLength(maxFileLength); - templateRaf.getChannel().force(true); + RandomAccessFile templateRaf = new RandomAccessFile(osKernelCopyTemplateFile, "rw"); templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel()); templateRaf.close(); - templateFile.delete(); + } catch (ClosedByInterruptException ignored) { + LOG.trace("Could not preallocate journal file with kernel copy", ignored); } catch (FileNotFoundException e) { - LOG.error("Could not find the template file on disk at " + templateFile.getAbsolutePath(), e); + LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e); } catch (IOException e) { - LOG.error("Could not transfer the template file to journal, transferFile=" + templateFile.getAbsolutePath(), e); + LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e); } } private File createJournalTemplateFile() { String fileName = "db-log.template"; File rc = new File(directory, fileName); - if (rc.exists()) { - LOG.trace("deleting journal template file because it already exists..."); - rc.delete(); + try (RandomAccessFile templateRaf = new RandomAccessFile(rc, "rw");) { + templateRaf.getChannel().write(ByteBuffer.wrap(EOF_RECORD)); + templateRaf.setLength(maxFileLength); + templateRaf.getChannel().force(true); + } catch (FileNotFoundException e) { + LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e); + } catch (IOException e) { + LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e); } return rc; } @@ -325,6 +390,8 @@ public class Journal { private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) { ByteBuffer buffer = ByteBuffer.allocate(PREALLOC_CHUNK_SIZE); + buffer.put(EOF_RECORD); + buffer.rewind(); try { FileChannel channel = file.getChannel(); @@ -354,6 +421,24 @@ public class Journal { } } + public boolean isUnusedPreallocated(DataFile dataFile) throws IOException { + int firstBatchRecordSize = -1; + if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) { + Location location = new Location(); + location.setDataFileId(dataFile.getDataFileId()); + location.setOffset(0); + + DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); + try { + firstBatchRecordSize = checkBatchRecord(reader, location.getOffset()); + } catch (Exception ignored) { + } finally { + accessorPool.closeDataFileAccessor(reader); + } + } + return firstBatchRecordSize == 0; + } + protected Location recoveryCheck(DataFile dataFile) throws IOException { Location location = new Location(); location.setDataFileId(dataFile.getDataFileId()); @@ -364,6 +449,10 @@ public class Journal { while (true) { int size = checkBatchRecord(reader, location.getOffset()); if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) { + if (size == 0) { + // eof batch record + break; + } location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size); } else { @@ -433,6 +522,12 @@ public class Journal { reader.readFully(offset, controlRecord); + // check for journal eof + if (Arrays.equals(EOF_RECORD, Arrays.copyOfRange(controlRecord, 0, EOF_RECORD.length))) { + // eof batch + return 0; + } + // Assert that it's a batch record. for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) { if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) { @@ -476,42 +571,67 @@ public class Journal { return totalLength.get(); } - synchronized DataFile getOrCreateCurrentWriteFile() throws IOException { - if (dataFiles.isEmpty()) { - rotateWriteFile(); + private void rotateWriteFile() throws IOException { + synchronized (dataFileIdLock) { + DataFile dataFile = nextDataFile; + if (dataFile == null) { + dataFile = newDataFile(); + } + synchronized (currentDataFile) { + fileMap.put(dataFile.getDataFileId(), dataFile); + fileByFileMap.put(dataFile.getFile(), dataFile); + dataFiles.addLast(dataFile); + currentDataFile.set(dataFile); + } + nextDataFile = null; } - - DataFile current = dataFiles.getTail(); - - if (current != null) { - return current; - } else { - return rotateWriteFile(); + if (PreallocationScope.ENTIRE_JOURNAL_ASYNC == preallocationScope) { + preAllocateNextDataFileFuture = scheduler.submit(preAllocateNextDataFileTask); } } - synchronized DataFile rotateWriteFile() { + private Runnable preAllocateNextDataFileTask = new Runnable() { + @Override + public void run() { + if (nextDataFile == null) { + synchronized (dataFileIdLock){ + try { + nextDataFile = newDataFile(); + } catch (IOException e) { + LOG.warn("Failed to proactively allocate data file", e); + } + } + } + } + }; + + private volatile Future preAllocateNextDataFileFuture; + + private DataFile newDataFile() throws IOException { int nextNum = nextDataFileId++; File file = getFile(nextNum); DataFile nextWriteFile = new DataFile(file, nextNum); - fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile); - fileByFileMap.put(file, nextWriteFile); - dataFiles.addLast(nextWriteFile); + preallocateEntireJournalDataFile(nextWriteFile.appendRandomAccessFile()); return nextWriteFile; } - public synchronized DataFile reserveDataFile() { - int nextNum = nextDataFileId++; - File file = getFile(nextNum); - DataFile reservedDataFile = new DataFile(file, nextNum); - fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile); - fileByFileMap.put(file, reservedDataFile); - if (dataFiles.isEmpty()) { - dataFiles.addLast(reservedDataFile); - } else { - dataFiles.getTail().linkBefore(reservedDataFile); + + public DataFile reserveDataFile() { + synchronized (dataFileIdLock) { + int nextNum = nextDataFileId++; + File file = getFile(nextNum); + DataFile reservedDataFile = new DataFile(file, nextNum); + synchronized (currentDataFile) { + fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile); + fileByFileMap.put(file, reservedDataFile); + if (dataFiles.isEmpty()) { + dataFiles.addLast(reservedDataFile); + } else { + dataFiles.getTail().linkBefore(reservedDataFile); + } + } + return reservedDataFile; } - return reservedDataFile; } public File getFile(int nextNum) { @@ -520,9 +640,12 @@ public class Journal { return file; } - synchronized DataFile getDataFile(Location item) throws IOException { + DataFile getDataFile(Location item) throws IOException { Integer key = Integer.valueOf(item.getDataFileId()); - DataFile dataFile = fileMap.get(key); + DataFile dataFile = null; + synchronized (currentDataFile) { + dataFile = fileMap.get(key); + } if (dataFile == null) { LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); @@ -530,29 +653,21 @@ public class Journal { return dataFile; } - synchronized File getFile(Location item) throws IOException { - Integer key = Integer.valueOf(item.getDataFileId()); - DataFile dataFile = fileMap.get(key); - if (dataFile == null) { - LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); - throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); - } - return dataFile.getFile(); - } - public void close() throws IOException { synchronized (this) { if (!started) { return; } - if (this.timer != null) { - this.timer.cancel(); + cleanupTask.cancel(true); + if (preAllocateNextDataFileFuture != null) { + preAllocateNextDataFileFuture.cancel(true); } + ThreadPoolUtils.shutdownGraceful(scheduler, 4000); accessorPool.close(); } // the appender can be calling back to to the journal blocking a close AMQ-5620 appender.close(); - synchronized (this) { + synchronized (currentDataFile) { fileMap.clear(); fileByFileMap.clear(); dataFiles.clear(); @@ -579,37 +694,52 @@ public class Journal { result &= dataFile.delete(); } - totalLength.set(0); - fileMap.clear(); - fileByFileMap.clear(); - lastAppendLocation.set(null); - dataFiles = new LinkedNodeList(); + if (preAllocateNextDataFileFuture != null) { + preAllocateNextDataFileFuture.cancel(true); + } + synchronized (dataFileIdLock) { + if (nextDataFile != null) { + nextDataFile.delete(); + nextDataFile = null; + } + } + totalLength.set(0); + synchronized (currentDataFile) { + fileMap.clear(); + fileByFileMap.clear(); + lastAppendLocation.set(null); + dataFiles = new LinkedNodeList(); + } // reopen open file handles... accessorPool = new DataFileAccessorPool(this); appender = new DataFileAppender(this); return result; } - public synchronized void removeDataFiles(Set files) throws IOException { + public void removeDataFiles(Set files) throws IOException { for (Integer key : files) { // Can't remove the data file (or subsequent files) that is currently being written to. if (key >= lastAppendLocation.get().getDataFileId()) { continue; } - DataFile dataFile = fileMap.get(key); + DataFile dataFile = null; + synchronized (currentDataFile) { + dataFile = fileMap.remove(key); + if (dataFile != null) { + fileByFileMap.remove(dataFile.getFile()); + dataFile.unlink(); + } + } if (dataFile != null) { forceRemoveDataFile(dataFile); } } } - private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException { + private void forceRemoveDataFile(DataFile dataFile) throws IOException { accessorPool.disposeDataFileAccessors(dataFile); - fileByFileMap.remove(dataFile.getFile()); - fileMap.remove(dataFile.getDataFileId()); totalLength.addAndGet(-dataFile.getLength()); - dataFile.unlink(); if (archiveDataLogs) { File directoryArchive = getDirectoryArchive(); if (directoryArchive.exists()) { @@ -657,13 +787,15 @@ public class Journal { return directory.toString(); } - public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException { - + public Location getNextLocation(Location location) throws IOException, IllegalStateException { Location cur = null; while (true) { if (cur == null) { if (location == null) { - DataFile head = dataFiles.getHead(); + DataFile head = null; + synchronized (currentDataFile) { + head = dataFiles.getHead(); + } if (head == null) { return null; } @@ -687,7 +819,9 @@ public class Journal { // Did it go into the next file?? if (dataFile.getLength() <= cur.getOffset()) { - dataFile = dataFile.getNext(); + synchronized (currentDataFile) { + dataFile = dataFile.getNext(); + } if (dataFile == null) { return null; } else { @@ -708,9 +842,14 @@ public class Journal { if (corruptedRange != null) { // skip corruption cur.setSize((int) corruptedRange.range()); - } else if (cur.getType() == 0) { + } else if (cur.getSize() == EOF_INT && cur.getType() == EOF_EOT || + (cur.getType() == 0 && cur.getSize() == 0)) { // eof - jump to next datafile - cur.setOffset(maxFileLength); + // EOF_INT and EOF_EOT replace 0,0 - we need to react to both for + // replay of existing journals + // possibly journal is larger than maxFileLength after config change + cur.setSize(EOF_RECORD.length); + cur.setOffset(Math.max(maxFileLength, dataFile.getLength())); } else if (cur.getType() == USER_RECORD_TYPE) { // Only return user records. return cur; @@ -718,7 +857,7 @@ public class Journal { } } - public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException { + public ByteSequence read(Location location) throws IOException, IllegalStateException { DataFile dataFile = getDataFile(location); DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); ByteSequence rc = null; @@ -816,34 +955,24 @@ public class Journal { this.archiveDataLogs = archiveDataLogs; } - public synchronized DataFile getDataFileById(int dataFileId) { - if (dataFiles.isEmpty()) { - return null; - } - - return fileMap.get(Integer.valueOf(dataFileId)); - } - - public synchronized DataFile getCurrentDataFile() { - if (dataFiles.isEmpty()) { - return null; - } - - DataFile current = dataFiles.getTail(); - - if (current != null) { - return current; - } else { - return null; + public DataFile getDataFileById(int dataFileId) { + synchronized (currentDataFile) { + return fileMap.get(Integer.valueOf(dataFileId)); } } - public synchronized Integer getCurrentDataFileId() { - DataFile current = getCurrentDataFile(); - if (current != null) { - return current.getDataFileId(); - } else { - return null; + public DataFile getCurrentDataFile(int capacity) throws IOException { + synchronized (currentDataFile) { + if (currentDataFile.get().getLength() + capacity >= maxFileLength) { + rotateWriteFile(); + } + return currentDataFile.get(); + } + } + + public Integer getCurrentDataFileId() { + synchronized (currentDataFile) { + return currentDataFile.get().getDataFileId(); } } @@ -853,11 +982,15 @@ public class Journal { * @return files currently being used */ public Set getFiles() { - return fileByFileMap.keySet(); + synchronized (currentDataFile) { + return fileByFileMap.keySet(); + } } - public synchronized Map getFileMap() { - return new TreeMap(fileMap); + public Map getFileMap() { + synchronized (currentDataFile) { + return new TreeMap(fileMap); + } } public long getDiskSize() { diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java index 3e3e090a04..a80328f060 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java @@ -227,20 +227,18 @@ public class TargetedDataFileAppender implements FileAppender { } // append 'unset' next batch (5 bytes) so read can always find eof - buff.writeInt(0); - buff.writeByte(0); - + buff.write(Journal.EOF_RECORD); ByteSequence sequence = buff.toByteSequence(); // Now we can fill in the batch control record properly. buff.reset(); buff.skip(5 + Journal.BATCH_CONTROL_RECORD_MAGIC.length); - buff.writeInt(sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5); + buff.writeInt(sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - Journal.EOF_RECORD.length); if (journal.isChecksum()) { Checksum checksum = new Adler32(); checksum.update(sequence.getData(), sequence.getOffset() + Journal.BATCH_CONTROL_RECORD_SIZE, - sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5); + sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - Journal.EOF_RECORD.length); buff.writeLong(checksum.getValue()); } diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java index cf60a08a9a..faf00229f8 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java @@ -128,8 +128,8 @@ public class JournalCorruptionEofIndexRecoveryTest { adapter.setCheckForCorruptJournalFiles(true); adapter.setIgnoreMissingJournalfiles(ignoreMissingJournalFiles); - adapter.setPreallocationStrategy("zeros"); - adapter.setPreallocationScope("entire_journal"); + adapter.setPreallocationStrategy(Journal.PreallocationStrategy.ZEROS.name()); + adapter.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name()); } @After @@ -259,6 +259,7 @@ public class JournalCorruptionEofIndexRecoveryTest { corruptOrderIndex(id, size); randomAccessFile.getChannel().force(true); + dataFile.closeRandomAccessFile(randomAccessFile); } private void corruptBatchEndEof(int id) throws Exception{ diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java index 84c2ab5c0e..2e34686060 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java @@ -114,6 +114,8 @@ public class JournalCorruptionIndexRecoveryTest { adapter.setCheckForCorruptJournalFiles(true); adapter.setIgnoreMissingJournalfiles(true); + + adapter.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name()); } @After diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java index 3c65814468..987c2d31e3 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.store.kahadb.disk.journal; import java.io.File; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import junit.framework.TestCase; diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalLatencyTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalLatencyTest.java index d6c64f4748..32e2f0c549 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalLatencyTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalLatencyTest.java @@ -39,10 +39,13 @@ public class PreallocationJournalLatencyTest { TimeStatisticImpl sparse = executeTest(Journal.PreallocationStrategy.SPARSE_FILE.name()); TimeStatisticImpl chunked_zeros = executeTest(Journal.PreallocationStrategy.CHUNKED_ZEROS.name()); - TimeStatisticImpl zeros = executeTest(Journal.PreallocationStrategy.ZEROS.name()); + //TimeStatisticImpl zeros = executeTest(Journal.PreallocationStrategy.ZEROS.name()); + TimeStatisticImpl kernel = executeTest(Journal.PreallocationStrategy.OS_KERNEL_COPY.name()); + LOG.info(" sparse: " + sparse); LOG.info(" chunked: " + chunked_zeros); - LOG.info(" zeros: " + zeros); + //LOG.info(" zeros: " + zeros); + LOG.info(" kernel: " + kernel); } @@ -50,11 +53,13 @@ public class PreallocationJournalLatencyTest { int randInt = rand.nextInt(100); File dataDirectory = new File("./target/activemq-data/kahadb" + randInt); - KahaDBStore store = new KahaDBStore(); - store.setJournalMaxFileLength(16*1204*1024); + final KahaDBStore store = new KahaDBStore(); + store.setCheckpointInterval(5000); + store.setJournalMaxFileLength(32*1204*1024); store.deleteAllMessages(); store.setDirectory(dataDirectory); store.setPreallocationStrategy(preallocationStrategy); + store.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name()); store.start(); final File journalLog = new File(dataDirectory, "db-1.log"); @@ -66,7 +71,7 @@ public class PreallocationJournalLatencyTest { })); final Journal journal = store.getJournal(); - ByteSequence byteSequence = new ByteSequence(new byte[8*1024]); + ByteSequence byteSequence = new ByteSequence(new byte[16*1024]); TimeStatisticImpl timeStatistic = new TimeStatisticImpl("append", "duration"); for (int i=0;i<5000; i++) { diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java index a6cdb9e004..bbdcde7600 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.activemq.util.ByteSequence; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java index 3e41dc9a54..087b9be466 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java @@ -41,6 +41,7 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.disk.journal.Journal; import org.apache.activemq.util.IntrospectionSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -224,6 +225,7 @@ public class AMQ2584ConcurrentDlqTest extends org.apache.activemq.TestSupport { properties.put("maxFileLength", maxFileLengthVal); properties.put("cleanupInterval", "2000"); properties.put("checkpointInterval", "2000"); + properties.put("preallocationScope", Journal.PreallocationScope.ENTIRE_JOURNAL.name()); // there are problems with duplicate dispatch in the cursor, which maintain // a map of messages. A dup dispatch can be dropped. // see: org.apache.activemq.broker.region.cursors.OrderedPendingList diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java index 6494efeba4..58e27f8a24 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java @@ -31,6 +31,7 @@ import org.junit.Test; import javax.jms.*; import java.io.File; +import java.util.Arrays; import static org.junit.Assert.assertEquals; @@ -101,6 +102,7 @@ public class AMQ3120Test { private int getFileCount(File dir){ if (dir.isDirectory()) { String[] children = dir.list(); + LOG.info("Children: " + Arrays.asList(children)); return children.length; } @@ -112,7 +114,7 @@ public class AMQ3120Test { final int messageCount = 500; startBroker(true); int fileCount = getFileCount(kahaDbDir); - assertEquals(4, fileCount); + assertEquals(5, fileCount); Connection connection = new ActiveMQConnectionFactory( broker.getTransportConnectors().get(0).getConnectUri()).createConnection(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java index e965731d26..5db7579c6d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java @@ -116,7 +116,7 @@ public class AMQ4323Test { final int messageCount = 500; startBroker(true); int fileCount = getFileCount(kahaDbDir); - assertEquals(4, fileCount); + assertEquals(5, fileCount); Connection connection = new ActiveMQConnectionFactory( broker.getTransportConnectors().get(0).getConnectUri()).createConnection(); @@ -149,7 +149,7 @@ public class AMQ4323Test { public boolean isSatisified() throws Exception { int fileCount = getFileCount(kahaDbDir); LOG.info("current filecount:" + fileCount); - return 4 == fileCount; + return 5 == fileCount; } })); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java index 4a2333191d..cf9522f59d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java @@ -135,7 +135,7 @@ public class KahaDBIndexLocationTest { // Should contain the initial log for the journal and the lock. assertNotNull(journal); - assertEquals(2, journal.length); + assertEquals(3, journal.length); } @Test