HDFS-4130. BKJM: The reading for editlog at NN starting using bkjm is not efficient. Contributed by Han Xiao.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1419661 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
89b9fc3034
commit
b5a959d39d
|
@ -148,6 +148,9 @@ Release 2.0.3-alpha - Unreleased
|
||||||
HDFS-3680. Allow customized audit logging in HDFS FSNamesystem. (Marcelo
|
HDFS-3680. Allow customized audit logging in HDFS FSNamesystem. (Marcelo
|
||||||
Vanzin via atm)
|
Vanzin via atm)
|
||||||
|
|
||||||
|
HDFS-4130. BKJM: The reading for editlog at NN starting using bkjm is not efficient.
|
||||||
|
(Han Xiao via umamahesh)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -500,16 +500,18 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
EditLogInputStream getInputStream(long fromTxId, boolean inProgressOk)
|
@Override
|
||||||
throws IOException {
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
for (EditLogLedgerMetadata l : getLedgerList(inProgressOk)) {
|
long fromTxId, boolean inProgressOk) throws IOException {
|
||||||
long lastTxId = l.getLastTxId();
|
List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(inProgressOk);
|
||||||
if (l.isInProgress()) {
|
try {
|
||||||
lastTxId = recoverLastTxId(l, false);
|
BookKeeperEditLogInputStream elis = null;
|
||||||
}
|
for (EditLogLedgerMetadata l : currentLedgerList) {
|
||||||
|
long lastTxId = l.getLastTxId();
|
||||||
if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) {
|
if (l.isInProgress()) {
|
||||||
try {
|
lastTxId = recoverLastTxId(l, false);
|
||||||
|
}
|
||||||
|
if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) {
|
||||||
LedgerHandle h;
|
LedgerHandle h;
|
||||||
if (l.isInProgress()) { // we don't want to fence the current journal
|
if (l.isInProgress()) { // we don't want to fence the current journal
|
||||||
h = bkc.openLedgerNoRecovery(l.getLedgerId(),
|
h = bkc.openLedgerNoRecovery(l.getLedgerId(),
|
||||||
|
@ -518,42 +520,22 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC,
|
h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC,
|
||||||
digestpw.getBytes());
|
digestpw.getBytes());
|
||||||
}
|
}
|
||||||
BookKeeperEditLogInputStream s = new BookKeeperEditLogInputStream(h,
|
elis = new BookKeeperEditLogInputStream(h, l);
|
||||||
l);
|
elis.skipTo(fromTxId);
|
||||||
s.skipTo(fromTxId);
|
} else {
|
||||||
return s;
|
return;
|
||||||
} catch (BKException e) {
|
|
||||||
throw new IOException("Could not open ledger for " + fromTxId, e);
|
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new IOException("Interrupted opening ledger for "
|
|
||||||
+ fromTxId, ie);
|
|
||||||
}
|
}
|
||||||
|
streams.add(elis);
|
||||||
|
if (elis.getLastTxId() == HdfsConstants.INVALID_TXID) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
fromTxId = elis.getLastTxId() + 1;
|
||||||
}
|
}
|
||||||
}
|
} catch (BKException e) {
|
||||||
return null;
|
throw new IOException("Could not open ledger for " + fromTxId, e);
|
||||||
}
|
} catch (InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
@Override
|
throw new IOException("Interrupted opening ledger for " + fromTxId, ie);
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
|
||||||
long fromTxId, boolean inProgressOk) {
|
|
||||||
// NOTE: could probably be rewritten more efficiently
|
|
||||||
while (true) {
|
|
||||||
EditLogInputStream elis;
|
|
||||||
try {
|
|
||||||
elis = getInputStream(fromTxId, inProgressOk);
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error(e);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (elis == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
streams.add(elis);
|
|
||||||
if (elis.getLastTxId() == HdfsConstants.INVALID_TXID) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
fromTxId = elis.getLastTxId() + 1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.mockito.Mockito;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
@ -315,13 +316,13 @@ public class TestBookKeeperJournalManager {
|
||||||
out.close();
|
out.close();
|
||||||
bkjm.finalizeLogSegment(1, numTransactions);
|
bkjm.finalizeLogSegment(1, numTransactions);
|
||||||
|
|
||||||
|
List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
|
||||||
EditLogInputStream in = bkjm.getInputStream(1, true);
|
bkjm.selectInputStreams(in, 1, true);
|
||||||
try {
|
try {
|
||||||
assertEquals(numTransactions,
|
assertEquals(numTransactions,
|
||||||
FSEditLogTestUtil.countTransactionsInStream(in));
|
FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
|
||||||
} finally {
|
} finally {
|
||||||
in.close();
|
in.get(0).close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue