HBASE-15252 Data loss when replaying wal if HDFS timeout

This commit is contained in:
zhangduo 2016-02-12 08:17:10 +08:00
parent 12982d1957
commit 85e1d9a109
2 changed files with 112 additions and 4 deletions

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedInputStream;
import com.google.protobuf.InvalidProtocolBufferException;
/** /**
* A Protobuf based WAL has the following structure: * A Protobuf based WAL has the following structure:
@ -332,7 +333,7 @@ public class ProtobufLogReader extends ReaderBase {
} }
ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size), ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size),
(int)size); (int)size);
} catch (IOException ipbe) { } catch (InvalidProtocolBufferException ipbe) {
throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" + throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
originalPosition + ", currentPosition=" + this.inputStream.getPos() + originalPosition + ", currentPosition=" + this.inputStream.getPos() +
", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe); ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);

View File

@ -22,9 +22,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.FilterInputStream;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -37,6 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -91,6 +98,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
@ -100,6 +108,8 @@ import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/** /**
* Test replay of edits out of a WAL split. * Test replay of edits out of a WAL split.
@ -499,7 +509,7 @@ public class TestWALReplay {
boolean first = true; boolean first = true;
for (HColumnDescriptor hcd: htd.getFamilies()) { for (HColumnDescriptor hcd: htd.getFamilies()) {
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
if (first ) { if (first) {
// If first, so we have at least one family w/ different seqid to rest. // If first, so we have at least one family w/ different seqid to rest.
region.flush(true); region.flush(true);
first = false; first = false;
@ -819,9 +829,9 @@ public class TestWALReplay {
final Configuration newConf = HBaseConfiguration.create(this.conf); final Configuration newConf = HBaseConfiguration.create(this.conf);
User user = HBaseTestingUtility.getDifferentUser(newConf, User user = HBaseTestingUtility.getDifferentUser(newConf,
".replay.wal.secondtime"); ".replay.wal.secondtime");
user.runAs(new PrivilegedExceptionAction() { user.runAs(new PrivilegedExceptionAction<Void>() {
@Override @Override
public Object run() throws Exception { public Void run() throws Exception {
runWALSplit(newConf); runWALSplit(newConf);
FileSystem newFS = FileSystem.get(newConf); FileSystem newFS = FileSystem.get(newConf);
// 100k seems to make for about 4 flushes during HRegion#initialize. // 100k seems to make for about 4 flushes during HRegion#initialize.
@ -927,6 +937,103 @@ public class TestWALReplay {
lastestSeqNumber, editCount); lastestSeqNumber, editCount);
} }
/**
* testcase for https://issues.apache.org/jira/browse/HBASE-15252
*/
@Test
public void testDatalossWhenInputError() throws IOException, InstantiationException,
IllegalAccessException {
final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
deleteDir(basedir);
final byte[] rowName = tableName.getName();
final int countPerFamily = 10;
final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
Path regionDir = region1.getRegionFileSystem().getRegionDir();
HBaseTestingUtility.closeRegionAndWAL(region1);
WAL wal = createWAL(this.conf);
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
for (HColumnDescriptor hcd : htd.getFamilies()) {
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
}
// Now assert edits made it in.
final Get g = new Get(rowName);
Result result = region.get(g);
assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
// Now close the region (without flush), split the log, reopen the region and assert that
// replay of log has the correct effect.
region.close(true);
wal.shutdown();
runWALSplit(this.conf);
// here we let the DFSInputStream throw an IOException just after the WALHeader.
Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, regionDir).first();
FSDataInputStream stream = fs.open(editFile);
stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
Class<? extends DefaultWALProvider.Reader> logReaderClass =
conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
DefaultWALProvider.Reader.class);
DefaultWALProvider.Reader reader = logReaderClass.newInstance();
reader.init(this.fs, editFile, conf, stream);
final long headerLength = stream.getPos();
reader.close();
FileSystem spyFs = spy(this.fs);
doAnswer(new Answer<FSDataInputStream>() {
@Override
public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod();
Field field = FilterInputStream.class.getDeclaredField("in");
field.setAccessible(true);
final DFSInputStream in = (DFSInputStream) field.get(stream);
DFSInputStream spyIn = spy(in);
doAnswer(new Answer<Integer>() {
private long pos;
@Override
public Integer answer(InvocationOnMock invocation) throws Throwable {
if (pos >= headerLength) {
throw new IOException("read over limit");
}
int b = (Integer) invocation.callRealMethod();
if (b > 0) {
pos += b;
}
return b;
}
}).when(spyIn).read(any(byte[].class), any(int.class), any(int.class));
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
invocation.callRealMethod();
in.close();
return null;
}
}).when(spyIn).close();
field.set(stream, spyIn);
return stream;
}
}).when(spyFs).open(eq(editFile));
WAL wal2 = createWAL(this.conf);
HRegion region2;
try {
// log replay should fail due to the IOException, otherwise we may lose data.
region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2);
assertEquals(result.size(), region2.get(g).size());
} catch (IOException e) {
assertEquals("read over limit", e.getMessage());
}
region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2);
assertEquals(result.size(), region2.get(g).size());
}
static class MockWAL extends FSHLog { static class MockWAL extends FSHLog {
boolean doCompleteCacheFlush = false; boolean doCompleteCacheFlush = false;