Reworking patch so that a periodic disk sync uses a journal trace
command to trigger the sync so that everything is done in the same
thread for the writes
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-07-28 09:36:28 -04:00
parent 1bdcb4f96f
commit 1a598277cf
6 changed files with 164 additions and 69 deletions

View File

@ -56,6 +56,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.ActiveMQMessageAuditNoSync; import org.apache.activemq.ActiveMQMessageAuditNoSync;
@ -288,6 +289,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private int journalLogOnLastCompactionCheck; private int journalLogOnLastCompactionCheck;
private boolean enableSubscriptionStatistics = false; private boolean enableSubscriptionStatistics = false;
//only set when using JournalDiskSyncStrategy.PERIODIC
protected final AtomicReference<Location> lastAsyncJournalUpdate = new AtomicReference<>();
@Override @Override
public void doStart() throws Exception { public void doStart() throws Exception {
load(); load();
@ -392,6 +396,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private long lastCheckpoint = System.currentTimeMillis(); private long lastCheckpoint = System.currentTimeMillis();
private long lastCleanup = System.currentTimeMillis(); private long lastCleanup = System.currentTimeMillis();
private long lastSync = System.currentTimeMillis(); private long lastSync = System.currentTimeMillis();
private Location lastAsyncUpdate = null;
@Override @Override
public void run() { public void run() {
@ -401,7 +406,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
if (journal.isJournalDiskSyncPeriodic() && if (journal.isJournalDiskSyncPeriodic() &&
journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) { journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) {
journal.syncCurrentDataFile(); Location currentUpdate = lastAsyncJournalUpdate.get();
if (currentUpdate != null && !currentUpdate.equals(lastAsyncUpdate)) {
lastAsyncUpdate = currentUpdate;
if (LOG.isTraceEnabled()) {
LOG.trace("Writing trace command to trigger journal sync");
}
store(new KahaTraceCommand(), true, null, null);
}
lastSync = now; lastSync = now;
} }
if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) { if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) {
@ -1095,6 +1107,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ; location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
long start2 = System.currentTimeMillis(); long start2 = System.currentTimeMillis();
//Track the last async update so we know if we need to sync at the next checkpoint
if (!sync && journal.isJournalDiskSyncPeriodic()) {
lastAsyncJournalUpdate.set(location);
}
process(data, location, before); process(data, location, before);
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();

View File

@ -23,6 +23,8 @@ import java.util.zip.Checksum;
import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream; import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.RecoverableRandomAccessFile; import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* An optimized writer to do batch appends to a data file. This object is thread * An optimized writer to do batch appends to a data file. This object is thread
@ -34,6 +36,8 @@ import org.apache.activemq.util.RecoverableRandomAccessFile;
*/ */
class CallerBufferingDataFileAppender extends DataFileAppender { class CallerBufferingDataFileAppender extends DataFileAppender {
private static final Logger logger = LoggerFactory.getLogger(CallerBufferingDataFileAppender.class);
final DataByteArrayOutputStream cachedBuffers[] = new DataByteArrayOutputStream[] { final DataByteArrayOutputStream cachedBuffers[] = new DataByteArrayOutputStream[] {
new DataByteArrayOutputStream(maxWriteBatchSize), new DataByteArrayOutputStream(maxWriteBatchSize),
new DataByteArrayOutputStream(maxWriteBatchSize) new DataByteArrayOutputStream(maxWriteBatchSize)
@ -111,6 +115,12 @@ class CallerBufferingDataFileAppender extends DataFileAppender {
wb = (WriteBatch)o; wb = (WriteBatch)o;
if (dataFile != wb.dataFile) { if (dataFile != wb.dataFile) {
if (file != null) { if (file != null) {
if (periodicSync) {
if (logger.isTraceEnabled()) {
logger.trace("Syning file {} on rotate", dataFile.getFile().getName());
}
file.sync();
}
dataFile.closeRandomAccessFile(file); dataFile.closeRandomAccessFile(file);
} }
dataFile = wb.dataFile; dataFile = wb.dataFile;
@ -177,6 +187,12 @@ class CallerBufferingDataFileAppender extends DataFileAppender {
} finally { } finally {
try { try {
if (file != null) { if (file != null) {
if (periodicSync) {
if (logger.isTraceEnabled()) {
logger.trace("Syning file {} on close", dataFile.getFile().getName());
}
file.sync();
}
dataFile.closeRandomAccessFile(file); dataFile.closeRandomAccessFile(file);
} }
} catch (Throwable ignore) { } catch (Throwable ignore) {

View File

@ -284,6 +284,12 @@ class DataFileAppender implements FileAppender {
if (dataFile != wb.dataFile) { if (dataFile != wb.dataFile) {
if (file != null) { if (file != null) {
if (periodicSync) {
if (logger.isTraceEnabled()) {
logger.trace("Syning file {} on rotate", dataFile.getFile().getName());
}
file.sync();
}
dataFile.closeRandomAccessFile(file); dataFile.closeRandomAccessFile(file);
} }
dataFile = wb.dataFile; dataFile = wb.dataFile;
@ -342,8 +348,6 @@ class DataFileAppender implements FileAppender {
if (forceToDisk) { if (forceToDisk) {
file.sync(); file.sync();
} else if (periodicSync) {
journal.currentFileNeedSync.set(true);
} }
Journal.WriteCommand lastWrite = wb.writes.getTail(); Journal.WriteCommand lastWrite = wb.writes.getTail();
@ -368,6 +372,12 @@ class DataFileAppender implements FileAppender {
} finally { } finally {
try { try {
if (file != null) { if (file != null) {
if (periodicSync) {
if (logger.isTraceEnabled()) {
logger.trace("Syning file {} on close", dataFile.getFile().getName());
}
file.sync();
}
dataFile.closeRandomAccessFile(file); dataFile.closeRandomAccessFile(file);
} }
} catch (Throwable ignore) { } catch (Throwable ignore) {

View File

@ -41,7 +41,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32; import java.util.zip.Adler32;
@ -84,7 +83,6 @@ public class Journal {
public static final byte EOF_EOT = '4'; public static final byte EOF_EOT = '4';
public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord(); public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord();
protected final AtomicBoolean currentFileNeedSync = new AtomicBoolean();
private ScheduledExecutorService scheduler; private ScheduledExecutorService scheduler;
// tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss
@ -597,7 +595,6 @@ public class Journal {
dataFile = newDataFile(); dataFile = newDataFile();
} }
synchronized (currentDataFile) { synchronized (currentDataFile) {
syncCurrentDataFile();
fileMap.put(dataFile.getDataFileId(), dataFile); fileMap.put(dataFile.getDataFileId(), dataFile);
fileByFileMap.put(dataFile.getFile(), dataFile); fileByFileMap.put(dataFile.getFile(), dataFile);
dataFiles.addLast(dataFile); dataFiles.addLast(dataFile);
@ -610,23 +607,6 @@ public class Journal {
} }
} }
public void syncCurrentDataFile() throws IOException {
synchronized (currentDataFile) {
DataFile dataFile = currentDataFile.get();
if (dataFile != null && isJournalDiskSyncPeriodic()) {
if (currentFileNeedSync.compareAndSet(true, false)) {
LOG.trace("Syncing Journal file: {}", dataFile.getFile().getName());
RecoverableRandomAccessFile file = dataFile.openRandomAccessFile();
try {
file.sync();
} finally {
file.close();
}
}
}
}
}
private Runnable preAllocateNextDataFileTask = new Runnable() { private Runnable preAllocateNextDataFileTask = new Runnable() {
@Override @Override
public void run() { public void run() {
@ -705,7 +685,6 @@ public class Journal {
// the appender can be calling back to to the journal blocking a close AMQ-5620 // the appender can be calling back to to the journal blocking a close AMQ-5620
appender.close(); appender.close();
synchronized (currentDataFile) { synchronized (currentDataFile) {
syncCurrentDataFile();
fileMap.clear(); fileMap.clear();
fileByFileMap.clear(); fileByFileMap.clear();
dataFiles.clear(); dataFiles.clear();

View File

@ -14,13 +14,15 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.store.kahadb.disk.journal; package org.apache.activemq.store.kahadb;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
@ -28,10 +30,10 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.kahadb.KahaDBStore; import org.apache.activemq.store.kahadb.disk.journal.FileAppender;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
import org.apache.activemq.util.Wait; import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.util.Wait.Condition;
import org.junit.After; import org.junit.After;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -59,56 +61,23 @@ public class JournalSyncStrategyTest {
@Test @Test
public void testPeriodicSync()throws Exception { public void testPeriodicSync()throws Exception {
store = configureStore(JournalDiskSyncStrategy.PERIODIC); store = configureStore(JournalDiskSyncStrategy.PERIODIC);
store.setJournalDiskSyncInterval(800);
store.start(); store.start();
final Journal journal = store.getJournal(); final Journal journal = store.getJournal();
assertTrue(journal.isJournalDiskSyncPeriodic()); assertTrue(journal.isJournalDiskSyncPeriodic());
assertFalse(store.isEnableJournalDiskSyncs()); assertFalse(store.isEnableJournalDiskSyncs());
assertEquals(store.getJournalDiskSyncStrategy(), JournalDiskSyncStrategy.PERIODIC.name());
assertEquals(store.getJournal().getJournalDiskSyncStrategy(), JournalDiskSyncStrategy.PERIODIC);
assertEquals(store.getJournalDiskSyncInterval(), 800);
MessageStore messageStore = store.createQueueMessageStore(new ActiveMQQueue("test")); Location l = store.lastAsyncJournalUpdate.get();
//write a message to the store //write a message to the store
MessageStore messageStore = store.createQueueMessageStore(new ActiveMQQueue("test"));
writeMessage(messageStore, 1); writeMessage(messageStore, 1);
//Make sure the flag was set to true //make sure message write causes the lastAsyncJournalUpdate to be set with a new value
assertTrue(Wait.waitFor(new Condition() { assertFalse(store.lastAsyncJournalUpdate.get().equals(l));
@Override
public boolean isSatisified() throws Exception {
return journal.currentFileNeedSync.get();
}
}));
//Make sure a disk sync was done by the executor because a message was added
//which will cause the flag to be set to false
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return !journal.currentFileNeedSync.get();
}
}));
}
@Test
public void testSyncRotate()throws Exception {
store = configureStore(JournalDiskSyncStrategy.PERIODIC);
//Set a long interval to make sure it isn't called in this test
store.setJournalDiskSyncInterval(10 * 1000);
store.start();
final Journal journal = store.getJournal();
assertTrue(journal.isJournalDiskSyncPeriodic());
assertFalse(store.isEnableJournalDiskSyncs());
assertEquals(10 * 1000, store.getJournalDiskSyncInterval());
journal.currentFileNeedSync.set(true); //Make sure a disk sync was done by the executor because a message was added
//get the current file but pass in a size greater than the
//journal length to trigger a rotation so we can verify that it was synced
journal.getCurrentDataFile(2 * defaultJournalLength);
//verify a sync was called (which will set this flag to false)
assertFalse(journal.currentFileNeedSync.get());
} }
@Test @Test
@ -117,6 +86,12 @@ public class JournalSyncStrategyTest {
store.start(); store.start();
assertFalse(store.getJournal().isJournalDiskSyncPeriodic()); assertFalse(store.getJournal().isJournalDiskSyncPeriodic());
assertTrue(store.isEnableJournalDiskSyncs()); assertTrue(store.isEnableJournalDiskSyncs());
assertEquals(store.getJournalDiskSyncStrategy(), JournalDiskSyncStrategy.ALWAYS.name());
assertEquals(store.getJournal().getJournalDiskSyncStrategy(), JournalDiskSyncStrategy.ALWAYS);
MessageStore messageStore = store.createQueueMessageStore(new ActiveMQQueue("test"));
writeMessage(messageStore, 1);
assertNull(store.lastAsyncJournalUpdate.get());
} }
@Test @Test
@ -125,6 +100,12 @@ public class JournalSyncStrategyTest {
store.start(); store.start();
assertFalse(store.getJournal().isJournalDiskSyncPeriodic()); assertFalse(store.getJournal().isJournalDiskSyncPeriodic());
assertFalse(store.isEnableJournalDiskSyncs()); assertFalse(store.isEnableJournalDiskSyncs());
assertEquals(store.getJournalDiskSyncStrategy(), JournalDiskSyncStrategy.NEVER.name());
assertEquals(store.getJournal().getJournalDiskSyncStrategy(), JournalDiskSyncStrategy.NEVER);
MessageStore messageStore = store.createQueueMessageStore(new ActiveMQQueue("test"));
writeMessage(messageStore, 1);
assertNull(store.lastAsyncJournalUpdate.get());
} }
private KahaDBStore configureStore(JournalDiskSyncStrategy strategy) throws Exception { private KahaDBStore configureStore(JournalDiskSyncStrategy strategy) throws Exception {

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb.disk.journal;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
public class DataFileAppenderSyncStrategyTest {
@Rule
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
@Rule
public Timeout globalTimeout= new Timeout(10, TimeUnit.SECONDS);
private KahaDBStore store;
private int defaultJournalLength = 10 * 1024;
@After
public void after() throws Exception {
if (store != null) {
store.stop();
}
}
@Test
public void testPeriodicSync()throws Exception {
store = configureStore(JournalDiskSyncStrategy.PERIODIC);
store.start();
final Journal journal = store.getJournal();
DataFileAppender appender = (DataFileAppender) journal.appender;
assertTrue(appender.periodicSync);
}
@Test
public void testAlwaysSync()throws Exception {
store = configureStore(JournalDiskSyncStrategy.ALWAYS);
store.start();
final Journal journal = store.getJournal();
DataFileAppender appender = (DataFileAppender) journal.appender;
assertFalse(appender.periodicSync);
}
@Test
public void testNeverSync() throws Exception {
store = configureStore(JournalDiskSyncStrategy.NEVER);
store.start();
final Journal journal = store.getJournal();
DataFileAppender appender = (DataFileAppender) journal.appender;
assertFalse(appender.periodicSync);
}
private KahaDBStore configureStore(JournalDiskSyncStrategy strategy) throws Exception {
KahaDBStore store = new KahaDBStore();
store.setJournalMaxFileLength(defaultJournalLength);
store.deleteAllMessages();
store.setDirectory(dataFileDir.getRoot());
if (strategy != null) {
store.setJournalDiskSyncStrategy(strategy.name());
}
return store;
}
}