diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index dc5c9cc555d..bb25aa1ca7f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; import com.google.protobuf.CodedInputStream; +import com.google.protobuf.InvalidProtocolBufferException; /** * A Protobuf based WAL has the following structure: @@ -332,7 +333,7 @@ public class ProtobufLogReader extends ReaderBase { } ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size), (int)size); - } catch (IOException ipbe) { + } catch (InvalidProtocolBufferException ipbe) { throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" + originalPosition + ", currentPosition=" + this.inputStream.getPos() + ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index eea3cc8c398..f9d7da2eb34 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -22,9 +22,15 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import java.io.FilterInputStream; import java.io.IOException; +import java.lang.reflect.Field; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; @@ -37,6 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -91,6 +98,7 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hdfs.DFSInputStream; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -100,6 +108,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** * Test replay of edits out of a WAL split. @@ -501,7 +511,7 @@ public class TestWALReplay { boolean first = true; for (HColumnDescriptor hcd: htd.getFamilies()) { addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); - if (first ) { + if (first) { // If first, so we have at least one family w/ different seqid to rest. region.flush(true); first = false; @@ -824,9 +834,9 @@ public class TestWALReplay { final Configuration newConf = HBaseConfiguration.create(this.conf); User user = HBaseTestingUtility.getDifferentUser(newConf, ".replay.wal.secondtime"); - user.runAs(new PrivilegedExceptionAction() { + user.runAs(new PrivilegedExceptionAction() { @Override - public Object run() throws Exception { + public Void run() throws Exception { runWALSplit(newConf); FileSystem newFS = FileSystem.get(newConf); // 100k seems to make for about 4 flushes during HRegion#initialize. @@ -932,6 +942,103 @@ public class TestWALReplay { lastestSeqNumber, editCount); } + /** + * testcase for https://issues.apache.org/jira/browse/HBASE-15252 + */ + @Test + public void testDatalossWhenInputError() throws IOException, InstantiationException, + IllegalAccessException { + final TableName tableName = TableName.valueOf("testDatalossWhenInputError"); + final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); + final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); + deleteDir(basedir); + final byte[] rowName = tableName.getName(); + final int countPerFamily = 10; + final HTableDescriptor htd = createBasic1FamilyHTD(tableName); + HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); + Path regionDir = region1.getRegionFileSystem().getRegionDir(); + HBaseTestingUtility.closeRegionAndWAL(region1); + + WAL wal = createWAL(this.conf); + HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); + for (HColumnDescriptor hcd : htd.getFamilies()) { + addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); + } + // Now assert edits made it in. + final Get g = new Get(rowName); + Result result = region.get(g); + assertEquals(countPerFamily * htd.getFamilies().size(), result.size()); + // Now close the region (without flush), split the log, reopen the region and assert that + // replay of log has the correct effect. + region.close(true); + wal.shutdown(); + + runWALSplit(this.conf); + + // here we let the DFSInputStream throw an IOException just after the WALHeader. + Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, regionDir).first(); + FSDataInputStream stream = fs.open(editFile); + stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length); + Class logReaderClass = + conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, + DefaultWALProvider.Reader.class); + DefaultWALProvider.Reader reader = logReaderClass.newInstance(); + reader.init(this.fs, editFile, conf, stream); + final long headerLength = stream.getPos(); + reader.close(); + FileSystem spyFs = spy(this.fs); + doAnswer(new Answer() { + + @Override + public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { + FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod(); + Field field = FilterInputStream.class.getDeclaredField("in"); + field.setAccessible(true); + final DFSInputStream in = (DFSInputStream) field.get(stream); + DFSInputStream spyIn = spy(in); + doAnswer(new Answer() { + + private long pos; + + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + if (pos >= headerLength) { + throw new IOException("read over limit"); + } + int b = (Integer) invocation.callRealMethod(); + if (b > 0) { + pos += b; + } + return b; + } + }).when(spyIn).read(any(byte[].class), any(int.class), any(int.class)); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + invocation.callRealMethod(); + in.close(); + return null; + } + }).when(spyIn).close(); + field.set(stream, spyIn); + return stream; + } + }).when(spyFs).open(eq(editFile)); + + WAL wal2 = createWAL(this.conf); + HRegion region2; + try { + // log replay should fail due to the IOException, otherwise we may lose data. + region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2); + assertEquals(result.size(), region2.get(g).size()); + } catch (IOException e) { + assertEquals("read over limit", e.getMessage()); + } + region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2); + assertEquals(result.size(), region2.get(g).size()); + } + static class MockWAL extends FSHLog { boolean doCompleteCacheFlush = false;