HDFS-3885. QJM: optimize log sync when JN is lagging behind. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1383039 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-09-10 18:51:15 +00:00
parent aa65777ef0
commit ca4582222e
12 changed files with 31 additions and 17 deletions

View File

@ -54,3 +54,5 @@ HDFS-3893. QJM: Make QJM work with security enabled. (atm)
HDFS-3897. QJM: TestBlockToken fails after HDFS-3893. (atm)
HDFS-3898. QJM: enable TCP_NODELAY for IPC (todd)
HDFS-3885. QJM: optimize log sync when JN is lagging behind (todd)

View File

@ -84,7 +84,7 @@ public void create() throws IOException {
@Override
public void close() throws IOException {
setReadyToFlush();
flushAndSync();
flushAndSync(true);
try {
lh.close();
} catch (InterruptedException ie) {
@ -130,7 +130,7 @@ public void setReadyToFlush() throws IOException {
}
@Override
public void flushAndSync() throws IOException {
public void flushAndSync(boolean durable) throws IOException {
assert(syncLatch != null);
try {
syncLatch.await();

View File

@ -77,7 +77,7 @@ public void setReadyToFlush() throws IOException {
}
@Override
protected void flushAndSync() throws IOException {
protected void flushAndSync(boolean durable) throws IOException {
int numReadyBytes = buf.countReadyBytes();
if (numReadyBytes > 0) {
int numReadyTxns = buf.countReadyTxns();

View File

@ -302,16 +302,22 @@ synchronized void journal(RequestInfo reqInfo,
LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId);
}
// If the edit has already been marked as committed, we know
// it has been fsynced on a quorum of other nodes, and we are
// "catching up" with the rest. Hence we do not need to fsync.
boolean isLagging = lastTxnId <= committedTxnId.get();
boolean shouldFsync = !isLagging;
curSegment.writeRaw(records, 0, records.length);
curSegment.setReadyToFlush();
Stopwatch sw = new Stopwatch();
sw.start();
curSegment.flush();
curSegment.flush(shouldFsync);
sw.stop();
metrics.addSync(sw.elapsedTime(TimeUnit.MICROSECONDS));
if (committedTxnId.get() > lastTxnId) {
if (isLagging) {
// This batch of edits has already been committed on a quorum of other
// nodes. So, we are in "catch up" mode. This gets its own metric.
metrics.batchesWrittenWhileLagging.incr(1);

View File

@ -114,7 +114,7 @@ public void setReadyToFlush() throws IOException {
}
@Override // EditLogOutputStream
protected void flushAndSync() throws IOException {
protected void flushAndSync(boolean durable) throws IOException {
assert out.getLength() == 0 : "Output buffer is not empty";
int numReadyTxns = doubleBuf.countReadyTxns();

View File

@ -176,7 +176,7 @@ public void setReadyToFlush() throws IOException {
* accumulates new log records while readyBuffer will be flushed and synced.
*/
@Override
public void flushAndSync() throws IOException {
public void flushAndSync(boolean durable) throws IOException {
if (fp == null) {
throw new IOException("Trying to use aborted output stream");
}
@ -186,7 +186,7 @@ public void flushAndSync() throws IOException {
}
preallocate(); // preallocate file if necessay
doubleBuf.flushTo(fp);
if (!shouldSkipFsyncForTests) {
if (durable && !shouldSkipFsyncForTests) {
fc.force(false); // metadata updates not needed
}
}

View File

@ -93,18 +93,24 @@ abstract public void writeRaw(byte[] bytes, int offset, int length)
/**
* Flush and sync all data that is ready to be flush
* {@link #setReadyToFlush()} into underlying persistent store.
* @param durable if true, the edits should be made truly durable before
* returning
* @throws IOException
*/
abstract protected void flushAndSync() throws IOException;
abstract protected void flushAndSync(boolean durable) throws IOException;
/**
* Flush data to persistent store.
* Collect sync metrics.
*/
public void flush() throws IOException {
flush(true);
}
public void flush(boolean durable) throws IOException {
numSync++;
long start = now();
flushAndSync();
flushAndSync(durable);
long end = now();
totalTimeSync += (end - start);
}

View File

@ -471,12 +471,12 @@ public void apply(JournalAndStream jas) throws IOException {
}
@Override
protected void flushAndSync() throws IOException {
protected void flushAndSync(final boolean durable) throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
jas.getCurrentStream().flushAndSync();
jas.getCurrentStream().flushAndSync(durable);
}
}
}, "flushAndSync");

View File

@ -56,7 +56,7 @@ public void start(int version) throws IOException {
@Override
public void close(Throwable error) throws IOException {
elfos.setReadyToFlush();
elfos.flushAndSync();
elfos.flushAndSync(true);
elfos.close();
}

View File

@ -1222,7 +1222,7 @@ static void validateNoCrash(byte garbage[]) throws IOException {
elfos.create();
elfos.writeRaw(garbage, 0, garbage.length);
elfos.setReadyToFlush();
elfos.flushAndSync();
elfos.flushAndSync(true);
elfos.close();
elfos = null;
file = new File(TEST_LOG_NAME);

View File

@ -55,7 +55,7 @@ public void deleteEditsFile() {
static void flushAndCheckLength(EditLogFileOutputStream elos,
long expectedLength) throws IOException {
elos.setReadyToFlush();
elos.flushAndSync();
elos.flushAndSync(true);
assertEquals(expectedLength, elos.getFile().length());
}

View File

@ -74,7 +74,7 @@ static void runEditLogTest(EditLogTestSetup elts) throws IOException {
elts.addTransactionsToLog(elfos, cache);
elfos.setReadyToFlush();
elfos.flushAndSync();
elfos.flushAndSync(true);
elfos.close();
elfos = null;
file = new File(TEST_LOG_NAME);