From dedab71381e14d96da61485f859e0792d8ebc3f9 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 15 Feb 2019 20:57:56 +0800 Subject: [PATCH] HBASE-21909 Validate the put instance before executing in AsyncTable.put method Signed-off-by: Michael Stack --- .../client/AsyncBufferedMutatorBuilder.java | 7 ++++++ .../AsyncBufferedMutatorBuilderImpl.java | 15 +++++++++++-- .../client/AsyncBufferedMutatorImpl.java | 14 ++++++++++-- .../client/AsyncConnectionConfiguration.java | 9 ++++++++ .../hbase/client/BufferedMutatorImpl.java | 2 +- .../hadoop/hbase/client/ConnectionUtils.java | 16 ++++++++++++++ .../apache/hadoop/hbase/client/HTable.java | 22 +++---------------- .../hbase/client/RawAsyncTableImpl.java | 6 +++++ .../hbase/client/TestAsyncBufferMutator.java | 6 ++--- 9 files changed, 70 insertions(+), 27 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java index c617c8e1e8d..ea2528d5152 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java @@ -97,6 +97,13 @@ public interface AsyncBufferedMutatorBuilder { */ AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize); + /** + * Override the maximum key-value size specified by the provided {@link AsyncConnection}'s + * {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key + * {@code hbase.client.keyvalue.maxsize}. + */ + AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize); + /** * Create the {@link AsyncBufferedMutator} instance. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java index eb8af175fbd..cd0496377bc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java @@ -37,11 +37,14 @@ class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder { private long periodicFlushTimeoutNs; + private int maxKeyValueSize; + public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf, AsyncTableBuilder tableBuilder, HashedWheelTimer periodicalFlushTimer) { this.tableBuilder = tableBuilder; this.writeBufferSize = connConf.getWriteBufferSize(); this.periodicFlushTimeoutNs = connConf.getWriteBufferPeriodicFlushTimeoutNs(); + this.maxKeyValueSize = connConf.getMaxKeyValueSize(); this.periodicalFlushTimer = periodicalFlushTimer; } @@ -77,7 +80,7 @@ class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder { @Override public AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize) { - Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must be >= 0", + Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must be > 0", writeBufferSize); this.writeBufferSize = writeBufferSize; return this; @@ -89,9 +92,17 @@ class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder { return this; } + @Override + public AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize) { + Preconditions.checkArgument(maxKeyValueSize > 0, "maxKeyValueSize %d must be > 0", + maxKeyValueSize); + this.maxKeyValueSize = maxKeyValueSize; + return this; + } + @Override public AsyncBufferedMutator build() { return new AsyncBufferedMutatorImpl(periodicalFlushTimer, tableBuilder.build(), writeBufferSize, - periodicFlushTimeoutNs); + periodicFlushTimeoutNs, maxKeyValueSize); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java index 61d49af3103..7aa9597c594 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; @@ -49,6 +50,8 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { private final long periodicFlushTimeoutNs; + private final int maxKeyValueSize; + private List mutations = new ArrayList<>(); private List> futures = new ArrayList<>(); @@ -61,11 +64,12 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { Timeout periodicFlushTask; AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, AsyncTable table, - long writeBufferSize, long periodicFlushTimeoutNs) { + long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) { this.periodicalFlushTimer = periodicalFlushTimer; this.table = table; this.writeBufferSize = writeBufferSize; this.periodicFlushTimeoutNs = periodicFlushTimeoutNs; + this.maxKeyValueSize = maxKeyValueSize; } @Override @@ -112,7 +116,13 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { List> futures = Stream.> generate(CompletableFuture::new).limit(mutations.size()) .collect(Collectors.toList()); - long heapSize = mutations.stream().mapToLong(m -> m.heapSize()).sum(); + long heapSize = 0; + for (Mutation mutation : mutations) { + heapSize += mutation.heapSize(); + if (mutation instanceof Put) { + validatePut((Put)mutation, maxKeyValueSize); + } + } synchronized (this) { if (closed) { IOException ioe = new IOException("Already closed"); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index 65542e48e74..22042c9b831 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -40,6 +40,8 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY; import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT; import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND; @@ -106,6 +108,8 @@ class AsyncConnectionConfiguration { private final long primaryMetaScanTimeoutNs; + private final int maxKeyValueSize; + AsyncConnectionConfiguration(Configuration conf) { this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); @@ -142,6 +146,7 @@ class AsyncConnectionConfiguration { this.primaryMetaScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT)); + this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); } long getMetaOperationTimeoutNs() { @@ -211,4 +216,8 @@ class AsyncConnectionConfiguration { long getPrimaryMetaScanTimeoutNs() { return primaryMetaScanTimeoutNs; } + + int getMaxKeyValueSize() { + return maxKeyValueSize; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index d4bc811c72c..f0c8da413d8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -188,7 +188,7 @@ public class BufferedMutatorImpl implements BufferedMutator { int toAddCount = 0; for (Mutation m : ms) { if (m instanceof Put) { - HTable.validatePut((Put) m, maxKeyValueSize); + ConnectionUtils.validatePut((Put) m, maxKeyValueSize); } toAddSize += m.heapSize(); ++toAddCount; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 8e050df93da..3b6560fd100 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -572,4 +572,20 @@ public final class ConnectionUtils { }); return future; } + + // validate for well-formedness + static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException { + if (put.isEmpty()) { + throw new IllegalArgumentException("No columns to insert"); + } + if (maxKeyValueSize > 0) { + for (List list : put.getFamilyCellMap().values()) { + for (Cell cell : list) { + if (cell.getSerializedSize() > maxKeyValueSize) { + throw new IllegalArgumentException("KeyValue size too large"); + } + } + } + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 15a189cd171..9b3afd9e824 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -27,7 +27,6 @@ import com.google.protobuf.Service; import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; @@ -920,24 +919,8 @@ public class HTable implements Table { } // validate for well-formedness - public void validatePut(final Put put) throws IllegalArgumentException { - validatePut(put, connConfiguration.getMaxKeyValueSize()); - } - - // validate for well-formedness - public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException { - if (put.isEmpty()) { - throw new IllegalArgumentException("No columns to insert"); - } - if (maxKeyValueSize > 0) { - for (List list : put.getFamilyCellMap().values()) { - for (Cell cell : list) { - if (cell.getSerializedSize() > maxKeyValueSize) { - throw new IllegalArgumentException("KeyValue size too large"); - } - } - } - } + private void validatePut(final Put put) throws IllegalArgumentException { + ConnectionUtils.validatePut(put, connConfiguration.getMaxKeyValueSize()); } /** @@ -1261,6 +1244,7 @@ public class HTable implements Table { @Override public boolean thenPut(Put put) throws IOException { + validatePut(put); preCheck(); return doCheckAndPut(row, family, qualifier, op.name(), value, timeRange, put); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 7562e6fd6b7..96fa85d67ce 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -21,6 +21,7 @@ import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; +import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import com.google.protobuf.RpcChannel; @@ -235,6 +236,7 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture put(Put put) { + validatePut(put, conn.connConf.getMaxKeyValueSize()); return this. newCaller(put, writeRpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub, put, RequestConverter::buildMutateRequest)) @@ -326,6 +328,7 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture thenPut(Put put) { + validatePut(put, conn.connConf.getMaxKeyValueSize()); preCheck(); return RawAsyncTableImpl.this. newCaller(row, rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, loc, @@ -478,6 +481,9 @@ class RawAsyncTableImpl implements AsyncTable { @Override public List> put(List puts) { + for (Put put : puts) { + validatePut(put, conn.connConf.getMaxKeyValueSize()); + } return voidMutate(puts); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java index 6eed326f2d6..5e7f6cc5a0f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java @@ -245,8 +245,8 @@ public class TestAsyncBufferMutator { private int flushCount; AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable table, - long writeBufferSize, long periodicFlushTimeoutNs) { - super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs); + long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) { + super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs, maxKeyValueSize); } @Override @@ -262,7 +262,7 @@ public class TestAsyncBufferMutator { Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); try (AsyncBufferMutatorForTest mutator = new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME), - 10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200))) { + 10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024)) { CompletableFuture future = mutator.mutate(put); Timeout task = mutator.periodicFlushTask; // we should have scheduled a periodic flush task