HBASE-16451 Procedure v2 - Test WAL protobuf entry size limit

This commit is contained in:
Matteo Bertozzi 2016-08-23 20:43:29 -07:00
parent 32c21f4594
commit 97b164ac38
4 changed files with 77 additions and 17 deletions

View File

@ -43,7 +43,9 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ByteSlot extends OutputStream {
private static final int DOUBLE_GROW_LIMIT = 1 << 20;
private static final int LARGE_GROW_SIZE_THRESHOLD = 8 << 20;
private static final int LARGE_GROW_SIZE = 1 << 20;
private static final int RESET_THRESHOLD = 64 << 20;
private static final int GROW_ALIGN = 128;
private byte[] buf;
@ -51,6 +53,9 @@ public class ByteSlot extends OutputStream {
private int size;
public void reset() {
if (buf != null && buf.length > RESET_THRESHOLD) {
buf = null;
}
head = 0;
size = 0;
}
@ -101,11 +106,16 @@ public class ByteSlot extends OutputStream {
if (buf == null) {
buf = new byte[minCapacity];
} else if (minCapacity > buf.length) {
int newCapacity = buf.length << 1;
if (minCapacity > newCapacity || newCapacity > DOUBLE_GROW_LIMIT) {
int newCapacity;
if (buf.length <= LARGE_GROW_SIZE_THRESHOLD) {
newCapacity = buf.length << 1;
} else {
newCapacity = buf.length + LARGE_GROW_SIZE;
}
if (minCapacity > newCapacity) {
newCapacity = minCapacity;
}
buf = Arrays.copyOf(buf, newCapacity);
}
}
}
}

View File

@ -31,15 +31,16 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.util.Threads;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -89,6 +90,24 @@ public class ProcedureTestingUtility {
procExecutor.start(execThreads, failOnCorrupted);
}
public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader)
throws Exception {
procStore.stop(false);
procStore.start(procStore.getNumThreads());
procStore.recoverLease();
procStore.load(loader);
}
public static void storeRestartAndAssert(ProcedureStore procStore, long maxProcId,
long runnableCount, int completedCount, int corruptedCount) throws Exception {
final LoadCounter loader = new LoadCounter();
storeRestart(procStore, loader);
assertEquals(maxProcId, loader.getMaxProcId());
assertEquals(runnableCount, loader.getRunnableCount());
assertEquals(completedCount, loader.getCompletedCount());
assertEquals(corruptedCount, loader.getCorruptedCount());
}
public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
boolean value) {
if (procExecutor.testing == null) {
@ -223,6 +242,8 @@ public class ProcedureTestingUtility {
}
public static class TestProcedure extends Procedure<Void> {
private byte[] data = null;
public TestProcedure() {}
public TestProcedure(long procId) {
@ -230,6 +251,11 @@ public class ProcedureTestingUtility {
}
public TestProcedure(long procId, long parentId) {
this(procId, parentId, null);
}
public TestProcedure(long procId, long parentId, byte[] data) {
setData(data);
setProcId(procId);
if (parentId > 0) {
setParentProcId(parentId);
@ -244,6 +270,10 @@ public class ProcedureTestingUtility {
setState(ProcedureState.FINISHED);
}
public void setData(final byte[] data) {
this.data = data;
}
@Override
protected Procedure[] execute(Void env) { return null; }
@ -254,10 +284,21 @@ public class ProcedureTestingUtility {
protected boolean abort(Void env) { return false; }
@Override
protected void serializeStateData(final OutputStream stream) throws IOException { }
protected void serializeStateData(final OutputStream stream) throws IOException {
StreamUtils.writeRawVInt32(stream, data != null ? data.length : 0);
if (data != null) stream.write(data);
}
@Override
protected void deserializeStateData(final InputStream stream) throws IOException { }
protected void deserializeStateData(final InputStream stream) throws IOException {
int len = StreamUtils.readRawVarint32(stream);
if (len > 0) {
data = new byte[len];
stream.read(data);
} else {
data = null;
}
}
}
public static class LoadCounter implements ProcedureStore.ProcedureLoader {

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -129,4 +130,18 @@ public class TestStressWALProcedureStore {
assertTrue(procStore.getStoreTracker().isEmpty());
assertEquals(1, procStore.getActiveLogs().size());
}
@Test
public void testEntrySizeLimit() throws Exception {
final int NITEMS = 20;
for (int i = 1; i <= NITEMS; ++i) {
final byte[] data = new byte[256 << i];
LOG.info(String.format("Writing %s", StringUtils.humanSize(data.length)));
TestProcedure proc = new TestProcedure(i, 0, data);
procStore.insert(proc, null);
}
// check that we are able to read the big proc-blobs
ProcedureTestingUtility.storeRestartAndAssert(procStore, NITEMS, NITEMS, 0, 0);
}
}

View File

@ -93,10 +93,7 @@ public class TestWALProcedureStore {
}
private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception {
procStore.stop(false);
procStore.start(PROCEDURE_STORE_SLOTS);
procStore.recoverLease();
procStore.load(loader);
ProcedureTestingUtility.storeRestart(procStore, loader);
}
@Test
@ -622,6 +619,7 @@ public class TestWALProcedureStore {
assertEquals(0, loader.getCorruptedCount());
}
@Test
public void testLoadChildren() throws Exception {
TestProcedure a = new TestProcedure(1, 0);
TestProcedure b = new TestProcedure(2, 1);
@ -659,12 +657,8 @@ public class TestWALProcedureStore {
private void restartAndAssert(long maxProcId, long runnableCount,
int completedCount, int corruptedCount) throws Exception {
final LoadCounter loader = new LoadCounter();
storeRestart(loader);
assertEquals(maxProcId, loader.getMaxProcId());
assertEquals(runnableCount, loader.getRunnableCount());
assertEquals(completedCount, loader.getCompletedCount());
assertEquals(corruptedCount, loader.getCorruptedCount());
ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId,
runnableCount, completedCount, corruptedCount);
}
private void corruptLog(final FileStatus logFile, final long dropBytes)