HDFS-3372. offlineEditsViewer should be able to read a binary edits file with recovery mode. Contributed by Colin Patrick McCabe
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1349628 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
07b8584431
commit
6702d5dbd4
|
@ -226,6 +226,9 @@ Branch-2 ( Unreleased changes )
|
|||
connections and RPC calls, and add MultipleLinearRandomRetry, a new retry
|
||||
policy. (szetszwo)
|
||||
|
||||
HDFS-3372. offlineEditsViewer should be able to read a binary
|
||||
edits file with recovery mode. (Colin Patrick McCabe via eli)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-2982. Startup performance suffers when there are many edit log
|
||||
|
|
|
@ -18,9 +18,12 @@
|
|||
package org.apache.hadoop.hdfs.tools.offlineEditsViewer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import org.apache.hadoop.hdfs.tools.offlineEditsViewer.OfflineEditsViewer;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
||||
|
@ -33,17 +36,21 @@ import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
|||
class OfflineEditsBinaryLoader implements OfflineEditsLoader {
|
||||
private OfflineEditsVisitor visitor;
|
||||
private EditLogInputStream inputStream;
|
||||
private boolean fixTxIds;
|
||||
private final boolean fixTxIds;
|
||||
private final boolean recoveryMode;
|
||||
private long nextTxId;
|
||||
public static final Log LOG =
|
||||
LogFactory.getLog(OfflineEditsBinaryLoader.class.getName());
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
public OfflineEditsBinaryLoader(OfflineEditsVisitor visitor,
|
||||
EditLogInputStream inputStream) {
|
||||
EditLogInputStream inputStream, OfflineEditsViewer.Flags flags) {
|
||||
this.visitor = visitor;
|
||||
this.inputStream = inputStream;
|
||||
this.fixTxIds = false;
|
||||
this.fixTxIds = flags.getFixTxIds();
|
||||
this.recoveryMode = flags.getRecoveryMode();
|
||||
this.nextTxId = -1;
|
||||
}
|
||||
|
||||
|
@ -51,9 +58,9 @@ class OfflineEditsBinaryLoader implements OfflineEditsLoader {
|
|||
* Loads edits file, uses visitor to process all elements
|
||||
*/
|
||||
public void loadEdits() throws IOException {
|
||||
try {
|
||||
visitor.start(inputStream.getVersion());
|
||||
while (true) {
|
||||
visitor.start(inputStream.getVersion());
|
||||
while (true) {
|
||||
try {
|
||||
FSEditLogOp op = inputStream.readOp();
|
||||
if (op == null)
|
||||
break;
|
||||
|
@ -68,16 +75,24 @@ class OfflineEditsBinaryLoader implements OfflineEditsLoader {
|
|||
nextTxId++;
|
||||
}
|
||||
visitor.visitOp(op);
|
||||
} catch (IOException e) {
|
||||
if (!recoveryMode) {
|
||||
// Tell the visitor to clean up, then re-throw the exception
|
||||
visitor.close(e);
|
||||
throw e;
|
||||
}
|
||||
LOG.error("Got IOException while reading stream! Resyncing.", e);
|
||||
inputStream.resync();
|
||||
} catch (RuntimeException e) {
|
||||
if (!recoveryMode) {
|
||||
// Tell the visitor to clean up, then re-throw the exception
|
||||
visitor.close(e);
|
||||
throw e;
|
||||
}
|
||||
LOG.error("Got RuntimeException while reading stream! Resyncing.", e);
|
||||
inputStream.resync();
|
||||
}
|
||||
visitor.close(null);
|
||||
} catch(IOException e) {
|
||||
// Tell the visitor to clean up, then re-throw the exception
|
||||
visitor.close(e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public void setFixTxIds() {
|
||||
fixTxIds = true;
|
||||
visitor.close(null);
|
||||
}
|
||||
}
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import org.apache.hadoop.hdfs.tools.offlineEditsViewer.OfflineEditsViewer;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
|
||||
|
||||
|
@ -36,13 +37,12 @@ interface OfflineEditsLoader {
|
|||
|
||||
abstract public void loadEdits() throws IOException;
|
||||
|
||||
public abstract void setFixTxIds();
|
||||
|
||||
static class OfflineEditsLoaderFactory {
|
||||
static OfflineEditsLoader createLoader(OfflineEditsVisitor visitor,
|
||||
String inputFileName, boolean xmlInput) throws IOException {
|
||||
String inputFileName, boolean xmlInput,
|
||||
OfflineEditsViewer.Flags flags) throws IOException {
|
||||
if (xmlInput) {
|
||||
return new OfflineEditsXmlLoader(visitor, new File(inputFileName));
|
||||
return new OfflineEditsXmlLoader(visitor, new File(inputFileName), flags);
|
||||
} else {
|
||||
File file = null;
|
||||
EditLogInputStream elis = null;
|
||||
|
@ -51,7 +51,7 @@ interface OfflineEditsLoader {
|
|||
file = new File(inputFileName);
|
||||
elis = new EditLogFileInputStream(file, HdfsConstants.INVALID_TXID,
|
||||
HdfsConstants.INVALID_TXID, false);
|
||||
loader = new OfflineEditsBinaryLoader(visitor, elis);
|
||||
loader = new OfflineEditsBinaryLoader(visitor, elis, flags);
|
||||
} finally {
|
||||
if ((loader == null) && (elis != null)) {
|
||||
elis.close();
|
||||
|
@ -61,4 +61,4 @@ interface OfflineEditsLoader {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,16 +17,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.tools.offlineEditsViewer;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
|
||||
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
||||
import org.apache.hadoop.hdfs.tools.offlineEditsViewer.OfflineEditsLoader.OfflineEditsLoaderFactory;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
@ -37,7 +31,6 @@ import org.apache.commons.cli.OptionBuilder;
|
|||
import org.apache.commons.cli.Options;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.commons.cli.PosixParser;
|
||||
import org.xml.sax.SAXParseException;
|
||||
|
||||
/**
|
||||
* This class implements an offline edits viewer, tool that
|
||||
|
@ -78,6 +71,9 @@ public class OfflineEditsViewer extends Configured implements Tool {
|
|||
"-f,--fix-txids Renumber the transaction IDs in the input,\n" +
|
||||
" so that there are no gaps or invalid " +
|
||||
" transaction IDs.\n" +
|
||||
"-r,--recover When reading binary edit logs, use recovery \n" +
|
||||
" mode. This will give you the chance to skip \n" +
|
||||
" corrupt parts of the edit log.\n" +
|
||||
"-v,--verbose More verbose output, prints the input and\n" +
|
||||
" output filenames, for processors that write\n" +
|
||||
" to a file, also output to screen. On large\n" +
|
||||
|
@ -113,6 +109,7 @@ public class OfflineEditsViewer extends Configured implements Tool {
|
|||
options.addOption("p", "processor", true, "");
|
||||
options.addOption("v", "verbose", false, "");
|
||||
options.addOption("f", "fix-txids", false, "");
|
||||
options.addOption("r", "recover", false, "");
|
||||
options.addOption("h", "help", false, "");
|
||||
|
||||
return options;
|
||||
|
@ -128,23 +125,20 @@ public class OfflineEditsViewer extends Configured implements Tool {
|
|||
* @return 0 on success; error code otherwise
|
||||
*/
|
||||
public int go(String inputFileName, String outputFileName, String processor,
|
||||
boolean printToScreen, boolean fixTxIds, OfflineEditsVisitor visitor)
|
||||
Flags flags, OfflineEditsVisitor visitor)
|
||||
{
|
||||
if (printToScreen) {
|
||||
if (flags.getPrintToScreen()) {
|
||||
System.out.println("input [" + inputFileName + "]");
|
||||
System.out.println("output [" + outputFileName + "]");
|
||||
}
|
||||
try {
|
||||
if (visitor == null) {
|
||||
visitor = OfflineEditsVisitorFactory.getEditsVisitor(
|
||||
outputFileName, processor, printToScreen);
|
||||
outputFileName, processor, flags.getPrintToScreen());
|
||||
}
|
||||
boolean xmlInput = inputFileName.endsWith(".xml");
|
||||
OfflineEditsLoader loader = OfflineEditsLoaderFactory.
|
||||
createLoader(visitor, inputFileName, xmlInput);
|
||||
if (fixTxIds) {
|
||||
loader.setFixTxIds();
|
||||
}
|
||||
createLoader(visitor, inputFileName, xmlInput, flags);
|
||||
loader.loadEdits();
|
||||
} catch(Exception e) {
|
||||
System.err.println("Encountered exception. Exiting: " + e.getMessage());
|
||||
|
@ -154,6 +148,39 @@ public class OfflineEditsViewer extends Configured implements Tool {
|
|||
return 0;
|
||||
}
|
||||
|
||||
public static class Flags {
|
||||
private boolean printToScreen = false;
|
||||
private boolean fixTxIds = false;
|
||||
private boolean recoveryMode = false;
|
||||
|
||||
public Flags() {
|
||||
}
|
||||
|
||||
public boolean getPrintToScreen() {
|
||||
return printToScreen;
|
||||
}
|
||||
|
||||
public void setPrintToScreen() {
|
||||
printToScreen = true;
|
||||
}
|
||||
|
||||
public boolean getFixTxIds() {
|
||||
return fixTxIds;
|
||||
}
|
||||
|
||||
public void setFixTxIds() {
|
||||
fixTxIds = true;
|
||||
}
|
||||
|
||||
public boolean getRecoveryMode() {
|
||||
return recoveryMode;
|
||||
}
|
||||
|
||||
public void setRecoveryMode() {
|
||||
recoveryMode = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Main entry point for ToolRunner (see ToolRunner docs)
|
||||
*
|
||||
|
@ -177,6 +204,7 @@ public class OfflineEditsViewer extends Configured implements Tool {
|
|||
printHelp();
|
||||
return -1;
|
||||
}
|
||||
|
||||
if(cmd.hasOption("h")) { // print help and exit
|
||||
printHelp();
|
||||
return -1;
|
||||
|
@ -187,10 +215,17 @@ public class OfflineEditsViewer extends Configured implements Tool {
|
|||
if(processor == null) {
|
||||
processor = defaultProcessor;
|
||||
}
|
||||
boolean printToScreen = cmd.hasOption("v");
|
||||
boolean fixTxIds = cmd.hasOption("f");
|
||||
return go(inputFileName, outputFileName, processor,
|
||||
printToScreen, fixTxIds, null);
|
||||
Flags flags = new Flags();
|
||||
if (cmd.hasOption("r")) {
|
||||
flags.setRecoveryMode();
|
||||
}
|
||||
if (cmd.hasOption("f")) {
|
||||
flags.setFixTxIds();
|
||||
}
|
||||
if (cmd.hasOption("v")) {
|
||||
flags.setPrintToScreen();
|
||||
}
|
||||
return go(inputFileName, outputFileName, processor, flags, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
|
|||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
|
||||
|
||||
import org.apache.hadoop.hdfs.tools.offlineEditsViewer.OfflineEditsViewer;
|
||||
import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
|
||||
import org.xml.sax.Attributes;
|
||||
import org.xml.sax.InputSource;
|
||||
|
@ -46,9 +46,9 @@ import org.xml.sax.helpers.XMLReaderFactory;
|
|||
@InterfaceStability.Unstable
|
||||
class OfflineEditsXmlLoader
|
||||
extends DefaultHandler implements OfflineEditsLoader {
|
||||
private boolean fixTxIds;
|
||||
private OfflineEditsVisitor visitor;
|
||||
private FileReader fileReader;
|
||||
private final boolean fixTxIds;
|
||||
private final OfflineEditsVisitor visitor;
|
||||
private final FileReader fileReader;
|
||||
private ParseState state;
|
||||
private Stanza stanza;
|
||||
private Stack<Stanza> stanzaStack;
|
||||
|
@ -68,9 +68,10 @@ class OfflineEditsXmlLoader
|
|||
}
|
||||
|
||||
public OfflineEditsXmlLoader(OfflineEditsVisitor visitor,
|
||||
File inputFile) throws FileNotFoundException {
|
||||
File inputFile, OfflineEditsViewer.Flags flags) throws FileNotFoundException {
|
||||
this.visitor = visitor;
|
||||
this.fileReader = new FileReader(inputFile);
|
||||
this.fixTxIds = flags.getFixTxIds();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -250,9 +251,4 @@ class OfflineEditsXmlLoader
|
|||
public void characters (char ch[], int start, int length) {
|
||||
cbuf.append(ch, start, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFixTxIds() {
|
||||
fixTxIds = true;
|
||||
}
|
||||
}
|
|
@ -22,11 +22,14 @@ import java.io.FileOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.File;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.Before;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -34,12 +37,12 @@ import org.apache.commons.logging.LogFactory;
|
|||
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
|
||||
import org.apache.hadoop.hdfs.tools.offlineEditsViewer.OfflineEditsViewer;
|
||||
import org.apache.hadoop.hdfs.tools.offlineEditsViewer.OfflineEditsViewer.Flags;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.namenode.OfflineEditsViewerHelper;
|
||||
|
||||
public class TestOfflineEditsViewer {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestOfflineEditsViewer.class);
|
||||
|
||||
private static final Map<FSEditLogOpCodes, Boolean> obsoleteOpCodes =
|
||||
|
@ -97,8 +100,8 @@ public class TestOfflineEditsViewer {
|
|||
String editsReparsed = cacheDir + "/editsReparsed";
|
||||
|
||||
// parse to XML then back to binary
|
||||
runOev(edits, editsParsedXml, "xml");
|
||||
runOev(editsParsedXml, editsReparsed, "binary");
|
||||
assertEquals(0, runOev(edits, editsParsedXml, "xml", false));
|
||||
assertEquals(0, runOev(editsParsedXml, editsReparsed, "binary", false));
|
||||
|
||||
// judgment time
|
||||
assertTrue(
|
||||
|
@ -114,6 +117,42 @@ public class TestOfflineEditsViewer {
|
|||
LOG.info("END");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecoveryMode() throws IOException {
|
||||
LOG.info("START - testing with generated edits");
|
||||
|
||||
nnHelper.startCluster(buildDir + "/dfs/");
|
||||
|
||||
// edits generated by nnHelper (MiniDFSCluster), should have all op codes
|
||||
// binary, XML, reparsed binary
|
||||
String edits = nnHelper.generateEdits();
|
||||
|
||||
// Corrupt the file by truncating the end
|
||||
FileChannel editsFile = new FileOutputStream(edits, true).getChannel();
|
||||
editsFile.truncate(editsFile.size() - 5);
|
||||
|
||||
String editsParsedXml = cacheDir + "/editsRecoveredParsed.xml";
|
||||
String editsReparsed = cacheDir + "/editsRecoveredReparsed";
|
||||
String editsParsedXml2 = cacheDir + "/editsRecoveredParsed2.xml";
|
||||
|
||||
// Can't read the corrupted file without recovery mode
|
||||
assertEquals(-1, runOev(edits, editsParsedXml, "xml", false));
|
||||
|
||||
// parse to XML then back to binary
|
||||
assertEquals(0, runOev(edits, editsParsedXml, "xml", true));
|
||||
assertEquals(0, runOev(editsParsedXml, editsReparsed, "binary", false));
|
||||
assertEquals(0, runOev(editsReparsed, editsParsedXml2, "xml", false));
|
||||
|
||||
// judgment time
|
||||
assertTrue("Test round trip",
|
||||
filesEqualIgnoreTrailingZeros(editsParsedXml, editsParsedXml2));
|
||||
|
||||
// removes edits so do this at the end
|
||||
nnHelper.shutdownCluster();
|
||||
|
||||
LOG.info("END");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStored() throws IOException {
|
||||
|
||||
|
@ -128,8 +167,9 @@ public class TestOfflineEditsViewer {
|
|||
String editsStoredXml = cacheDir + "/editsStored.xml";
|
||||
|
||||
// parse to XML then back to binary
|
||||
runOev(editsStored, editsStoredParsedXml, "xml");
|
||||
runOev(editsStoredParsedXml, editsStoredReparsed, "binary");
|
||||
assertEquals(0, runOev(editsStored, editsStoredParsedXml, "xml", false));
|
||||
assertEquals(0, runOev(editsStoredParsedXml, editsStoredReparsed,
|
||||
"binary", false));
|
||||
|
||||
// judgement time
|
||||
assertTrue(
|
||||
|
@ -151,14 +191,18 @@ public class TestOfflineEditsViewer {
|
|||
* @param inFilename input edits filename
|
||||
* @param outFilename oputput edits filename
|
||||
*/
|
||||
private void runOev(String inFilename, String outFilename, String processor)
|
||||
throws IOException {
|
||||
private int runOev(String inFilename, String outFilename, String processor,
|
||||
boolean recovery) throws IOException {
|
||||
|
||||
LOG.info("Running oev [" + inFilename + "] [" + outFilename + "]");
|
||||
|
||||
OfflineEditsViewer oev = new OfflineEditsViewer();
|
||||
if (oev.go(inFilename, outFilename, processor, true, false, null) != 0)
|
||||
throw new RuntimeException("oev failed");
|
||||
Flags flags = new Flags();
|
||||
flags.setPrintToScreen();
|
||||
if (recovery) {
|
||||
flags.setRecoveryMode();
|
||||
}
|
||||
return oev.go(inFilename, outFilename, processor, flags, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -172,7 +216,7 @@ public class TestOfflineEditsViewer {
|
|||
FileOutputStream fout = new FileOutputStream(outFilename);
|
||||
StatisticsEditsVisitor visitor = new StatisticsEditsVisitor(fout);
|
||||
OfflineEditsViewer oev = new OfflineEditsViewer();
|
||||
if (oev.go(inFilename, outFilename, "stats", false, false, visitor) != 0)
|
||||
if (oev.go(inFilename, outFilename, "stats", new Flags(), visitor) != 0)
|
||||
return false;
|
||||
LOG.info("Statistics for " + inFilename + "\n" +
|
||||
visitor.getStatisticsString());
|
||||
|
|
Loading…
Reference in New Issue