HBASE-21944 Validate put for batch operation

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
zhangduo 2019-02-24 18:18:33 +08:00
parent b73f03e0cb
commit aa7d3ce39f
4 changed files with 126 additions and 96 deletions

View File

@ -596,8 +596,7 @@ public final class ConnectionUtils {
* The rules are:
* <ol>
* <li>If user set a priority explicitly, then just use it.</li>
* <li>For meta table, use {@link HConstants#META_QOS}.</li>
* <li>For other system table, use {@link HConstants#SYSTEMTABLE_QOS}.</li>
* <li>For system table, use {@link HConstants#SYSTEMTABLE_QOS}.</li>
* <li>For other tables, use {@link HConstants#NORMAL_QOS}.</li>
* </ol>
* @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}.

View File

@ -484,9 +484,6 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public List<CompletableFuture<Void>> put(List<Put> puts) {
for (Put put : puts) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
}
return voidMutate(puts);
}
@ -506,6 +503,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
actions.stream().filter(action -> action instanceof Put).map(action -> (Put) action)
.forEach(put -> validatePut(put, conn.connConf.getMaxKeyValueSize()));
return conn.callerFactory.batch().table(tableName).actions(actions)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)

View File

@ -69,7 +69,7 @@ public class TestAsyncTable {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncTable.class);
HBaseClassTestRule.forClass(TestAsyncTable.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@ -81,6 +81,8 @@ public class TestAsyncTable {
private static byte[] VALUE = Bytes.toBytes("value");
private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
private static AsyncConnection ASYNC_CONN;
@Rule
@ -107,6 +109,8 @@ public class TestAsyncTable {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
MAX_KEY_VALUE_SIZE);
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
@ -146,6 +150,7 @@ public class TestAsyncTable {
return Bytes.toBytes(Bytes.toString(base) + "-" + index);
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testSimpleMultiple() throws Exception {
AsyncTable<?> table = getTable.get();
@ -153,19 +158,19 @@ public class TestAsyncTable {
CountDownLatch putLatch = new CountDownLatch(count);
IntStream.range(0, count).forEach(
i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))
.thenAccept(x -> putLatch.countDown()));
.thenAccept(x -> putLatch.countDown()));
putLatch.await();
BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count);
IntStream.range(0, count)
.forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
.thenAccept(x -> existsResp.add(x)));
.forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
.thenAccept(x -> existsResp.add(x)));
for (int i = 0; i < count; i++) {
assertTrue(existsResp.take());
}
BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count);
IntStream.range(0, count)
.forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
.thenAccept(x -> getResp.add(Pair.newPair(i, x))));
.forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
.thenAccept(x -> getResp.add(Pair.newPair(i, x))));
for (int i = 0; i < count; i++) {
Pair<Integer, Result> pair = getResp.take();
assertArrayEquals(concat(VALUE, pair.getFirst()),
@ -176,20 +181,21 @@ public class TestAsyncTable {
i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown()));
deleteLatch.await();
IntStream.range(0, count)
.forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
.thenAccept(x -> existsResp.add(x)));
.forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
.thenAccept(x -> existsResp.add(x)));
for (int i = 0; i < count; i++) {
assertFalse(existsResp.take());
}
IntStream.range(0, count)
.forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
.thenAccept(x -> getResp.add(Pair.newPair(i, x))));
.forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
.thenAccept(x -> getResp.add(Pair.newPair(i, x))));
for (int i = 0; i < count; i++) {
Pair<Integer, Result> pair = getResp.take();
assertTrue(pair.getSecond().isEmpty());
}
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testIncrement() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
@ -197,16 +203,17 @@ public class TestAsyncTable {
CountDownLatch latch = new CountDownLatch(count);
AtomicLong sum = new AtomicLong(0L);
IntStream.range(0, count)
.forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> {
sum.addAndGet(x);
latch.countDown();
}));
.forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> {
sum.addAndGet(x);
latch.countDown();
}));
latch.await();
assertEquals(count, Bytes.toLong(
table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)));
assertEquals((1 + count) * count / 2, sum.get());
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testAppend() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
@ -214,22 +221,24 @@ public class TestAsyncTable {
CountDownLatch latch = new CountDownLatch(count);
char suffix = ':';
AtomicLong suffixCount = new AtomicLong(0L);
IntStream.range(0, count).forEachOrdered(
i -> table.append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix)))
.thenAccept(r -> {
suffixCount.addAndGet(Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars()
.filter(x -> x == suffix).count());
latch.countDown();
}));
IntStream.range(0, count)
.forEachOrdered(i -> table
.append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix)))
.thenAccept(r -> {
suffixCount.addAndGet(
Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count());
latch.countDown();
}));
latch.await();
assertEquals((1 + count) * count / 2, suffixCount.get());
String value = Bytes.toString(
table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER));
int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt)
.sorted().toArray();
.sorted().toArray();
assertArrayEquals(IntStream.range(0, count).toArray(), actual);
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testCheckAndPut() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
@ -238,20 +247,21 @@ public class TestAsyncTable {
int count = 10;
CountDownLatch latch = new CountDownLatch(count);
IntStream.range(0, count)
.forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists()
.thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
if (x) {
successCount.incrementAndGet();
successIndex.set(i);
}
latch.countDown();
}));
.forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists()
.thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
if (x) {
successCount.incrementAndGet();
successIndex.set(i);
}
latch.countDown();
}));
latch.await();
assertEquals(1, successCount.get());
String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testCheckAndDelete() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
@ -259,24 +269,24 @@ public class TestAsyncTable {
CountDownLatch putLatch = new CountDownLatch(count + 1);
table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
IntStream.range(0, count)
.forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
.thenRun(() -> putLatch.countDown()));
.forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
.thenRun(() -> putLatch.countDown()));
putLatch.await();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger successIndex = new AtomicInteger(-1);
CountDownLatch deleteLatch = new CountDownLatch(count);
IntStream.range(0, count)
.forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE)
.thenDelete(
new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
.thenAccept(x -> {
if (x) {
successCount.incrementAndGet();
successIndex.set(i);
}
deleteLatch.countDown();
}));
.forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE)
.thenDelete(
new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
.thenAccept(x -> {
if (x) {
successCount.incrementAndGet();
successIndex.set(i);
}
deleteLatch.countDown();
}));
deleteLatch.await();
assertEquals(1, successCount.get());
Result result = table.get(new Get(row)).get();
@ -307,6 +317,7 @@ public class TestAsyncTable {
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testCheckAndMutate() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
@ -314,8 +325,8 @@ public class TestAsyncTable {
CountDownLatch putLatch = new CountDownLatch(count + 1);
table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
IntStream.range(0, count)
.forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
.thenRun(() -> putLatch.countDown()));
.forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
.thenRun(() -> putLatch.countDown()));
putLatch.await();
AtomicInteger successCount = new AtomicInteger(0);
@ -331,13 +342,13 @@ public class TestAsyncTable {
throw new UncheckedIOException(e);
}
table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation)
.thenAccept(x -> {
if (x) {
successCount.incrementAndGet();
successIndex.set(i);
}
mutateLatch.countDown();
});
.thenAccept(x -> {
if (x) {
successCount.incrementAndGet();
successIndex.set(i);
}
mutateLatch.countDown();
});
});
mutateLatch.await();
assertEquals(1, successCount.get());
@ -358,57 +369,35 @@ public class TestAsyncTable {
Put put = new Put(row);
put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
boolean ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
.ifNotExists()
.thenPut(put)
.get();
boolean ok =
table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get();
assertTrue(ok);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
.timeRange(TimeRange.at(ts + 10000))
.ifEquals(VALUE)
.thenPut(put)
.get();
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
.ifEquals(VALUE).thenPut(put).get();
assertFalse(ok);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
.timeRange(TimeRange.at(ts))
.ifEquals(VALUE)
.thenPut(put)
.get();
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
.ifEquals(VALUE).thenPut(put).get();
assertTrue(ok);
RowMutations rm = new RowMutations(row)
.add((Mutation) put);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
.timeRange(TimeRange.at(ts + 10000))
.ifEquals(VALUE)
.thenMutate(rm)
.get();
RowMutations rm = new RowMutations(row).add((Mutation) put);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
.ifEquals(VALUE).thenMutate(rm).get();
assertFalse(ok);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
.timeRange(TimeRange.at(ts))
.ifEquals(VALUE)
.thenMutate(rm)
.get();
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
.ifEquals(VALUE).thenMutate(rm).get();
assertTrue(ok);
Delete delete = new Delete(row)
.addColumn(FAMILY, QUALIFIER);
Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
.timeRange(TimeRange.at(ts + 10000))
.ifEquals(VALUE)
.thenDelete(delete)
.get();
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
.ifEquals(VALUE).thenDelete(delete).get();
assertFalse(ok);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
.timeRange(TimeRange.at(ts))
.ifEquals(VALUE)
.thenDelete(delete)
.get();
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
.ifEquals(VALUE).thenDelete(delete).get();
assertTrue(ok);
}
@ -424,4 +413,22 @@ public class TestAsyncTable {
assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString()));
}
}
@Test
public void testInvalidPut() {
try {
getTable.get().put(new Put(Bytes.toBytes(0)));
fail("Should fail since the put does not contain any cells");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("No columns to insert"));
}
try {
getTable.get()
.put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]));
fail("Should fail since the put exceeds the max key value size");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@ -87,6 +88,8 @@ public class TestAsyncTableBatch {
private static byte[][] SPLIT_KEYS;
private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
@Parameter(0)
public String tableType;
@ -111,6 +114,8 @@ public class TestAsyncTableBatch {
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
MAX_KEY_VALUE_SIZE);
TEST_UTIL.startMiniCluster(3);
SPLIT_KEYS = new byte[8][];
for (int i = 111; i < 999; i += 111) {
@ -224,8 +229,8 @@ public class TestAsyncTableBatch {
actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1));
actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4)));
RowMutations rm = new RowMutations(Bytes.toBytes(5));
rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes(100L)));
rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes(200L)));
rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes(100L)));
rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes(200L)));
actions.add(rm);
actions.add(new Get(Bytes.toBytes(6)));
@ -308,4 +313,24 @@ public class TestAsyncTableBatch {
assertEquals("good",
Bytes.toString(table.get(new Get(Bytes.toBytes("put"))).get().getValue(FAMILY, CQ)));
}
@Test
public void testInvalidPut() {
AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
try {
table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)), new Put(Bytes.toBytes(0))));
fail("Should fail since the put does not contain any cells");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("No columns to insert"));
}
try {
table.batch(
Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]),
new Delete(Bytes.toBytes(0))));
fail("Should fail since the put exceeds the max key value size");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}
}