HDFS-3797. QJM: add segment txid as a parameter to journal() RPC. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1373571 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-08-15 18:58:51 +00:00
parent c95a1674b6
commit 42cdc1b083
18 changed files with 148 additions and 43 deletions

View File

@ -22,3 +22,5 @@ HDFS-3795. QJM: validate journal dir at startup (todd)
HDFS-3798. Avoid throwing NPE when finalizeSegment() is called on invalid segment (todd)
HDFS-3799. QJM: handle empty log segments during recovery (todd)
HDFS-3797. QJM: add segment txid as a parameter to journal() RPC (todd)

View File

@ -53,12 +53,14 @@ interface AsyncLogger {
/**
* Send a batch of edits to the logger.
* @param segmentTxId the first txid in the current segment
* @param firstTxnId the first txid of the edits.
* @param numTxns the number of transactions in the batch
* @param data the actual data to be sent
*/
public ListenableFuture<Void> sendEdits(
final long firstTxnId, final int numTxns, final byte[] data);
final long segmentTxId, final long firstTxnId,
final int numTxns, final byte[] data);
/**
* Begin writing a new log segment.

View File

@ -263,11 +263,11 @@ class AsyncLoggerSet {
}
public QuorumCall<AsyncLogger, Void> sendEdits(
long firstTxnId, int numTxns, byte[] data) {
long segmentTxId, long firstTxnId, int numTxns, byte[] data) {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Void> future =
logger.sendEdits(firstTxnId, numTxns, data);
logger.sendEdits(segmentTxId, firstTxnId, numTxns, data);
calls.put(logger, future);
}
return QuorumCall.create(calls);

View File

@ -235,7 +235,8 @@ public class IPCLoggerChannel implements AsyncLogger {
@Override
public ListenableFuture<Void> sendEdits(
final long firstTxnId, final int numTxns, final byte[] data) {
final long segmentTxId, final long firstTxnId,
final int numTxns, final byte[] data) {
try {
reserveQueueSpace(data.length);
} catch (LoggerTooFarBehindException e) {
@ -246,7 +247,8 @@ public class IPCLoggerChannel implements AsyncLogger {
ret = executor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
getProxy().journal(createReqInfo(), firstTxnId, numTxns, data);
getProxy().journal(createReqInfo(),
segmentTxId, firstTxnId, numTxns, data);
return null;
}
});

View File

@ -353,7 +353,7 @@ public class QuorumJournalManager implements JournalManager {
"must recover segments before starting a new one");
QuorumCall<AsyncLogger,Void> q = loggers.startLogSegment(txId);
loggers.waitForWriteQuorum(q, startSegmentTimeoutMs);
return new QuorumOutputStream(loggers);
return new QuorumOutputStream(loggers, txId);
}
@Override

View File

@ -31,11 +31,14 @@ import org.apache.hadoop.io.DataOutputBuffer;
class QuorumOutputStream extends EditLogOutputStream {
private final AsyncLoggerSet loggers;
private EditsDoubleBuffer buf;
private final long segmentTxId;
public QuorumOutputStream(AsyncLoggerSet loggers) throws IOException {
public QuorumOutputStream(AsyncLoggerSet loggers,
long txId) throws IOException {
super();
this.buf = new EditsDoubleBuffer(256*1024); // TODO: conf
this.loggers = loggers;
this.segmentTxId = txId;
}
@Override
@ -96,7 +99,8 @@ class QuorumOutputStream extends EditLogOutputStream {
assert data.length == bufToSend.getLength();
QuorumCall<AsyncLogger, Void> qcall = loggers.sendEdits(
firstTxToFlush, numReadyTxns, data);
segmentTxId, firstTxToFlush,
numReadyTxns, data);
loggers.waitForWriteQuorum(qcall, 20000); // TODO: configurable timeout
}
}

View File

@ -72,6 +72,7 @@ public interface QJournalProtocol {
* to write edits to their local logs.
*/
public void journal(RequestInfo reqInfo,
long segmentTxId,
long firstTxnId,
int numTxns,
byte[] records) throws IOException;

