HDFS-3726. If a logger misses an RPC, don't retry that logger until next segment. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1381482 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-09-06 07:03:57 +00:00
parent 437948ea1c
commit cae8116a14
5 changed files with 131 additions and 22 deletions

View File

@ -46,3 +46,5 @@ HDFS-3884. Journal format() should reset cached values (todd)
HDFS-3870. Add metrics to JournalNode (todd) HDFS-3870. Add metrics to JournalNode (todd)
HDFS-3891. Make selectInputStreams throw IOE instead of RTE (todd) HDFS-3891. Make selectInputStreams throw IOE instead of RTE (todd)
HDFS-3726. If a logger misses an RPC, don't retry that logger until next segment (todd)

View File

@ -25,13 +25,13 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
@ -98,6 +98,13 @@ public class IPCLoggerChannel implements AsyncLogger {
*/ */
private final int queueSizeLimitBytes; private final int queueSizeLimitBytes;
/**
* If this logger misses some edits, or restarts in the middle of
* a segment, the writer won't be able to write any more edits until
* the beginning of the next segment. Upon detecting this situation,
* the writer sets this flag to true to avoid sending useless RPCs.
*/
private boolean outOfSync = false;
static final Factory FACTORY = new AsyncLogger.Factory() { static final Factory FACTORY = new AsyncLogger.Factory() {
@Override @Override
@ -212,6 +219,15 @@ public class IPCLoggerChannel implements AsyncLogger {
public synchronized int getQueuedEditsSize() { public synchronized int getQueuedEditsSize() {
return queuedEditsSizeBytes; return queuedEditsSizeBytes;
} }
/**
* @return true if the server has gotten out of sync from the client,
* and thus a log roll is required for this logger to successfully start
* logging more edits.
*/
public synchronized boolean isOutOfSync() {
return outOfSync;
}
@VisibleForTesting @VisibleForTesting
void waitForAllPendingCalls() throws InterruptedException { void waitForAllPendingCalls() throws InterruptedException {
@ -265,8 +281,22 @@ public class IPCLoggerChannel implements AsyncLogger {
ret = executor.submit(new Callable<Void>() { ret = executor.submit(new Callable<Void>() {
@Override @Override
public Void call() throws IOException { public Void call() throws IOException {
getProxy().journal(createReqInfo(), throwIfOutOfSync();
segmentTxId, firstTxnId, numTxns, data);
try {
getProxy().journal(createReqInfo(),
segmentTxId, firstTxnId, numTxns, data);
} catch (IOException e) {
QuorumJournalManager.LOG.warn(
"Remote journal " + IPCLoggerChannel.this + " failed to " +
"write txns " + firstTxnId + "-" + (firstTxnId + numTxns - 1) +
". Will try to write to this JN again after the next " +
"log roll.", e);
synchronized (IPCLoggerChannel.this) {
outOfSync = true;
}
throw e;
}
synchronized (IPCLoggerChannel.this) { synchronized (IPCLoggerChannel.this) {
highestAckedTxId = firstTxnId + numTxns - 1; highestAckedTxId = firstTxnId + numTxns - 1;
} }
@ -298,6 +328,15 @@ public class IPCLoggerChannel implements AsyncLogger {
return ret; return ret;
} }
private synchronized void throwIfOutOfSync() throws JournalOutOfSyncException {
if (outOfSync) {
// TODO: send a "heartbeat" here so that the remote node knows the newest
// committed txid, for metrics purposes
throw new JournalOutOfSyncException(
"Journal disabled until next roll");
}
}
private synchronized void reserveQueueSpace(int size) private synchronized void reserveQueueSpace(int size)
throws LoggerTooFarBehindException { throws LoggerTooFarBehindException {
Preconditions.checkArgument(size >= 0); Preconditions.checkArgument(size >= 0);
@ -330,6 +369,15 @@ public class IPCLoggerChannel implements AsyncLogger {
@Override @Override
public Void call() throws IOException { public Void call() throws IOException {
getProxy().startLogSegment(createReqInfo(), txid); getProxy().startLogSegment(createReqInfo(), txid);
synchronized (IPCLoggerChannel.this) {
if (outOfSync) {
outOfSync = false;
QuorumJournalManager.LOG.info(
"Restarting previously-stopped writes to " +
IPCLoggerChannel.this + " in segment starting at txid " +
txid);
}
}
return null; return null;
} }
}); });
@ -341,6 +389,8 @@ public class IPCLoggerChannel implements AsyncLogger {
return executor.submit(new Callable<Void>() { return executor.submit(new Callable<Void>() {
@Override @Override
public Void call() throws IOException { public Void call() throws IOException {
throwIfOutOfSync();
getProxy().finalizeLogSegment(createReqInfo(), startTxId, endTxId); getProxy().finalizeLogSegment(createReqInfo(), startTxId, endTxId);
return null; return null;
} }
@ -415,5 +465,8 @@ public class IPCLoggerChannel implements AsyncLogger {
if (behind > 0) { if (behind > 0) {
sb.append(" (" + behind + " behind)"); sb.append(" (" + behind + " behind)");
} }
if (outOfSync) {
sb.append(" (will re-join on next segment)");
}
} }
} }

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException; import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@ -273,14 +274,8 @@ class Journal implements Closeable {
int numTxns, byte[] records) throws IOException { int numTxns, byte[] records) throws IOException {
checkFormatted(); checkFormatted();
checkWriteRequest(reqInfo); checkWriteRequest(reqInfo);
// TODO: if a JN goes down and comes back up, then it will throw checkSync(curSegment != null,
// this exception on every edit. We should instead send back
// a response indicating the log needs to be rolled, which would
// mark the logger on the client side as "pending" -- and have the
// NN code look for this condition and trigger a roll when it happens.
// That way the node can catch back up and rejoin
Preconditions.checkState(curSegment != null,
"Can't write, no segment open"); "Can't write, no segment open");
if (curSegmentTxId != segmentTxId) { if (curSegmentTxId != segmentTxId) {
@ -297,7 +292,7 @@ class Journal implements Closeable {
+ " but current segment is " + curSegmentTxId); + " but current segment is " + curSegmentTxId);
} }
Preconditions.checkState(nextTxId == firstTxnId, checkSync(nextTxId == firstTxnId,
"Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId); "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);
long lastTxnId = firstTxnId + numTxns - 1; long lastTxnId = firstTxnId + numTxns - 1;
@ -378,6 +373,18 @@ class Journal implements Closeable {
} }
} }
/**
* @throws JournalOutOfSyncException if the given expression is not true.
* The message of the exception is formatted using the 'msg' and
* 'formatArgs' parameters.
*/
private void checkSync(boolean expression, String msg,
Object... formatArgs) throws JournalOutOfSyncException {
if (!expression) {
throw new JournalOutOfSyncException(String.format(msg, formatArgs));
}
}
/** /**
* Start a new segment at the given txid. The previous segment * Start a new segment at the given txid. The previous segment
* must have already been finalized. * must have already been finalized.
@ -453,7 +460,7 @@ class Journal implements Closeable {
FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId); FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId);
if (elf == null) { if (elf == null) {
throw new IllegalStateException("No log file to finalize at " + throw new JournalOutOfSyncException("No log file to finalize at " +
"transaction ID " + startTxId); "transaction ID " + startTxId);
} }
@ -463,8 +470,8 @@ class Journal implements Closeable {
LOG.info("Validating log about to be finalized: " + elf); LOG.info("Validating log about to be finalized: " + elf);
elf.validateLog(); elf.validateLog();
Preconditions.checkState(elf.getLastTxId() == endTxId, checkSync(elf.getLastTxId() == endTxId,
"Trying to finalize log %s-%s, but current state of log " + "Trying to finalize log %s-%s, but current state of log " +
"is %s", startTxId, endTxId, elf); "is %s", startTxId, endTxId, elf);
fjm.finalizeLogSegment(startTxId, endTxId); fjm.finalizeLogSegment(startTxId, endTxId);

View File

@ -128,5 +128,51 @@ public class TestIPCLoggerChannel {
} }
}, 10, 1000); }, 10, 1000);
} }
/**
* Test that, if the remote node gets unsynchronized (eg some edits were
* missed or the node rebooted), the client stops sending edits until
* the next roll. Test for HDFS-3726.
*/
@Test
public void testStopSendingEditsWhenOutOfSync() throws Exception {
Mockito.doThrow(new IOException("injected error"))
.when(mockProxy).journal(
Mockito.<RequestInfo>any(),
Mockito.eq(1L), Mockito.eq(1L),
Mockito.eq(1), Mockito.same(FAKE_DATA));
try {
ch.sendEdits(1L, 1L, 1, FAKE_DATA).get();
fail("Injected JOOSE did not cause sendEdits() to throw");
} catch (ExecutionException ee) {
GenericTestUtils.assertExceptionContains("injected", ee);
}
Mockito.verify(mockProxy).journal(
Mockito.<RequestInfo>any(),
Mockito.eq(1L), Mockito.eq(1L),
Mockito.eq(1), Mockito.same(FAKE_DATA));
assertTrue(ch.isOutOfSync());
try {
ch.sendEdits(1L, 2L, 1, FAKE_DATA).get();
fail("sendEdits() should throw until next roll");
} catch (ExecutionException ee) {
GenericTestUtils.assertExceptionContains("disabled until next roll",
ee.getCause());
}
// It should have failed without even sending an RPC, since it was not sync.
Mockito.verify(mockProxy, Mockito.never()).journal(
Mockito.<RequestInfo>any(),
Mockito.eq(1L), Mockito.eq(2L),
Mockito.eq(1), Mockito.same(FAKE_DATA));
// After a roll, sending new edits should not fail.
ch.startLogSegment(3L).get();
assertFalse(ch.isOutOfSync());
ch.sendEdits(3L, 3L, 1, FAKE_DATA).get();
}
} }

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil; import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo; import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProtoOrBuilder; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.qjournal.server.Journal; import org.apache.hadoop.hdfs.qjournal.server.Journal;
@ -219,9 +220,9 @@ public class TestJournal {
try { try {
journal.finalizeLogSegment(makeRI(3), 1, 6); journal.finalizeLogSegment(makeRI(3), 1, 6);
fail("did not fail to finalize"); fail("did not fail to finalize");
} catch (IllegalStateException ise) { } catch (JournalOutOfSyncException e) {
GenericTestUtils.assertExceptionContains( GenericTestUtils.assertExceptionContains(
"but current state of log is", ise); "but current state of log is", e);
} }
// Check that, even if we re-construct the journal by scanning the // Check that, even if we re-construct the journal by scanning the
@ -232,9 +233,9 @@ public class TestJournal {
try { try {
journal.finalizeLogSegment(makeRI(4), 1, 6); journal.finalizeLogSegment(makeRI(4), 1, 6);
fail("did not fail to finalize"); fail("did not fail to finalize");
} catch (IllegalStateException ise) { } catch (JournalOutOfSyncException e) {
GenericTestUtils.assertExceptionContains( GenericTestUtils.assertExceptionContains(
"but current state of log is", ise); "but current state of log is", e);
} }
} }
@ -248,9 +249,9 @@ public class TestJournal {
try { try {
journal.finalizeLogSegment(makeRI(1), 1000, 1001); journal.finalizeLogSegment(makeRI(1), 1000, 1001);
fail("did not fail to finalize"); fail("did not fail to finalize");
} catch (IllegalStateException ise) { } catch (JournalOutOfSyncException e) {
GenericTestUtils.assertExceptionContains( GenericTestUtils.assertExceptionContains(
"No log file to finalize at transaction ID 1000", ise); "No log file to finalize at transaction ID 1000", e);
} }
} }