diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 1940948ee81..fea7a1e69cf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -596,8 +596,7 @@ public final class ConnectionUtils { * The rules are: *
    *
  1. If user set a priority explicitly, then just use it.
  2. - *
  3. For meta table, use {@link HConstants#META_QOS}.
  4. - *
  5. For other system table, use {@link HConstants#SYSTEMTABLE_QOS}.
  6. + *
  7. For system table, use {@link HConstants#SYSTEMTABLE_QOS}.
  8. *
  9. For other tables, use {@link HConstants#NORMAL_QOS}.
  10. *
* @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 789460ccd24..1925c0e97db 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -484,9 +484,6 @@ class RawAsyncTableImpl implements AsyncTable { @Override public List> put(List puts) { - for (Put put : puts) { - validatePut(put, conn.connConf.getMaxKeyValueSize()); - } return voidMutate(puts); } @@ -506,6 +503,8 @@ class RawAsyncTableImpl implements AsyncTable { } private List> batch(List actions, long rpcTimeoutNs) { + actions.stream().filter(action -> action instanceof Put).map(action -> (Put) action) + .forEach(put -> validatePut(put, conn.connConf.getMaxKeyValueSize())); return conn.callerFactory.batch().table(tableName).actions(actions) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index 89ebf8d5749..63080b9a34d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -69,7 +69,7 @@ public class TestAsyncTable { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncTable.class); + HBaseClassTestRule.forClass(TestAsyncTable.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -81,6 +81,8 @@ public class TestAsyncTable { private static byte[] VALUE = Bytes.toBytes("value"); + private static int MAX_KEY_VALUE_SIZE = 64 * 1024; + private static AsyncConnection ASYNC_CONN; @Rule @@ -107,6 +109,8 @@ public class TestAsyncTable { @BeforeClass public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, + MAX_KEY_VALUE_SIZE); TEST_UTIL.startMiniCluster(1); TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); @@ -146,6 +150,7 @@ public class TestAsyncTable { return Bytes.toBytes(Bytes.toString(base) + "-" + index); } + @SuppressWarnings("FutureReturnValueIgnored") @Test public void testSimpleMultiple() throws Exception { AsyncTable table = getTable.get(); @@ -153,19 +158,19 @@ public class TestAsyncTable { CountDownLatch putLatch = new CountDownLatch(count); IntStream.range(0, count).forEach( i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))) - .thenAccept(x -> putLatch.countDown())); + .thenAccept(x -> putLatch.countDown())); putLatch.await(); BlockingQueue existsResp = new ArrayBlockingQueue<>(count); IntStream.range(0, count) - .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) - .thenAccept(x -> existsResp.add(x))); + .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) + .thenAccept(x -> existsResp.add(x))); for (int i = 0; i < count; i++) { assertTrue(existsResp.take()); } BlockingQueue> getResp = new ArrayBlockingQueue<>(count); IntStream.range(0, count) - .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) - .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); + .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) + .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); for (int i = 0; i < count; i++) { Pair pair = getResp.take(); assertArrayEquals(concat(VALUE, pair.getFirst()), @@ -176,20 +181,21 @@ public class TestAsyncTable { i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown())); deleteLatch.await(); IntStream.range(0, count) - .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) - .thenAccept(x -> existsResp.add(x))); + .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) + .thenAccept(x -> existsResp.add(x))); for (int i = 0; i < count; i++) { assertFalse(existsResp.take()); } IntStream.range(0, count) - .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) - .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); + .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) + .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); for (int i = 0; i < count; i++) { Pair pair = getResp.take(); assertTrue(pair.getSecond().isEmpty()); } } + @SuppressWarnings("FutureReturnValueIgnored") @Test public void testIncrement() throws InterruptedException, ExecutionException { AsyncTable table = getTable.get(); @@ -197,16 +203,17 @@ public class TestAsyncTable { CountDownLatch latch = new CountDownLatch(count); AtomicLong sum = new AtomicLong(0L); IntStream.range(0, count) - .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> { - sum.addAndGet(x); - latch.countDown(); - })); + .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> { + sum.addAndGet(x); + latch.countDown(); + })); latch.await(); assertEquals(count, Bytes.toLong( table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER))); assertEquals((1 + count) * count / 2, sum.get()); } + @SuppressWarnings("FutureReturnValueIgnored") @Test public void testAppend() throws InterruptedException, ExecutionException { AsyncTable table = getTable.get(); @@ -214,22 +221,24 @@ public class TestAsyncTable { CountDownLatch latch = new CountDownLatch(count); char suffix = ':'; AtomicLong suffixCount = new AtomicLong(0L); - IntStream.range(0, count).forEachOrdered( - i -> table.append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix))) - .thenAccept(r -> { - suffixCount.addAndGet(Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars() - .filter(x -> x == suffix).count()); - latch.countDown(); - })); + IntStream.range(0, count) + .forEachOrdered(i -> table + .append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix))) + .thenAccept(r -> { + suffixCount.addAndGet( + Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count()); + latch.countDown(); + })); latch.await(); assertEquals((1 + count) * count / 2, suffixCount.get()); String value = Bytes.toString( table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)); int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt) - .sorted().toArray(); + .sorted().toArray(); assertArrayEquals(IntStream.range(0, count).toArray(), actual); } + @SuppressWarnings("FutureReturnValueIgnored") @Test public void testCheckAndPut() throws InterruptedException, ExecutionException { AsyncTable table = getTable.get(); @@ -238,20 +247,21 @@ public class TestAsyncTable { int count = 10; CountDownLatch latch = new CountDownLatch(count); IntStream.range(0, count) - .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists() - .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> { - if (x) { - successCount.incrementAndGet(); - successIndex.set(i); - } - latch.countDown(); - })); + .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists() + .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> { + if (x) { + successCount.incrementAndGet(); + successIndex.set(i); + } + latch.countDown(); + })); latch.await(); assertEquals(1, successCount.get()); String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER)); assertTrue(actual.endsWith(Integer.toString(successIndex.get()))); } + @SuppressWarnings("FutureReturnValueIgnored") @Test public void testCheckAndDelete() throws InterruptedException, ExecutionException { AsyncTable table = getTable.get(); @@ -259,24 +269,24 @@ public class TestAsyncTable { CountDownLatch putLatch = new CountDownLatch(count + 1); table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); IntStream.range(0, count) - .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) - .thenRun(() -> putLatch.countDown())); + .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) + .thenRun(() -> putLatch.countDown())); putLatch.await(); AtomicInteger successCount = new AtomicInteger(0); AtomicInteger successIndex = new AtomicInteger(-1); CountDownLatch deleteLatch = new CountDownLatch(count); IntStream.range(0, count) - .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE) - .thenDelete( - new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))) - .thenAccept(x -> { - if (x) { - successCount.incrementAndGet(); - successIndex.set(i); - } - deleteLatch.countDown(); - })); + .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE) + .thenDelete( + new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))) + .thenAccept(x -> { + if (x) { + successCount.incrementAndGet(); + successIndex.set(i); + } + deleteLatch.countDown(); + })); deleteLatch.await(); assertEquals(1, successCount.get()); Result result = table.get(new Get(row)).get(); @@ -307,6 +317,7 @@ public class TestAsyncTable { assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2))); } + @SuppressWarnings("FutureReturnValueIgnored") @Test public void testCheckAndMutate() throws InterruptedException, ExecutionException { AsyncTable table = getTable.get(); @@ -314,8 +325,8 @@ public class TestAsyncTable { CountDownLatch putLatch = new CountDownLatch(count + 1); table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); IntStream.range(0, count) - .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) - .thenRun(() -> putLatch.countDown())); + .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) + .thenRun(() -> putLatch.countDown())); putLatch.await(); AtomicInteger successCount = new AtomicInteger(0); @@ -331,13 +342,13 @@ public class TestAsyncTable { throw new UncheckedIOException(e); } table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation) - .thenAccept(x -> { - if (x) { - successCount.incrementAndGet(); - successIndex.set(i); - } - mutateLatch.countDown(); - }); + .thenAccept(x -> { + if (x) { + successCount.incrementAndGet(); + successIndex.set(i); + } + mutateLatch.countDown(); + }); }); mutateLatch.await(); assertEquals(1, successCount.get()); @@ -358,57 +369,35 @@ public class TestAsyncTable { Put put = new Put(row); put.addColumn(FAMILY, QUALIFIER, ts, VALUE); - boolean ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) - .ifNotExists() - .thenPut(put) - .get(); + boolean ok = + table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get(); assertTrue(ok); - ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) - .timeRange(TimeRange.at(ts + 10000)) - .ifEquals(VALUE) - .thenPut(put) - .get(); + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) + .ifEquals(VALUE).thenPut(put).get(); assertFalse(ok); - ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) - .timeRange(TimeRange.at(ts)) - .ifEquals(VALUE) - .thenPut(put) - .get(); + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) + .ifEquals(VALUE).thenPut(put).get(); assertTrue(ok); - RowMutations rm = new RowMutations(row) - .add((Mutation) put); - ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) - .timeRange(TimeRange.at(ts + 10000)) - .ifEquals(VALUE) - .thenMutate(rm) - .get(); + RowMutations rm = new RowMutations(row).add((Mutation) put); + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) + .ifEquals(VALUE).thenMutate(rm).get(); assertFalse(ok); - ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) - .timeRange(TimeRange.at(ts)) - .ifEquals(VALUE) - .thenMutate(rm) - .get(); + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) + .ifEquals(VALUE).thenMutate(rm).get(); assertTrue(ok); - Delete delete = new Delete(row) - .addColumn(FAMILY, QUALIFIER); + Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER); - ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) - .timeRange(TimeRange.at(ts + 10000)) - .ifEquals(VALUE) - .thenDelete(delete) - .get(); + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) + .ifEquals(VALUE).thenDelete(delete).get(); assertFalse(ok); - ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) - .timeRange(TimeRange.at(ts)) - .ifEquals(VALUE) - .thenDelete(delete) - .get(); + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) + .ifEquals(VALUE).thenDelete(delete).get(); assertTrue(ok); } @@ -424,4 +413,22 @@ public class TestAsyncTable { assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString())); } } + + @Test + public void testInvalidPut() { + try { + getTable.get().put(new Put(Bytes.toBytes(0))); + fail("Should fail since the put does not contain any cells"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("No columns to insert")); + } + + try { + getTable.get() + .put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE])); + fail("Should fail since the put exceeds the max key value size"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("KeyValue size too large")); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java index 717eb24a11a..42e61d7456e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -87,6 +88,8 @@ public class TestAsyncTableBatch { private static byte[][] SPLIT_KEYS; + private static int MAX_KEY_VALUE_SIZE = 64 * 1024; + @Parameter(0) public String tableType; @@ -111,6 +114,8 @@ public class TestAsyncTableBatch { @BeforeClass public static void setUp() throws Exception { + TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, + MAX_KEY_VALUE_SIZE); TEST_UTIL.startMiniCluster(3); SPLIT_KEYS = new byte[8][]; for (int i = 111; i < 999; i += 111) { @@ -224,8 +229,8 @@ public class TestAsyncTableBatch { actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1)); actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4))); RowMutations rm = new RowMutations(Bytes.toBytes(5)); - rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes(100L))); - rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes(200L))); + rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes(100L))); + rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes(200L))); actions.add(rm); actions.add(new Get(Bytes.toBytes(6))); @@ -308,4 +313,24 @@ public class TestAsyncTableBatch { assertEquals("good", Bytes.toString(table.get(new Get(Bytes.toBytes("put"))).get().getValue(FAMILY, CQ))); } + + @Test + public void testInvalidPut() { + AsyncTable table = tableGetter.apply(TABLE_NAME); + try { + table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)), new Put(Bytes.toBytes(0)))); + fail("Should fail since the put does not contain any cells"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("No columns to insert")); + } + + try { + table.batch( + Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]), + new Delete(Bytes.toBytes(0)))); + fail("Should fail since the put exceeds the max key value size"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("KeyValue size too large")); + } + } }