From 547d46cfac34a088b7fb6984453c98fcd6f56977 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 2 Nov 2017 13:55:16 +0800 Subject: [PATCH] HBASE-18972 Use Builder pattern to remove nullable parameters for coprocessor methods in RawAsyncTable interface --- .../hadoop/hbase/client/RawAsyncTable.java | 85 +++++++++++-------- .../hbase/client/RawAsyncTableImpl.java | 75 ++++++++++++---- .../coprocessor/AsyncAggregationClient.java | 72 +++++++++------- 3 files changed, 151 insertions(+), 81 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java index cd0226bdca0..102f2790afd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java @@ -38,11 +38,6 @@ import com.google.protobuf.RpcController; *

* So, only experts that want to build high performance service should use this interface directly, * especially for the {@link #scan(Scan, RawScanResultConsumer)} below. - *

- * TODO: For now the only difference between this interface and {@link AsyncTable} is the scan - * method. The {@link RawScanResultConsumer} exposes the implementation details of a scan(heartbeat) - * so it is not suitable for a normal user. If it is still the only difference after we implement - * most features of AsyncTable, we can think about merge these two interfaces. * @since 2.0.0 */ @InterfaceAudience.Public @@ -135,8 +130,8 @@ public interface RawAsyncTable extends AsyncTableBase { * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have * passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)} * or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no - * {@link #onRegionComplete(RegionInfo, Object)} or - * {@link #onRegionError(RegionInfo, Throwable)} calls in the future. + * {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)} + * calls in the future. *

* Here is a pseudo code to describe a typical implementation of a range coprocessor service * method to help you better understand how the {@link CoprocessorCallback} will be called. The @@ -200,25 +195,56 @@ public interface RawAsyncTable extends AsyncTableBase { } /** - * Execute the given coprocessor call on the regions which are covered by the range from - * {@code startKey} inclusive and {@code endKey} exclusive. See the comment of - * {@link #coprocessorService(Function, CoprocessorCallable, byte[], boolean, byte[], boolean, CoprocessorCallback)} - * for more details. - * @see #coprocessorService(Function, CoprocessorCallable, byte[], boolean, byte[], boolean, - * CoprocessorCallback) + * Helper class for sending coprocessorService request that executes a coprocessor call on regions + * which are covered by a range. + *

+ * If {@code fromRow} is not specified the selection will start with the first table region. If + * {@code toRow} is not specified the selection will continue through the last table region. + * @param the type of the protobuf Service you want to call. + * @param the type of the return value. */ - default void coprocessorService(Function stubMaker, - CoprocessorCallable callable, byte[] startKey, byte[] endKey, - CoprocessorCallback callback) { - coprocessorService(stubMaker, callable, startKey, true, endKey, false, callback); + interface CoprocessorServiceBuilder { + + /** + * @param startKey start region selection with region containing this row, inclusive. + */ + default CoprocessorServiceBuilder fromRow(byte[] startKey) { + return fromRow(startKey, true); + } + + /** + * @param startKey start region selection with region containing this row + * @param inclusive whether to include the startKey + */ + CoprocessorServiceBuilder fromRow(byte[] startKey, boolean inclusive); + + /** + * @param endKey select regions up to and including the region containing this row, exclusive. + */ + default CoprocessorServiceBuilder toRow(byte[] endKey) { + return toRow(endKey, false); + } + + /** + * @param endKey select regions up to and including the region containing this row + * @param inclusive whether to include the endKey + */ + CoprocessorServiceBuilder toRow(byte[] endKey, boolean inclusive); + + /** + * Execute the coprocessorService request. You can get the response through the + * {@link CoprocessorCallback}. + */ + void execute(); } /** - * Execute the given coprocessor call on the regions which are covered by the range from - * {@code startKey} and {@code endKey}. The inclusive of boundaries are specified by - * {@code startKeyInclusive} and {@code endKeyInclusive}. The {@code stubMaker} is just a - * delegation to the {@code xxxService.newStub} call. Usually it is only a one line lambda - * expression, like: + * Execute a coprocessor call on the regions which are covered by a range. + *

+ * Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it. + *

+ * The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it + * is only a one line lambda expression, like: * *

    * 
@@ -229,20 +255,9 @@ public interface RawAsyncTable extends AsyncTableBase {
    * @param stubMaker a delegation to the actual {@code newStub} call.
    * @param callable a delegation to the actual protobuf rpc call. See the comment of
    *          {@link CoprocessorCallable} for more details.
-   * @param startKey start region selection with region containing this row. If {@code null}, the
-   *          selection will start with the first table region.
-   * @param startKeyInclusive whether to include the startKey
-   * @param endKey select regions up to and including the region containing this row. If
-   *          {@code null}, selection will continue through the last table region.
-   * @param endKeyInclusive whether to include the endKey
    * @param callback callback to get the response. See the comment of {@link CoprocessorCallback}
    *          for more details.
-   * @param  the type of the asynchronous stub
-   * @param  the type of the return value
-   * @see CoprocessorCallable
-   * @see CoprocessorCallback
    */
-   void coprocessorService(Function stubMaker,
-      CoprocessorCallable callable, byte[] startKey, boolean startKeyInclusive, byte[] endKey,
-      boolean endKeyInclusive, CoprocessorCallback callback);
+   CoprocessorServiceBuilder coprocessorService(Function stubMaker,
+      CoprocessorCallable callable, CoprocessorCallback callback);
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 6107f7f15d4..d4de5737ed5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -18,8 +18,6 @@
 package org.apache.hadoop.hbase.client;
 
 import static java.util.stream.Collectors.toList;
-import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
-import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
 
@@ -29,7 +27,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,6 +35,7 @@ import java.util.function.Function;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
@@ -560,19 +558,64 @@ class RawAsyncTableImpl implements RawAsyncTable {
     });
   }
 
