From 0309ea6b411e2991707f7f506ea2f021bc141fd6 Mon Sep 17 00:00:00 2001 From: Guangxu Cheng Date: Mon, 4 Jun 2018 14:35:01 +0800 Subject: [PATCH] HBASE-19764 Fix Checkstyle errors in hbase-endpoint Signed-off-by: Jan Hentschel --- hbase-endpoint/pom.xml | 7 + .../client/coprocessor/AggregationClient.java | 259 ++++++++++-------- .../client/coprocessor/AggregationHelper.java | 15 +- .../coprocessor/AsyncAggregationClient.java | 52 ++-- .../coprocessor/AggregateImplementation.java | 38 +-- .../hadoop/hbase/coprocessor/Export.java | 76 ++--- .../access/SecureBulkLoadEndpoint.java | 22 +- .../ColumnAggregationEndpoint.java | 11 +- ...ColumnAggregationEndpointNullResponse.java | 19 +- .../ColumnAggregationEndpointWithErrors.java | 16 +- .../ProtobufCoprocessorService.java | 21 +- .../TestAsyncCoprocessorEndpoint.java | 20 +- .../hbase/coprocessor/TestClassLoading.java | 13 +- .../coprocessor/TestCoprocessorEndpoint.java | 91 +++--- ...processorServiceBackwardCompatibility.java | 11 +- .../TestCoprocessorTableEndpoint.java | 39 ++- .../coprocessor/TestRowProcessorEndpoint.java | 27 +- .../hbase/coprocessor/TestSecureExport.java | 65 +++-- .../SecureBulkLoadEndpointClient.java | 11 +- ...onServerBulkLoadWithOldSecureEndpoint.java | 33 ++- .../TestServerCustomProtocol.java | 63 +++-- 21 files changed, 478 insertions(+), 431 deletions(-) diff --git a/hbase-endpoint/pom.xml b/hbase-endpoint/pom.xml index 008155ae008..2e1d9f860b4 100644 --- a/hbase-endpoint/pom.xml +++ b/hbase-endpoint/pom.xml @@ -69,6 +69,13 @@ net.revelc.code warbucks-maven-plugin + + org.apache.maven.plugins + maven-checkstyle-plugin + + true + + diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java index 7b071f412cc..8101654ab83 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.client.coprocessor; import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance; @@ -42,9 +40,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Result; @@ -58,6 +53,9 @@ import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRespo import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This client class is for invoking the aggregate functions deployed on the @@ -135,7 +133,7 @@ public class AggregationClient implements Closeable { /** * Constructor with Conf object - * @param cfg + * @param cfg Configuration to use */ public AggregationClient(Configuration cfg) { try { @@ -157,17 +155,16 @@ public class AggregationClient implements Closeable { * It gives the maximum value of a column for a given column family for the * given range. In case qualifier is null, a max of all values for the given * family is returned. - * @param tableName - * @param ci - * @param scan + * @param tableName the name of the table to scan + * @param ci the user's ColumnInterpreter implementation + * @param scan the HBase scan object to use to read data from HBase * @return max val <R> - * @throws Throwable - * The caller is supposed to handle the exception as they are thrown + * @throws Throwable The caller is supposed to handle the exception as they are thrown * & propagated to it. */ public R max( - final TableName tableName, final ColumnInterpreter ci, final Scan scan) - throws Throwable { + final TableName tableName, final ColumnInterpreter ci, final Scan scan) + throws Throwable { try (Table table = connection.getTable(tableName)) { return max(table, ci, scan); } @@ -177,17 +174,16 @@ public class AggregationClient implements Closeable { * It gives the maximum value of a column for a given column family for the * given range. In case qualifier is null, a max of all values for the given * family is returned. - * @param table - * @param ci - * @param scan + * @param table table to scan. + * @param ci the user's ColumnInterpreter implementation + * @param scan the HBase scan object to use to read data from HBase * @return max val <> - * @throws Throwable - * The caller is supposed to handle the exception as they are thrown + * @throws Throwable The caller is supposed to handle the exception as they are thrown * & propagated to it. */ public - R max(final Table table, final ColumnInterpreter ci, - final Scan scan) throws Throwable { + R max(final Table table, final ColumnInterpreter ci, final Scan scan) + throws Throwable { final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); class MaxCallBack implements Batch.Callback { R max = null; @@ -225,21 +221,20 @@ public class AggregationClient implements Closeable { return aMaxCallBack.getMax(); } - - /** * It gives the minimum value of a column for a given column family for the * given range. In case qualifier is null, a min of all values for the given * family is returned. - * @param tableName - * @param ci - * @param scan + * @param tableName the name of the table to scan + * @param ci the user's ColumnInterpreter implementation + * @param scan the HBase scan object to use to read data from HBase * @return min val <R> - * @throws Throwable + * @throws Throwable The caller is supposed to handle the exception as they are thrown + * & propagated to it. */ public R min( - final TableName tableName, final ColumnInterpreter ci, final Scan scan) - throws Throwable { + final TableName tableName, final ColumnInterpreter ci, final Scan scan) + throws Throwable { try (Table table = connection.getTable(tableName)) { return min(table, ci, scan); } @@ -249,18 +244,18 @@ public class AggregationClient implements Closeable { * It gives the minimum value of a column for a given column family for the * given range. In case qualifier is null, a min of all values for the given * family is returned. - * @param table - * @param ci - * @param scan + * @param table table to scan. + * @param ci the user's ColumnInterpreter implementation + * @param scan the HBase scan object to use to read data from HBase * @return min val <R> - * @throws Throwable + * @throws Throwable The caller is supposed to handle the exception as they are thrown + * & propagated to it. */ public - R min(final Table table, final ColumnInterpreter ci, - final Scan scan) throws Throwable { + R min(final Table table, final ColumnInterpreter ci, final Scan scan) + throws Throwable { final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); class MinCallBack implements Batch.Callback { - private R min = null; public R getMinimum() { @@ -272,10 +267,10 @@ public class AggregationClient implements Closeable { min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min; } } + MinCallBack minCallBack = new MinCallBack(); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call() { - @Override public R call(AggregateService instance) throws IOException { RpcController controller = new AggregationClientRpcController(); @@ -305,17 +300,18 @@ public class AggregationClient implements Closeable { * filter as it may set the flag to skip to next row, but the value read is * not of the given filter: in this case, this particular row will not be * counted ==> an error. - * @param tableName - * @param ci - * @param scan + * @param tableName the name of the table to scan + * @param ci the user's ColumnInterpreter implementation + * @param scan the HBase scan object to use to read data from HBase * @return <R, S> - * @throws Throwable + * @throws Throwable The caller is supposed to handle the exception as they are thrown + * & propagated to it. */ public long rowCount( - final TableName tableName, final ColumnInterpreter ci, final Scan scan) - throws Throwable { + final TableName tableName, final ColumnInterpreter ci, final Scan scan) + throws Throwable { try (Table table = connection.getTable(tableName)) { - return rowCount(table, ci, scan); + return rowCount(table, ci, scan); } } @@ -326,15 +322,16 @@ public class AggregationClient implements Closeable { * filter as it may set the flag to skip to next row, but the value read is * not of the given filter: in this case, this particular row will not be * counted ==> an error. - * @param table - * @param ci - * @param scan + * @param table table to scan. + * @param ci the user's ColumnInterpreter implementation + * @param scan the HBase scan object to use to read data from HBase * @return <R, S> - * @throws Throwable + * @throws Throwable The caller is supposed to handle the exception as they are thrown + * & propagated to it. */ public - long rowCount(final Table table, - final ColumnInterpreter ci, final Scan scan) throws Throwable { + long rowCount(final Table table, final ColumnInterpreter ci, final Scan scan) + throws Throwable { final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true); class RowNumCallback implements Batch.Callback { private final AtomicLong rowCountL = new AtomicLong(0); @@ -348,6 +345,7 @@ public class AggregationClient implements Closeable { rowCountL.addAndGet(result.longValue()); } } + RowNumCallback rowNum = new RowNumCallback(); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call() { @@ -373,32 +371,34 @@ public class AggregationClient implements Closeable { /** * It sums up the value returned from various regions. In case qualifier is * null, summation of all the column qualifiers in the given family is done. - * @param tableName - * @param ci - * @param scan + * @param tableName the name of the table to scan + * @param ci the user's ColumnInterpreter implementation + * @param scan the HBase scan object to use to read data from HBase * @return sum <S> - * @throws Throwable + * @throws Throwable The caller is supposed to handle the exception as they are thrown + * & propagated to it. */ public S sum( final TableName tableName, final ColumnInterpreter ci, final Scan scan) - throws Throwable { + throws Throwable { try (Table table = connection.getTable(tableName)) { - return sum(table, ci, scan); + return sum(table, ci, scan); } } /** * It sums up the value returned from various regions. In case qualifier is * null, summation of all the column qualifiers in the given family is done. - * @param table - * @param ci - * @param scan + * @param table table to scan. + * @param ci the user's ColumnInterpreter implementation + * @param scan the HBase scan object to use to read data from HBase * @return sum <S> - * @throws Throwable + * @throws Throwable The caller is supposed to handle the exception as they are thrown + * & propagated to it. */ public - S sum(final Table table, final ColumnInterpreter ci, - final Scan scan) throws Throwable { + S sum(final Table table, final ColumnInterpreter ci, final Scan scan) + throws Throwable { final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); class SumCallBack implements Batch.Callback { @@ -443,15 +443,16 @@ public class AggregationClient implements Closeable { * It computes average while fetching sum and row count from all the * corresponding regions. Approach is to compute a global sum of region level * sum and rowcount and then compute the average. - * @param tableName - * @param scan - * @throws Throwable + * @param tableName the name of the table to scan + * @param scan the HBase scan object to use to read data from HBase + * @throws Throwable The caller is supposed to handle the exception as they are thrown + * & propagated to it. */ private Pair getAvgArgs( final TableName tableName, final ColumnInterpreter ci, final Scan scan) throws Throwable { try (Table table = connection.getTable(tableName)) { - return getAvgArgs(table, ci, scan); + return getAvgArgs(table, ci, scan); } } @@ -459,17 +460,18 @@ public class AggregationClient implements Closeable { * It computes average while fetching sum and row count from all the * corresponding regions. Approach is to compute a global sum of region level * sum and rowcount and then compute the average. - * @param table - * @param scan - * @throws Throwable + * @param table table to scan. + * @param scan the HBase scan object to use to read data from HBase + * @throws Throwable The caller is supposed to handle the exception as they are thrown + * & propagated to it. */ private - Pair getAvgArgs(final Table table, - final ColumnInterpreter ci, final Scan scan) throws Throwable { + Pair getAvgArgs(final Table table, final ColumnInterpreter ci, + final Scan scan) throws Throwable { final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); class AvgCallBack implements Batch.Callback> { S sum = null; - Long rowCount = 0l; + Long rowCount = 0L; public synchronized Pair getAvgArgs() { return new Pair<>(sum, rowCount); @@ -481,6 +483,7 @@ public class AggregationClient implements Closeable { rowCount += result.getSecond(); } } + AvgCallBack avgCallBack = new AvgCallBack(); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call>() { @@ -518,15 +521,16 @@ public class AggregationClient implements Closeable { * its return type should be a decimal value, irrespective of what * columninterpreter says. So, this methods collects the necessary parameters * to compute the average and returs the double value. - * @param tableName - * @param ci - * @param scan + * @param tableName the name of the table to scan + * @param ci the user's ColumnInterpreter implementation + * @param scan the HBase scan object to use to read data from HBase * @return <R, S> - * @throws Throwable + * @throws Throwable The caller is supposed to handle the exception as they are thrown + * & propagated to it. */ public - double avg(final TableName tableName, - final ColumnInterpreter ci, Scan scan) throws Throwable { + double avg(final TableName tableName, final ColumnInterpreter ci, + Scan scan) throws Throwable { Pair p = getAvgArgs(tableName, ci, scan); return ci.divideForAvg(p.getFirst(), p.getSecond()); } @@ -537,14 +541,16 @@ public class AggregationClient implements Closeable { * its return type should be a decimal value, irrespective of what * columninterpreter says. So, this methods collects the necessary parameters * to compute the average and returs the double value. - * @param table - * @param ci - * @param scan + * @param table table to scan. + * @param ci the user's ColumnInterpreter implementation + * @param scan the HBase scan object to use to read data from HBase * @return <R, S> - * @throws Throwable + * @throws Throwable The caller is supposed to handle the exception as they are thrown + * & propagated to it. */ public double avg( - final Table table, final ColumnInterpreter ci, Scan scan) throws Throwable { + final Table table, final ColumnInterpreter ci, Scan scan) + throws Throwable { Pair p = getAvgArgs(table, ci, scan); return ci.divideForAvg(p.getFirst(), p.getSecond()); } @@ -555,17 +561,18 @@ public class AggregationClient implements Closeable { * average*average). From individual regions, it obtains sum, square sum and * number of rows. With these, the above values are computed to get the global * std. - * @param table - * @param scan + * @param table table to scan. + * @param scan the HBase scan object to use to read data from HBase * @return standard deviations - * @throws Throwable + * @throws Throwable The caller is supposed to handle the exception as they are thrown + * & propagated to it. */ private - Pair, Long> getStdArgs(final Table table, - final ColumnInterpreter ci, final Scan scan) throws Throwable { + Pair, Long> getStdArgs(final Table table, final ColumnInterpreter ci, + final Scan scan) throws Throwable { final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); class StdCallback implements Batch.Callback, Long>> { - long rowCountVal = 0l; + long rowCountVal = 0L; S sumVal = null, sumSqVal = null; public synchronized Pair, Long> getStdParams() { @@ -585,6 +592,7 @@ public class AggregationClient implements Closeable { } } } + StdCallback stdCallback = new StdCallback(); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call, Long>>() { @@ -626,17 +634,18 @@ public class AggregationClient implements Closeable { * return type should be a decimal value, irrespective of what * columninterpreter says. So, this methods collects the necessary parameters * to compute the std and returns the double value. - * @param tableName - * @param ci - * @param scan + * @param tableName the name of the table to scan + * @param ci the user's ColumnInterpreter implementation + * @param scan the HBase scan object to use to read data from HBase * @return <R, S> - * @throws Throwable + * @throws Throwable The caller is supposed to handle the exception as they are thrown + * & propagated to it. */ public - double std(final TableName tableName, ColumnInterpreter ci, + double std(final TableName tableName, ColumnInterpreter ci, Scan scan) throws Throwable { try (Table table = connection.getTable(tableName)) { - return std(table, ci, scan); + return std(table, ci, scan); } } @@ -646,11 +655,12 @@ public class AggregationClient implements Closeable { * return type should be a decimal value, irrespective of what * columninterpreter says. So, this methods collects the necessary parameters * to compute the std and returns the double value. - * @param table - * @param ci - * @param scan + * @param table table to scan. + * @param ci the user's ColumnInterpreter implementation + * @param scan the HBase scan object to use to read data from HBase * @return <R, S> - * @throws Throwable + * @throws Throwable The caller is supposed to handle the exception as they are thrown + * & propagated to it. */ public double std( final Table table, ColumnInterpreter ci, Scan scan) throws Throwable { @@ -667,17 +677,18 @@ public class AggregationClient implements Closeable { * It helps locate the region with median for a given column whose weight * is specified in an optional column. * From individual regions, it obtains sum of values and sum of weights. - * @param table - * @param ci - * @param scan + * @param table table to scan. + * @param ci the user's ColumnInterpreter implementation + * @param scan the HBase scan object to use to read data from HBase * @return pair whose first element is a map between start row of the region - * and (sum of values, sum of weights) for the region, the second element is - * (sum of values, sum of weights) for all the regions chosen - * @throws Throwable + * and (sum of values, sum of weights) for the region, the second element is + * (sum of values, sum of weights) for all the regions chosen + * @throws Throwable The caller is supposed to handle the exception as they are thrown + * & propagated to it. */ private - Pair>, List> - getMedianArgs(final Table table, + Pair>, List> + getMedianArgs(final Table table, final ColumnInterpreter ci, final Scan scan) throws Throwable { final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); final NavigableMap> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); @@ -731,17 +742,18 @@ public class AggregationClient implements Closeable { * This is the client side interface/handler for calling the median method for a * given cf-cq combination. This method collects the necessary parameters * to compute the median and returns the median. - * @param tableName - * @param ci - * @param scan + * @param tableName the name of the table to scan + * @param ci the user's ColumnInterpreter implementation + * @param scan the HBase scan object to use to read data from HBase * @return R the median - * @throws Throwable + * @throws Throwable The caller is supposed to handle the exception as they are thrown + * & propagated to it. */ public - R median(final TableName tableName, ColumnInterpreter ci, + R median(final TableName tableName, ColumnInterpreter ci, Scan scan) throws Throwable { try (Table table = connection.getTable(tableName)) { - return median(table, ci, scan); + return median(table, ci, scan); } } @@ -749,15 +761,15 @@ public class AggregationClient implements Closeable { * This is the client side interface/handler for calling the median method for a * given cf-cq combination. This method collects the necessary parameters * to compute the median and returns the median. - * @param table - * @param ci - * @param scan + * @param table table to scan. + * @param ci the user's ColumnInterpreter implementation + * @param scan the HBase scan object to use to read data from HBase * @return R the median - * @throws Throwable + * @throws Throwable The caller is supposed to handle the exception as they are thrown + * & propagated to it. */ public - R median(final Table table, ColumnInterpreter ci, - Scan scan) throws Throwable { + R median(final Table table, ColumnInterpreter ci, Scan scan) throws Throwable { Pair>, List> p = getMedianArgs(table, ci, scan); byte[] startRow = null; byte[] colFamily = scan.getFamilies()[0]; @@ -776,14 +788,19 @@ public class AggregationClient implements Closeable { for (Map.Entry> entry : map.entrySet()) { S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0); double newSumVal = movingSumVal + ci.divideForAvg(s, 1L); - if (newSumVal > halfSumVal) break; // we found the region with the median + if (newSumVal > halfSumVal) { + // we found the region with the median + break; + } movingSumVal = newSumVal; startRow = entry.getKey(); } // scan the region with median and find it Scan scan2 = new Scan(scan); // inherit stop row from method parameter - if (startRow != null) scan2.setStartRow(startRow); + if (startRow != null) { + scan2.setStartRow(startRow); + } ResultScanner scanner = null; try { int cacheSize = scan2.getCaching(); @@ -815,8 +832,8 @@ public class AggregationClient implements Closeable { movingSumVal = newSumVal; kv = r.getColumnLatestCell(colFamily, qualifier); value = ci.getValue(colFamily, qualifier, kv); - } } + } } while (results != null && results.length > 0); } finally { if (scanner != null) { diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java index 010451a34ba..6d804e43d73 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,21 +27,22 @@ import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import org.apache.hadoop.hbase.HConstants; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; /** * Helper class for constructing aggregation request and response. */ @InterfaceAudience.Private -public class AggregationHelper { +public final class AggregationHelper { + private AggregationHelper() {} /** - * @param scan + * @param scan the HBase scan object to use to read data from HBase * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan */ private static void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException { @@ -64,8 +65,8 @@ public class AggregationHelper { validateParameters(scan, canFamilyBeAbsent); final AggregateRequest.Builder requestBuilder = AggregateRequest.newBuilder(); requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName()); - P columnInterpreterSpecificData = null; - if ((columnInterpreterSpecificData = ci.getRequestData()) != null) { + P columnInterpreterSpecificData = ci.getRequestData(); + if (columnInterpreterSpecificData != null) { requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString()); } requestBuilder.setScan(ProtobufUtil.toScan(scan)); @@ -80,7 +81,7 @@ public class AggregationHelper { * @param position the position of the argument in the class declaration * @param b the ByteString which should be parsed to get the instance created * @return the instance - * @throws IOException + * @throws IOException Either we couldn't instantiate the method object, or "parseFrom" failed. */ @SuppressWarnings("unchecked") // Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO. diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java index 371e865f1ef..3b3e8d9dfab 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -52,11 +52,11 @@ import org.apache.yetus.audience.InterfaceAudience; * summing/processing the individual results obtained from the AggregateService for each region. */ @InterfaceAudience.Public -public class AsyncAggregationClient { +public final class AsyncAggregationClient { + private AsyncAggregationClient() {} private static abstract class AbstractAggregationCallback implements CoprocessorCallback { - private final CompletableFuture future; protected boolean finished = false; @@ -172,6 +172,7 @@ public class AsyncAggregationClient { future.completeExceptionally(e); return future; } + AbstractAggregationCallback callback = new AbstractAggregationCallback(future) { private R min; @@ -200,8 +201,8 @@ public class AsyncAggregationClient { } public static - CompletableFuture - rowCount(AsyncTable table, ColumnInterpreter ci, Scan scan) { + CompletableFuture rowCount(AsyncTable table, ColumnInterpreter ci, + Scan scan) { CompletableFuture future = new CompletableFuture<>(); AggregateRequest req; try { @@ -243,7 +244,6 @@ public class AsyncAggregationClient { return future; } AbstractAggregationCallback callback = new AbstractAggregationCallback(future) { - private S sum; @Override @@ -268,8 +268,8 @@ public class AsyncAggregationClient { } public static - CompletableFuture - avg(AsyncTable table, ColumnInterpreter ci, Scan scan) { + CompletableFuture avg(AsyncTable table, ColumnInterpreter ci, + Scan scan) { CompletableFuture future = new CompletableFuture<>(); AggregateRequest req; try { @@ -279,7 +279,6 @@ public class AsyncAggregationClient { return future; } AbstractAggregationCallback callback = new AbstractAggregationCallback(future) { - private S sum; long count = 0L; @@ -306,8 +305,8 @@ public class AsyncAggregationClient { } public static - CompletableFuture - std(AsyncTable table, ColumnInterpreter ci, Scan scan) { + CompletableFuture std(AsyncTable table, ColumnInterpreter ci, + Scan scan) { CompletableFuture future = new CompletableFuture<>(); AggregateRequest req; try { @@ -365,20 +364,20 @@ public class AsyncAggregationClient { AbstractAggregationCallback> callback = new AbstractAggregationCallback>(future) { - private final NavigableMap map = new TreeMap<>(Bytes.BYTES_COMPARATOR); + private final NavigableMap map = new TreeMap<>(Bytes.BYTES_COMPARATOR); - @Override - protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { - if (resp.getFirstPartCount() > 0) { - map.put(region.getStartKey(), getPromotedValueFromProto(ci, resp, firstPartIndex)); - } + @Override + protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { + if (resp.getFirstPartCount() > 0) { + map.put(region.getStartKey(), getPromotedValueFromProto(ci, resp, firstPartIndex)); } + } - @Override - protected NavigableMap getFinalResult() { - return map; - } - }; + @Override + protected NavigableMap getFinalResult() { + return map; + } + }; table . coprocessorService(AggregateService::newStub, (stub, controller, rpcCallback) -> stub.getMedian(controller, req, rpcCallback), callback) @@ -388,8 +387,8 @@ public class AsyncAggregationClient { } private static void findMedian( - CompletableFuture future, AsyncTable table, - ColumnInterpreter ci, Scan scan, NavigableMap sumByRegion) { + CompletableFuture future, AsyncTable table, + ColumnInterpreter ci, Scan scan, NavigableMap sumByRegion) { double halfSum = ci.divideForAvg(sumByRegion.values().stream().reduce(ci::add).get(), 2L); S movingSum = null; byte[] startRow = null; @@ -411,7 +410,6 @@ public class AsyncAggregationClient { byte[] weightQualifier = qualifiers.last(); byte[] valueQualifier = qualifiers.first(); table.scan(scan, new AdvancedScanResultConsumer() { - private S sum = baseSum; private R value = null; @@ -457,8 +455,8 @@ public class AsyncAggregationClient { } public static - CompletableFuture median(AsyncTable table, - ColumnInterpreter ci, Scan scan) { + CompletableFuture median(AsyncTable table, + ColumnInterpreter ci, Scan scan) { CompletableFuture future = new CompletableFuture<>(); sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> { if (error != null) { diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java index c8badd36921..a7181f962cd 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -36,9 +35,6 @@ import java.util.NavigableSet; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; @@ -47,6 +43,9 @@ import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateReque import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A concrete AggregateProtocol implementation. Its system level coprocessor @@ -62,7 +61,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner; */ @InterfaceAudience.Private public class AggregateImplementation -extends AggregateService implements RegionCoprocessor { + extends AggregateService implements RegionCoprocessor { protected static final Logger log = LoggerFactory.getLogger(AggregateImplementation.class); private RegionCoprocessorEnvironment env; @@ -75,7 +74,7 @@ extends AggregateService implements RegionCoprocessor { */ @Override public void getMax(RpcController controller, AggregateRequest request, - RpcCallback done) { + RpcCallback done) { InternalScanner scanner = null; AggregateResponse response = null; T max = null; @@ -130,7 +129,7 @@ extends AggregateService implements RegionCoprocessor { */ @Override public void getMin(RpcController controller, AggregateRequest request, - RpcCallback done) { + RpcCallback done) { AggregateResponse response = null; InternalScanner scanner = null; T min = null; @@ -183,10 +182,10 @@ extends AggregateService implements RegionCoprocessor { */ @Override public void getSum(RpcController controller, AggregateRequest request, - RpcCallback done) { + RpcCallback done) { AggregateResponse response = null; InternalScanner scanner = null; - long sum = 0l; + long sum = 0L; try { ColumnInterpreter ci = constructColumnInterpreterFromRequest(request); S sumVal = null; @@ -206,8 +205,9 @@ extends AggregateService implements RegionCoprocessor { int listSize = results.size(); for (int i = 0; i < listSize; i++) { temp = ci.getValue(colFamily, qualifier, results.get(i)); - if (temp != null) + if (temp != null) { sumVal = ci.add(sumVal, ci.castToReturnType(temp)); + } } results.clear(); } while (hasMoreRows); @@ -235,9 +235,9 @@ extends AggregateService implements RegionCoprocessor { */ @Override public void getRowNum(RpcController controller, AggregateRequest request, - RpcCallback done) { + RpcCallback done) { AggregateResponse response = null; - long counter = 0l; + long counter = 0L; List results = new ArrayList<>(); InternalScanner scanner = null; try { @@ -250,8 +250,9 @@ extends AggregateService implements RegionCoprocessor { if (qualifiers != null && !qualifiers.isEmpty()) { qualifier = qualifiers.pollFirst(); } - if (scan.getFilter() == null && qualifier == null) + if (scan.getFilter() == null && qualifier == null) { scan.setFilter(new FirstKeyOnlyFilter()); + } scanner = env.getRegion().getScanner(scan); boolean hasMoreRows = false; do { @@ -294,13 +295,13 @@ extends AggregateService implements RegionCoprocessor { */ @Override public void getAvg(RpcController controller, AggregateRequest request, - RpcCallback done) { + RpcCallback done) { AggregateResponse response = null; InternalScanner scanner = null; try { ColumnInterpreter ci = constructColumnInterpreterFromRequest(request); S sumVal = null; - Long rowCountVal = 0l; + Long rowCountVal = 0L; Scan scan = ProtobufUtil.toScan(request.getScan()); scanner = env.getRegion().getScanner(scan); byte[] colFamily = scan.getFamilies()[0]; @@ -354,13 +355,13 @@ extends AggregateService implements RegionCoprocessor { */ @Override public void getStd(RpcController controller, AggregateRequest request, - RpcCallback done) { + RpcCallback done) { InternalScanner scanner = null; AggregateResponse response = null; try { ColumnInterpreter ci = constructColumnInterpreterFromRequest(request); S sumVal = null, sumSqVal = null, tempVal = null; - long rowCountVal = 0l; + long rowCountVal = 0L; Scan scan = ProtobufUtil.toScan(request.getScan()); scanner = env.getRegion().getScanner(scan); byte[] colFamily = scan.getFamilies()[0]; @@ -419,7 +420,7 @@ extends AggregateService implements RegionCoprocessor { */ @Override public void getMedian(RpcController controller, AggregateRequest request, - RpcCallback done) { + RpcCallback done) { AggregateResponse response = null; InternalScanner scanner = null; try { @@ -526,5 +527,4 @@ extends AggregateService implements RegionCoprocessor { public void stop(CoprocessorEnvironment env) throws IOException { // nothing to do } - } diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java index 12b8b8e58c7..6d6c1a66671 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java @@ -18,6 +18,10 @@ */ package org.apache.hadoop.hbase.coprocessor; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; + import java.io.Closeable; import java.io.IOException; import java.security.PrivilegedExceptionAction; @@ -77,10 +81,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; - /** * Export an HBase table. Writes content to sequence files up in HDFS. Use * {@link Import} to read it back in again. It is implemented by the endpoint @@ -91,10 +91,10 @@ import com.google.protobuf.Service; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving public class Export extends ExportProtos.ExportService implements RegionCoprocessor { - private static final Logger LOG = LoggerFactory.getLogger(Export.class); private static final Class DEFAULT_CODEC = DefaultCodec.class; - private static final SequenceFile.CompressionType DEFAULT_TYPE = SequenceFile.CompressionType.RECORD; + private static final SequenceFile.CompressionType DEFAULT_TYPE = + SequenceFile.CompressionType.RECORD; private RegionCoprocessorEnvironment env = null; private UserProvider userProvider; @@ -110,11 +110,13 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces ExportUtils.usage("Wrong number of arguments: " + ArrayUtils.getLength(otherArgs)); return null; } - Triple arguments = ExportUtils.getArgumentsFromCommandLine(conf, otherArgs); + Triple arguments = + ExportUtils.getArgumentsFromCommandLine(conf, otherArgs); return run(conf, arguments.getFirst(), arguments.getSecond(), arguments.getThird()); } - public static Map run(final Configuration conf, TableName tableName, Scan scan, Path dir) throws Throwable { + public static Map run(final Configuration conf, TableName tableName, + Scan scan, Path dir) throws Throwable { FileSystem fs = dir.getFileSystem(conf); UserProvider userProvider = UserProvider.instantiate(conf); checkDir(fs, dir); @@ -158,7 +160,8 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces } } - private static SequenceFile.CompressionType getCompressionType(final ExportProtos.ExportRequest request) { + private static SequenceFile.CompressionType getCompressionType( + final ExportProtos.ExportRequest request) { if (request.hasCompressType()) { return SequenceFile.CompressionType.valueOf(request.getCompressType()); } else { @@ -166,11 +169,13 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces } } - private static CompressionCodec getCompressionCodec(final Configuration conf, final ExportProtos.ExportRequest request) { + private static CompressionCodec getCompressionCodec(final Configuration conf, + final ExportProtos.ExportRequest request) { try { Class codecClass; if (request.hasCompressCodec()) { - codecClass = conf.getClassByName(request.getCompressCodec()).asSubclass(CompressionCodec.class); + codecClass = conf.getClassByName(request.getCompressCodec()) + .asSubclass(CompressionCodec.class); } else { codecClass = DEFAULT_CODEC; } @@ -198,16 +203,17 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces rval.add(SequenceFile.Writer.valueClass(Result.class)); rval.add(getOutputPath(conf, info, request)); if (getCompression(request)) { - rval.add(SequenceFile.Writer.compression(getCompressionType(request), getCompressionCodec(conf, request))); + rval.add(SequenceFile.Writer.compression(getCompressionType(request), + getCompressionCodec(conf, request))); } else { rval.add(SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE)); } return rval; } - private static ExportProtos.ExportResponse processData(final Region region, final Configuration conf, - final UserProvider userProvider, final Scan scan, final Token userToken, - final List opts) throws IOException { + private static ExportProtos.ExportResponse processData(final Region region, + final Configuration conf, final UserProvider userProvider, final Scan scan, + final Token userToken, final List opts) throws IOException { ScanCoprocessor cp = new ScanCoprocessor(region); RegionScanner scanner = null; try (RegionOp regionOp = new RegionOp(region); @@ -230,11 +236,15 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces } Cell firstCell = cells.get(0); for (Cell cell : cells) { - if (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength(), - cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) != 0) { - throw new IOException("Why the RegionScanner#nextRaw returns the data of different rows??" - + " first row=" + Bytes.toHex(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength()) - + ", current row=" + Bytes.toHex(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + if (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(), + firstCell.getRowLength(), cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength()) != 0) { + throw new IOException("Why the RegionScanner#nextRaw returns the data of different" + + " rows?? first row=" + + Bytes.toHex(firstCell.getRowArray(), firstCell.getRowOffset(), + firstCell.getRowLength()) + + ", current row=" + + Bytes.toHex(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); } } results.add(Result.create(cells)); @@ -298,7 +308,6 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces return builder.build(); } - @Override public void start(CoprocessorEnvironment environment) throws IOException { if (environment instanceof RegionCoprocessorEnvironment) { @@ -323,7 +332,8 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces RpcCallback done) { Region region = env.getRegion(); Configuration conf = HBaseConfiguration.create(env.getConfiguration()); - conf.setStrings("io.serializations", conf.get("io.serializations"), ResultSerialization.class.getName()); + conf.setStrings("io.serializations", conf.get("io.serializations"), + ResultSerialization.class.getName()); try { Scan scan = validateKey(region.getRegionInfo(), request); Token userToken = null; @@ -344,7 +354,8 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces } } - private Scan validateKey(final RegionInfo region, final ExportProtos.ExportRequest request) throws IOException { + private Scan validateKey(final RegionInfo region, final ExportProtos.ExportRequest request) + throws IOException { Scan scan = ProtobufUtil.toScan(request.getScan()); byte[] regionStartKey = region.getStartKey(); byte[] originStartKey = scan.getStartRow(); @@ -362,7 +373,6 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces } private static class RegionOp implements Closeable { - private final Region region; RegionOp(final Region region) throws IOException { @@ -377,7 +387,6 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces } private static class ScanCoprocessor { - private final HRegion region; ScanCoprocessor(final Region region) { @@ -439,17 +448,20 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces private static class SecureWriter implements Closeable { private final PrivilegedWriter privilegedWriter; - SecureWriter(final Configuration conf, final UserProvider userProvider, final Token userToken, - final List opts) throws IOException { + SecureWriter(final Configuration conf, final UserProvider userProvider, + final Token userToken, final List opts) + throws IOException { privilegedWriter = new PrivilegedWriter(getActiveUser(userProvider, userToken), - SequenceFile.createWriter(conf, opts.toArray(new SequenceFile.Writer.Option[opts.size()]))); + SequenceFile.createWriter(conf, + opts.toArray(new SequenceFile.Writer.Option[opts.size()]))); } void append(final Object key, final Object value) throws IOException { privilegedWriter.append(key, value); } - private static User getActiveUser(final UserProvider userProvider, final Token userToken) throws IOException { + private static User getActiveUser(final UserProvider userProvider, final Token userToken) + throws IOException { User user = RpcServer.getRequestUser().orElse(userProvider.getCurrent()); if (user == null && userToken != null) { LOG.warn("No found of user credentials, but a token was got from user request"); @@ -465,7 +477,8 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces } } - private static class PrivilegedWriter implements PrivilegedExceptionAction, Closeable { + private static class PrivilegedWriter implements PrivilegedExceptionAction, + Closeable { private final User user; private final SequenceFile.Writer out; private Object key; @@ -502,8 +515,7 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces } } - public static class Response { - + public final static class Response { private final long rowCount; private final long cellCount; diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java index 5418cd0b8e7..fb161d94661 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hbase.security.access; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; + import java.io.IOException; import java.util.Collections; import java.util.List; @@ -45,9 +49,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +61,6 @@ import org.slf4j.LoggerFactory; @InterfaceAudience.Private @Deprecated public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements RegionCoprocessor { - public static final long VERSION = 0L; private static final Logger LOG = LoggerFactory.getLogger(SecureBulkLoadEndpoint.class); @@ -82,7 +82,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg @Override public void prepareBulkLoad(RpcController controller, PrepareBulkLoadRequest request, - RpcCallback done) { + RpcCallback done) { try { SecureBulkLoadManager secureBulkLoadManager = this.rsServices.getSecureBulkLoadManager(); @@ -100,7 +100,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg */ org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest convert(PrepareBulkLoadRequest request) - throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException { + throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException { byte [] bytes = request.toByteArray(); org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest.Builder builder = @@ -112,7 +112,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg @Override public void cleanupBulkLoad(RpcController controller, CleanupBulkLoadRequest request, - RpcCallback done) { + RpcCallback done) { try { SecureBulkLoadManager secureBulkLoadManager = this.rsServices.getSecureBulkLoadManager(); secureBulkLoadManager.cleanupBulkLoad((HRegion) this.env.getRegion(), convert(request)); @@ -127,7 +127,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg * Convert from CPEP protobuf 2.5 to internal protobuf 3.3. */ org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest - convert(CleanupBulkLoadRequest request) + convert(CleanupBulkLoadRequest request) throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException { byte [] bytes = request.toByteArray(); org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest.Builder @@ -140,7 +140,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg @Override public void secureBulkLoadHFiles(RpcController controller, SecureBulkLoadHFilesRequest request, - RpcCallback done) { + RpcCallback done) { boolean loaded = false; Map> map = null; try { @@ -159,7 +159,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg * Convert from CPEP protobuf 2.5 to internal protobuf 3.3. */ org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest - convert(BulkLoadHFileRequest request) + convert(BulkLoadHFileRequest request) throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException { byte [] bytes = request.toByteArray(); org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.Builder @@ -171,7 +171,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg } private BulkLoadHFileRequest ConvertSecureBulkLoadHFilesRequest( - SecureBulkLoadHFilesRequest request) { + SecureBulkLoadHFilesRequest request) { BulkLoadHFileRequest.Builder bulkLoadHFileRequest = BulkLoadHFileRequest.newBuilder(); RegionSpecifier region = ProtobufUtil.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, this.env diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java index fd570e7854c..7b315f9f367 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,6 +17,10 @@ */ package org.apache.hadoop.hbase.coprocessor; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -36,15 +39,11 @@ import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; - /** * The aggregation implementation at a region. */ public class ColumnAggregationEndpoint extends ColumnAggregationService -implements RegionCoprocessor { + implements RegionCoprocessor { private static final Logger LOG = LoggerFactory.getLogger(ColumnAggregationEndpoint.class); private RegionCoprocessorEnvironment env = null; diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java index 5effbe9fa40..8ed137a2e7b 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.coprocessor; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -27,31 +31,28 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumRequest; import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumResponse; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; - /** * Test coprocessor endpoint that always returns {@code null} for requests to the last region * in the table. This allows tests to provide assurance of correct {@code null} handling for * response values. */ -public class ColumnAggregationEndpointNullResponse - extends ColumnAggregationServiceNullResponse implements RegionCoprocessor { +public class ColumnAggregationEndpointNullResponse extends ColumnAggregationServiceNullResponse + implements RegionCoprocessor { private static final Logger LOG = LoggerFactory.getLogger(ColumnAggregationEndpointNullResponse.class); private RegionCoprocessorEnvironment env = null; + @Override public Iterable getServices() { return Collections.singleton(this); diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java index 39e3b12cd21..4aaaea268e3 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.coprocessor; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -38,18 +42,14 @@ import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; - /** * Test coprocessor endpoint that always throws a {@link DoNotRetryIOException} for requests on * the last region in the table. This allows tests to ensure correct error handling of * coprocessor endpoints throwing exceptions. */ public class ColumnAggregationEndpointWithErrors - extends ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors - implements RegionCoprocessor { + extends ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors + implements RegionCoprocessor { private static final Logger LOG = LoggerFactory.getLogger(ColumnAggregationEndpointWithErrors.class); @@ -76,7 +76,7 @@ public class ColumnAggregationEndpointWithErrors @Override public void sum(RpcController controller, ColumnAggregationWithErrorsSumRequest request, - RpcCallback done) { + RpcCallback done) { // aggregate at each region Scan scan = new Scan(); // Family is required in pb. Qualifier is not. diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java index 6fc4eb9886a..697e88463f6 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java @@ -15,12 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.coprocessor; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.Service; + +import java.io.IOException; +import java.util.Collections; + import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.RpcServer; @@ -32,16 +35,13 @@ import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestPro import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; import org.apache.hadoop.hbase.util.Threads; -import java.io.IOException; -import java.util.Collections; - /** * Test implementation of a coprocessor endpoint exposing the * {@link TestRpcServiceProtos.TestProtobufRpcProto} service methods. For internal use by unit tests * only. */ public class ProtobufCoprocessorService extends TestRpcServiceProtos.TestProtobufRpcProto - implements MasterCoprocessor, RegionCoprocessor { + implements MasterCoprocessor, RegionCoprocessor { public ProtobufCoprocessorService() {} @Override @@ -51,34 +51,34 @@ public class ProtobufCoprocessorService extends TestRpcServiceProtos.TestProtobu @Override public void ping(RpcController controller, TestProtos.EmptyRequestProto request, - RpcCallback done) { + RpcCallback done) { done.run(TestProtos.EmptyResponseProto.getDefaultInstance()); } @Override public void echo(RpcController controller, TestProtos.EchoRequestProto request, - RpcCallback done) { + RpcCallback done) { String message = request.getMessage(); done.run(TestProtos.EchoResponseProto.newBuilder().setMessage(message).build()); } @Override public void error(RpcController controller, TestProtos.EmptyRequestProto request, - RpcCallback done) { + RpcCallback done) { CoprocessorRpcUtils.setControllerException(controller, new IOException("Test exception")); done.run(null); } @Override public void pause(RpcController controller, PauseRequestProto request, - RpcCallback done) { + RpcCallback done) { Threads.sleepWithoutInterrupt(request.getMs()); done.run(EmptyResponseProto.getDefaultInstance()); } @Override public void addr(RpcController controller, EmptyRequestProto request, - RpcCallback done) { + RpcCallback done) { done.run(AddrResponseProto.newBuilder() .setAddr(RpcServer.getRemoteAddress().get().getHostAddress()).build()); } @@ -92,5 +92,4 @@ public class ProtobufCoprocessorService extends TestRpcServiceProtos.TestProtobu public void stop(CoprocessorEnvironment env) throws IOException { // To change body of implemented methods use File | Settings | File Templates. } - } diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java index 9d4b07df5b8..1a68f086486 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -53,7 +53,6 @@ import org.junit.runners.Parameterized; @RunWith(Parameterized.class) @Category({ ClientTests.class, MediumTests.class }) public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase { - @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestAsyncCoprocessorEndpoint.class); @@ -80,8 +79,8 @@ public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase { TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build(); TestProtos.EchoResponseProto response = admin - . coprocessorService( - TestRpcServiceProtos.TestProtobufRpcProto::newStub, + . + coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto::newStub, (s, c, done) -> s.echo(c, request, done)).get(); assertEquals("hello", response.getMessage()); } @@ -91,8 +90,8 @@ public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase { TestProtos.EmptyRequestProto emptyRequest = TestProtos.EmptyRequestProto.getDefaultInstance(); try { admin - . coprocessorService( - TestRpcServiceProtos.TestProtobufRpcProto::newStub, + . + coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto::newStub, (s, c, done) -> s.error(c, emptyRequest, done)).get(); fail("Should have thrown an exception"); } catch (Exception e) { @@ -106,7 +105,8 @@ public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase { DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(); DummyRegionServerEndpointProtos.DummyResponse response = admin - . coprocessorService( + . coprocessorService( DummyRegionServerEndpointProtos.DummyService::newStub, (s, c, done) -> s.dummyCall(c, request, done), serverName).get(); assertEquals(DUMMY_VALUE, response.getValue()); @@ -119,7 +119,8 @@ public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase { DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(); try { admin - . coprocessorService( + . coprocessorService( DummyRegionServerEndpointProtos.DummyService::newStub, (s, c, done) -> s.dummyThrow(c, request, done), serverName).get(); fail("Should have thrown an exception"); @@ -130,8 +131,7 @@ public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase { } public static class DummyRegionServerEndpoint extends DummyService - implements RegionServerCoprocessor { - + implements RegionServerCoprocessor { public DummyRegionServerEndpoint() {} @Override diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java index 1bf36f5e861..5c3e5363922 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -66,7 +66,6 @@ import org.slf4j.LoggerFactory; */ @Category({CoprocessorTests.class, MediumTests.class}) public class TestClassLoading { - @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestClassLoading.class); @@ -98,9 +97,7 @@ public class TestClassLoading { private static Class masterCoprocessor = TestMasterCoprocessor.class; private static final String[] regionServerSystemCoprocessors = - new String[]{ - regionServerCoprocessor.getSimpleName() - }; + new String[]{ regionServerCoprocessor.getSimpleName() }; private static final String[] masterRegionServerSystemCoprocessors = new String[] { regionCoprocessor1.getSimpleName(), MultiRowMutationEndpoint.class.getSimpleName(), @@ -211,7 +208,9 @@ public class TestClassLoading { found2_k2 = found2_k2 && (conf.get("k2") != null); found2_k3 = found2_k3 && (conf.get("k3") != null); } else { - found2_k1 = found2_k2 = found2_k3 = false; + found2_k1 = false; + found2_k2 = false; + found2_k3 = false; } regionsActiveClassLoaders .put(region, ((CoprocessorHost) region.getCoprocessorHost()).getExternalClassLoaders()); @@ -571,6 +570,4 @@ public class TestClassLoading { // Now wait a bit longer for the coprocessor hosts to load the CPs Thread.sleep(1000); } - } - diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java index 87409a7813c..4b8f6c7e8be 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -67,7 +67,6 @@ import org.slf4j.LoggerFactory; */ @Category({CoprocessorTests.class, MediumTests.class}) public class TestCoprocessorEndpoint { - @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestCoprocessorEndpoint.class); @@ -119,14 +118,14 @@ public class TestCoprocessorEndpoint { } private Map sum(final Table table, final byte [] family, - final byte [] qualifier, final byte [] start, final byte [] end) - throws ServiceException, Throwable { + final byte [] qualifier, final byte [] start, final byte [] end) + throws ServiceException, Throwable { return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class, start, end, new Batch.Call() { @Override public Long call(ColumnAggregationProtos.ColumnAggregationService instance) - throws IOException { + throws IOException { CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>(); ColumnAggregationProtos.SumRequest.Builder builder = @@ -191,26 +190,28 @@ public class TestCoprocessorEndpoint { // scan: for all regions final RpcController controller = new ServerRpcController(); table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class, - ROWS[0], ROWS[ROWS.length - 1], - new Batch.Call() { - public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance) - throws IOException { - LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance()); - CoprocessorRpcUtils.BlockingRpcCallback callback = - new CoprocessorRpcUtils.BlockingRpcCallback<>(); - instance.echo(controller, request, callback); - TestProtos.EchoResponseProto response = callback.get(); - LOG.debug("Batch.Call returning result " + response); - return response; - } - }, - new Batch.Callback() { - public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) { - assertNotNull(result); - assertEquals("hello", result.getMessage()); - results.put(region, result.getMessage()); - } + ROWS[0], ROWS[ROWS.length - 1], + new Batch.Call() { + @Override + public TestProtos.EchoResponseProto call( + TestRpcServiceProtos.TestProtobufRpcProto instance) throws IOException { + LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance()); + CoprocessorRpcUtils.BlockingRpcCallback callback = + new CoprocessorRpcUtils.BlockingRpcCallback<>(); + instance.echo(controller, request, callback); + TestProtos.EchoResponseProto response = callback.get(); + LOG.debug("Batch.Call returning result " + response); + return response; } + }, + new Batch.Callback() { + @Override + public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) { + assertNotNull(result); + assertEquals("hello", result.getMessage()); + results.put(region, result.getMessage()); + } + } ); for (Map.Entry e : results.entrySet()) { LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey())); @@ -224,26 +225,28 @@ public class TestCoprocessorEndpoint { // scan: for region 2 and region 3 table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class, - ROWS[rowSeperator1], ROWS[ROWS.length - 1], - new Batch.Call() { - public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance) - throws IOException { - LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance()); - CoprocessorRpcUtils.BlockingRpcCallback callback = - new CoprocessorRpcUtils.BlockingRpcCallback<>(); - instance.echo(controller, request, callback); - TestProtos.EchoResponseProto response = callback.get(); - LOG.debug("Batch.Call returning result " + response); - return response; - } - }, - new Batch.Callback() { - public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) { - assertNotNull(result); - assertEquals("hello", result.getMessage()); - results.put(region, result.getMessage()); - } + ROWS[rowSeperator1], ROWS[ROWS.length - 1], + new Batch.Call() { + @Override + public TestProtos.EchoResponseProto call( + TestRpcServiceProtos.TestProtobufRpcProto instance) throws IOException { + LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance()); + CoprocessorRpcUtils.BlockingRpcCallback callback = + new CoprocessorRpcUtils.BlockingRpcCallback<>(); + instance.echo(controller, request, callback); + TestProtos.EchoResponseProto response = callback.get(); + LOG.debug("Batch.Call returning result " + response); + return response; } + }, + new Batch.Callback() { + @Override + public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) { + assertNotNull(result); + assertEquals("hello", result.getMessage()); + results.put(region, result.getMessage()); + } + } ); for (Map.Entry e : results.entrySet()) { LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey())); @@ -348,6 +351,4 @@ public class TestCoprocessorEndpoint { } return ret; } - } - diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorServiceBackwardCompatibility.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorServiceBackwardCompatibility.java index e7181bb0541..d1e848dbc62 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorServiceBackwardCompatibility.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorServiceBackwardCompatibility.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -26,8 +26,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.*; -import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -38,9 +40,8 @@ import org.junit.experimental.categories.Category; /** * Tests to ensure that 2.0 is backward compatible in loading CoprocessorService. */ -@Category({SmallTests.class}) +@Category({MediumTests.class}) public class TestCoprocessorServiceBackwardCompatibility { - @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestCoprocessorServiceBackwardCompatibility.class); diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java index fbcbb54f680..3bec2034a12 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -47,7 +47,6 @@ import org.junit.rules.TestName; @Category({CoprocessorTests.class, MediumTests.class}) public class TestCoprocessorTableEndpoint { - @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestCoprocessorTableEndpoint.class); @@ -81,7 +80,7 @@ public class TestCoprocessorTableEndpoint { HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor(TEST_FAMILY)); - desc.addCoprocessor(org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName()); + desc.addCoprocessor(ColumnAggregationEndpoint.class.getName()); createTable(desc); verifyTable(tableName); @@ -96,7 +95,7 @@ public class TestCoprocessorTableEndpoint { createTable(desc); - desc.addCoprocessor(org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName()); + desc.addCoprocessor(ColumnAggregationEndpoint.class.getName()); updateTable(desc); verifyTable(tableName); @@ -113,24 +112,24 @@ public class TestCoprocessorTableEndpoint { private static Map sum(final Table table, final byte [] family, final byte [] qualifier, final byte [] start, final byte [] end) throws ServiceException, Throwable { - return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class, + return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class, start, end, - new Batch.Call() { - @Override - public Long call(ColumnAggregationProtos.ColumnAggregationService instance) - throws IOException { - CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = - new CoprocessorRpcUtils.BlockingRpcCallback<>(); - ColumnAggregationProtos.SumRequest.Builder builder = - ColumnAggregationProtos.SumRequest.newBuilder(); - builder.setFamily(ByteString.copyFrom(family)); - if (qualifier != null && qualifier.length > 0) { - builder.setQualifier(ByteString.copyFrom(qualifier)); + new Batch.Call() { + @Override + public Long call(ColumnAggregationProtos.ColumnAggregationService instance) + throws IOException { + CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<>(); + ColumnAggregationProtos.SumRequest.Builder builder = + ColumnAggregationProtos.SumRequest.newBuilder(); + builder.setFamily(ByteString.copyFrom(family)); + if (qualifier != null && qualifier.length > 0) { + builder.setQualifier(ByteString.copyFrom(qualifier)); + } + instance.sum(null, builder.build(), rpcCallback); + return rpcCallback.get().getSum(); } - instance.sum(null, builder.build(), rpcCallback); - return rpcCallback.get().getSum(); - } - }); + }); } private static final void createTable(HTableDescriptor desc) throws Exception { diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java index 01e5b59a11b..e788e5d11f5 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -84,7 +84,6 @@ import org.slf4j.LoggerFactory; */ @Category({CoprocessorTests.class, MediumTests.class}) public class TestRowProcessorEndpoint { - @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestRowProcessorEndpoint.class); @@ -219,8 +218,7 @@ public class TestRowProcessorEndpoint { return result; } - private void concurrentExec( - final Runnable task, final int numThreads) throws Throwable { + private void concurrentExec(final Runnable task, final int numThreads) throws Throwable { startSignal = new CountDownLatch(numThreads); doneSignal = new CountDownLatch(numThreads); for (int i = 0; i < numThreads; ++i) { @@ -313,10 +311,10 @@ public class TestRowProcessorEndpoint { * So they can be loaded with the endpoint on the coprocessor. */ public static class RowProcessorEndpoint - extends BaseRowProcessorEndpoint { + extends BaseRowProcessorEndpoint { public static class IncrementCounterProcessor extends - BaseRowProcessor { + BaseRowProcessor { int counter = 0; byte[] row = new byte[0]; @@ -397,7 +395,7 @@ public class TestRowProcessorEndpoint { } public static class FriendsOfFriendsProcessor extends - BaseRowProcessor { + BaseRowProcessor { byte[] row = null; byte[] person = null; final Set result = new HashSet<>(); @@ -482,7 +480,7 @@ public class TestRowProcessorEndpoint { } public static class RowSwapProcessor extends - BaseRowProcessor { + BaseRowProcessor { byte[] row1 = new byte[0]; byte[] row2 = new byte[0]; @@ -586,8 +584,7 @@ public class TestRowProcessorEndpoint { } public static class TimeoutProcessor extends - BaseRowProcessor { - + BaseRowProcessor { byte[] row = new byte[0]; /** @@ -643,8 +640,7 @@ public class TestRowProcessorEndpoint { } } - public static void doScan( - HRegion region, Scan scan, List result) throws IOException { + public static void doScan(HRegion region, Scan scan, List result) throws IOException { InternalScanner scanner = null; try { scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); @@ -652,7 +648,9 @@ public class TestRowProcessorEndpoint { result.clear(); scanner.next(result); } finally { - if (scanner != null) scanner.close(); + if (scanner != null) { + scanner.close(); + } } } } @@ -676,5 +674,4 @@ public class TestRowProcessorEndpoint { out.append("]"); return out.toString(); } - } diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java index 38c3081bb00..21f17f76a8a 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -87,7 +87,6 @@ import org.slf4j.LoggerFactory; @Category({MediumTests.class}) public class TestSecureExport { - @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestSecureExport.class); @@ -146,12 +145,16 @@ public class TestSecureExport { USER_XO + "/" + LOCALHOST, USER_NONE + "/" + LOCALHOST); } + private static User getUserByLogin(final String user) throws IOException { - return User.create(UserGroupInformation.loginUserFromKeytabAndReturnUGI(getPrinciple(user), KEYTAB_FILE.getAbsolutePath())); + return User.create(UserGroupInformation.loginUserFromKeytabAndReturnUGI( + getPrinciple(user), KEYTAB_FILE.getAbsolutePath())); } + private static String getPrinciple(final String user) { return user + "/" + LOCALHOST + "@" + KDC.getRealm(); } + private static void setUpClusterKdc() throws Exception { HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath()); HBaseKerberosUtils.setPrincipalForTesting(SERVER_PRINCIPAL + "@" + KDC.getRealm()); @@ -160,30 +163,42 @@ public class TestSecureExport { // the following key should be changed. // 1) DFS_NAMENODE_USER_NAME_KEY -> DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY // 2) DFS_DATANODE_USER_NAME_KEY -> DFS_DATANODE_KERBEROS_PRINCIPAL_KEY - UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, SERVER_PRINCIPAL + "@" + KDC.getRealm()); - UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, SERVER_PRINCIPAL + "@" + KDC.getRealm()); - UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath()); - UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath()); + UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, + SERVER_PRINCIPAL + "@" + KDC.getRealm()); + UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, + SERVER_PRINCIPAL + "@" + KDC.getRealm()); + UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, + KEYTAB_FILE.getAbsolutePath()); + UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, + KEYTAB_FILE.getAbsolutePath()); // set yarn principal - UTIL.getConfiguration().set(YarnConfiguration.RM_PRINCIPAL, SERVER_PRINCIPAL + "@" + KDC.getRealm()); - UTIL.getConfiguration().set(YarnConfiguration.NM_PRINCIPAL, SERVER_PRINCIPAL + "@" + KDC.getRealm()); - UTIL.getConfiguration().set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, HTTP_PRINCIPAL + "@" + KDC.getRealm()); + UTIL.getConfiguration().set(YarnConfiguration.RM_PRINCIPAL, + SERVER_PRINCIPAL + "@" + KDC.getRealm()); + UTIL.getConfiguration().set(YarnConfiguration.NM_PRINCIPAL, + SERVER_PRINCIPAL + "@" + KDC.getRealm()); + UTIL.getConfiguration().set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, + HTTP_PRINCIPAL + "@" + KDC.getRealm()); UTIL.getConfiguration().setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); - UTIL.getConfiguration().set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name()); + UTIL.getConfiguration().set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, + HttpConfig.Policy.HTTPS_ONLY.name()); UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, LOCALHOST + ":0"); UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, LOCALHOST + ":0"); File keystoresDir = new File(UTIL.getDataTestDir("keystore").toUri().getPath()); keystoresDir.mkdirs(); String sslConfDir = KeyStoreTestUtil.getClasspathDir(TestSecureExport.class); - KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, UTIL.getConfiguration(), false); + KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, + UTIL.getConfiguration(), false); UTIL.getConfiguration().setBoolean("ignore.secure.ports.for.testing", true); UserGroupInformation.setConfiguration(UTIL.getConfiguration()); - UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, UTIL.getConfiguration().get( - CoprocessorHost.REGION_COPROCESSOR_CONF_KEY) + "," + Export.class.getName()); + UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + UTIL.getConfiguration().get( + CoprocessorHost.REGION_COPROCESSOR_CONF_KEY) + "," + Export.class.getName()); } - private static void addLabels(final Configuration conf, final List users, final List labels) throws Exception { + + private static void addLabels(final Configuration conf, final List users, + final List labels) throws Exception { PrivilegedExceptionAction action = () -> { try (Connection conn = ConnectionFactory.createConnection(conf)) { @@ -207,19 +222,21 @@ public class TestSecureExport { @After public void cleanup() throws IOException { } + private static void clearOutput(Path path) throws IOException { FileSystem fs = path.getFileSystem(UTIL.getConfiguration()); if (fs.exists(path)) { assertEquals(true, fs.delete(path, true)); } } + /** * Sets the security firstly for getting the correct default realm. - * @throws Exception */ @BeforeClass public static void beforeClass() throws Exception { - UserProvider.setUserProviderForTesting(UTIL.getConfiguration(), HadoopSecurityEnabledUserProviderForTesting.class); + UserProvider.setUserProviderForTesting(UTIL.getConfiguration(), + HadoopSecurityEnabledUserProviderForTesting.class); setUpKdcServer(); SecureTestUtil.enableSecurity(UTIL.getConfiguration()); UTIL.getConfiguration().setBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY, true); @@ -252,11 +269,9 @@ public class TestSecureExport { /** * Test the ExportEndpoint's access levels. The {@link Export} test is ignored * since the access exceptions cannot be collected from the mappers. - * - * @throws java.io.IOException */ @Test - public void testAccessCase() throws IOException, Throwable { + public void testAccessCase() throws Throwable { final String exportTable = name.getMethodName(); TableDescriptor exportHtd = TableDescriptorBuilder .newBuilder(TableName.valueOf(name.getMethodName())) @@ -339,11 +354,13 @@ public class TestSecureExport { SecureTestUtil.verifyAllowed(deleteAction, getUserByLogin(USER_OWNER)); fs.delete(openDir, true); } + @Test public void testVisibilityLabels() throws IOException, Throwable { final String exportTable = name.getMethodName() + "_export"; final String importTable = name.getMethodName() + "_import"; - final TableDescriptor exportHtd = TableDescriptorBuilder.newBuilder(TableName.valueOf(exportTable)) + final TableDescriptor exportHtd = TableDescriptorBuilder + .newBuilder(TableName.valueOf(exportTable)) .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYA)) .setOwnerString(USER_OWNER) .build(); @@ -400,7 +417,8 @@ public class TestSecureExport { } }; SecureTestUtil.verifyAllowed(exportAction, getUserByLogin(USER_OWNER)); - final TableDescriptor importHtd = TableDescriptorBuilder.newBuilder(TableName.valueOf(importTable)) + final TableDescriptor importHtd = TableDescriptorBuilder + .newBuilder(TableName.valueOf(importTable)) .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYB)) .setOwnerString(USER_OWNER) .build(); @@ -411,7 +429,8 @@ public class TestSecureExport { importTable, output.toString() }; - assertEquals(0, ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args)); + assertEquals(0, ToolRunner.run( + new Configuration(UTIL.getConfiguration()), new Import(), args)); return null; }; SecureTestUtil.verifyAllowed(importAction, getUserByLogin(USER_OWNER)); diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java index d32e6ea098c..0d15f93d9f5 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java @@ -15,17 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; @@ -41,12 +38,14 @@ import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.security.token.Token; +import org.apache.yetus.audience.InterfaceAudience; /** * Client proxy for SecureBulkLoadProtocol used in conjunction with SecureBulkLoadEndpoint * @deprecated Use for backward compatibility testing only. Will be removed when * SecureBulkLoadEndpoint is not supported. */ +@Deprecated @InterfaceAudience.Private public class SecureBulkLoadEndpointClient { private Table table; @@ -111,9 +110,8 @@ public class SecureBulkLoadEndpointClient { } public boolean bulkLoadHFiles(final List> familyPaths, - final Token userToken, - final String bulkToken, - final byte[] startRow) throws IOException { + final Token userToken, final String bulkToken, final byte[] startRow) + throws IOException { // we never want to send a batch of HFiles to all regions, thus cannot call // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639 try { @@ -162,5 +160,4 @@ public class SecureBulkLoadEndpointClient { throw new IOException(throwable); } } - } diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java index 7196851b920..49697b83150 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegi @Category({RegionServerTests.class, LargeTests.class}) @Ignore // BROKEN. FIX OR REMOVE. public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionServerBulkLoad { - @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHRegionServerBulkLoadWithOldSecureEndpoint.class); @@ -86,8 +85,8 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS final AtomicLong numCompactions = new AtomicLong(); private TableName tableName; - public AtomicHFileLoader(TableName tableName, TestContext ctx, - byte targetFamilies[][]) throws IOException { + public AtomicHFileLoader(TableName tableName, TestContext ctx, byte[][] targetFamilies) + throws IOException { super(ctx); this.tableName = tableName; } @@ -114,19 +113,19 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS final String bulkToken = new SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName); RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration()); ClientServiceCallable callable = - new ClientServiceCallable(conn, tableName, Bytes.toBytes("aaa"), - rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { - @Override - protected Void rpcCall() throws Exception { - LOG.debug("Going to connect to server " + getLocation() + " for row " + - Bytes.toStringBinary(getRow())); - try (Table table = conn.getTable(getTableName())) { - boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths, - null, bulkToken, getLocation().getRegionInfo().getStartKey()); - } - return null; + new ClientServiceCallable(conn, tableName, Bytes.toBytes("aaa"), + rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { + @Override + protected Void rpcCall() throws Exception { + LOG.debug("Going to connect to server " + getLocation() + " for row " + + Bytes.toStringBinary(getRow())); + try (Table table = conn.getTable(getTableName())) { + boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths, + null, bulkToken, getLocation().getRegionInfo().getStartKey()); } - }; + return null; + } + }; RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); RpcRetryingCaller caller = factory. newCaller(); caller.callWithRetries(callable, Integer.MAX_VALUE); @@ -156,7 +155,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS } void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) - throws Exception { + throws Exception { setupTable(tableName, 10); TestContext ctx = new TestContext(UTIL.getConfiguration()); diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java index b306b763441..bc368e3c5a9 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -68,7 +68,6 @@ import org.slf4j.LoggerFactory; @Category({RegionServerTests.class, MediumTests.class}) public class TestServerCustomProtocol { - @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestServerCustomProtocol.class); @@ -84,7 +83,9 @@ public class TestServerCustomProtocol { @Override public void start(CoprocessorEnvironment env) throws IOException { - if (env instanceof RegionCoprocessorEnvironment) return; + if (env instanceof RegionCoprocessorEnvironment) { + return; + } throw new CoprocessorException("Must be loaded on a table region!"); } @@ -116,9 +117,13 @@ public class TestServerCustomProtocol { @Override public void hello(RpcController controller, HelloRequest request, RpcCallback done) { - if (!request.hasName()) done.run(HelloResponse.newBuilder().setResponse(WHOAREYOU).build()); - else if (request.getName().equals(NOBODY)) done.run(HelloResponse.newBuilder().build()); - else done.run(HelloResponse.newBuilder().setResponse(HELLO + request.getName()).build()); + if (!request.hasName()) { + done.run(HelloResponse.newBuilder().setResponse(WHOAREYOU).build()); + } else if (request.getName().equals(NOBODY)) { + done.run(HelloResponse.newBuilder().build()); + } else { + done.run(HelloResponse.newBuilder().setResponse(HELLO + request.getName()).build()); + } } @Override @@ -153,19 +158,19 @@ public class TestServerCustomProtocol { } @Before - public void before() throws Exception { + public void before() throws Exception { final byte[][] SPLIT_KEYS = new byte[][] { ROW_B, ROW_C }; Table table = util.createTable(TEST_TABLE, TEST_FAMILY, SPLIT_KEYS); - Put puta = new Put( ROW_A ); + Put puta = new Put(ROW_A); puta.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1)); table.put(puta); - Put putb = new Put( ROW_B ); + Put putb = new Put(ROW_B); putb.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1)); table.put(putb); - Put putc = new Put( ROW_C ); + Put putc = new Put(ROW_C); putc.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1)); table.put(putc); } @@ -234,7 +239,7 @@ public class TestServerCustomProtocol { } private Map hello(final Table table, final String send, final String response) - throws ServiceException, Throwable { + throws ServiceException, Throwable { Map results = hello(table, send); for (Map.Entry e: results.entrySet()) { assertEquals("Invalid custom protocol response", response, e.getValue()); @@ -243,13 +248,12 @@ public class TestServerCustomProtocol { } private Map hello(final Table table, final String send) - throws ServiceException, Throwable { + throws ServiceException, Throwable { return hello(table, send, null, null); } private Map hello(final Table table, final String send, final byte [] start, - final byte [] end) - throws ServiceException, Throwable { + final byte [] end) throws ServiceException, Throwable { return table.coprocessorService(PingProtos.PingService.class, start, end, new Batch.Call() { @@ -258,7 +262,9 @@ public class TestServerCustomProtocol { CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>(); PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder(); - if (send != null) builder.setName(send); + if (send != null) { + builder.setName(send); + } instance.hello(null, builder.build(), rpcCallback); PingProtos.HelloResponse r = rpcCallback.get(); return r != null && r.hasResponse()? r.getResponse(): null; @@ -267,8 +273,7 @@ public class TestServerCustomProtocol { } private Map compoundOfHelloAndPing(final Table table, final byte [] start, - final byte [] end) - throws ServiceException, Throwable { + final byte [] end) throws ServiceException, Throwable { return table.coprocessorService(PingProtos.PingService.class, start, end, new Batch.Call() { @@ -286,9 +291,8 @@ public class TestServerCustomProtocol { }); } - private Map noop(final Table table, final byte [] start, - final byte [] end) - throws ServiceException, Throwable { + private Map noop(final Table table, final byte [] start, final byte [] end) + throws ServiceException, Throwable { return table.coprocessorService(PingProtos.PingService.class, start, end, new Batch.Call() { @Override @@ -397,7 +401,7 @@ public class TestServerCustomProtocol { } private Map ping(final Table table, final byte [] start, final byte [] end) - throws ServiceException, Throwable { + throws ServiceException, Throwable { return table.coprocessorService(PingProtos.PingService.class, start, end, new Batch.Call() { @Override @@ -410,8 +414,8 @@ public class TestServerCustomProtocol { private static String doPing(PingProtos.PingService instance) throws IOException { CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>(); - instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback); - return rpcCallback.get().getPong(); + instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback); + return rpcCallback.get().getPong(); } @Test @@ -459,18 +463,17 @@ public class TestServerCustomProtocol { } } - private void verifyRegionResults(RegionLocator table, - Map results, byte[] row) throws Exception { + private void verifyRegionResults(RegionLocator table, Map results, byte[] row) + throws Exception { verifyRegionResults(table, results, "pong", row); } - private void verifyRegionResults(RegionLocator regionLocator, - Map results, String expected, byte[] row) - throws Exception { + private void verifyRegionResults(RegionLocator regionLocator, Map results, + String expected, byte[] row) throws Exception { for (Map.Entry e: results.entrySet()) { LOG.info("row=" + Bytes.toString(row) + ", expected=" + expected + - ", result key=" + Bytes.toString(e.getKey()) + - ", value=" + e.getValue()); + ", result key=" + Bytes.toString(e.getKey()) + + ", value=" + e.getValue()); } HRegionLocation loc = regionLocator.getRegionLocation(row, true); byte[] region = loc.getRegionInfo().getRegionName();