HBASE-21909 Validate the put instance before executing in AsyncTable.put method

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
zhangduo 2019-02-15 20:57:56 +08:00
parent 48f2ef432b
commit d88a87fd04
10 changed files with 72 additions and 29 deletions

View File

@ -97,6 +97,13 @@ public interface AsyncBufferedMutatorBuilder {
*/
AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize);
/**
* Override the maximum key-value size specified by the provided {@link AsyncConnection}'s
* {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key
* {@code hbase.client.keyvalue.maxsize}.
*/
AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize);
/**
* Create the {@link AsyncBufferedMutator} instance.
*/

View File

@ -37,11 +37,14 @@ class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
private long periodicFlushTimeoutNs;
private int maxKeyValueSize;
public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf,
AsyncTableBuilder<?> tableBuilder, HashedWheelTimer periodicalFlushTimer) {
this.tableBuilder = tableBuilder;
this.writeBufferSize = connConf.getWriteBufferSize();
this.periodicFlushTimeoutNs = connConf.getWriteBufferPeriodicFlushTimeoutNs();
this.maxKeyValueSize = connConf.getMaxKeyValueSize();
this.periodicalFlushTimer = periodicalFlushTimer;
}
@ -77,7 +80,7 @@ class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
@Override
public AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize) {
Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must be >= 0",
Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must be > 0",
writeBufferSize);
this.writeBufferSize = writeBufferSize;
return this;
@ -89,9 +92,17 @@ class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
return this;
}
@Override
public AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize) {
Preconditions.checkArgument(maxKeyValueSize > 0, "maxKeyValueSize %d must be > 0",
maxKeyValueSize);
this.maxKeyValueSize = maxKeyValueSize;
return this;
}
@Override
public AsyncBufferedMutator build() {
return new AsyncBufferedMutatorImpl(periodicalFlushTimer, tableBuilder.build(), writeBufferSize,
periodicFlushTimeoutNs);
periodicFlushTimeoutNs, maxKeyValueSize);
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
@ -49,6 +50,8 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
private final long periodicFlushTimeoutNs;
private final int maxKeyValueSize;
private List<Mutation> mutations = new ArrayList<>();
private List<CompletableFuture<Void>> futures = new ArrayList<>();
@ -61,11 +64,12 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
Timeout periodicFlushTask;
AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
long writeBufferSize, long periodicFlushTimeoutNs) {
long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) {
this.periodicalFlushTimer = periodicalFlushTimer;
this.table = table;
this.writeBufferSize = writeBufferSize;
this.periodicFlushTimeoutNs = periodicFlushTimeoutNs;
this.maxKeyValueSize = maxKeyValueSize;
}
@Override
@ -112,7 +116,13 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
List<CompletableFuture<Void>> futures =
Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size())
.collect(Collectors.toList());
long heapSize = mutations.stream().mapToLong(m -> m.heapSize()).sum();
long heapSize = 0;
for (Mutation mutation : mutations) {
heapSize += mutation.heapSize();
if (mutation instanceof Put) {
validatePut((Put)mutation, maxKeyValueSize);
}
}
synchronized (this) {
if (closed) {
IOException ioe = new IOException("Already closed");

View File

@ -40,6 +40,8 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND;
@ -106,6 +108,8 @@ class AsyncConnectionConfiguration {
private final long primaryMetaScanTimeoutNs;
private final int maxKeyValueSize;
AsyncConnectionConfiguration(Configuration conf) {
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
@ -142,6 +146,7 @@ class AsyncConnectionConfiguration {
this.primaryMetaScanTimeoutNs =
TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT));
this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
}
long getMetaOperationTimeoutNs() {
@ -211,4 +216,8 @@ class AsyncConnectionConfiguration {
long getPrimaryMetaScanTimeoutNs() {
return primaryMetaScanTimeoutNs;
}
int getMaxKeyValueSize() {
return maxKeyValueSize;
}
}

View File

@ -188,7 +188,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
int toAddCount = 0;
for (Mutation m : ms) {
if (m instanceof Put) {
HTable.validatePut((Put) m, maxKeyValueSize);
ConnectionUtils.validatePut((Put) m, maxKeyValueSize);
}
toAddSize += m.heapSize();
++toAddCount;

View File

@ -572,4 +572,20 @@ public final class ConnectionUtils {
});
return future;
}
// validate for well-formedness
static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert");
}
if (maxKeyValueSize > 0) {
for (List<Cell> list : put.getFamilyCellMap().values()) {
for (Cell cell : list) {
if (cell.getSerializedSize() > maxKeyValueSize) {
throw new IllegalArgumentException("KeyValue size too large");
}
}
}
}
}
}

View File

@ -27,7 +27,6 @@ import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
@ -920,24 +919,8 @@ public class HTable implements Table {
}
// validate for well-formedness
public void validatePut(final Put put) throws IllegalArgumentException {
validatePut(put, connConfiguration.getMaxKeyValueSize());
}
// validate for well-formedness
public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert");
}
if (maxKeyValueSize > 0) {
for (List<Cell> list : put.getFamilyCellMap().values()) {
for (Cell cell : list) {
if (cell.getSerializedSize() > maxKeyValueSize) {
throw new IllegalArgumentException("KeyValue size too large");
}
}
}
}
private void validatePut(final Put put) throws IllegalArgumentException {
ConnectionUtils.validatePut(put, connConfiguration.getMaxKeyValueSize());
}
/**
@ -1261,6 +1244,7 @@ public class HTable implements Table {
@Override
public boolean thenPut(Put put) throws IOException {
validatePut(put);
preCheck();
return doCheckAndPut(row, family, qualifier, op.name(), value, timeRange, put);
}

View File

@ -34,7 +34,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@ -46,6 +45,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -195,7 +195,7 @@ public class HTableMultiplexer {
}
try {
HTable.validatePut(put, maxKeyValueSize);
ConnectionUtils.validatePut(put, maxKeyValueSize);
// Allow mocking to get at the connection, but don't expose the connection to users.
ClusterConnection conn = (ClusterConnection) getConnection();
// AsyncProcess in the FlushWorker should take care of refreshing the location cache

View File

@ -21,6 +21,7 @@ import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.RpcChannel;
@ -235,6 +236,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Void> put(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
return this.<Void> newCaller(put, writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
put, RequestConverter::buildMutateRequest))
@ -326,6 +328,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
@ -478,6 +481,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public List<CompletableFuture<Void>> put(List<Put> puts) {
for (Put put : puts) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
}
return voidMutate(puts);
}

View File

@ -245,8 +245,8 @@ public class TestAsyncBufferMutator {
private int flushCount;
AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
long writeBufferSize, long periodicFlushTimeoutNs) {
super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs);
long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) {
super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs, maxKeyValueSize);
}
@Override
@ -262,7 +262,7 @@ public class TestAsyncBufferMutator {
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
try (AsyncBufferMutatorForTest mutator =
new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME),
10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200))) {
10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024)) {
CompletableFuture<?> future = mutator.mutate(put);
Timeout task = mutator.periodicFlushTask;
// we should have scheduled a periodic flush task