-  @Override
-  public  void coprocessorService(Function stubMaker,
-      CoprocessorCallable callable, byte[] startKey, boolean startKeyInclusive, byte[] endKey,
-      boolean endKeyInclusive, CoprocessorCallback callback) {
-    byte[] nonNullStartKey = Optional.ofNullable(startKey).orElse(EMPTY_START_ROW);
-    byte[] nonNullEndKey = Optional.ofNullable(endKey).orElse(EMPTY_END_ROW);
-    List locs = new ArrayList<>();
-    conn.getLocator()
-        .getRegionLocation(tableName, nonNullStartKey,
-          startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs)
-        .whenComplete(
-          (loc, error) -> onLocateComplete(stubMaker, callable, callback, locs, nonNullEndKey,
-            endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
+  private final class CoprocessorServiceBuilderImpl
+      implements CoprocessorServiceBuilder {
+
+    private final Function stubMaker;
+
+    private final CoprocessorCallable callable;
+
+    private final CoprocessorCallback callback;
+
+    private byte[] startKey = HConstants.EMPTY_START_ROW;
+
+    private boolean startKeyInclusive;
+
+    private byte[] endKey = HConstants.EMPTY_END_ROW;
+
+    private boolean endKeyInclusive;
+
+    public CoprocessorServiceBuilderImpl(Function stubMaker,
+        CoprocessorCallable callable, CoprocessorCallback callback) {
+      this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null");
+      this.callable = Preconditions.checkNotNull(callable, "callable is null");
+      this.callback = Preconditions.checkNotNull(callback, "callback is null");
+    }
+
+    @Override
+    public CoprocessorServiceBuilderImpl fromRow(byte[] startKey, boolean inclusive) {
+      this.startKey = Preconditions.checkNotNull(startKey,
+        "startKey is null. Consider using" +
+            " an empty byte array, or just do not call this method if you want to start selection" +
+            " from the first region");
+      this.startKeyInclusive = inclusive;
+      return this;
+    }
+
+    @Override
+    public CoprocessorServiceBuilderImpl toRow(byte[] endKey, boolean inclusive) {
+      this.endKey = Preconditions.checkNotNull(endKey,
+        "endKey is null. Consider using" +
+            " an empty byte array, or just do not call this method if you want to continue" +
+            " selection to the last region");
+      this.endKeyInclusive = inclusive;
+      return this;
+    }
+
+    @Override
+    public void execute() {
+      conn.getLocator().getRegionLocation(tableName, startKey,
+        startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs)
+          .whenComplete(
+            (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(),
+              endKey, endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
+    }
   }
 
+  @Override
+  public  CoprocessorServiceBuilder coprocessorService(
+      Function stubMaker, CoprocessorCallable callable,
+      CoprocessorCallback callback) {
+    return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
+  }
 }
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
index 51c8248fffd..ff9b873b130 100644
--- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.client.coprocessor;
 import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
 import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB;
 
+import com.google.protobuf.Message;
+
 import java.io.IOException;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -29,6 +31,7 @@ import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.RawAsyncTable;
 import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallback;
 import org.apache.hadoop.hbase.client.RawScanResultConsumer;
@@ -43,8 +46,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import com.google.protobuf.Message;
-
 /**
  * This client class is for invoking the aggregate functions deployed on the Region Server side via
  * the AggregateService. This class will implement the supporting functionality for
@@ -120,6 +121,10 @@ public class AsyncAggregationClient {
     return ci.getPromotedValueFromProto(t);
   }
 
+  private static byte[] nullToEmpty(byte[] b) {
+    return b != null ? b : HConstants.EMPTY_BYTE_ARRAY;
+  }
+
   public static  CompletableFuture
       max(RawAsyncTable table, ColumnInterpreter ci, Scan scan) {
     CompletableFuture future = new CompletableFuture<>();
@@ -149,10 +154,11 @@ public class AsyncAggregationClient {
         return max;
       }
     };
-    table.coprocessorService(channel -> AggregateService.newStub(channel),
-      (stub, controller, rpcCallback) -> stub.getMax(controller, req, rpcCallback),
-      scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
-      callback);
+    table
+        . coprocessorService(AggregateService::newStub,
+          (stub, controller, rpcCallback) -> stub.getMax(controller, req, rpcCallback), callback)
+        .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
+        .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
     return future;
   }
 
@@ -185,10 +191,11 @@ public class AsyncAggregationClient {
         return min;
       }
     };
-    table.coprocessorService(channel -> AggregateService.newStub(channel),
-      (stub, controller, rpcCallback) -> stub.getMin(controller, req, rpcCallback),
-      scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
-      callback);
+    table
+        . coprocessorService(AggregateService::newStub,
+          (stub, controller, rpcCallback) -> stub.getMin(controller, req, rpcCallback), callback)
+        .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
+        .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
     return future;
   }
 
@@ -217,10 +224,11 @@ public class AsyncAggregationClient {
         return count;
       }
     };
-    table.coprocessorService(channel -> AggregateService.newStub(channel),
-      (stub, controller, rpcCallback) -> stub.getRowNum(controller, req, rpcCallback),
-      scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
-      callback);
+    table
+        . coprocessorService(AggregateService::newStub,
+          (stub, controller, rpcCallback) -> stub.getRowNum(controller, req, rpcCallback), callback)
+        .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
+        .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
     return future;
   }
 
@@ -251,10 +259,11 @@ public class AsyncAggregationClient {
         return sum;
       }
     };
-    table.coprocessorService(channel -> AggregateService.newStub(channel),
-      (stub, controller, rpcCallback) -> stub.getSum(controller, req, rpcCallback),
-      scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
-      callback);
+    table
+        . coprocessorService(AggregateService::newStub,
+          (stub, controller, rpcCallback) -> stub.getSum(controller, req, rpcCallback), callback)
+        .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
+        .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
     return future;
   }
 
@@ -288,10 +297,11 @@ public class AsyncAggregationClient {
         return ci.divideForAvg(sum, count);
       }
     };
-    table.coprocessorService(channel -> AggregateService.newStub(channel),
-      (stub, controller, rpcCallback) -> stub.getAvg(controller, req, rpcCallback),
-      scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
-      callback);
+    table
+        . coprocessorService(AggregateService::newStub,
+          (stub, controller, rpcCallback) -> stub.getAvg(controller, req, rpcCallback), callback)
+        .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
+        .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
     return future;
   }
 
@@ -330,10 +340,11 @@ public class AsyncAggregationClient {
         return Math.sqrt(avgSq - avg * avg);
       }
     };
-    table.coprocessorService(channel -> AggregateService.newStub(channel),
-      (stub, controller, rpcCallback) -> stub.getStd(controller, req, rpcCallback),
-      scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
-      callback);
+    table
+        . coprocessorService(AggregateService::newStub,
+          (stub, controller, rpcCallback) -> stub.getStd(controller, req, rpcCallback), callback)
+        .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
+        .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
     return future;
   }
 
@@ -368,10 +379,11 @@ public class AsyncAggregationClient {
             return map;
           }
         };
-    table.coprocessorService(channel -> AggregateService.newStub(channel),
-      (stub, controller, rpcCallback) -> stub.getMedian(controller, req, rpcCallback),
-      scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
-      callback);
+    table
+        . coprocessorService(AggregateService::newStub,
+          (stub, controller, rpcCallback) -> stub.getMedian(controller, req, rpcCallback), callback)
+        .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
+        .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
     return future;
   }