HDFS-309. FSEditLog should log progress during replay. Contributed by Sho Shimauchi.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1303485 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f55a1c0876
commit
2f820dde17
|
@ -55,6 +55,9 @@ Trunk (unreleased changes)
|
||||||
HDFS-3091. Update the usage limitations of ReplaceDatanodeOnFailure policy in
|
HDFS-3091. Update the usage limitations of ReplaceDatanodeOnFailure policy in
|
||||||
the config description for the smaller clusters. (szetszwo via umamahesh)
|
the config description for the smaller clusters. (szetszwo via umamahesh)
|
||||||
|
|
||||||
|
HDFS-309. FSEditLog should log progress during replay. (Sho Shimauchi
|
||||||
|
via todd)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream.
|
HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream.
|
||||||
|
|
|
@ -25,6 +25,8 @@ import java.io.InputStream;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.EnumMap;
|
import java.util.EnumMap;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||||
|
@ -67,6 +69,8 @@ import com.google.common.base.Joiner;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class FSEditLogLoader {
|
public class FSEditLogLoader {
|
||||||
|
static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName());
|
||||||
|
static long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
|
||||||
private final FSNamesystem fsNamesys;
|
private final FSNamesystem fsNamesys;
|
||||||
|
|
||||||
public FSEditLogLoader(FSNamesystem fsNamesys) {
|
public FSEditLogLoader(FSNamesystem fsNamesys) {
|
||||||
|
@ -108,6 +112,10 @@ public class FSEditLogLoader {
|
||||||
EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
|
EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
|
||||||
new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
|
new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
|
||||||
|
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Acquiring write lock to replay edit log");
|
||||||
|
}
|
||||||
|
|
||||||
fsNamesys.writeLock();
|
fsNamesys.writeLock();
|
||||||
fsDir.writeLock();
|
fsDir.writeLock();
|
||||||
|
|
||||||
|
@ -115,6 +123,16 @@ public class FSEditLogLoader {
|
||||||
Arrays.fill(recentOpcodeOffsets, -1);
|
Arrays.fill(recentOpcodeOffsets, -1);
|
||||||
|
|
||||||
long txId = expectedStartingTxId - 1;
|
long txId = expectedStartingTxId - 1;
|
||||||
|
long lastTxId = in.getLastTxId();
|
||||||
|
long numTxns = (lastTxId - expectedStartingTxId) + 1;
|
||||||
|
|
||||||
|
long lastLogTime = now();
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("edit log length: " + in.length() + ", start txid: "
|
||||||
|
+ expectedStartingTxId + ", last txid: " + lastTxId);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
try {
|
try {
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -151,6 +169,15 @@ public class FSEditLogLoader {
|
||||||
FSImage.LOG.error(errorMessage);
|
FSImage.LOG.error(errorMessage);
|
||||||
throw new IOException(errorMessage, t);
|
throw new IOException(errorMessage, t);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// log progress
|
||||||
|
if (now() - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
|
||||||
|
int percent = Math.round((float) txId / numTxns * 100);
|
||||||
|
LOG.info("replaying edit log: " + txId + "/" + numTxns
|
||||||
|
+ " transactions completed. (" + percent + "%)");
|
||||||
|
lastLogTime = now();
|
||||||
|
}
|
||||||
|
|
||||||
numEdits++;
|
numEdits++;
|
||||||
}
|
}
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
|
@ -162,6 +189,11 @@ public class FSEditLogLoader {
|
||||||
} finally {
|
} finally {
|
||||||
fsDir.writeUnlock();
|
fsDir.writeUnlock();
|
||||||
fsNamesys.writeUnlock();
|
fsNamesys.writeUnlock();
|
||||||
|
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("replaying edit log finished");
|
||||||
|
}
|
||||||
|
|
||||||
if (FSImage.LOG.isDebugEnabled()) {
|
if (FSImage.LOG.isDebugEnabled()) {
|
||||||
dumpOpCounts(opCounts);
|
dumpOpCounts(opCounts);
|
||||||
}
|
}
|
||||||
|
@ -172,6 +204,11 @@ public class FSEditLogLoader {
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
private void applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
|
private void applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
|
||||||
int logVersion) throws IOException {
|
int logVersion) throws IOException {
|
||||||
|
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("replaying edit log: " + op);
|
||||||
|
}
|
||||||
|
|
||||||
switch (op.opCode) {
|
switch (op.opCode) {
|
||||||
case OP_ADD: {
|
case OP_ADD: {
|
||||||
AddCloseOp addCloseOp = (AddCloseOp)op;
|
AddCloseOp addCloseOp = (AddCloseOp)op;
|
||||||
|
|
|
@ -52,6 +52,7 @@ public class TestFSEditLogLoader {
|
||||||
|
|
||||||
static {
|
static {
|
||||||
((Log4JLogger)FSImage.LOG).getLogger().setLevel(Level.ALL);
|
((Log4JLogger)FSImage.LOG).getLogger().setLevel(Level.ALL);
|
||||||
|
((Log4JLogger)FSEditLogLoader.LOG).getLogger().setLevel(Level.ALL);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final File TEST_DIR = new File(
|
private static final File TEST_DIR = new File(
|
||||||
|
|
Loading…
Reference in New Issue