From 103eff1fad9dc947c5c078f829044cff2da6139b Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Mon, 10 Dec 2012 18:42:53 +0000 Subject: [PATCH] HDFS-4130. BKJM: The reading for editlog at NN starting using bkjm is not efficient. Contributed by Han Xiao. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1419649 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../bkjournal/BookKeeperJournalManager.java | 70 +++++++------------ .../TestBookKeeperJournalManager.java | 9 +-- 3 files changed, 34 insertions(+), 48 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c0d6a6a4aea..e72d85dd69c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -414,6 +414,9 @@ Release 2.0.3-alpha - Unreleased HDFS-3680. Allow customized audit logging in HDFS FSNamesystem. (Marcelo Vanzin via atm) + HDFS-4130. BKJM: The reading for editlog at NN starting using bkjm is not efficient. + (Han Xiao via umamahesh) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java index 6d9a65d2821..5d1814233f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java @@ -500,16 +500,18 @@ public class BookKeeperJournalManager implements JournalManager { } } - EditLogInputStream getInputStream(long fromTxId, boolean inProgressOk) - throws IOException { - for (EditLogLedgerMetadata l : getLedgerList(inProgressOk)) { - long lastTxId = l.getLastTxId(); - if (l.isInProgress()) { - lastTxId = recoverLastTxId(l, false); - } - - if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) { - try { + @Override + public void selectInputStreams(Collection streams, + long fromTxId, boolean inProgressOk) throws IOException { + List currentLedgerList = getLedgerList(inProgressOk); + try { + BookKeeperEditLogInputStream elis = null; + for (EditLogLedgerMetadata l : currentLedgerList) { + long lastTxId = l.getLastTxId(); + if (l.isInProgress()) { + lastTxId = recoverLastTxId(l, false); + } + if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) { LedgerHandle h; if (l.isInProgress()) { // we don't want to fence the current journal h = bkc.openLedgerNoRecovery(l.getLedgerId(), @@ -518,42 +520,22 @@ public class BookKeeperJournalManager implements JournalManager { h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC, digestpw.getBytes()); } - BookKeeperEditLogInputStream s = new BookKeeperEditLogInputStream(h, - l); - s.skipTo(fromTxId); - return s; - } catch (BKException e) { - throw new IOException("Could not open ledger for " + fromTxId, e); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted opening ledger for " - + fromTxId, ie); + elis = new BookKeeperEditLogInputStream(h, l); + elis.skipTo(fromTxId); + } else { + return; } + streams.add(elis); + if (elis.getLastTxId() == HdfsConstants.INVALID_TXID) { + return; + } + fromTxId = elis.getLastTxId() + 1; } - } - return null; - } - - @Override - public void selectInputStreams(Collection streams, - long fromTxId, boolean inProgressOk) { - // NOTE: could probably be rewritten more efficiently - while (true) { - EditLogInputStream elis; - try { - elis = getInputStream(fromTxId, inProgressOk); - } catch (IOException e) { - LOG.error(e); - return; - } - if (elis == null) { - return; - } - streams.add(elis); - if (elis.getLastTxId() == HdfsConstants.INVALID_TXID) { - return; - } - fromTxId = elis.getLastTxId() + 1; + } catch (BKException e) { + throw new IOException("Could not open ledger for " + fromTxId, e); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted opening ledger for " + fromTxId, ie); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java index 9da904007d3..e4c7e87f91d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java @@ -28,6 +28,7 @@ import org.mockito.Mockito; import java.io.IOException; import java.net.URI; +import java.util.ArrayList; import java.util.List; import java.util.ArrayList; import java.util.Random; @@ -315,13 +316,13 @@ public class TestBookKeeperJournalManager { out.close(); bkjm.finalizeLogSegment(1, numTransactions); - - EditLogInputStream in = bkjm.getInputStream(1, true); + List in = new ArrayList(); + bkjm.selectInputStreams(in, 1, true); try { assertEquals(numTransactions, - FSEditLogTestUtil.countTransactionsInStream(in)); + FSEditLogTestUtil.countTransactionsInStream(in.get(0))); } finally { - in.close(); + in.get(0).close(); } }