diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt index 4544210a7a2..33d3f2cceae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt @@ -22,3 +22,5 @@ HDFS-3795. QJM: validate journal dir at startup (todd) HDFS-3798. Avoid throwing NPE when finalizeSegment() is called on invalid segment (todd) HDFS-3799. QJM: handle empty log segments during recovery (todd) + +HDFS-3797. QJM: add segment txid as a parameter to journal() RPC (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java index 2c9d5c05b88..62bac82de57 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java @@ -53,12 +53,14 @@ interface AsyncLogger { /** * Send a batch of edits to the logger. + * @param segmentTxId the first txid in the current segment * @param firstTxnId the first txid of the edits. * @param numTxns the number of transactions in the batch * @param data the actual data to be sent */ public ListenableFuture sendEdits( - final long firstTxnId, final int numTxns, final byte[] data); + final long segmentTxId, final long firstTxnId, + final int numTxns, final byte[] data); /** * Begin writing a new log segment. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java index d0ca5ab096e..76414f9bb7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java @@ -263,11 +263,11 @@ class AsyncLoggerSet { } public QuorumCall sendEdits( - long firstTxnId, int numTxns, byte[] data) { + long segmentTxId, long firstTxnId, int numTxns, byte[] data) { Map> calls = Maps.newHashMap(); for (AsyncLogger logger : loggers) { ListenableFuture future = - logger.sendEdits(firstTxnId, numTxns, data); + logger.sendEdits(segmentTxId, firstTxnId, numTxns, data); calls.put(logger, future); } return QuorumCall.create(calls); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java index 89d2858c1ce..56f494f186a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java @@ -235,7 +235,8 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public ListenableFuture sendEdits( - final long firstTxnId, final int numTxns, final byte[] data) { + final long segmentTxId, final long firstTxnId, + final int numTxns, final byte[] data) { try { reserveQueueSpace(data.length); } catch (LoggerTooFarBehindException e) { @@ -246,7 +247,8 @@ public class IPCLoggerChannel implements AsyncLogger { ret = executor.submit(new Callable() { @Override public Void call() throws IOException { - getProxy().journal(createReqInfo(), firstTxnId, numTxns, data); + getProxy().journal(createReqInfo(), + segmentTxId, firstTxnId, numTxns, data); return null; } }); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index c5ae384d94c..a55a5580141 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -353,7 +353,7 @@ public class QuorumJournalManager implements JournalManager { "must recover segments before starting a new one"); QuorumCall q = loggers.startLogSegment(txId); loggers.waitForWriteQuorum(q, startSegmentTimeoutMs); - return new QuorumOutputStream(loggers); + return new QuorumOutputStream(loggers, txId); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java index bb18f34572a..2725e72587c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java @@ -31,11 +31,14 @@ import org.apache.hadoop.io.DataOutputBuffer; class QuorumOutputStream extends EditLogOutputStream { private final AsyncLoggerSet loggers; private EditsDoubleBuffer buf; + private final long segmentTxId; - public QuorumOutputStream(AsyncLoggerSet loggers) throws IOException { + public QuorumOutputStream(AsyncLoggerSet loggers, + long txId) throws IOException { super(); this.buf = new EditsDoubleBuffer(256*1024); // TODO: conf this.loggers = loggers; + this.segmentTxId = txId; } @Override @@ -96,7 +99,8 @@ class QuorumOutputStream extends EditLogOutputStream { assert data.length == bufToSend.getLength(); QuorumCall qcall = loggers.sendEdits( - firstTxToFlush, numReadyTxns, data); + segmentTxId, firstTxToFlush, + numReadyTxns, data); loggers.waitForWriteQuorum(qcall, 20000); // TODO: configurable timeout } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java index ccea160e1e4..7e0b4b9f2b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java @@ -72,6 +72,7 @@ public interface QJournalProtocol { * to write edits to their local logs. */ public void journal(RequestInfo reqInfo, + long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java index 854cfd906bb..c34f6482c2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java @@ -109,8 +109,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP JournalRequestProto req) throws ServiceException { try { impl.journal(convert(req.getReqInfo()), - req.getFirstTxnId(), req.getNumTxns(), req.getRecords() - .toByteArray()); + req.getSegmentTxnId(), req.getFirstTxnId(), + req.getNumTxns(), req.getRecords().toByteArray()); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java index eab835eacf2..0ac70702c9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java @@ -124,10 +124,12 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface, } @Override - public void journal(RequestInfo reqInfo, long firstTxnId, int numTxns, + public void journal(RequestInfo reqInfo, + long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException { JournalRequestProto req = JournalRequestProto.newBuilder() .setReqInfo(convert(reqInfo)) + .setSegmentTxnId(segmentTxId) .setFirstTxnId(firstTxnId) .setNumTxns(numTxns) .setRecords(PBHelper.getByteString(records)) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index 2014267d812..fda8d415c93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -196,9 +196,10 @@ class Journal implements Closeable { /** * Write a batch of edits to the journal. - * {@see QJournalProtocol#journal(RequestInfo, long, int, byte[])} + * {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])} */ - synchronized void journal(RequestInfo reqInfo, long firstTxnId, + synchronized void journal(RequestInfo reqInfo, + long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException { checkRequest(reqInfo); checkFormatted(); @@ -211,6 +212,21 @@ class Journal implements Closeable { // That way the node can catch back up and rejoin Preconditions.checkState(curSegment != null, "Can't write, no segment open"); + + if (curSegmentTxId != segmentTxId) { + // Sanity check: it is possible that the writer will fail IPCs + // on both the finalize() and then the start() of the next segment. + // This could cause us to continue writing to an old segment + // instead of rolling to a new one, which breaks one of the + // invariants in the design. If it happens, abort the segment + // and throw an exception. + curSegment.abort(); + curSegment = null; + throw new IllegalStateException( + "Writer out of sync: it thinks it is writing segment " + segmentTxId + + " but current segment is " + curSegmentTxId); + } + Preconditions.checkState(nextTxId == firstTxnId, "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java index f03ceadb7aa..36af5b1a78d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java @@ -115,10 +115,11 @@ class JournalNodeRpcServer implements QJournalProtocol { } @Override - public void journal(RequestInfo reqInfo, long firstTxnId, + public void journal(RequestInfo reqInfo, + long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException { jn.getOrCreateJournal(reqInfo.getJournalId()) - .journal(reqInfo, firstTxnId, numTxns, records); + .journal(reqInfo, segmentTxId, firstTxnId, numTxns, records); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto index c6a28e82f39..14429633e8e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto @@ -58,6 +58,7 @@ message JournalRequestProto { required uint64 firstTxnId = 2; required uint32 numTxns = 3; required bytes records = 4; + required uint64 segmentTxnId = 5; } message JournalResponseProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java index 54e081e9316..6fb1d897b53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.Arrays; @@ -37,6 +38,9 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; + +import com.google.common.collect.Lists; public abstract class QJMTestUtil { public static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo( @@ -132,6 +136,23 @@ public abstract class QJMTestUtil { assertTrue("File " + fname + " should exist in a quorum of dirs", count >= cluster.getQuorumSize()); } - + public static long recoverAndReturnLastTxn(QuorumJournalManager qjm) + throws IOException { + qjm.recoverUnfinalizedSegments(); + long lastRecoveredTxn = 0; + + List streams = Lists.newArrayList(); + try { + qjm.selectInputStreams(streams, 0, false); + + for (EditLogInputStream elis : streams) { + assertTrue(elis.getFirstTxId() > lastRecoveredTxn); + lastRecoveredTxn = elis.getLastTxId(); + } + } finally { + IOUtils.cleanup(null, streams.toArray(new Closeable[0])); + } + return lastRecoveredTxn; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java index ba893a1b4cb..54e469adc31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java @@ -78,9 +78,10 @@ public class TestIPCLoggerChannel { @Test public void testSimpleCall() throws Exception { - ch.sendEdits(1, 3, FAKE_DATA).get(); + ch.sendEdits(1, 1, 3, FAKE_DATA).get(); Mockito.verify(mockProxy).journal(Mockito.any(), - Mockito.eq(1L), Mockito.eq(3), Mockito.same(FAKE_DATA)); + Mockito.eq(1L), Mockito.eq(1L), + Mockito.eq(3), Mockito.same(FAKE_DATA)); } @@ -95,12 +96,13 @@ public class TestIPCLoggerChannel { DelayAnswer delayer = new DelayAnswer(LOG); Mockito.doAnswer(delayer).when(mockProxy).journal( Mockito.any(), - Mockito.eq(1L), Mockito.eq(1), Mockito.same(FAKE_DATA)); + Mockito.eq(1L), Mockito.eq(1L), + Mockito.eq(1), Mockito.same(FAKE_DATA)); // Queue up the maximum number of calls. int numToQueue = LIMIT_QUEUE_SIZE_BYTES / FAKE_DATA.length; for (int i = 1; i <= numToQueue; i++) { - ch.sendEdits((long)i, 1, FAKE_DATA); + ch.sendEdits(1L, (long)i, 1, FAKE_DATA); } // The accounting should show the correct total number queued. @@ -108,7 +110,7 @@ public class TestIPCLoggerChannel { // Trying to queue any more should fail. try { - ch.sendEdits(numToQueue + 1, 1, FAKE_DATA).get(1, TimeUnit.SECONDS); + ch.sendEdits(1L, numToQueue + 1, 1, FAKE_DATA).get(1, TimeUnit.SECONDS); fail("Did not fail to queue more calls after queue was full"); } catch (ExecutionException ee) { if (!(ee.getCause() instanceof LoggerTooFarBehindException)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index afe851388a1..e9f81d28ec3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO; import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment; import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeTxns; import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits; +import static org.apache.hadoop.hdfs.qjournal.client.TestQuorumJournalManagerUnit.futureThrows; import java.io.Closeable; import java.io.File; @@ -414,10 +415,59 @@ public class TestQuorumJournalManager { private void failLoggerAtTxn(AsyncLogger spy, long txid) { TestQuorumJournalManagerUnit.futureThrows(new IOException("mock failure")) - .when(spy).sendEdits( + .when(spy).sendEdits(Mockito.anyLong(), Mockito.eq(txid), Mockito.eq(1), Mockito.any()); } + /** + * Test the case where one of the loggers misses a finalizeLogSegment() + * call, and then misses the next startLogSegment() call before coming + * back to life. + * + * Previously, this caused it to keep on writing to the old log segment, + * such that one logger had eg edits_1-10 while the others had edits_1-5 and + * edits_6-10. This caused recovery to fail in certain cases. + */ + @Test + public void testMissFinalizeAndNextStart() throws Exception { + + // Logger 0: miss finalize(1-3) and start(4) + futureThrows(new IOException("injected")).when(spies.get(0)) + .finalizeLogSegment(Mockito.eq(1L), Mockito.eq(3L)); + futureThrows(new IOException("injected")).when(spies.get(0)) + .startLogSegment(Mockito.eq(4L)); + + // Logger 1: fail at txn id 4 + failLoggerAtTxn(spies.get(1), 4L); + + writeSegment(cluster, qjm, 1, 3, true); + EditLogOutputStream stm = qjm.startLogSegment(4); + try { + writeTxns(stm, 4, 1); + fail("Did not fail to write"); + } catch (QuorumException qe) { + // Should fail, because logger 1 had an injected fault and + // logger 0 should detect writer out of sync + GenericTestUtils.assertExceptionContains("Writer out of sync", + qe); + } finally { + stm.abort(); + qjm.close(); + } + + // State: + // Logger 0: 1-3 in-progress (since it missed finalize) + // Logger 1: 1-3 finalized + // Logger 2: 1-3 finalized, 4 in-progress with one txn + + // Shut down logger 2 so it doesn't participate in recovery + cluster.getJournalNode(2).stopAndJoin(0); + + qjm = createSpyingQJM(); + long recovered = QJMTestUtil.recoverAndReturnLastTxn(qjm); + assertEquals(3L, recovered); + } + /** * edit lengths [3,4,5] * first recovery: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java index 13e4d645d47..e0dfbf6c355 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.qjournal.client; import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import java.io.IOException; @@ -150,21 +151,21 @@ public class TestQuorumJournalManagerUnit { // The flush should log txn 1-2 futureReturns(null).when(spyLoggers.get(0)).sendEdits( - eq(1L), eq(2), Mockito.any()); + anyLong(), eq(1L), eq(2), Mockito.any()); futureReturns(null).when(spyLoggers.get(1)).sendEdits( - eq(1L), eq(2), Mockito.any()); + anyLong(), eq(1L), eq(2), Mockito.any()); futureReturns(null).when(spyLoggers.get(2)).sendEdits( - eq(1L), eq(2), Mockito.any()); + anyLong(), eq(1L), eq(2), Mockito.any()); stm.flush(); // Another flush should now log txn #3 stm.setReadyToFlush(); futureReturns(null).when(spyLoggers.get(0)).sendEdits( - eq(3L), eq(1), Mockito.any()); + anyLong(), eq(3L), eq(1), Mockito.any()); futureReturns(null).when(spyLoggers.get(1)).sendEdits( - eq(3L), eq(1), Mockito.any()); + anyLong(), eq(3L), eq(1), Mockito.any()); futureReturns(null).when(spyLoggers.get(2)).sendEdits( - eq(3L), eq(1), Mockito.any()); + anyLong(), eq(3L), eq(1), Mockito.any()); stm.flush(); } @@ -176,14 +177,14 @@ public class TestQuorumJournalManagerUnit { // Make the first two logs respond immediately futureReturns(null).when(spyLoggers.get(0)).sendEdits( - eq(1L), eq(1), Mockito.any()); + anyLong(), eq(1L), eq(1), Mockito.any()); futureReturns(null).when(spyLoggers.get(1)).sendEdits( - eq(1L), eq(1), Mockito.any()); + anyLong(), eq(1L), eq(1), Mockito.any()); // And the third log not respond SettableFuture slowLog = SettableFuture.create(); Mockito.doReturn(slowLog).when(spyLoggers.get(2)).sendEdits( - eq(1L), eq(1), Mockito.any()); + anyLong(), eq(1L), eq(1), Mockito.any()); stm.flush(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java index 1d52f987f9d..dac70ce11c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java @@ -97,7 +97,7 @@ public class TestJournal { } try { journal.journal(new RequestInfo(JID, 1L, 1L), - 100L, 0, new byte[0]); + 12345L, 100L, 0, new byte[0]); fail("Should have rejected call from prior epoch"); } catch (IOException ioe) { GenericTestUtils.assertExceptionContains( @@ -109,7 +109,7 @@ public class TestJournal { public void testRestartJournal() throws Exception { journal.newEpoch(FAKE_NSINFO, 1); journal.startLogSegment(new RequestInfo("j", 1, 1), 1); - journal.journal(new RequestInfo("j", 1, 2), 1, 2, + journal.journal(new RequestInfo("j", 1, 2), 1, 1, 2, QJMTestUtil.createTxnData(1, 2)); // Don't finalize. @@ -163,7 +163,7 @@ public class TestJournal { public void testFinalizeWhenEditsAreMissed() throws Exception { journal.newEpoch(FAKE_NSINFO, 1); journal.startLogSegment(makeRI(1), 1); - journal.journal(makeRI(2), 1, 3, + journal.journal(makeRI(2), 1, 1, 3, QJMTestUtil.createTxnData(1, 3)); // Try to finalize up to txn 6, even though we only wrote up to txn 3. @@ -220,7 +220,7 @@ public class TestJournal { // Start a segment at txid 1, and write a batch of 3 txns. journal.startLogSegment(makeRI(1), 1); - journal.journal(makeRI(2), 1, 3, + journal.journal(makeRI(2), 1, 1, 3, QJMTestUtil.createTxnData(1, 3)); GenericTestUtils.assertExists( @@ -229,7 +229,7 @@ public class TestJournal { // Try to start new segment at txid 6, this should abort old segment and // then succeed, allowing us to write txid 6-9. journal.startLogSegment(makeRI(3), 6); - journal.journal(makeRI(4), 6, 3, + journal.journal(makeRI(4), 6, 6, 3, QJMTestUtil.createTxnData(6, 3)); // The old segment should *not* be finalized. @@ -250,19 +250,19 @@ public class TestJournal { // Start a segment at txid 1, and write just 1 transaction. This // would normally be the START_LOG_SEGMENT transaction. journal.startLogSegment(makeRI(1), 1); - journal.journal(makeRI(2), 1, 1, + journal.journal(makeRI(2), 1, 1, 1, QJMTestUtil.createTxnData(1, 1)); // Try to start new segment at txid 1, this should succeed, because // we are allowed to re-start a segment if we only ever had the // START_LOG_SEGMENT transaction logged. journal.startLogSegment(makeRI(3), 1); - journal.journal(makeRI(4), 1, 1, + journal.journal(makeRI(4), 1, 1, 1, QJMTestUtil.createTxnData(1, 1)); // This time through, write more transactions afterwards, simulating // real user transactions. - journal.journal(makeRI(5), 2, 3, + journal.journal(makeRI(5), 1, 2, 3, QJMTestUtil.createTxnData(2, 3)); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java index 5a07f02d7fe..a1a1556efd9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java @@ -91,7 +91,7 @@ public class TestJournalNode { ch.newEpoch(1).get(); ch.setEpoch(1); ch.startLogSegment(1).get(); - ch.sendEdits(1, 1, "hello".getBytes(Charsets.UTF_8)).get(); + ch.sendEdits(1L, 1, 1, "hello".getBytes(Charsets.UTF_8)).get(); } @@ -100,7 +100,7 @@ public class TestJournalNode { ch.newEpoch(1).get(); ch.setEpoch(1); ch.startLogSegment(1).get(); - ch.sendEdits(1, 2, QJMTestUtil.createTxnData(1, 2)).get(); + ch.sendEdits(1L, 1, 2, QJMTestUtil.createTxnData(1, 2)).get(); // Switch to a new epoch without closing earlier segment NewEpochResponseProto response = ch.newEpoch(2).get(); @@ -148,7 +148,7 @@ public class TestJournalNode { ch.newEpoch(1).get(); ch.setEpoch(1); ch.startLogSegment(1).get(); - ch.sendEdits(1, 3, EDITS_DATA).get(); + ch.sendEdits(1L, 1, 3, EDITS_DATA).get(); ch.finalizeLogSegment(1, 3).get(); // Attempt to retrieve via HTTP, ensure we get the data back @@ -199,7 +199,7 @@ public class TestJournalNode { // Make a log segment, and prepare again -- this time should see the // segment existing. ch.startLogSegment(1L).get(); - ch.sendEdits(1L, 1, QJMTestUtil.createTxnData(1, 1)).get(); + ch.sendEdits(1L, 1L, 1, QJMTestUtil.createTxnData(1, 1)).get(); prep = ch.prepareRecovery(1L).get(); System.err.println("Prep: " + prep);