HBASE-21944 Validate put for batch operation

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
zhangduo 2019-02-24 18:18:33 +08:00
parent d47d64c6e3
commit d7c5d73de0
4 changed files with 126 additions and 96 deletions

View File

@ -596,8 +596,7 @@ public final class ConnectionUtils {
* The rules are:
* <ol>
* <li>If user set a priority explicitly, then just use it.</li>
* <li>For meta table, use {@link HConstants#META_QOS}.</li>
* <li>For other system table, use {@link HConstants#SYSTEMTABLE_QOS}.</li>
* <li>For system table, use {@link HConstants#SYSTEMTABLE_QOS}.</li>
* <li>For other tables, use {@link HConstants#NORMAL_QOS}.</li>
* </ol>
* @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}.

View File

@ -484,9 +484,6 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public List<CompletableFuture<Void>> put(List<Put> puts) {
for (Put put : puts) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
}
return voidMutate(puts);
}
@ -506,6 +503,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
actions.stream().filter(action -> action instanceof Put).map(action -> (Put) action)
.forEach(put -> validatePut(put, conn.connConf.getMaxKeyValueSize()));
return conn.callerFactory.batch().table(tableName).actions(actions)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)

View File

@ -81,6 +81,8 @@ public class TestAsyncTable {
private static byte[] VALUE = Bytes.toBytes("value");
private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
private static AsyncConnection ASYNC_CONN;
@Rule
@ -107,6 +109,8 @@ public class TestAsyncTable {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
MAX_KEY_VALUE_SIZE);
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
@ -146,6 +150,7 @@ public class TestAsyncTable {
return Bytes.toBytes(Bytes.toString(base) + "-" + index);
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testSimpleMultiple() throws Exception {
AsyncTable<?> table = getTable.get();
@ -190,6 +195,7 @@ public class TestAsyncTable {
}
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testIncrement() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
@ -207,6 +213,7 @@ public class TestAsyncTable {
assertEquals((1 + count) * count / 2, sum.get());
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testAppend() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
@ -214,11 +221,12 @@ public class TestAsyncTable {
CountDownLatch latch = new CountDownLatch(count);
char suffix = ':';
AtomicLong suffixCount = new AtomicLong(0L);
IntStream.range(0, count).forEachOrdered(
i -> table.append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix)))
IntStream.range(0, count)
.forEachOrdered(i -> table
.append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix)))
.thenAccept(r -> {
suffixCount.addAndGet(Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars()
.filter(x -> x == suffix).count());
suffixCount.addAndGet(
Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count());
latch.countDown();
}));
latch.await();
@ -230,6 +238,7 @@ public class TestAsyncTable {
assertArrayEquals(IntStream.range(0, count).toArray(), actual);
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testCheckAndPut() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
@ -252,6 +261,7 @@ public class TestAsyncTable {
assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testCheckAndDelete() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
@ -307,6 +317,7 @@ public class TestAsyncTable {
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testCheckAndMutate() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
@ -358,57 +369,35 @@ public class TestAsyncTable {
Put put = new Put(row);
put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
boolean ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
.ifNotExists()
.thenPut(put)
.get();
boolean ok =
table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get();
assertTrue(ok);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
.timeRange(TimeRange.at(ts + 10000))
.ifEquals(VALUE)
.thenPut(put)
.get();
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
.ifEquals(VALUE).thenPut(put).get();
assertFalse(ok);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
.timeRange(TimeRange.at(ts))
.ifEquals(VALUE)
.thenPut(put)
.get();
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
.ifEquals(VALUE).thenPut(put).get();
assertTrue(ok);
RowMutations rm = new RowMutations(row)
.add((Mutation) put);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
.timeRange(TimeRange.at(ts + 10000))
.ifEquals(VALUE)
.thenMutate(rm)
.get();
RowMutations rm = new RowMutations(row).add((Mutation) put);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
.ifEquals(VALUE).thenMutate(rm).get();
assertFalse(ok);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
.timeRange(TimeRange.at(ts))
.ifEquals(VALUE)
.thenMutate(rm)
.get();
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
.ifEquals(VALUE).thenMutate(rm).get();
assertTrue(ok);
Delete delete = new Delete(row)
.addColumn(FAMILY, QUALIFIER);
Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
.timeRange(TimeRange.at(ts + 10000))
.ifEquals(VALUE)
.thenDelete(delete)
.get();
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
.ifEquals(VALUE).thenDelete(delete).get();
assertFalse(ok);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
.timeRange(TimeRange.at(ts))
.ifEquals(VALUE)
.thenDelete(delete)
.get();
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
.ifEquals(VALUE).thenDelete(delete).get();
assertTrue(ok);
}
@ -424,4 +413,22 @@ public class TestAsyncTable {
assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString()));
}
}
@Test
public void testInvalidPut() {
try {
getTable.get().put(new Put(Bytes.toBytes(0)));
fail("Should fail since the put does not contain any cells");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("No columns to insert"));
}
try {
getTable.get()
.put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]));
fail("Should fail since the put exceeds the max key value size");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@ -87,6 +88,8 @@ public class TestAsyncTableBatch {
private static byte[][] SPLIT_KEYS;
private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
@Parameter(0)
public String tableType;
@ -111,6 +114,8 @@ public class TestAsyncTableBatch {
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
MAX_KEY_VALUE_SIZE);
TEST_UTIL.startMiniCluster(3);
SPLIT_KEYS = new byte[8][];
for (int i = 111; i < 999; i += 111) {
@ -224,8 +229,8 @@ public class TestAsyncTableBatch {
actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1));
actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4)));
RowMutations rm = new RowMutations(Bytes.toBytes(5));
rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes(100L)));
rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes(200L)));
rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes(100L)));
rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes(200L)));
actions.add(rm);
actions.add(new Get(Bytes.toBytes(6)));
@ -308,4 +313,24 @@ public class TestAsyncTableBatch {
assertEquals("good",
Bytes.toString(table.get(new Get(Bytes.toBytes("put"))).get().getValue(FAMILY, CQ)));
}
@Test
public void testInvalidPut() {
AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
try {
table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)), new Put(Bytes.toBytes(0))));
fail("Should fail since the put does not contain any cells");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("No columns to insert"));
}
try {
table.batch(
Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]),
new Delete(Bytes.toBytes(0))));
fail("Should fail since the put exceeds the max key value size");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}
}