HBASE-15252 Data loss when replaying wal if HDFS timeout
This commit is contained in:
parent
7e5ce6b0a3
commit
9df8d4c1ce
|
@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
|
|
||||||
import com.google.protobuf.CodedInputStream;
|
import com.google.protobuf.CodedInputStream;
|
||||||
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Protobuf based WAL has the following structure:
|
* A Protobuf based WAL has the following structure:
|
||||||
|
@ -332,7 +333,7 @@ public class ProtobufLogReader extends ReaderBase {
|
||||||
}
|
}
|
||||||
ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size),
|
ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size),
|
||||||
(int)size);
|
(int)size);
|
||||||
} catch (IOException ipbe) {
|
} catch (InvalidProtocolBufferException ipbe) {
|
||||||
throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
|
throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
|
||||||
originalPosition + ", currentPosition=" + this.inputStream.getPos() +
|
originalPosition + ", currentPosition=" + this.inputStream.getPos() +
|
||||||
", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);
|
", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);
|
||||||
|
|
|
@ -22,9 +22,15 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.FilterInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -37,6 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -91,6 +98,7 @@ import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
import org.apache.hadoop.hbase.wal.WALKey;
|
import org.apache.hadoop.hbase.wal.WALKey;
|
||||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||||
|
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -100,6 +108,8 @@ import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test replay of edits out of a WAL split.
|
* Test replay of edits out of a WAL split.
|
||||||
|
@ -501,7 +511,7 @@ public class TestWALReplay {
|
||||||
boolean first = true;
|
boolean first = true;
|
||||||
for (HColumnDescriptor hcd: htd.getFamilies()) {
|
for (HColumnDescriptor hcd: htd.getFamilies()) {
|
||||||
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
|
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
|
||||||
if (first ) {
|
if (first) {
|
||||||
// If first, so we have at least one family w/ different seqid to rest.
|
// If first, so we have at least one family w/ different seqid to rest.
|
||||||
region.flush(true);
|
region.flush(true);
|
||||||
first = false;
|
first = false;
|
||||||
|
@ -824,9 +834,9 @@ public class TestWALReplay {
|
||||||
final Configuration newConf = HBaseConfiguration.create(this.conf);
|
final Configuration newConf = HBaseConfiguration.create(this.conf);
|
||||||
User user = HBaseTestingUtility.getDifferentUser(newConf,
|
User user = HBaseTestingUtility.getDifferentUser(newConf,
|
||||||
".replay.wal.secondtime");
|
".replay.wal.secondtime");
|
||||||
user.runAs(new PrivilegedExceptionAction() {
|
user.runAs(new PrivilegedExceptionAction<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public Object run() throws Exception {
|
public Void run() throws Exception {
|
||||||
runWALSplit(newConf);
|
runWALSplit(newConf);
|
||||||
FileSystem newFS = FileSystem.get(newConf);
|
FileSystem newFS = FileSystem.get(newConf);
|
||||||
// 100k seems to make for about 4 flushes during HRegion#initialize.
|
// 100k seems to make for about 4 flushes during HRegion#initialize.
|
||||||
|
@ -932,6 +942,103 @@ public class TestWALReplay {
|
||||||
lastestSeqNumber, editCount);
|
lastestSeqNumber, editCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* testcase for https://issues.apache.org/jira/browse/HBASE-15252
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDatalossWhenInputError() throws IOException, InstantiationException,
|
||||||
|
IllegalAccessException {
|
||||||
|
final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
|
||||||
|
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
|
||||||
|
final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
|
||||||
|
deleteDir(basedir);
|
||||||
|
final byte[] rowName = tableName.getName();
|
||||||
|
final int countPerFamily = 10;
|
||||||
|
final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
|
||||||
|
HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
|
||||||
|
Path regionDir = region1.getRegionFileSystem().getRegionDir();
|
||||||
|
HBaseTestingUtility.closeRegionAndWAL(region1);
|
||||||
|
|
||||||
|
WAL wal = createWAL(this.conf);
|
||||||
|
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
|
||||||
|
for (HColumnDescriptor hcd : htd.getFamilies()) {
|
||||||
|
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
|
||||||
|
}
|
||||||
|
// Now assert edits made it in.
|
||||||
|
final Get g = new Get(rowName);
|
||||||
|
Result result = region.get(g);
|
||||||
|
assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
|
||||||
|
// Now close the region (without flush), split the log, reopen the region and assert that
|
||||||
|
// replay of log has the correct effect.
|
||||||
|
region.close(true);
|
||||||
|
wal.shutdown();
|
||||||
|
|
||||||
|
runWALSplit(this.conf);
|
||||||
|
|
||||||
|
// here we let the DFSInputStream throw an IOException just after the WALHeader.
|
||||||
|
Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, regionDir).first();
|
||||||
|
FSDataInputStream stream = fs.open(editFile);
|
||||||
|
stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
|
||||||
|
Class<? extends DefaultWALProvider.Reader> logReaderClass =
|
||||||
|
conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
|
||||||
|
DefaultWALProvider.Reader.class);
|
||||||
|
DefaultWALProvider.Reader reader = logReaderClass.newInstance();
|
||||||
|
reader.init(this.fs, editFile, conf, stream);
|
||||||
|
final long headerLength = stream.getPos();
|
||||||
|
reader.close();
|
||||||
|
FileSystem spyFs = spy(this.fs);
|
||||||
|
doAnswer(new Answer<FSDataInputStream>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod();
|
||||||
|
Field field = FilterInputStream.class.getDeclaredField("in");
|
||||||
|
field.setAccessible(true);
|
||||||
|
final DFSInputStream in = (DFSInputStream) field.get(stream);
|
||||||
|
DFSInputStream spyIn = spy(in);
|
||||||
|
doAnswer(new Answer<Integer>() {
|
||||||
|
|
||||||
|
private long pos;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
if (pos >= headerLength) {
|
||||||
|
throw new IOException("read over limit");
|
||||||
|
}
|
||||||
|
int b = (Integer) invocation.callRealMethod();
|
||||||
|
if (b > 0) {
|
||||||
|
pos += b;
|
||||||
|
}
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
}).when(spyIn).read(any(byte[].class), any(int.class), any(int.class));
|
||||||
|
doAnswer(new Answer<Void>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
invocation.callRealMethod();
|
||||||
|
in.close();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).when(spyIn).close();
|
||||||
|
field.set(stream, spyIn);
|
||||||
|
return stream;
|
||||||
|
}
|
||||||
|
}).when(spyFs).open(eq(editFile));
|
||||||
|
|
||||||
|
WAL wal2 = createWAL(this.conf);
|
||||||
|
HRegion region2;
|
||||||
|
try {
|
||||||
|
// log replay should fail due to the IOException, otherwise we may lose data.
|
||||||
|
region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2);
|
||||||
|
assertEquals(result.size(), region2.get(g).size());
|
||||||
|
} catch (IOException e) {
|
||||||
|
assertEquals("read over limit", e.getMessage());
|
||||||
|
}
|
||||||
|
region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2);
|
||||||
|
assertEquals(result.size(), region2.get(g).size());
|
||||||
|
}
|
||||||
|
|
||||||
static class MockWAL extends FSHLog {
|
static class MockWAL extends FSHLog {
|
||||||
boolean doCompleteCacheFlush = false;
|
boolean doCompleteCacheFlush = false;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue