diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index 9441e52958d..74bdec7d88a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -101,3 +101,5 @@ HDFS-2773. Reading edit logs from an earlier version should not leave blocks in HDFS-2775. Fix TestStandbyCheckpoints.testBothNodesInStandbyState failing intermittently. (todd) HDFS-2766. Test for case where standby partially reads log and then performs checkpoint. (atm) + +HDFS-2738. FSEditLog.selectinputStreams is reading through in-progress streams even when non-in-progress are requested. (atm) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java index 7fa90269ecd..047efd51f4b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java @@ -312,8 +312,10 @@ public void finalizeLogSegment(long firstTxId, long lastTxId) } } + // TODO(HA): Handle inProgressOk @Override - public EditLogInputStream getInputStream(long fromTxnId) throws IOException { + public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk) + throws IOException { for (EditLogLedgerMetadata l : getLedgerList()) { if (l.getFirstTxId() == fromTxnId) { try { @@ -329,8 +331,10 @@ public EditLogInputStream getInputStream(long fromTxnId) throws IOException { throw new IOException("No ledger for fromTxnId " + fromTxnId + " found."); } + // TODO(HA): Handle inProgressOk @Override - public long getNumberOfTransactions(long fromTxnId) throws IOException { + public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk) + throws IOException { long count = 0; long expectedStart = 0; for (EditLogLedgerMetadata l : getLedgerList()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java index b949bc200ea..5937fa82958 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java @@ -195,7 +195,7 @@ public void testNumberOfTransactions() throws Exception { out.close(); bkjm.finalizeLogSegment(1, 100); - long numTrans = bkjm.getNumberOfTransactions(1); + long numTrans = bkjm.getNumberOfTransactions(1, true); assertEquals(100, numTrans); } @@ -218,17 +218,17 @@ public void testNumberOfTransactionsWithGaps() throws Exception { } zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1); - long numTrans = bkjm.getNumberOfTransactions(1); + long numTrans = bkjm.getNumberOfTransactions(1, true); assertEquals(DEFAULT_SEGMENT_SIZE, numTrans); try { - numTrans = bkjm.getNumberOfTransactions(DEFAULT_SEGMENT_SIZE+1); + numTrans = bkjm.getNumberOfTransactions(DEFAULT_SEGMENT_SIZE+1, true); fail("Should have thrown corruption exception by this point"); } catch (JournalManager.CorruptionException ce) { // if we get here, everything is going good } - numTrans = bkjm.getNumberOfTransactions((DEFAULT_SEGMENT_SIZE*2)+1); + numTrans = bkjm.getNumberOfTransactions((DEFAULT_SEGMENT_SIZE*2)+1, true); assertEquals(DEFAULT_SEGMENT_SIZE, numTrans); } @@ -262,7 +262,7 @@ public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception { out.abort(); out.close(); - long numTrans = bkjm.getNumberOfTransactions(1); + long numTrans = bkjm.getNumberOfTransactions(1, true); assertEquals((txid-1), numTrans); } @@ -357,7 +357,7 @@ public void testSimpleRead() throws Exception { bkjm.finalizeLogSegment(1, numTransactions); - EditLogInputStream in = bkjm.getInputStream(1); + EditLogInputStream in = bkjm.getInputStream(1, true); try { assertEquals(numTransactions, FSEditLogTestUtil.countTransactionsInStream(in)); @@ -392,4 +392,4 @@ public void testSimpleRecovery() throws Exception { assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false)); assertNull(zkc.exists(bkjm.inprogressZNode(), false)); } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java index c655ee75bbf..de75b769345 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java @@ -58,7 +58,7 @@ public void purgeLogsOlderThan(long minTxIdToKeep) } @Override - public long getNumberOfTransactions(long fromTxnId) + public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk) throws IOException, CorruptionException { // This JournalManager is never used for input. Therefore it cannot // return any transactions @@ -66,7 +66,8 @@ public long getNumberOfTransactions(long fromTxnId) } @Override - public EditLogInputStream getInputStream(long fromTxnId) throws IOException { + public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk) + throws IOException { // This JournalManager is never used for input. Therefore it cannot // return any transactions throw new IOException("Unsupported operation"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index 19f9f5117aa..cd7ff5b0c8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -252,7 +252,7 @@ synchronized void openForWrite() throws IOException { long segmentTxId = getLastWrittenTxId() + 1; // Safety check: we should never start a segment if there are // newer txids readable. - EditLogInputStream s = journalSet.getInputStream(segmentTxId); + EditLogInputStream s = journalSet.getInputStream(segmentTxId, true); try { Preconditions.checkState(s == null, "Cannot start writing at txid %s when there is a stream " + @@ -1071,19 +1071,19 @@ Collection selectInputStreams(long fromTxId, public Collection selectInputStreams(long fromTxId, long toAtLeastTxId, boolean inProgressOk) throws IOException { List streams = new ArrayList(); - EditLogInputStream stream = journalSet.getInputStream(fromTxId); + EditLogInputStream stream = journalSet.getInputStream(fromTxId, inProgressOk); while (stream != null) { - if (inProgressOk || !stream.isInProgress()) { - streams.add(stream); - } + streams.add(stream); // We're now looking for a higher range, so reset the fromTxId fromTxId = stream.getLastTxId() + 1; - stream = journalSet.getInputStream(fromTxId); + stream = journalSet.getInputStream(fromTxId, inProgressOk); } + if (fromTxId <= toAtLeastTxId) { closeAllStreams(streams); - throw new IOException("No non-corrupt logs for txid " - + fromTxId); + throw new IOException(String.format("Gap in transactions. Expected to " + + "be able to read up until at least txid %d but unable to find any " + + "edit logs containing txid %d", toAtLeastTxId, fromTxId)); } return streams; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index ce1abe82bbb..d72523d29f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -585,9 +585,12 @@ boolean loadFSImage(FSNamesystem target) throws IOException { if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, getLayoutVersion())) { + // If we're open for write, we're either non-HA or we're the active NN, so + // we better be able to load all the edits. If we're the standby NN, it's + // OK to not be able to read all of edits right now. + long toAtLeastTxId = editLog.isOpenForWrite() ? inspector.getMaxSeenTxId() : 0; editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1, - inspector.getMaxSeenTxId(), - false); + toAtLeastTxId, false); } else { editStreams = FSImagePreTransactionalStorageInspector .getEditLogStreams(storage); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java index 3c6bec6cd5a..2380e93f0f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException; import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger; import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; @@ -192,10 +193,13 @@ static List matchEditLogs(File[] filesInStorage) { } @Override - synchronized public EditLogInputStream getInputStream(long fromTxId) - throws IOException { + synchronized public EditLogInputStream getInputStream(long fromTxId, + boolean inProgressOk) throws IOException { for (EditLogFile elf : getLogFiles(fromTxId)) { if (elf.containsTxId(fromTxId)) { + if (!inProgressOk && elf.isInProgress()) { + continue; + } if (elf.isInProgress()) { elf.validateLog(); } @@ -219,7 +223,7 @@ synchronized public EditLogInputStream getInputStream(long fromTxId) } @Override - public long getNumberOfTransactions(long fromTxId) + public long getNumberOfTransactions(long fromTxId, boolean inProgressOk) throws IOException, CorruptionException { long numTxns = 0L; @@ -232,6 +236,10 @@ public long getNumberOfTransactions(long fromTxId) + fromTxId + " - " + (elf.getFirstTxId() - 1)); break; } else if (elf.containsTxId(fromTxId)) { + if (!inProgressOk && elf.isInProgress()) { + break; + } + if (elf.isInProgress()) { elf.validateLog(); } @@ -253,7 +261,7 @@ public long getNumberOfTransactions(long fromTxId) + " txns from " + fromTxId); } - long max = findMaxTransaction(); + long max = findMaxTransaction(inProgressOk); // fromTxId should be greater than max, as it points to the next // transaction we should expect to find. If it is less than or equal @@ -276,7 +284,7 @@ synchronized public void recoverUnfinalizedSegments() throws IOException { // make sure journal is aware of max seen transaction before moving corrupt // files aside - findMaxTransaction(); + findMaxTransaction(true); for (EditLogFile elf : allLogFiles) { if (elf.getFile().equals(currentInProgress)) { @@ -318,9 +326,13 @@ private List getLogFiles(long fromTxId) throws IOException { * tranaction id in the case that it was the maximum transaction in * the journal. */ - private long findMaxTransaction() + private long findMaxTransaction(boolean inProgressOk) throws IOException { for (EditLogFile elf : getLogFiles(0)) { + if (elf.isInProgress() && !inProgressOk) { + continue; + } + if (elf.isInProgress()) { maxSeenTransaction = Math.max(elf.getFirstTxId(), maxSeenTransaction); elf.validateLog(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java index d45de18e92d..f9c622dc387 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java @@ -48,20 +48,23 @@ public interface JournalManager extends Closeable { /** * Get the input stream starting with fromTxnId from this journal manager * @param fromTxnId the first transaction id we want to read + * @param inProgressOk whether or not in-progress streams should be returned * @return the stream starting with transaction fromTxnId * @throws IOException if a stream cannot be found. */ - EditLogInputStream getInputStream(long fromTxnId) throws IOException; + EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk) + throws IOException; /** * Get the number of transaction contiguously available from fromTxnId. * * @param fromTxnId Transaction id to count from + * @param inProgressOk whether or not in-progress streams should be counted * @return The number of transactions available from fromTxnId * @throws IOException if the journal cannot be read. * @throws CorruptionException if there is a gap in the journal at fromTxnId. */ - long getNumberOfTransactions(long fromTxnId) + long getNumberOfTransactions(long fromTxnId, boolean inProgressOk) throws IOException, CorruptionException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java index 7af0b51b909..c00236fd094 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java @@ -198,7 +198,8 @@ public void apply(JournalAndStream jas) throws IOException { * or null if no more exist */ @Override - public EditLogInputStream getInputStream(long fromTxnId) throws IOException { + public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk) + throws IOException { JournalManager bestjm = null; long bestjmNumTxns = 0; CorruptionException corruption = null; @@ -209,7 +210,8 @@ public EditLogInputStream getInputStream(long fromTxnId) throws IOException { JournalManager candidate = jas.getManager(); long candidateNumTxns = 0; try { - candidateNumTxns = candidate.getNumberOfTransactions(fromTxnId); + candidateNumTxns = candidate.getNumberOfTransactions(fromTxnId, + inProgressOk); } catch (CorruptionException ce) { corruption = ce; } catch (IOException ioe) { @@ -232,18 +234,20 @@ public EditLogInputStream getInputStream(long fromTxnId) throws IOException { return null; } } - return bestjm.getInputStream(fromTxnId); + return bestjm.getInputStream(fromTxnId, inProgressOk); } @Override - public long getNumberOfTransactions(long fromTxnId) throws IOException { + public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk) + throws IOException { long num = 0; for (JournalAndStream jas: journals) { if (jas.isDisabled()) { LOG.info("Skipping jas " + jas + " since it's disabled"); continue; } else { - long newNum = jas.getManager().getNumberOfTransactions(fromTxnId); + long newNum = jas.getManager().getNumberOfTransactions(fromTxnId, + inProgressOk); if (newNum > num) { num = newNum; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index f36b5d20516..f95a876eed9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -936,11 +936,11 @@ static class AbortSpec { * * @param editUris directories to create edit logs in * @param numrolls number of times to roll the edit log during setup + * @param closeOnFinish whether to close the edit log after setup * @param abortAtRolls Specifications for when to fail, see AbortSpec */ - public static NNStorage setupEdits(List editUris, int numrolls, - AbortSpec... abortAtRolls) - throws IOException { + public static NNStorage setupEdits(List editUris, int numrolls, + boolean closeOnFinish, AbortSpec... abortAtRolls) throws IOException { List aborts = new ArrayList(Arrays.asList(abortAtRolls)); NNStorage storage = new NNStorage(new Configuration(), Collections.emptyList(), @@ -979,16 +979,34 @@ public static NNStorage setupEdits(List editUris, int numrolls, } editlog.logSync(); } - editlog.close(); + + if (closeOnFinish) { + editlog.close(); + } FSImageTestUtil.logStorageContents(LOG, storage); return storage; } + + /** + * Set up directories for tests. + * + * Each rolled file is 10 txns long. + * A failed file is 2 txns long. + * + * @param editUris directories to create edit logs in + * @param numrolls number of times to roll the edit log during setup + * @param abortAtRolls Specifications for when to fail, see AbortSpec + */ + public static NNStorage setupEdits(List editUris, int numrolls, + AbortSpec... abortAtRolls) throws IOException { + return setupEdits(editUris, numrolls, true, abortAtRolls); + } /** * Test loading an editlog which has had both its storage fail * on alternating rolls. Two edit log directories are created. - * The first on fails on odd rolls, the second on even. Test + * The first one fails on odd rolls, the second on even. Test * that we are able to load the entire editlog regardless. */ @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java index 275c3fa38ae..300080a5c96 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java @@ -60,7 +60,7 @@ public void testNormalOperation() throws IOException { long numJournals = 0; for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) { FileJournalManager jm = new FileJournalManager(sd); - assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1)); + assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true)); numJournals++; } assertEquals(3, numJournals); @@ -81,7 +81,7 @@ public void testInprogressRecovery() throws IOException { FileJournalManager jm = new FileJournalManager(sd); assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, - jm.getNumberOfTransactions(1)); + jm.getNumberOfTransactions(1, true)); } /** @@ -103,15 +103,16 @@ public void testInprogressRecoveryMixed() throws IOException { Iterator dirs = storage.dirIterator(NameNodeDirType.EDITS); StorageDirectory sd = dirs.next(); FileJournalManager jm = new FileJournalManager(sd); - assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1)); + assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true)); sd = dirs.next(); jm = new FileJournalManager(sd); - assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1)); + assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1, + true)); sd = dirs.next(); jm = new FileJournalManager(sd); - assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1)); + assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true)); } /** @@ -135,15 +136,18 @@ public void testInprogressRecoveryAll() throws IOException { Iterator dirs = storage.dirIterator(NameNodeDirType.EDITS); StorageDirectory sd = dirs.next(); FileJournalManager jm = new FileJournalManager(sd); - assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1)); + assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1, + true)); sd = dirs.next(); jm = new FileJournalManager(sd); - assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1)); + assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1, + true)); sd = dirs.next(); jm = new FileJournalManager(sd); - assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1)); + assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1, + true)); } /** @@ -174,15 +178,15 @@ public void testReadFromStream() throws IOException { FileJournalManager jm = new FileJournalManager(sd); long expectedTotalTxnCount = TXNS_PER_ROLL*10 + TXNS_PER_FAIL; - assertEquals(expectedTotalTxnCount, jm.getNumberOfTransactions(1)); + assertEquals(expectedTotalTxnCount, jm.getNumberOfTransactions(1, true)); long skippedTxns = (3*TXNS_PER_ROLL); // skip first 3 files long startingTxId = skippedTxns + 1; - long numTransactionsToLoad = jm.getNumberOfTransactions(startingTxId); + long numTransactionsToLoad = jm.getNumberOfTransactions(startingTxId, true); long numLoaded = 0; while (numLoaded < numTransactionsToLoad) { - EditLogInputStream editIn = jm.getInputStream(startingTxId); + EditLogInputStream editIn = jm.getInputStream(startingTxId, true); FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(editIn); long count = val.getNumTransactions(); @@ -212,7 +216,8 @@ public void testAskForTransactionsMidfile() throws IOException { // 10 rolls, so 11 rolled files, 110 txids total. final int TOTAL_TXIDS = 10 * 11; for (int txid = 1; txid <= TOTAL_TXIDS; txid++) { - assertEquals((TOTAL_TXIDS - txid) + 1, jm.getNumberOfTransactions(txid)); + assertEquals((TOTAL_TXIDS - txid) + 1, jm.getNumberOfTransactions(txid, + true)); } } @@ -244,10 +249,10 @@ public boolean accept(File dir, String name) { assertTrue(files[0].delete()); FileJournalManager jm = new FileJournalManager(sd); - assertEquals(startGapTxId-1, jm.getNumberOfTransactions(1)); + assertEquals(startGapTxId-1, jm.getNumberOfTransactions(1, true)); try { - jm.getNumberOfTransactions(startGapTxId); + jm.getNumberOfTransactions(startGapTxId, true); fail("Should have thrown an exception by now"); } catch (IOException ioe) { assertTrue(true); @@ -255,7 +260,7 @@ public boolean accept(File dir, String name) { // rolled 10 times so there should be 11 files. assertEquals(11*TXNS_PER_ROLL - endGapTxId, - jm.getNumberOfTransactions(endGapTxId+1)); + jm.getNumberOfTransactions(endGapTxId + 1, true)); } /** @@ -282,7 +287,7 @@ public boolean accept(File dir, String name) { FileJournalManager jm = new FileJournalManager(sd); assertEquals(10*TXNS_PER_ROLL+1, - jm.getNumberOfTransactions(1)); + jm.getNumberOfTransactions(1, true)); } @Test @@ -323,11 +328,37 @@ public void testReadFromMiddleOfEditLog() throws CorruptionException, FileJournalManager jm = new FileJournalManager(sd); - EditLogInputStream elis = jm.getInputStream(5); + EditLogInputStream elis = jm.getInputStream(5, true); FSEditLogOp op = elis.readOp(); assertEquals("read unexpected op", op.getTransactionId(), 5); } + /** + * Make sure that in-progress streams aren't counted if we don't ask for + * them. + */ + @Test + public void testExcludeInProgressStreams() throws CorruptionException, + IOException { + File f = new File(TestEditLog.TEST_DIR + "/filejournaltest2"); + + // Don't close the edit log once the files have been set up. + NNStorage storage = setupEdits(Collections.singletonList(f.toURI()), + 10, false); + StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next(); + + FileJournalManager jm = new FileJournalManager(sd); + + // If we exclude the in-progess stream, we should only have 100 tx. + assertEquals(100, jm.getNumberOfTransactions(1, false)); + + EditLogInputStream elis = jm.getInputStream(90, false); + FSEditLogOp lastReadOp = null; + while ((lastReadOp = elis.readOp()) != null) { + assertTrue(lastReadOp.getTransactionId() <= 100); + } + } + private static String getLogsAsString( FileJournalManager fjm, long firstTxId) throws IOException { return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java index 00fe43f404c..51e49a92375 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java @@ -144,13 +144,13 @@ public void finalizeLogSegment(long firstTxId, long lastTxId) } @Override - public EditLogInputStream getInputStream(long fromTxnId) + public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk) throws IOException { return null; } @Override - public long getNumberOfTransactions(long fromTxnId) + public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk) throws IOException { return 0; }