View File

@ -109,8 +109,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
JournalRequestProto req) throws ServiceException {
try {
impl.journal(convert(req.getReqInfo()),
req.getFirstTxnId(), req.getNumTxns(), req.getRecords()
.toByteArray());
req.getSegmentTxnId(), req.getFirstTxnId(),
req.getNumTxns(), req.getRecords().toByteArray());
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -124,10 +124,12 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
}
@Override
public void journal(RequestInfo reqInfo, long firstTxnId, int numTxns,
public void journal(RequestInfo reqInfo,
long segmentTxId, long firstTxnId, int numTxns,
byte[] records) throws IOException {
JournalRequestProto req = JournalRequestProto.newBuilder()
.setReqInfo(convert(reqInfo))
.setSegmentTxnId(segmentTxId)
.setFirstTxnId(firstTxnId)
.setNumTxns(numTxns)
.setRecords(PBHelper.getByteString(records))

View File

@ -196,9 +196,10 @@ class Journal implements Closeable {
/**
* Write a batch of edits to the journal.
* {@see QJournalProtocol#journal(RequestInfo, long, int, byte[])}
* {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])}
*/
synchronized void journal(RequestInfo reqInfo, long firstTxnId,
synchronized void journal(RequestInfo reqInfo,
long segmentTxId, long firstTxnId,
int numTxns, byte[] records) throws IOException {
checkRequest(reqInfo);
checkFormatted();
@ -211,6 +212,21 @@ class Journal implements Closeable {
// That way the node can catch back up and rejoin
Preconditions.checkState(curSegment != null,
"Can't write, no segment open");
if (curSegmentTxId != segmentTxId) {
// Sanity check: it is possible that the writer will fail IPCs
// on both the finalize() and then the start() of the next segment.
// This could cause us to continue writing to an old segment
// instead of rolling to a new one, which breaks one of the
// invariants in the design. If it happens, abort the segment
// and throw an exception.
curSegment.abort();
curSegment = null;
throw new IllegalStateException(
"Writer out of sync: it thinks it is writing segment " + segmentTxId
+ " but current segment is " + curSegmentTxId);
}
Preconditions.checkState(nextTxId == firstTxnId,
"Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);

View File

@ -115,10 +115,11 @@ class JournalNodeRpcServer implements QJournalProtocol {
}
@Override
public void journal(RequestInfo reqInfo, long firstTxnId,
public void journal(RequestInfo reqInfo,
long segmentTxId, long firstTxnId,
int numTxns, byte[] records) throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId())
.journal(reqInfo, firstTxnId, numTxns, records);
.journal(reqInfo, segmentTxId, firstTxnId, numTxns, records);
}
@Override

View File

@ -58,6 +58,7 @@ message JournalRequestProto {
required uint64 firstTxnId = 2;
required uint32 numTxns = 3;
required bytes records = 4;
required uint64 segmentTxnId = 5;
}
message JournalResponseProto {

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
@ -37,6 +38,9 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import com.google.common.collect.Lists;
public abstract class QJMTestUtil {
public static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
@ -132,6 +136,23 @@ public abstract class QJMTestUtil {
assertTrue("File " + fname + " should exist in a quorum of dirs",
count >= cluster.getQuorumSize());
}
public static long recoverAndReturnLastTxn(QuorumJournalManager qjm)
throws IOException {
qjm.recoverUnfinalizedSegments();
long lastRecoveredTxn = 0;
List<EditLogInputStream> streams = Lists.newArrayList();
try {
qjm.selectInputStreams(streams, 0, false);
for (EditLogInputStream elis : streams) {
assertTrue(elis.getFirstTxId() > lastRecoveredTxn);
lastRecoveredTxn = elis.getLastTxId();
}
} finally {
IOUtils.cleanup(null, streams.toArray(new Closeable[0]));
}
return lastRecoveredTxn;
}
}

View File

@ -78,9 +78,10 @@ public class TestIPCLoggerChannel {
@Test
public void testSimpleCall() throws Exception {
ch.sendEdits(1, 3, FAKE_DATA).get();
ch.sendEdits(1, 1, 3, FAKE_DATA).get();
Mockito.verify(mockProxy).journal(Mockito.<RequestInfo>any(),
Mockito.eq(1L), Mockito.eq(3), Mockito.same(FAKE_DATA));
Mockito.eq(1L), Mockito.eq(1L),
Mockito.eq(3), Mockito.same(FAKE_DATA));
}
@ -95,12 +96,13 @@ public class TestIPCLoggerChannel {
DelayAnswer delayer = new DelayAnswer(LOG);
Mockito.doAnswer(delayer).when(mockProxy).journal(
Mockito.<RequestInfo>any(),
Mockito.eq(1L), Mockito.eq(1), Mockito.same(FAKE_DATA));
Mockito.eq(1L), Mockito.eq(1L),
Mockito.eq(1), Mockito.same(FAKE_DATA));
// Queue up the maximum number of calls.
int numToQueue = LIMIT_QUEUE_SIZE_BYTES / FAKE_DATA.length;
for (int i = 1; i <= numToQueue; i++) {
ch.sendEdits((long)i, 1, FAKE_DATA);
ch.sendEdits(1L, (long)i, 1, FAKE_DATA);
}
// The accounting should show the correct total number queued.
@ -108,7 +110,7 @@ public class TestIPCLoggerChannel {
// Trying to queue any more should fail.
try {
ch.sendEdits(numToQueue + 1, 1, FAKE_DATA).get(1, TimeUnit.SECONDS);
ch.sendEdits(1L, numToQueue + 1, 1, FAKE_DATA).get(1, TimeUnit.SECONDS);
fail("Did not fail to queue more calls after queue was full");
} catch (ExecutionException ee) {
if (!(ee.getCause() instanceof LoggerTooFarBehindException)) {

View File

@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeTxns;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
import static org.apache.hadoop.hdfs.qjournal.client.TestQuorumJournalManagerUnit.futureThrows;
import java.io.Closeable;
import java.io.File;
@ -414,10 +415,59 @@ public class TestQuorumJournalManager {
private void failLoggerAtTxn(AsyncLogger spy, long txid) {
TestQuorumJournalManagerUnit.futureThrows(new IOException("mock failure"))
.when(spy).sendEdits(
.when(spy).sendEdits(Mockito.anyLong(),
Mockito.eq(txid), Mockito.eq(1), Mockito.<byte[]>any());
}
/**
* Test the case where one of the loggers misses a finalizeLogSegment()
* call, and then misses the next startLogSegment() call before coming
* back to life.
*
* Previously, this caused it to keep on writing to the old log segment,
* such that one logger had eg edits_1-10 while the others had edits_1-5 and
* edits_6-10. This caused recovery to fail in certain cases.
*/
@Test
public void testMissFinalizeAndNextStart() throws Exception {
// Logger 0: miss finalize(1-3) and start(4)
futureThrows(new IOException("injected")).when(spies.get(0))
.finalizeLogSegment(Mockito.eq(1L), Mockito.eq(3L));
futureThrows(new IOException("injected")).when(spies.get(0))
.startLogSegment(Mockito.eq(4L));
// Logger 1: fail at txn id 4
failLoggerAtTxn(spies.get(1), 4L);
writeSegment(cluster, qjm, 1, 3, true);
EditLogOutputStream stm = qjm.startLogSegment(4);
try {
writeTxns(stm, 4, 1);
fail("Did not fail to write");
} catch (QuorumException qe) {
// Should fail, because logger 1 had an injected fault and
// logger 0 should detect writer out of sync
GenericTestUtils.assertExceptionContains("Writer out of sync",
qe);
} finally {
stm.abort();
qjm.close();
}
// State:
// Logger 0: 1-3 in-progress (since it missed finalize)
// Logger 1: 1-3 finalized
// Logger 2: 1-3 finalized, 4 in-progress with one txn
// Shut down logger 2 so it doesn't participate in recovery
cluster.getJournalNode(2).stopAndJoin(0);
qjm = createSpyingQJM();
long recovered = QJMTestUtil.recoverAndReturnLastTxn(qjm);
assertEquals(3L, recovered);
}
/**
* edit lengths [3,4,5]
* first recovery:

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.qjournal.client;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import java.io.IOException;
@ -150,21 +151,21 @@ public class TestQuorumJournalManagerUnit {
// The flush should log txn 1-2
futureReturns(null).when(spyLoggers.get(0)).sendEdits(
eq(1L), eq(2), Mockito.<byte[]>any());
anyLong(), eq(1L), eq(2), Mockito.<byte[]>any());
futureReturns(null).when(spyLoggers.get(1)).sendEdits(
eq(1L), eq(2), Mockito.<byte[]>any());
anyLong(), eq(1L), eq(2), Mockito.<byte[]>any());
futureReturns(null).when(spyLoggers.get(2)).sendEdits(
eq(1L), eq(2), Mockito.<byte[]>any());
anyLong(), eq(1L), eq(2), Mockito.<byte[]>any());
stm.flush();
// Another flush should now log txn #3
stm.setReadyToFlush();
futureReturns(null).when(spyLoggers.get(0)).sendEdits(
eq(3L), eq(1), Mockito.<byte[]>any());
anyLong(), eq(3L), eq(1), Mockito.<byte[]>any());
futureReturns(null).when(spyLoggers.get(1)).sendEdits(
eq(3L), eq(1), Mockito.<byte[]>any());
anyLong(), eq(3L), eq(1), Mockito.<byte[]>any());
futureReturns(null).when(spyLoggers.get(2)).sendEdits(
eq(3L), eq(1), Mockito.<byte[]>any());
anyLong(), eq(3L), eq(1), Mockito.<byte[]>any());
stm.flush();
}
@ -176,14 +177,14 @@ public class TestQuorumJournalManagerUnit {
// Make the first two logs respond immediately
futureReturns(null).when(spyLoggers.get(0)).sendEdits(
eq(1L), eq(1), Mockito.<byte[]>any());
anyLong(), eq(1L), eq(1), Mockito.<byte[]>any());
futureReturns(null).when(spyLoggers.get(1)).sendEdits(
eq(1L), eq(1), Mockito.<byte[]>any());
anyLong(), eq(1L), eq(1), Mockito.<byte[]>any());
// And the third log not respond
SettableFuture<Void> slowLog = SettableFuture.<Void>create();
Mockito.doReturn(slowLog).when(spyLoggers.get(2)).sendEdits(
eq(1L), eq(1), Mockito.<byte[]>any());
anyLong(), eq(1L), eq(1), Mockito.<byte[]>any());
stm.flush();
}

View File

@ -97,7 +97,7 @@ public class TestJournal {
}
try {
journal.journal(new RequestInfo(JID, 1L, 1L),
100L, 0, new byte[0]);
12345L, 100L, 0, new byte[0]);
fail("Should have rejected call from prior epoch");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
@ -109,7 +109,7 @@ public class TestJournal {
public void testRestartJournal() throws Exception {
journal.newEpoch(FAKE_NSINFO, 1);
journal.startLogSegment(new RequestInfo("j", 1, 1), 1);
journal.journal(new RequestInfo("j", 1, 2), 1, 2,
journal.journal(new RequestInfo("j", 1, 2), 1, 1, 2,
QJMTestUtil.createTxnData(1, 2));
// Don't finalize.
@ -163,7 +163,7 @@ public class TestJournal {
public void testFinalizeWhenEditsAreMissed() throws Exception {
journal.newEpoch(FAKE_NSINFO, 1);
journal.startLogSegment(makeRI(1), 1);
journal.journal(makeRI(2), 1, 3,
journal.journal(makeRI(2), 1, 1, 3,
QJMTestUtil.createTxnData(1, 3));
// Try to finalize up to txn 6, even though we only wrote up to txn 3.
@ -220,7 +220,7 @@ public class TestJournal {
// Start a segment at txid 1, and write a batch of 3 txns.
journal.startLogSegment(makeRI(1), 1);
journal.journal(makeRI(2), 1, 3,
journal.journal(makeRI(2), 1, 1, 3,
QJMTestUtil.createTxnData(1, 3));
GenericTestUtils.assertExists(
@ -229,7 +229,7 @@ public class TestJournal {
// Try to start new segment at txid 6, this should abort old segment and
// then succeed, allowing us to write txid 6-9.
journal.startLogSegment(makeRI(3), 6);
journal.journal(makeRI(4), 6, 3,
journal.journal(makeRI(4), 6, 6, 3,
QJMTestUtil.createTxnData(6, 3));
// The old segment should *not* be finalized.
@ -250,19 +250,19 @@ public class TestJournal {
// Start a segment at txid 1, and write just 1 transaction. This
// would normally be the START_LOG_SEGMENT transaction.
journal.startLogSegment(makeRI(1), 1);
journal.journal(makeRI(2), 1, 1,
journal.journal(makeRI(2), 1, 1, 1,
QJMTestUtil.createTxnData(1, 1));
// Try to start new segment at txid 1, this should succeed, because
// we are allowed to re-start a segment if we only ever had the
// START_LOG_SEGMENT transaction logged.
journal.startLogSegment(makeRI(3), 1);
journal.journal(makeRI(4), 1, 1,
journal.journal(makeRI(4), 1, 1, 1,
QJMTestUtil.createTxnData(1, 1));
// This time through, write more transactions afterwards, simulating
// real user transactions.
journal.journal(makeRI(5), 2, 3,
journal.journal(makeRI(5), 1, 2, 3,
QJMTestUtil.createTxnData(2, 3));
try {

View File

@ -91,7 +91,7 @@ public class TestJournalNode {
ch.newEpoch(1).get();
ch.setEpoch(1);
ch.startLogSegment(1).get();
ch.sendEdits(1, 1, "hello".getBytes(Charsets.UTF_8)).get();
ch.sendEdits(1L, 1, 1, "hello".getBytes(Charsets.UTF_8)).get();
}
@ -100,7 +100,7 @@ public class TestJournalNode {
ch.newEpoch(1).get();
ch.setEpoch(1);
ch.startLogSegment(1).get();
ch.sendEdits(1, 2, QJMTestUtil.createTxnData(1, 2)).get();
ch.sendEdits(1L, 1, 2, QJMTestUtil.createTxnData(1, 2)).get();
// Switch to a new epoch without closing earlier segment
NewEpochResponseProto response = ch.newEpoch(2).get();
@ -148,7 +148,7 @@ public class TestJournalNode {
ch.newEpoch(1).get();
ch.setEpoch(1);
ch.startLogSegment(1).get();
ch.sendEdits(1, 3, EDITS_DATA).get();
ch.sendEdits(1L, 1, 3, EDITS_DATA).get();
ch.finalizeLogSegment(1, 3).get();
// Attempt to retrieve via HTTP, ensure we get the data back
@ -199,7 +199,7 @@ public class TestJournalNode {
// Make a log segment, and prepare again -- this time should see the
// segment existing.
ch.startLogSegment(1L).get();
ch.sendEdits(1L, 1, QJMTestUtil.createTxnData(1, 1)).get();
ch.sendEdits(1L, 1L, 1, QJMTestUtil.createTxnData(1, 1)).get();
prep = ch.prepareRecovery(1L).get();
System.err.println("Prep: " + prep);