From dd0ed17e59da3ac19f9e528c8b071322d3035404 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Wed, 27 Jul 2016 14:20:48 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6377 Introducing JournalDiskSyncStrategy to allow a peridic disk sync mode instead of always syncing after every write or never syncing. --- .../store/kahadb/MessageDatabase.java | 52 +++++- .../kahadb/disk/journal/DataFileAppender.java | 6 + .../store/kahadb/disk/journal/Journal.java | 52 +++++- .../disk/journal/JournalSyncStrategyTest.java | 152 ++++++++++++++++++ 4 files changed, 256 insertions(+), 6 deletions(-) create mode 100644 activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalSyncStrategyTest.java diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index eb3a5ee8f9..a58d6148a9 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -92,6 +92,7 @@ import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor; import org.apache.activemq.store.kahadb.disk.index.ListIndex; import org.apache.activemq.store.kahadb.disk.journal.DataFile; import org.apache.activemq.store.kahadb.disk.journal.Journal; +import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender; import org.apache.activemq.store.kahadb.disk.page.Page; @@ -252,10 +253,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe protected ScheduledExecutorService scheduler; private final Object schedulerLock = new Object(); - protected boolean enableJournalDiskSyncs = true; + protected String journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name(); protected boolean archiveDataLogs; protected File directoryArchive; protected AtomicLong journalSize = new AtomicLong(0); + long journalDiskSyncInterval = 1000; long checkpointInterval = 5*1000; long cleanupInterval = 30*1000; int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; @@ -373,7 +375,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe }); // Short intervals for check-point and cleanups - long delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); + long delay; + if (journal.isJournalDiskSyncPeriodic()) { + delay = Math.min(journalDiskSyncInterval > 0 ? journalDiskSyncInterval : checkpointInterval, 500); + } else { + delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); + } scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS); } @@ -384,6 +391,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe private long lastCheckpoint = System.currentTimeMillis(); private long lastCleanup = System.currentTimeMillis(); + private long lastSync = System.currentTimeMillis(); @Override public void run() { @@ -391,6 +399,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // Decide on cleanup vs full checkpoint here. if (opened.get()) { long now = System.currentTimeMillis(); + if (journal.isJournalDiskSyncPeriodic() && + journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) { + journal.syncCurrentDataFile(); + lastSync = now; + } if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) { checkpointCleanup(true); lastCleanup = now; @@ -3110,6 +3123,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase())); manager.setPreallocationStrategy( Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase())); + manager.setJournalDiskSyncStrategy( + Journal.JournalDiskSyncStrategy.valueOf(journalDiskSyncStrategy.trim().toUpperCase())); if (getDirectoryArchive() != null) { IOHelper.mkdirs(getDirectoryArchive()); manager.setDirectoryArchive(getDirectoryArchive()); @@ -3166,12 +3181,41 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return enableIndexWriteAsync; } + /** + * @deprecated use {@link #getJournalDiskSyncStrategy} instead + * @return + */ public boolean isEnableJournalDiskSyncs() { - return enableJournalDiskSyncs; + return journalDiskSyncStrategy != null && JournalDiskSyncStrategy.ALWAYS.name().equals( + journalDiskSyncStrategy.trim().toUpperCase()); } + /** + * @deprecated use {@link #setEnableJournalDiskSyncs} instead + * @param syncWrites + */ public void setEnableJournalDiskSyncs(boolean syncWrites) { - this.enableJournalDiskSyncs = syncWrites; + if (syncWrites) { + journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name(); + } else { + journalDiskSyncStrategy = JournalDiskSyncStrategy.NEVER.name(); + } + } + + public String getJournalDiskSyncStrategy() { + return journalDiskSyncStrategy; + } + + public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) { + this.journalDiskSyncStrategy = journalDiskSyncStrategy; + } + + public long getJournalDiskSyncInterval() { + return journalDiskSyncInterval; + } + + public void setJournalDiskSyncInterval(long journalDiskSyncInterval) { + this.journalDiskSyncInterval = journalDiskSyncInterval; } public long getCheckpointInterval() { diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java index 792431ca98..8df834cedd 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.zip.Adler32; import java.util.zip.Checksum; +import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream; import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList; import org.apache.activemq.util.ByteSequence; @@ -53,6 +54,7 @@ class DataFileAppender implements FileAppender { protected final CountDownLatch shutdownDone = new CountDownLatch(1); protected int maxWriteBatchSize; protected final boolean syncOnComplete; + protected final boolean periodicSync; protected boolean running; private Thread thread; @@ -107,6 +109,8 @@ class DataFileAppender implements FileAppender { this.inflightWrites = this.journal.getInflightWrites(); this.maxWriteBatchSize = this.journal.getWriteBatchSize(); this.syncOnComplete = this.journal.isEnableAsyncDiskSync(); + this.periodicSync = JournalDiskSyncStrategy.PERIODIC.equals( + this.journal.getJournalDiskSyncStrategy()); } @Override @@ -338,6 +342,8 @@ class DataFileAppender implements FileAppender { if (forceToDisk) { file.sync(); + } else if (periodicSync) { + journal.currentFileNeedSync.set(true); } Journal.WriteCommand lastWrite = wb.writes.getTail(); diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java index 0b05c56d04..93d5f7f206 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java @@ -25,7 +25,15 @@ import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; import java.nio.channels.FileChannel; -import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -33,6 +41,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.zip.Adler32; @@ -75,6 +84,7 @@ public class Journal { public static final byte EOF_EOT = '4'; public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord(); + protected final AtomicBoolean currentFileNeedSync = new AtomicBoolean(); private ScheduledExecutorService scheduler; // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss @@ -115,6 +125,12 @@ public class Journal { NONE; } + public enum JournalDiskSyncStrategy { + ALWAYS, + PERIODIC, + NEVER; + } + private static byte[] createBatchControlRecordHeader() { try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { os.writeInt(BATCH_CONTROL_RECORD_SIZE); @@ -195,12 +211,13 @@ public class Journal { protected boolean enableAsyncDiskSync = true; private int nextDataFileId = 1; private Object dataFileIdLock = new Object(); - private final AtomicReference currentDataFile = new AtomicReference<>(null); + private final AtomicReference currentDataFile = new AtomicReference<>(null); private volatile DataFile nextDataFile; protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL; protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE; private File osKernelCopyTemplateFile = null; + protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; public interface DataFileRemovedListener { void fileRemoved(DataFile datafile); @@ -580,6 +597,7 @@ public class Journal { dataFile = newDataFile(); } synchronized (currentDataFile) { + syncCurrentDataFile(); fileMap.put(dataFile.getDataFileId(), dataFile); fileByFileMap.put(dataFile.getFile(), dataFile); dataFiles.addLast(dataFile); @@ -592,6 +610,23 @@ public class Journal { } } + public void syncCurrentDataFile() throws IOException { + synchronized (currentDataFile) { + DataFile dataFile = currentDataFile.get(); + if (dataFile != null && isJournalDiskSyncPeriodic()) { + if (currentFileNeedSync.compareAndSet(true, false)) { + LOG.trace("Syncing Journal file: {}", dataFile.getFile().getName()); + RecoverableRandomAccessFile file = dataFile.openRandomAccessFile(); + try { + file.sync(); + } finally { + file.close(); + } + } + } + } + } + private Runnable preAllocateNextDataFileTask = new Runnable() { @Override public void run() { @@ -670,6 +705,7 @@ public class Journal { // the appender can be calling back to to the journal blocking a close AMQ-5620 appender.close(); synchronized (currentDataFile) { + syncCurrentDataFile(); fileMap.clear(); fileByFileMap.clear(); dataFiles.clear(); @@ -1051,6 +1087,18 @@ public class Journal { return enableAsyncDiskSync; } + public JournalDiskSyncStrategy getJournalDiskSyncStrategy() { + return journalDiskSyncStrategy; + } + + public void setJournalDiskSyncStrategy(JournalDiskSyncStrategy journalDiskSyncStrategy) { + this.journalDiskSyncStrategy = journalDiskSyncStrategy; + } + + public boolean isJournalDiskSyncPeriodic() { + return JournalDiskSyncStrategy.PERIODIC.equals(journalDiskSyncStrategy); + } + public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) { this.dataFileRemovedListener = dataFileRemovedListener; } diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalSyncStrategyTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalSyncStrategyTest.java new file mode 100644 index 0000000000..af618d5e2d --- /dev/null +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalSyncStrategyTest.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb.disk.journal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.kahadb.KahaDBStore; +import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; +import org.apache.activemq.util.Wait; +import org.apache.activemq.util.Wait.Condition; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.Timeout; + +public class JournalSyncStrategyTest { + + @Rule + public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); + + @Rule + public Timeout globalTimeout= new Timeout(10, TimeUnit.SECONDS); + + private KahaDBStore store; + private int defaultJournalLength = 10 * 1024; + + @After + public void after() throws Exception { + if (store != null) { + store.stop(); + } + } + + @Test + public void testPeriodicSync()throws Exception { + store = configureStore(JournalDiskSyncStrategy.PERIODIC); + store.start(); + final Journal journal = store.getJournal(); + assertTrue(journal.isJournalDiskSyncPeriodic()); + assertFalse(store.isEnableJournalDiskSyncs()); + + MessageStore messageStore = store.createQueueMessageStore(new ActiveMQQueue("test")); + + //write a message to the store + writeMessage(messageStore, 1); + + //Make sure the flag was set to true + assertTrue(Wait.waitFor(new Condition() { + + @Override + public boolean isSatisified() throws Exception { + return journal.currentFileNeedSync.get(); + } + })); + + //Make sure a disk sync was done by the executor because a message was added + //which will cause the flag to be set to false + assertTrue(Wait.waitFor(new Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !journal.currentFileNeedSync.get(); + } + })); + + } + + @Test + public void testSyncRotate()throws Exception { + store = configureStore(JournalDiskSyncStrategy.PERIODIC); + //Set a long interval to make sure it isn't called in this test + store.setJournalDiskSyncInterval(10 * 1000); + store.start(); + + final Journal journal = store.getJournal(); + assertTrue(journal.isJournalDiskSyncPeriodic()); + assertFalse(store.isEnableJournalDiskSyncs()); + assertEquals(10 * 1000, store.getJournalDiskSyncInterval()); + journal.currentFileNeedSync.set(true); //Make sure a disk sync was done by the executor because a message was added + + //get the current file but pass in a size greater than the + //journal length to trigger a rotation so we can verify that it was synced + journal.getCurrentDataFile(2 * defaultJournalLength); + + //verify a sync was called (which will set this flag to false) + assertFalse(journal.currentFileNeedSync.get()); + } + + @Test + public void testAlwaysSync()throws Exception { + store = configureStore(JournalDiskSyncStrategy.ALWAYS); + store.start(); + assertFalse(store.getJournal().isJournalDiskSyncPeriodic()); + assertTrue(store.isEnableJournalDiskSyncs()); + } + + @Test + public void testNeverSync() throws Exception { + store = configureStore(JournalDiskSyncStrategy.NEVER); + store.start(); + assertFalse(store.getJournal().isJournalDiskSyncPeriodic()); + assertFalse(store.isEnableJournalDiskSyncs()); + } + + private KahaDBStore configureStore(JournalDiskSyncStrategy strategy) throws Exception { + KahaDBStore store = new KahaDBStore(); + store.setJournalMaxFileLength(defaultJournalLength); + store.deleteAllMessages(); + store.setDirectory(dataFileDir.getRoot()); + if (strategy != null) { + store.setJournalDiskSyncStrategy(strategy.name()); + } + + return store; + } + + private void writeMessage(final MessageStore messageStore, int num) throws Exception { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText("testtesttest"); + MessageId messageId = new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:" + num); + messageId.setBrokerSequenceId(num); + message.setMessageId(messageId); + messageStore.addMessage(new ConnectionContext(), message); + } + + +}