Introducing JournalDiskSyncStrategy to allow a peridic disk sync mode
instead of always syncing after every write or never syncing.
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-07-27 14:20:48 -04:00
parent 822e2be90e
commit dd0ed17e59
4 changed files with 256 additions and 6 deletions

View File

@ -92,6 +92,7 @@ import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
import org.apache.activemq.store.kahadb.disk.index.ListIndex;
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender;
import org.apache.activemq.store.kahadb.disk.page.Page;
@ -252,10 +253,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
protected ScheduledExecutorService scheduler;
private final Object schedulerLock = new Object();
protected boolean enableJournalDiskSyncs = true;
protected String journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name();
protected boolean archiveDataLogs;
protected File directoryArchive;
protected AtomicLong journalSize = new AtomicLong(0);
long journalDiskSyncInterval = 1000;
long checkpointInterval = 5*1000;
long cleanupInterval = 30*1000;
int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
@ -373,7 +375,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
});
// Short intervals for check-point and cleanups
long delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
long delay;
if (journal.isJournalDiskSyncPeriodic()) {
delay = Math.min(journalDiskSyncInterval > 0 ? journalDiskSyncInterval : checkpointInterval, 500);
} else {
delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
}
scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS);
}
@ -384,6 +391,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private long lastCheckpoint = System.currentTimeMillis();
private long lastCleanup = System.currentTimeMillis();
private long lastSync = System.currentTimeMillis();
@Override
public void run() {
@ -391,6 +399,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// Decide on cleanup vs full checkpoint here.
if (opened.get()) {
long now = System.currentTimeMillis();
if (journal.isJournalDiskSyncPeriodic() &&
journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) {
journal.syncCurrentDataFile();
lastSync = now;
}
if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) {
checkpointCleanup(true);
lastCleanup = now;
@ -3110,6 +3123,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
manager.setPreallocationStrategy(
Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
manager.setJournalDiskSyncStrategy(
Journal.JournalDiskSyncStrategy.valueOf(journalDiskSyncStrategy.trim().toUpperCase()));
if (getDirectoryArchive() != null) {
IOHelper.mkdirs(getDirectoryArchive());
manager.setDirectoryArchive(getDirectoryArchive());
@ -3166,12 +3181,41 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return enableIndexWriteAsync;
}
/**
* @deprecated use {@link #getJournalDiskSyncStrategy} instead
* @return
*/
public boolean isEnableJournalDiskSyncs() {
return enableJournalDiskSyncs;
return journalDiskSyncStrategy != null && JournalDiskSyncStrategy.ALWAYS.name().equals(
journalDiskSyncStrategy.trim().toUpperCase());
}
/**
* @deprecated use {@link #setEnableJournalDiskSyncs} instead
* @param syncWrites
*/
public void setEnableJournalDiskSyncs(boolean syncWrites) {
this.enableJournalDiskSyncs = syncWrites;
if (syncWrites) {
journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name();
} else {
journalDiskSyncStrategy = JournalDiskSyncStrategy.NEVER.name();
}
}
public String getJournalDiskSyncStrategy() {
return journalDiskSyncStrategy;
}
public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) {
this.journalDiskSyncStrategy = journalDiskSyncStrategy;
}
public long getJournalDiskSyncInterval() {
return journalDiskSyncInterval;
}
public void setJournalDiskSyncInterval(long journalDiskSyncInterval) {
this.journalDiskSyncInterval = journalDiskSyncInterval;
}
public long getCheckpointInterval() {

View File

@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
import org.apache.activemq.util.ByteSequence;
@ -53,6 +54,7 @@ class DataFileAppender implements FileAppender {
protected final CountDownLatch shutdownDone = new CountDownLatch(1);
protected int maxWriteBatchSize;
protected final boolean syncOnComplete;
protected final boolean periodicSync;
protected boolean running;
private Thread thread;
@ -107,6 +109,8 @@ class DataFileAppender implements FileAppender {
this.inflightWrites = this.journal.getInflightWrites();
this.maxWriteBatchSize = this.journal.getWriteBatchSize();
this.syncOnComplete = this.journal.isEnableAsyncDiskSync();
this.periodicSync = JournalDiskSyncStrategy.PERIODIC.equals(
this.journal.getJournalDiskSyncStrategy());
}
@Override
@ -338,6 +342,8 @@ class DataFileAppender implements FileAppender {
if (forceToDisk) {
file.sync();
} else if (periodicSync) {
journal.currentFileNeedSync.set(true);
}
Journal.WriteCommand lastWrite = wb.writes.getTail();

View File

@ -25,7 +25,15 @@ import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -33,6 +41,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32;
@ -75,6 +84,7 @@ public class Journal {
public static final byte EOF_EOT = '4';
public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord();
protected final AtomicBoolean currentFileNeedSync = new AtomicBoolean();
private ScheduledExecutorService scheduler;
// tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss
@ -115,6 +125,12 @@ public class Journal {
NONE;
}
public enum JournalDiskSyncStrategy {
ALWAYS,
PERIODIC,
NEVER;
}
private static byte[] createBatchControlRecordHeader() {
try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
os.writeInt(BATCH_CONTROL_RECORD_SIZE);
@ -195,12 +211,13 @@ public class Journal {
protected boolean enableAsyncDiskSync = true;
private int nextDataFileId = 1;
private Object dataFileIdLock = new Object();
private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null);
private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null);
private volatile DataFile nextDataFile;
protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
private File osKernelCopyTemplateFile = null;
protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
public interface DataFileRemovedListener {
void fileRemoved(DataFile datafile);
@ -580,6 +597,7 @@ public class Journal {
dataFile = newDataFile();
}
synchronized (currentDataFile) {
syncCurrentDataFile();
fileMap.put(dataFile.getDataFileId(), dataFile);
fileByFileMap.put(dataFile.getFile(), dataFile);
dataFiles.addLast(dataFile);
@ -592,6 +610,23 @@ public class Journal {
}
}
public void syncCurrentDataFile() throws IOException {
synchronized (currentDataFile) {
DataFile dataFile = currentDataFile.get();
if (dataFile != null && isJournalDiskSyncPeriodic()) {
if (currentFileNeedSync.compareAndSet(true, false)) {
LOG.trace("Syncing Journal file: {}", dataFile.getFile().getName());
RecoverableRandomAccessFile file = dataFile.openRandomAccessFile();
try {
file.sync();
} finally {
file.close();
}
}
}
}
}
private Runnable preAllocateNextDataFileTask = new Runnable() {
@Override
public void run() {
@ -670,6 +705,7 @@ public class Journal {
// the appender can be calling back to to the journal blocking a close AMQ-5620
appender.close();
synchronized (currentDataFile) {
syncCurrentDataFile();
fileMap.clear();
fileByFileMap.clear();
dataFiles.clear();
@ -1051,6 +1087,18 @@ public class Journal {
return enableAsyncDiskSync;
}
public JournalDiskSyncStrategy getJournalDiskSyncStrategy() {
return journalDiskSyncStrategy;
}
public void setJournalDiskSyncStrategy(JournalDiskSyncStrategy journalDiskSyncStrategy) {
this.journalDiskSyncStrategy = journalDiskSyncStrategy;
}
public boolean isJournalDiskSyncPeriodic() {
return JournalDiskSyncStrategy.PERIODIC.equals(journalDiskSyncStrategy);
}
public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) {
this.dataFileRemovedListener = dataFileRemovedListener;
}

View File

@ -0,0 +1,152 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb.disk.journal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
public class JournalSyncStrategyTest {
@Rule
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
@Rule
public Timeout globalTimeout= new Timeout(10, TimeUnit.SECONDS);
private KahaDBStore store;
private int defaultJournalLength = 10 * 1024;
@After
public void after() throws Exception {
if (store != null) {
store.stop();
}
}
@Test
public void testPeriodicSync()throws Exception {
store = configureStore(JournalDiskSyncStrategy.PERIODIC);
store.start();
final Journal journal = store.getJournal();
assertTrue(journal.isJournalDiskSyncPeriodic());
assertFalse(store.isEnableJournalDiskSyncs());
MessageStore messageStore = store.createQueueMessageStore(new ActiveMQQueue("test"));
//write a message to the store
writeMessage(messageStore, 1);
//Make sure the flag was set to true
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return journal.currentFileNeedSync.get();
}
}));
//Make sure a disk sync was done by the executor because a message was added
//which will cause the flag to be set to false
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return !journal.currentFileNeedSync.get();
}
}));
}
@Test
public void testSyncRotate()throws Exception {
store = configureStore(JournalDiskSyncStrategy.PERIODIC);
//Set a long interval to make sure it isn't called in this test
store.setJournalDiskSyncInterval(10 * 1000);
store.start();
final Journal journal = store.getJournal();
assertTrue(journal.isJournalDiskSyncPeriodic());
assertFalse(store.isEnableJournalDiskSyncs());
assertEquals(10 * 1000, store.getJournalDiskSyncInterval());
journal.currentFileNeedSync.set(true); //Make sure a disk sync was done by the executor because a message was added
//get the current file but pass in a size greater than the
//journal length to trigger a rotation so we can verify that it was synced
journal.getCurrentDataFile(2 * defaultJournalLength);
//verify a sync was called (which will set this flag to false)
assertFalse(journal.currentFileNeedSync.get());
}
@Test
public void testAlwaysSync()throws Exception {
store = configureStore(JournalDiskSyncStrategy.ALWAYS);
store.start();
assertFalse(store.getJournal().isJournalDiskSyncPeriodic());
assertTrue(store.isEnableJournalDiskSyncs());
}
@Test
public void testNeverSync() throws Exception {
store = configureStore(JournalDiskSyncStrategy.NEVER);
store.start();
assertFalse(store.getJournal().isJournalDiskSyncPeriodic());
assertFalse(store.isEnableJournalDiskSyncs());
}
private KahaDBStore configureStore(JournalDiskSyncStrategy strategy) throws Exception {
KahaDBStore store = new KahaDBStore();
store.setJournalMaxFileLength(defaultJournalLength);
store.deleteAllMessages();
store.setDirectory(dataFileDir.getRoot());
if (strategy != null) {
store.setJournalDiskSyncStrategy(strategy.name());
}
return store;
}
private void writeMessage(final MessageStore messageStore, int num) throws Exception {
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText("testtesttest");
MessageId messageId = new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:" + num);
messageId.setBrokerSequenceId(num);
message.setMessageId(messageId);
messageStore.addMessage(new ConnectionContext(), message);
}
}