HBASE-25575 Should validate Puts in RowMutations
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
b78366cf50
commit
9cfeec0deb
|
@ -584,7 +584,7 @@ public final class ConnectionUtils {
|
|||
}
|
||||
|
||||
// validate for well-formedness
|
||||
static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
|
||||
static void validatePut(Put put, int maxKeyValueSize) {
|
||||
if (put.isEmpty()) {
|
||||
throw new IllegalArgumentException("No columns to insert");
|
||||
}
|
||||
|
@ -599,6 +599,14 @@ public final class ConnectionUtils {
|
|||
}
|
||||
}
|
||||
|
||||
static void validatePutsInRowMutations(RowMutations rowMutations, int maxKeyValueSize) {
|
||||
for (Mutation mutation : rowMutations.getMutations()) {
|
||||
if (mutation instanceof Put) {
|
||||
validatePut((Put) mutation, maxKeyValueSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Select the priority for the rpc call.
|
||||
* <p/>
|
||||
|
|
|
@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
|
|||
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMutations;
|
||||
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
||||
|
||||
import com.google.protobuf.RpcChannel;
|
||||
|
@ -381,6 +382,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
@Override
|
||||
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
|
||||
preCheck();
|
||||
validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize());
|
||||
return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
|
||||
rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
|
||||
|
@ -441,6 +443,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
|
||||
validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize());
|
||||
return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
|
||||
rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
|
||||
|
@ -458,9 +461,6 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
|
||||
@Override
|
||||
public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
|
||||
if (checkAndMutate.getAction() instanceof Put) {
|
||||
validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
|
||||
}
|
||||
if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete
|
||||
|| checkAndMutate.getAction() instanceof Increment
|
||||
|| checkAndMutate.getAction() instanceof Append) {
|
||||
|
@ -480,6 +480,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
.call();
|
||||
} else if (checkAndMutate.getAction() instanceof RowMutations) {
|
||||
RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
|
||||
validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize());
|
||||
return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(),
|
||||
rowMutations.getMaxPriority(), rpcTimeoutNs)
|
||||
.action((controller, loc, stub) ->
|
||||
|
@ -552,6 +553,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
|
||||
@Override
|
||||
public CompletableFuture<Result> mutateRow(RowMutations mutations) {
|
||||
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
|
||||
return this.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(),
|
||||
writeRpcTimeoutNs).action((controller, loc, stub) ->
|
||||
this.<Result, Result> mutateRow(controller, loc, stub, mutations,
|
||||
|
@ -653,7 +655,12 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
CheckAndMutate checkAndMutate = (CheckAndMutate) action;
|
||||
if (checkAndMutate.getAction() instanceof Put) {
|
||||
validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
|
||||
} else if (checkAndMutate.getAction() instanceof RowMutations) {
|
||||
validatePutsInRowMutations((RowMutations) checkAndMutate.getAction(),
|
||||
conn.connConf.getMaxKeyValueSize());
|
||||
}
|
||||
} else if (action instanceof RowMutations) {
|
||||
validatePutsInRowMutations((RowMutations) action, conn.connConf.getMaxKeyValueSize());
|
||||
}
|
||||
}
|
||||
return conn.callerFactory.batch().table(tableName).actions(actions)
|
||||
|
|
|
@ -1672,4 +1672,47 @@ public class TestAsyncTable {
|
|||
assertThat(e.getMessage(), containsString("KeyValue size too large"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidPutInRowMutations() throws IOException {
|
||||
final byte[] row = Bytes.toBytes(0);
|
||||
try {
|
||||
getTable.get().mutateRow(new RowMutations(row).add((Mutation) new Put(row)));
|
||||
fail("Should fail since the put does not contain any cells");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), containsString("No columns to insert"));
|
||||
}
|
||||
|
||||
try {
|
||||
getTable.get()
|
||||
.mutateRow(new RowMutations(row).add((Mutation) new Put(row)
|
||||
.addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE])));
|
||||
fail("Should fail since the put exceeds the max key value size");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), containsString("KeyValue size too large"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidPutInRowMutationsInCheckAndMutate() throws IOException {
|
||||
final byte[] row = Bytes.toBytes(0);
|
||||
try {
|
||||
getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifNotExists(FAMILY, QUALIFIER)
|
||||
.build(new RowMutations(row).add((Mutation) new Put(row))));
|
||||
fail("Should fail since the put does not contain any cells");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), containsString("No columns to insert"));
|
||||
}
|
||||
|
||||
try {
|
||||
getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifNotExists(FAMILY, QUALIFIER)
|
||||
.build(new RowMutations(row).add((Mutation) new Put(row)
|
||||
.addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]))));
|
||||
fail("Should fail since the put exceeds the max key value size");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), containsString("KeyValue size too large"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -335,6 +335,57 @@ public class TestAsyncTableBatch {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidPutInRowMutations() throws IOException {
|
||||
final byte[] row = Bytes.toBytes(0);
|
||||
|
||||
AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
|
||||
try {
|
||||
table.batch(Arrays.asList(new Delete(row), new RowMutations(row)
|
||||
.add((Mutation) new Put(row))));
|
||||
fail("Should fail since the put does not contain any cells");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), containsString("No columns to insert"));
|
||||
}
|
||||
|
||||
try {
|
||||
table.batch(
|
||||
Arrays.asList(new RowMutations(row).add((Mutation) new Put(row)
|
||||
.addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE])),
|
||||
new Delete(row)));
|
||||
fail("Should fail since the put exceeds the max key value size");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), containsString("KeyValue size too large"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidPutInRowMutationsInCheckAndMutate() throws IOException {
|
||||
final byte[] row = Bytes.toBytes(0);
|
||||
|
||||
AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
|
||||
try {
|
||||
table.batch(Arrays.asList(new Delete(row), CheckAndMutate.newBuilder(row)
|
||||
.ifNotExists(FAMILY, CQ)
|
||||
.build(new RowMutations(row).add((Mutation) new Put(row)))));
|
||||
fail("Should fail since the put does not contain any cells");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), containsString("No columns to insert"));
|
||||
}
|
||||
|
||||
try {
|
||||
table.batch(
|
||||
Arrays.asList(CheckAndMutate.newBuilder(row)
|
||||
.ifNotExists(FAMILY, CQ)
|
||||
.build(new RowMutations(row).add((Mutation) new Put(row)
|
||||
.addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]))),
|
||||
new Delete(row)));
|
||||
fail("Should fail since the put exceeds the max key value size");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), containsString("KeyValue size too large"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithCheckAndMutate() throws Exception {
|
||||
AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
|
||||
|
|
Loading…
Reference in New Issue