From 97b164ac3892991f23fe8a02581ef30336cf5909 Mon Sep 17 00:00:00 2001 From: Matteo Bertozzi Date: Tue, 23 Aug 2016 20:43:29 -0700 Subject: [PATCH] HBASE-16451 Procedure v2 - Test WAL protobuf entry size limit --- .../hbase/procedure2/util/ByteSlot.java | 18 +++++-- .../procedure2/ProcedureTestingUtility.java | 47 +++++++++++++++++-- .../wal/TestStressWALProcedureStore.java | 15 ++++++ .../store/wal/TestWALProcedureStore.java | 14 ++---- 4 files changed, 77 insertions(+), 17 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java index 890411661b9..c4ed9b76cb0 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java @@ -43,7 +43,9 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; @InterfaceAudience.Private @InterfaceStability.Evolving public class ByteSlot extends OutputStream { - private static final int DOUBLE_GROW_LIMIT = 1 << 20; + private static final int LARGE_GROW_SIZE_THRESHOLD = 8 << 20; + private static final int LARGE_GROW_SIZE = 1 << 20; + private static final int RESET_THRESHOLD = 64 << 20; private static final int GROW_ALIGN = 128; private byte[] buf; @@ -51,6 +53,9 @@ public class ByteSlot extends OutputStream { private int size; public void reset() { + if (buf != null && buf.length > RESET_THRESHOLD) { + buf = null; + } head = 0; size = 0; } @@ -101,11 +106,16 @@ public class ByteSlot extends OutputStream { if (buf == null) { buf = new byte[minCapacity]; } else if (minCapacity > buf.length) { - int newCapacity = buf.length << 1; - if (minCapacity > newCapacity || newCapacity > DOUBLE_GROW_LIMIT) { + int newCapacity; + if (buf.length <= LARGE_GROW_SIZE_THRESHOLD) { + newCapacity = buf.length << 1; + } else { + newCapacity = buf.length + LARGE_GROW_SIZE; + } + if (minCapacity > newCapacity) { newCapacity = minCapacity; } buf = Arrays.copyOf(buf, newCapacity); } } -} \ No newline at end of file +} diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index ceec855b418..7365de93d16 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -31,15 +31,16 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ProcedureInfo; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState; +import org.apache.hadoop.hbase.util.Threads; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -89,6 +90,24 @@ public class ProcedureTestingUtility { procExecutor.start(execThreads, failOnCorrupted); } + public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader) + throws Exception { + procStore.stop(false); + procStore.start(procStore.getNumThreads()); + procStore.recoverLease(); + procStore.load(loader); + } + + public static void storeRestartAndAssert(ProcedureStore procStore, long maxProcId, + long runnableCount, int completedCount, int corruptedCount) throws Exception { + final LoadCounter loader = new LoadCounter(); + storeRestart(procStore, loader); + assertEquals(maxProcId, loader.getMaxProcId()); + assertEquals(runnableCount, loader.getRunnableCount()); + assertEquals(completedCount, loader.getCompletedCount()); + assertEquals(corruptedCount, loader.getCorruptedCount()); + } + public static void setKillBeforeStoreUpdate(ProcedureExecutor procExecutor, boolean value) { if (procExecutor.testing == null) { @@ -223,6 +242,8 @@ public class ProcedureTestingUtility { } public static class TestProcedure extends Procedure { + private byte[] data = null; + public TestProcedure() {} public TestProcedure(long procId) { @@ -230,6 +251,11 @@ public class ProcedureTestingUtility { } public TestProcedure(long procId, long parentId) { + this(procId, parentId, null); + } + + public TestProcedure(long procId, long parentId, byte[] data) { + setData(data); setProcId(procId); if (parentId > 0) { setParentProcId(parentId); @@ -244,6 +270,10 @@ public class ProcedureTestingUtility { setState(ProcedureState.FINISHED); } + public void setData(final byte[] data) { + this.data = data; + } + @Override protected Procedure[] execute(Void env) { return null; } @@ -254,10 +284,21 @@ public class ProcedureTestingUtility { protected boolean abort(Void env) { return false; } @Override - protected void serializeStateData(final OutputStream stream) throws IOException { } + protected void serializeStateData(final OutputStream stream) throws IOException { + StreamUtils.writeRawVInt32(stream, data != null ? data.length : 0); + if (data != null) stream.write(data); + } @Override - protected void deserializeStateData(final InputStream stream) throws IOException { } + protected void deserializeStateData(final InputStream stream) throws IOException { + int len = StreamUtils.readRawVarint32(stream); + if (len > 0) { + data = new byte[len]; + stream.read(data); + } else { + data = null; + } + } } public static class LoadCounter implements ProcedureStore.ProcedureLoader { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java index 8e15aef15a6..8b1c49af2c7 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; +import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -129,4 +130,18 @@ public class TestStressWALProcedureStore { assertTrue(procStore.getStoreTracker().isEmpty()); assertEquals(1, procStore.getActiveLogs().size()); } + + @Test + public void testEntrySizeLimit() throws Exception { + final int NITEMS = 20; + for (int i = 1; i <= NITEMS; ++i) { + final byte[] data = new byte[256 << i]; + LOG.info(String.format("Writing %s", StringUtils.humanSize(data.length))); + TestProcedure proc = new TestProcedure(i, 0, data); + procStore.insert(proc, null); + } + + // check that we are able to read the big proc-blobs + ProcedureTestingUtility.storeRestartAndAssert(procStore, NITEMS, NITEMS, 0, 0); + } } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index e7f7c77a50a..2e2a0384478 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -93,10 +93,7 @@ public class TestWALProcedureStore { } private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception { - procStore.stop(false); - procStore.start(PROCEDURE_STORE_SLOTS); - procStore.recoverLease(); - procStore.load(loader); + ProcedureTestingUtility.storeRestart(procStore, loader); } @Test @@ -622,6 +619,7 @@ public class TestWALProcedureStore { assertEquals(0, loader.getCorruptedCount()); } + @Test public void testLoadChildren() throws Exception { TestProcedure a = new TestProcedure(1, 0); TestProcedure b = new TestProcedure(2, 1); @@ -659,12 +657,8 @@ public class TestWALProcedureStore { private void restartAndAssert(long maxProcId, long runnableCount, int completedCount, int corruptedCount) throws Exception { - final LoadCounter loader = new LoadCounter(); - storeRestart(loader); - assertEquals(maxProcId, loader.getMaxProcId()); - assertEquals(runnableCount, loader.getRunnableCount()); - assertEquals(completedCount, loader.getCompletedCount()); - assertEquals(corruptedCount, loader.getCorruptedCount()); + ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId, + runnableCount, completedCount, corruptedCount); } private void corruptLog(final FileStatus logFile, final long dropBytes)