HBASE-2935 Refactor 'Corrupt Data' Tests in TestHLogSplit
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1031351 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
da1aa8b76a
commit
0440bb0463
|
@ -648,6 +648,8 @@ Release 0.21.0 - Unreleased
|
||||||
HBASE-2471 Splitting logs, we'll make an output file though the
|
HBASE-2471 Splitting logs, we'll make an output file though the
|
||||||
region no longer exists
|
region no longer exists
|
||||||
HBASE-3095 Client needs to reconnect if it expires its zk session
|
HBASE-3095 Client needs to reconnect if it expires its zk session
|
||||||
|
HBASE-2935 Refactor "Corrupt Data" Tests in TestHLogSplit
|
||||||
|
(Alex Newman via Stack)
|
||||||
|
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
|
@ -135,6 +135,10 @@ public class HLog implements Syncable {
|
||||||
private static Class<? extends Writer> logWriterClass;
|
private static Class<? extends Writer> logWriterClass;
|
||||||
private static Class<? extends Reader> logReaderClass;
|
private static Class<? extends Reader> logReaderClass;
|
||||||
|
|
||||||
|
static void resetLogReaderClass() {
|
||||||
|
HLog.logReaderClass = null;
|
||||||
|
}
|
||||||
|
|
||||||
private OutputStream hdfs_out; // OutputStream associated with the current SequenceFile.writer
|
private OutputStream hdfs_out; // OutputStream associated with the current SequenceFile.writer
|
||||||
private int initialReplication; // initial replication factor of SequenceFile.writer
|
private int initialReplication; // initial replication factor of SequenceFile.writer
|
||||||
private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
|
private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
|
||||||
|
@ -557,11 +561,14 @@ public class HLog implements Syncable {
|
||||||
final Path path, Configuration conf)
|
final Path path, Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
|
|
||||||
if (logReaderClass == null) {
|
if (logReaderClass == null) {
|
||||||
|
|
||||||
logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
|
logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
|
||||||
SequenceFileLogReader.class, Reader.class);
|
SequenceFileLogReader.class, Reader.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
HLog.Reader reader = logReaderClass.newInstance();
|
HLog.Reader reader = logReaderClass.newInstance();
|
||||||
reader.init(fs, path, conf);
|
reader.init(fs, path, conf);
|
||||||
return reader;
|
return reader;
|
||||||
|
|
|
@ -50,7 +50,7 @@ public class SequenceFileLogReader implements HLog.Reader {
|
||||||
* this.end = in.getPos() + length;
|
* this.end = in.getPos() + length;
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
private static class WALReader extends SequenceFile.Reader {
|
static class WALReader extends SequenceFile.Reader {
|
||||||
|
|
||||||
WALReader(final FileSystem fs, final Path p, final Configuration c)
|
WALReader(final FileSystem fs, final Path p, final Configuration c)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -131,7 +131,7 @@ public class SequenceFileLogReader implements HLog.Reader {
|
||||||
int edit = 0;
|
int edit = 0;
|
||||||
long entryStart = 0;
|
long entryStart = 0;
|
||||||
|
|
||||||
private Class<? extends HLogKey> keyClass;
|
protected Class<? extends HLogKey> keyClass;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default constructor.
|
* Default constructor.
|
||||||
|
@ -217,7 +217,7 @@ public class SequenceFileLogReader implements HLog.Reader {
|
||||||
return reader.getPosition();
|
return reader.getPosition();
|
||||||
}
|
}
|
||||||
|
|
||||||
private IOException addFileInfoToException(final IOException ioe)
|
protected IOException addFileInfoToException(final IOException ioe)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
long pos = -1;
|
long pos = -1;
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
@ -310,29 +311,91 @@ public class TestHLogSplit {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: fix this test (HBASE-2935)
|
@Test
|
||||||
//@Test
|
|
||||||
public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
|
public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
|
||||||
conf.setBoolean(HBASE_SKIP_ERRORS, true);
|
conf.setBoolean(HBASE_SKIP_ERRORS, true);
|
||||||
|
Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
|
||||||
|
Reader.class);
|
||||||
|
InstrumentedSequenceFileLogWriter.activateFailure = false;
|
||||||
|
HLog.resetLogReaderClass();
|
||||||
|
|
||||||
|
try {
|
||||||
Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0");
|
Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0");
|
||||||
Path c2 = new Path(hlogDir, HLOG_FILE_PREFIX + "5");
|
conf.setClass("hbase.regionserver.hlog.reader.impl",
|
||||||
Path c3 = new Path(hlogDir, HLOG_FILE_PREFIX + (NUM_WRITERS - 1));
|
FaultySequenceFileLogReader.class, HLog.Reader.class);
|
||||||
generateHLogs(-1);
|
String[] failureTypes = { "begin", "middle", "end" };
|
||||||
corruptHLog(c1, Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
|
for (FaultySequenceFileLogReader.FailureType failureType : FaultySequenceFileLogReader.FailureType.values()) {
|
||||||
corruptHLog(c2, Corruptions.APPEND_GARBAGE, true, fs);
|
conf.set("faultysequencefilelogreader.failuretype", failureType.name());
|
||||||
corruptHLog(c3, Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
|
generateHLogs(1, ENTRIES, -1);
|
||||||
|
|
||||||
fs.initialize(fs.getUri(), conf);
|
fs.initialize(fs.getUri(), conf);
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
||||||
|
|
||||||
FileStatus[] archivedLogs = fs.listStatus(corruptDir);
|
FileStatus[] archivedLogs = fs.listStatus(corruptDir);
|
||||||
|
assertEquals("expected a different file", c1.getName(), archivedLogs[0]
|
||||||
|
.getPath().getName());
|
||||||
|
assertEquals(archivedLogs.length, 1);
|
||||||
|
fs.delete(new Path(oldLogDir, HLOG_FILE_PREFIX + "0"), false);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
|
||||||
|
Reader.class);
|
||||||
|
HLog.resetLogReaderClass();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
assertEquals("expected a different file", c1.getName(), archivedLogs[0].getPath().getName());
|
@Test(expected = IOException.class)
|
||||||
assertEquals("expected a different file", c2.getName(), archivedLogs[1].getPath().getName());
|
public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
|
||||||
assertEquals("expected a different file", c3.getName(), archivedLogs[2].getPath().getName());
|
throws IOException {
|
||||||
assertEquals(archivedLogs.length, 3);
|
conf.setBoolean(HBASE_SKIP_ERRORS, false);
|
||||||
|
Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
|
||||||
|
Reader.class);
|
||||||
|
InstrumentedSequenceFileLogWriter.activateFailure = false;
|
||||||
|
HLog.resetLogReaderClass();
|
||||||
|
|
||||||
|
try {
|
||||||
|
conf.setClass("hbase.regionserver.hlog.reader.impl",
|
||||||
|
FaultySequenceFileLogReader.class, HLog.Reader.class);
|
||||||
|
conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
|
||||||
|
generateHLogs(Integer.MAX_VALUE);
|
||||||
|
fs.initialize(fs.getUri(), conf);
|
||||||
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
||||||
|
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
||||||
|
} finally {
|
||||||
|
conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
|
||||||
|
Reader.class);
|
||||||
|
HLog.resetLogReaderClass();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
|
||||||
|
throws IOException {
|
||||||
|
conf.setBoolean(HBASE_SKIP_ERRORS, false);
|
||||||
|
Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
|
||||||
|
Reader.class);
|
||||||
|
InstrumentedSequenceFileLogWriter.activateFailure = false;
|
||||||
|
HLog.resetLogReaderClass();
|
||||||
|
|
||||||
|
try {
|
||||||
|
conf.setClass("hbase.regionserver.hlog.reader.impl",
|
||||||
|
FaultySequenceFileLogReader.class, HLog.Reader.class);
|
||||||
|
conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
|
||||||
|
generateHLogs(-1);
|
||||||
|
fs.initialize(fs.getUri(), conf);
|
||||||
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
||||||
|
try {
|
||||||
|
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
||||||
|
} catch (IOException e) {
|
||||||
|
assertEquals(
|
||||||
|
"if skip.errors is false all files should remain in place",
|
||||||
|
NUM_WRITERS, fs.listStatus(hlogDir).length);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
|
||||||
|
Reader.class);
|
||||||
|
HLog.resetLogReaderClass();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -382,39 +445,6 @@ public class TestHLogSplit {
|
||||||
assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
|
assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// TODO: fix this test (HBASE-2935)
|
|
||||||
//@Test(expected = IOException.class)
|
|
||||||
public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException {
|
|
||||||
conf.setBoolean(HBASE_SKIP_ERRORS, false);
|
|
||||||
generateHLogs(Integer.MAX_VALUE);
|
|
||||||
corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
|
|
||||||
Corruptions.APPEND_GARBAGE, true, fs);
|
|
||||||
|
|
||||||
fs.initialize(fs.getUri(), conf);
|
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: fix this test (HBASE-2935)
|
|
||||||
//@Test
|
|
||||||
public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException {
|
|
||||||
conf.setBoolean(HBASE_SKIP_ERRORS, false);
|
|
||||||
generateHLogs(-1);
|
|
||||||
corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
|
|
||||||
Corruptions.APPEND_GARBAGE, true, fs);
|
|
||||||
fs.initialize(fs.getUri(), conf);
|
|
||||||
try {
|
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
|
||||||
} catch (IOException e) {/* expected */}
|
|
||||||
|
|
||||||
assertEquals("if skip.errors is false all files should remain in place",
|
|
||||||
NUM_WRITERS, fs.listStatus(hlogDir).length);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSplit() throws IOException {
|
public void testSplit() throws IOException {
|
||||||
generateHLogs(-1);
|
generateHLogs(-1);
|
||||||
|
|
Loading…
Reference in New Issue