HBASE-25575 Should validate Puts in RowMutations

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Toshihiro Suzuki 2021-02-22 10:46:16 +09:00
parent 5356edfe63
commit cfbae4d3a3
4 changed files with 113 additions and 4 deletions

View File

@ -570,7 +570,7 @@ public final class ConnectionUtils {
}
// validate for well-formedness
static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
static void validatePut(Put put, int maxKeyValueSize) {
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert");
}
@ -585,6 +585,14 @@ public final class ConnectionUtils {
}
}
static void validatePutsInRowMutations(RowMutations rowMutations, int maxKeyValueSize) {
for (Mutation mutation : rowMutations.getMutations()) {
if (mutation instanceof Put) {
validatePut((Put) mutation, maxKeyValueSize);
}
}
}
/**
* Select the priority for the rpc call.
* <p/>

View File

@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMutations;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.RpcChannel;
@ -381,6 +382,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
preCheck();
validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize());
return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
@ -441,6 +443,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize());
return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
@ -458,9 +461,6 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
if (checkAndMutate.getAction() instanceof Put) {
validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
}
if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete
|| checkAndMutate.getAction() instanceof Increment
|| checkAndMutate.getAction() instanceof Append) {
@ -480,6 +480,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
.call();
} else if (checkAndMutate.getAction() instanceof RowMutations) {
RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize());
return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(),
rowMutations.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) ->
@ -552,6 +553,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Result> mutateRow(RowMutations mutations) {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
return this.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(),
writeRpcTimeoutNs).action((controller, loc, stub) ->
this.<Result, Result> mutateRow(controller, loc, stub, mutations,
@ -653,7 +655,12 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
CheckAndMutate checkAndMutate = (CheckAndMutate) action;
if (checkAndMutate.getAction() instanceof Put) {
validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
} else if (checkAndMutate.getAction() instanceof RowMutations) {
validatePutsInRowMutations((RowMutations) checkAndMutate.getAction(),
conn.connConf.getMaxKeyValueSize());
}
} else if (action instanceof RowMutations) {
validatePutsInRowMutations((RowMutations) action, conn.connConf.getMaxKeyValueSize());
}
}
return conn.callerFactory.batch().table(tableName).actions(actions)

View File

@ -1673,4 +1673,47 @@ public class TestAsyncTable {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}
@Test
public void testInvalidPutInRowMutations() throws IOException {
final byte[] row = Bytes.toBytes(0);
try {
getTable.get().mutateRow(new RowMutations(row).add((Mutation) new Put(row)));
fail("Should fail since the put does not contain any cells");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("No columns to insert"));
}
try {
getTable.get()
.mutateRow(new RowMutations(row).add((Mutation) new Put(row)
.addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE])));
fail("Should fail since the put exceeds the max key value size");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}
@Test
public void testInvalidPutInRowMutationsInCheckAndMutate() throws IOException {
final byte[] row = Bytes.toBytes(0);
try {
getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, QUALIFIER)
.build(new RowMutations(row).add((Mutation) new Put(row))));
fail("Should fail since the put does not contain any cells");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("No columns to insert"));
}
try {
getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, QUALIFIER)
.build(new RowMutations(row).add((Mutation) new Put(row)
.addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]))));
fail("Should fail since the put exceeds the max key value size");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}
}

View File

@ -335,6 +335,57 @@ public class TestAsyncTableBatch {
}
}
@Test
public void testInvalidPutInRowMutations() throws IOException {
final byte[] row = Bytes.toBytes(0);
AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
try {
table.batch(Arrays.asList(new Delete(row), new RowMutations(row)
.add((Mutation) new Put(row))));
fail("Should fail since the put does not contain any cells");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("No columns to insert"));
}
try {
table.batch(
Arrays.asList(new RowMutations(row).add((Mutation) new Put(row)
.addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE])),
new Delete(row)));
fail("Should fail since the put exceeds the max key value size");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}
@Test
public void testInvalidPutInRowMutationsInCheckAndMutate() throws IOException {
final byte[] row = Bytes.toBytes(0);
AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
try {
table.batch(Arrays.asList(new Delete(row), CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, CQ)
.build(new RowMutations(row).add((Mutation) new Put(row)))));
fail("Should fail since the put does not contain any cells");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("No columns to insert"));
}
try {
table.batch(
Arrays.asList(CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, CQ)
.build(new RowMutations(row).add((Mutation) new Put(row)
.addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]))),
new Delete(row)));
fail("Should fail since the put exceeds the max key value size");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}
@Test
public void testWithCheckAndMutate() throws Exception {
AsyncTable<?> table = tableGetter.apply(TABLE_NAME);