HBASE-19764 Fix Checkstyle errors in hbase-endpoint

Signed-off-by: Jan Hentschel <jan.hentschel@ultratendency.com>
This commit is contained in:
Guangxu Cheng 2018-06-04 14:35:01 +08:00 committed by Jan Hentschel
parent daad14428d
commit 7357b0ce9f
21 changed files with 478 additions and 431 deletions

View File

@ -69,6 +69,13 @@
<groupId>net.revelc.code</groupId>
<artifactId>warbucks-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
<failOnViolation>true</failOnViolation>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<plugins>

View File

@ -1,5 +1,4 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -16,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.coprocessor;
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
@ -42,9 +40,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
@ -58,6 +53,9 @@ import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRespo
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This client class is for invoking the aggregate functions deployed on the
@ -135,7 +133,7 @@ public class AggregationClient implements Closeable {
/**
* Constructor with Conf object
* @param cfg
* @param cfg Configuration to use
*/
public AggregationClient(Configuration cfg) {
try {
@ -157,17 +155,16 @@ public class AggregationClient implements Closeable {
* It gives the maximum value of a column for a given column family for the
* given range. In case qualifier is null, a max of all values for the given
* family is returned.
* @param tableName
* @param ci
* @param scan
* @param tableName the name of the table to scan
* @param ci the user's ColumnInterpreter implementation
* @param scan the HBase scan object to use to read data from HBase
* @return max val &lt;R&gt;
* @throws Throwable
* The caller is supposed to handle the exception as they are thrown
* @throws Throwable The caller is supposed to handle the exception as they are thrown
* &amp; propagated to it.
*/
public <R, S, P extends Message, Q extends Message, T extends Message> R max(
final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
try (Table table = connection.getTable(tableName)) {
return max(table, ci, scan);
}
@ -177,17 +174,16 @@ public class AggregationClient implements Closeable {
* It gives the maximum value of a column for a given column family for the
* given range. In case qualifier is null, a max of all values for the given
* family is returned.
* @param table
* @param ci
* @param scan
* @param table table to scan.
* @param ci the user's ColumnInterpreter implementation
* @param scan the HBase scan object to use to read data from HBase
* @return max val &lt;&gt;
* @throws Throwable
* The caller is supposed to handle the exception as they are thrown
* @throws Throwable The caller is supposed to handle the exception as they are thrown
* &amp; propagated to it.
*/
public <R, S, P extends Message, Q extends Message, T extends Message>
R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
final Scan scan) throws Throwable {
R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
class MaxCallBack implements Batch.Callback<R> {
R max = null;
@ -225,21 +221,20 @@ public class AggregationClient implements Closeable {
return aMaxCallBack.getMax();
}
/**
* It gives the minimum value of a column for a given column family for the
* given range. In case qualifier is null, a min of all values for the given
* family is returned.
* @param tableName
* @param ci
* @param scan
* @param tableName the name of the table to scan
* @param ci the user's ColumnInterpreter implementation
* @param scan the HBase scan object to use to read data from HBase
* @return min val &lt;R&gt;
* @throws Throwable
* @throws Throwable The caller is supposed to handle the exception as they are thrown
* &amp; propagated to it.
*/
public <R, S, P extends Message, Q extends Message, T extends Message> R min(
final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
try (Table table = connection.getTable(tableName)) {
return min(table, ci, scan);
}
@ -249,18 +244,18 @@ public class AggregationClient implements Closeable {
* It gives the minimum value of a column for a given column family for the
* given range. In case qualifier is null, a min of all values for the given
* family is returned.
* @param table
* @param ci
* @param scan
* @param table table to scan.
* @param ci the user's ColumnInterpreter implementation
* @param scan the HBase scan object to use to read data from HBase
* @return min val &lt;R&gt;
* @throws Throwable
* @throws Throwable The caller is supposed to handle the exception as they are thrown
* &amp; propagated to it.
*/
public <R, S, P extends Message, Q extends Message, T extends Message>
R min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
final Scan scan) throws Throwable {
R min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
class MinCallBack implements Batch.Callback<R> {
private R min = null;
public R getMinimum() {
@ -272,10 +267,10 @@ public class AggregationClient implements Closeable {
min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min;
}
}
MinCallBack minCallBack = new MinCallBack();
table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
new Batch.Call<AggregateService, R>() {
@Override
public R call(AggregateService instance) throws IOException {
RpcController controller = new AggregationClientRpcController();
@ -305,17 +300,18 @@ public class AggregationClient implements Closeable {
* filter as it may set the flag to skip to next row, but the value read is
* not of the given filter: in this case, this particular row will not be
* counted ==&gt; an error.
* @param tableName
* @param ci
* @param scan
* @param tableName the name of the table to scan
* @param ci the user's ColumnInterpreter implementation
* @param scan the HBase scan object to use to read data from HBase
* @return &lt;R, S&gt;
* @throws Throwable
* @throws Throwable The caller is supposed to handle the exception as they are thrown
* &amp; propagated to it.
*/
public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(
final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
try (Table table = connection.getTable(tableName)) {
return rowCount(table, ci, scan);
return rowCount(table, ci, scan);
}
}
@ -326,15 +322,16 @@ public class AggregationClient implements Closeable {
* filter as it may set the flag to skip to next row, but the value read is
* not of the given filter: in this case, this particular row will not be
* counted ==&gt; an error.
* @param table
* @param ci
* @param scan
* @param table table to scan.
* @param ci the user's ColumnInterpreter implementation
* @param scan the HBase scan object to use to read data from HBase
* @return &lt;R, S&gt;
* @throws Throwable
* @throws Throwable The caller is supposed to handle the exception as they are thrown
* &amp; propagated to it.
*/
public <R, S, P extends Message, Q extends Message, T extends Message>
long rowCount(final Table table,
final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
long rowCount(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true);
class RowNumCallback implements Batch.Callback<Long> {
private final AtomicLong rowCountL = new AtomicLong(0);
@ -348,6 +345,7 @@ public class AggregationClient implements Closeable {
rowCountL.addAndGet(result.longValue());
}
}
RowNumCallback rowNum = new RowNumCallback();
table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
new Batch.Call<AggregateService, Long>() {
@ -373,32 +371,34 @@ public class AggregationClient implements Closeable {
/**
* It sums up the value returned from various regions. In case qualifier is
* null, summation of all the column qualifiers in the given family is done.
* @param tableName
* @param ci
* @param scan
* @param tableName the name of the table to scan
* @param ci the user's ColumnInterpreter implementation
* @param scan the HBase scan object to use to read data from HBase
* @return sum &lt;S&gt;
* @throws Throwable
* @throws Throwable The caller is supposed to handle the exception as they are thrown
* &amp; propagated to it.
*/
public <R, S, P extends Message, Q extends Message, T extends Message> S sum(
final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
throws Throwable {
try (Table table = connection.getTable(tableName)) {
return sum(table, ci, scan);
return sum(table, ci, scan);
}
}
/**
* It sums up the value returned from various regions. In case qualifier is
* null, summation of all the column qualifiers in the given family is done.
* @param table
* @param ci
* @param scan
* @param table table to scan.
* @param ci the user's ColumnInterpreter implementation
* @param scan the HBase scan object to use to read data from HBase
* @return sum &lt;S&gt;
* @throws Throwable
* @throws Throwable The caller is supposed to handle the exception as they are thrown
* &amp; propagated to it.
*/
public <R, S, P extends Message, Q extends Message, T extends Message>
S sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
final Scan scan) throws Throwable {
S sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
class SumCallBack implements Batch.Callback<S> {
@ -443,15 +443,16 @@ public class AggregationClient implements Closeable {
* It computes average while fetching sum and row count from all the
* corresponding regions. Approach is to compute a global sum of region level
* sum and rowcount and then compute the average.
* @param tableName
* @param scan
* @throws Throwable
* @param tableName the name of the table to scan
* @param scan the HBase scan object to use to read data from HBase
* @throws Throwable The caller is supposed to handle the exception as they are thrown
* &amp; propagated to it.
*/
private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
try (Table table = connection.getTable(tableName)) {
return getAvgArgs(table, ci, scan);
return getAvgArgs(table, ci, scan);
}
}
@ -459,17 +460,18 @@ public class AggregationClient implements Closeable {
* It computes average while fetching sum and row count from all the
* corresponding regions. Approach is to compute a global sum of region level
* sum and rowcount and then compute the average.
* @param table
* @param scan
* @throws Throwable
* @param table table to scan.
* @param scan the HBase scan object to use to read data from HBase
* @throws Throwable The caller is supposed to handle the exception as they are thrown
* &amp; propagated to it.
*/
private <R, S, P extends Message, Q extends Message, T extends Message>
Pair<S, Long> getAvgArgs(final Table table,
final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
Pair<S, Long> getAvgArgs(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
final Scan scan) throws Throwable {
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
S sum = null;
Long rowCount = 0l;
Long rowCount = 0L;
public synchronized Pair<S, Long> getAvgArgs() {
return new Pair<>(sum, rowCount);
@ -481,6 +483,7 @@ public class AggregationClient implements Closeable {
rowCount += result.getSecond();
}
}
AvgCallBack avgCallBack = new AvgCallBack();
table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
new Batch.Call<AggregateService, Pair<S, Long>>() {
@ -518,15 +521,16 @@ public class AggregationClient implements Closeable {
* its return type should be a decimal value, irrespective of what
* columninterpreter says. So, this methods collects the necessary parameters
* to compute the average and returs the double value.
* @param tableName
* @param ci
* @param scan
* @param tableName the name of the table to scan
* @param ci the user's ColumnInterpreter implementation
* @param scan the HBase scan object to use to read data from HBase
* @return &lt;R, S&gt;
* @throws Throwable
* @throws Throwable The caller is supposed to handle the exception as they are thrown
* &amp; propagated to it.
*/
public <R, S, P extends Message, Q extends Message, T extends Message>
double avg(final TableName tableName,
final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
double avg(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci,
Scan scan) throws Throwable {
Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
return ci.divideForAvg(p.getFirst(), p.getSecond());
}
@ -537,14 +541,16 @@ public class AggregationClient implements Closeable {
* its return type should be a decimal value, irrespective of what
* columninterpreter says. So, this methods collects the necessary parameters
* to compute the average and returs the double value.
* @param table
* @param ci
* @param scan
* @param table table to scan.
* @param ci the user's ColumnInterpreter implementation
* @param scan the HBase scan object to use to read data from HBase
* @return &lt;R, S&gt;
* @throws Throwable
* @throws Throwable The caller is supposed to handle the exception as they are thrown
* &amp; propagated to it.
*/
public <R, S, P extends Message, Q extends Message, T extends Message> double avg(
final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan)
throws Throwable {
Pair<S, Long> p = getAvgArgs(table, ci, scan);
return ci.divideForAvg(p.getFirst(), p.getSecond());
}
@ -555,17 +561,18 @@ public class AggregationClient implements Closeable {
* average*average). From individual regions, it obtains sum, square sum and
* number of rows. With these, the above values are computed to get the global
* std.
* @param table
* @param scan
* @param table table to scan.
* @param scan the HBase scan object to use to read data from HBase
* @return standard deviations
* @throws Throwable
* @throws Throwable The caller is supposed to handle the exception as they are thrown
* &amp; propagated to it.
*/
private <R, S, P extends Message, Q extends Message, T extends Message>
Pair<List<S>, Long> getStdArgs(final Table table,
final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
Pair<List<S>, Long> getStdArgs(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
final Scan scan) throws Throwable {
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
long rowCountVal = 0l;
long rowCountVal = 0L;
S sumVal = null, sumSqVal = null;
public synchronized Pair<List<S>, Long> getStdParams() {
@ -585,6 +592,7 @@ public class AggregationClient implements Closeable {
}
}
}
StdCallback stdCallback = new StdCallback();
table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
@ -626,17 +634,18 @@ public class AggregationClient implements Closeable {
* return type should be a decimal value, irrespective of what
* columninterpreter says. So, this methods collects the necessary parameters
* to compute the std and returns the double value.
* @param tableName
* @param ci
* @param scan
* @param tableName the name of the table to scan
* @param ci the user's ColumnInterpreter implementation
* @param scan the HBase scan object to use to read data from HBase
* @return &lt;R, S&gt;
* @throws Throwable
* @throws Throwable The caller is supposed to handle the exception as they are thrown
* &amp; propagated to it.
*/
public <R, S, P extends Message, Q extends Message, T extends Message>
double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
Scan scan) throws Throwable {
try (Table table = connection.getTable(tableName)) {
return std(table, ci, scan);
return std(table, ci, scan);
}
}
@ -646,11 +655,12 @@ public class AggregationClient implements Closeable {
* return type should be a decimal value, irrespective of what
* columninterpreter says. So, this methods collects the necessary parameters
* to compute the std and returns the double value.
* @param table
* @param ci
* @param scan
* @param table table to scan.
* @param ci the user's ColumnInterpreter implementation
* @param scan the HBase scan object to use to read data from HBase
* @return &lt;R, S&gt;
* @throws Throwable
* @throws Throwable The caller is supposed to handle the exception as they are thrown
* &amp; propagated to it.
*/
public <R, S, P extends Message, Q extends Message, T extends Message> double std(
final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
@ -667,17 +677,18 @@ public class AggregationClient implements Closeable {
* It helps locate the region with median for a given column whose weight
* is specified in an optional column.
* From individual regions, it obtains sum of values and sum of weights.
* @param table
* @param ci
* @param scan
* @param table table to scan.
* @param ci the user's ColumnInterpreter implementation
* @param scan the HBase scan object to use to read data from HBase
* @return pair whose first element is a map between start row of the region
* and (sum of values, sum of weights) for the region, the second element is
* (sum of values, sum of weights) for all the regions chosen
* @throws Throwable
* and (sum of values, sum of weights) for the region, the second element is
* (sum of values, sum of weights) for all the regions chosen
* @throws Throwable The caller is supposed to handle the exception as they are thrown
* &amp; propagated to it.
*/
private <R, S, P extends Message, Q extends Message, T extends Message>
Pair<NavigableMap<byte[], List<S>>, List<S>>
getMedianArgs(final Table table,
Pair<NavigableMap<byte[], List<S>>, List<S>>
getMedianArgs(final Table table,
final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
final NavigableMap<byte[], List<S>> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@ -731,17 +742,18 @@ public class AggregationClient implements Closeable {
* This is the client side interface/handler for calling the median method for a
* given cf-cq combination. This method collects the necessary parameters
* to compute the median and returns the median.
* @param tableName
* @param ci
* @param scan
* @param tableName the name of the table to scan
* @param ci the user's ColumnInterpreter implementation
* @param scan the HBase scan object to use to read data from HBase
* @return R the median
* @throws Throwable
* @throws Throwable The caller is supposed to handle the exception as they are thrown
* &amp; propagated to it.
*/
public <R, S, P extends Message, Q extends Message, T extends Message>
R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
Scan scan) throws Throwable {
try (Table table = connection.getTable(tableName)) {
return median(table, ci, scan);
return median(table, ci, scan);
}
}
@ -749,15 +761,15 @@ public class AggregationClient implements Closeable {
* This is the client side interface/handler for calling the median method for a
* given cf-cq combination. This method collects the necessary parameters
* to compute the median and returns the median.
* @param table
* @param ci
* @param scan
* @param table table to scan.
* @param ci the user's ColumnInterpreter implementation
* @param scan the HBase scan object to use to read data from HBase
* @return R the median
* @throws Throwable
* @throws Throwable The caller is supposed to handle the exception as they are thrown
* &amp; propagated to it.
*/
public <R, S, P extends Message, Q extends Message, T extends Message>
R median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci,
Scan scan) throws Throwable {
R median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan);
byte[] startRow = null;
byte[] colFamily = scan.getFamilies()[0];
@ -776,14 +788,19 @@ public class AggregationClient implements Closeable {
for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
if (newSumVal > halfSumVal) break; // we found the region with the median
if (newSumVal > halfSumVal) {
// we found the region with the median
break;
}
movingSumVal = newSumVal;
startRow = entry.getKey();
}
// scan the region with median and find it
Scan scan2 = new Scan(scan);
// inherit stop row from method parameter
if (startRow != null) scan2.setStartRow(startRow);
if (startRow != null) {
scan2.setStartRow(startRow);
}
ResultScanner scanner = null;
try {
int cacheSize = scan2.getCaching();
@ -815,8 +832,8 @@ public class AggregationClient implements Closeable {
movingSumVal = newSumVal;
kv = r.getColumnLatestCell(colFamily, qualifier);
value = ci.getValue(colFamily, qualifier, kv);
}
}
}
} while (results != null && results.length > 0);
} finally {
if (scanner != null) {

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -27,21 +27,22 @@ import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Helper class for constructing aggregation request and response.
*/
@InterfaceAudience.Private
public class AggregationHelper {
public final class AggregationHelper {
private AggregationHelper() {}
/**
* @param scan
* @param scan the HBase scan object to use to read data from HBase
* @param canFamilyBeAbsent whether column family can be absent in familyMap of scan
*/
private static void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException {
@ -64,8 +65,8 @@ public class AggregationHelper {
validateParameters(scan, canFamilyBeAbsent);
final AggregateRequest.Builder requestBuilder = AggregateRequest.newBuilder();
requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
P columnInterpreterSpecificData = null;
if ((columnInterpreterSpecificData = ci.getRequestData()) != null) {
P columnInterpreterSpecificData = ci.getRequestData();
if (columnInterpreterSpecificData != null) {
requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
}
requestBuilder.setScan(ProtobufUtil.toScan(scan));
@ -80,7 +81,7 @@ public class AggregationHelper {
* @param position the position of the argument in the class declaration
* @param b the ByteString which should be parsed to get the instance created
* @return the instance
* @throws IOException
* @throws IOException Either we couldn't instantiate the method object, or "parseFrom" failed.
*/
@SuppressWarnings("unchecked")
// Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO.

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -52,11 +52,11 @@ import org.apache.yetus.audience.InterfaceAudience;
* summing/processing the individual results obtained from the AggregateService for each region.
*/
@InterfaceAudience.Public
public class AsyncAggregationClient {
public final class AsyncAggregationClient {
private AsyncAggregationClient() {}
private static abstract class AbstractAggregationCallback<T>
implements CoprocessorCallback<AggregateResponse> {
private final CompletableFuture<T> future;
protected boolean finished = false;
@ -172,6 +172,7 @@ public class AsyncAggregationClient {
future.completeExceptionally(e);
return future;
}
AbstractAggregationCallback<R> callback = new AbstractAggregationCallback<R>(future) {
private R min;
@ -200,8 +201,8 @@ public class AsyncAggregationClient {
}
public static <R, S, P extends Message, Q extends Message, T extends Message>
CompletableFuture<Long>
rowCount(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<Long> rowCount(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci,
Scan scan) {
CompletableFuture<Long> future = new CompletableFuture<>();
AggregateRequest req;
try {
@ -243,7 +244,6 @@ public class AsyncAggregationClient {
return future;
}
AbstractAggregationCallback<S> callback = new AbstractAggregationCallback<S>(future) {
private S sum;
@Override
@ -268,8 +268,8 @@ public class AsyncAggregationClient {
}
public static <R, S, P extends Message, Q extends Message, T extends Message>
CompletableFuture<Double>
avg(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<Double> avg(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci,
Scan scan) {
CompletableFuture<Double> future = new CompletableFuture<>();
AggregateRequest req;
try {
@ -279,7 +279,6 @@ public class AsyncAggregationClient {
return future;
}
AbstractAggregationCallback<Double> callback = new AbstractAggregationCallback<Double>(future) {
private S sum;
long count = 0L;
@ -306,8 +305,8 @@ public class AsyncAggregationClient {
}
public static <R, S, P extends Message, Q extends Message, T extends Message>
CompletableFuture<Double>
std(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<Double> std(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci,
Scan scan) {
CompletableFuture<Double> future = new CompletableFuture<>();
AggregateRequest req;
try {
@ -365,20 +364,20 @@ public class AsyncAggregationClient {
AbstractAggregationCallback<NavigableMap<byte[], S>> callback =
new AbstractAggregationCallback<NavigableMap<byte[], S>>(future) {
private final NavigableMap<byte[], S> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
private final NavigableMap<byte[], S> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@Override
protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException {
if (resp.getFirstPartCount() > 0) {
map.put(region.getStartKey(), getPromotedValueFromProto(ci, resp, firstPartIndex));
}
@Override
protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException {
if (resp.getFirstPartCount() > 0) {
map.put(region.getStartKey(), getPromotedValueFromProto(ci, resp, firstPartIndex));
}
}
@Override
protected NavigableMap<byte[], S> getFinalResult() {
return map;
}
};
@Override
protected NavigableMap<byte[], S> getFinalResult() {
return map;
}
};
table
.<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
(stub, controller, rpcCallback) -> stub.getMedian(controller, req, rpcCallback), callback)
@ -388,8 +387,8 @@ public class AsyncAggregationClient {
}
private static <R, S, P extends Message, Q extends Message, T extends Message> void findMedian(
CompletableFuture<R> future, AsyncTable<AdvancedScanResultConsumer> table,
ColumnInterpreter<R, S, P, Q, T> ci, Scan scan, NavigableMap<byte[], S> sumByRegion) {
CompletableFuture<R> future, AsyncTable<AdvancedScanResultConsumer> table,
ColumnInterpreter<R, S, P, Q, T> ci, Scan scan, NavigableMap<byte[], S> sumByRegion) {
double halfSum = ci.divideForAvg(sumByRegion.values().stream().reduce(ci::add).get(), 2L);
S movingSum = null;
byte[] startRow = null;
@ -411,7 +410,6 @@ public class AsyncAggregationClient {
byte[] weightQualifier = qualifiers.last();
byte[] valueQualifier = qualifiers.first();
table.scan(scan, new AdvancedScanResultConsumer() {
private S sum = baseSum;
private R value = null;
@ -457,8 +455,8 @@ public class AsyncAggregationClient {
}
public static <R, S, P extends Message, Q extends Message, T extends Message>
CompletableFuture<R> median(AsyncTable<AdvancedScanResultConsumer> table,
ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<R> median(AsyncTable<AdvancedScanResultConsumer> table,
ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<R> future = new CompletableFuture<>();
sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> {
if (error != null) {

View File

@ -1,5 +1,4 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -36,9 +35,6 @@ import java.util.NavigableSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
@ -47,6 +43,9 @@ import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateReque
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A concrete AggregateProtocol implementation. Its system level coprocessor
@ -62,7 +61,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
*/
@InterfaceAudience.Private
public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message>
extends AggregateService implements RegionCoprocessor {
extends AggregateService implements RegionCoprocessor {
protected static final Logger log = LoggerFactory.getLogger(AggregateImplementation.class);
private RegionCoprocessorEnvironment env;
@ -75,7 +74,7 @@ extends AggregateService implements RegionCoprocessor {
*/
@Override
public void getMax(RpcController controller, AggregateRequest request,
RpcCallback<AggregateResponse> done) {
RpcCallback<AggregateResponse> done) {
InternalScanner scanner = null;
AggregateResponse response = null;
T max = null;
@ -130,7 +129,7 @@ extends AggregateService implements RegionCoprocessor {
*/
@Override
public void getMin(RpcController controller, AggregateRequest request,
RpcCallback<AggregateResponse> done) {
RpcCallback<AggregateResponse> done) {
AggregateResponse response = null;
InternalScanner scanner = null;
T min = null;
@ -183,10 +182,10 @@ extends AggregateService implements RegionCoprocessor {
*/
@Override
public void getSum(RpcController controller, AggregateRequest request,
RpcCallback<AggregateResponse> done) {
RpcCallback<AggregateResponse> done) {
AggregateResponse response = null;
InternalScanner scanner = null;
long sum = 0l;
long sum = 0L;
try {
ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
S sumVal = null;
@ -206,8 +205,9 @@ extends AggregateService implements RegionCoprocessor {
int listSize = results.size();
for (int i = 0; i < listSize; i++) {
temp = ci.getValue(colFamily, qualifier, results.get(i));
if (temp != null)
if (temp != null) {
sumVal = ci.add(sumVal, ci.castToReturnType(temp));
}
}
results.clear();
} while (hasMoreRows);
@ -235,9 +235,9 @@ extends AggregateService implements RegionCoprocessor {
*/
@Override
public void getRowNum(RpcController controller, AggregateRequest request,
RpcCallback<AggregateResponse> done) {
RpcCallback<AggregateResponse> done) {
AggregateResponse response = null;
long counter = 0l;
long counter = 0L;
List<Cell> results = new ArrayList<>();
InternalScanner scanner = null;
try {
@ -250,8 +250,9 @@ extends AggregateService implements RegionCoprocessor {
if (qualifiers != null && !qualifiers.isEmpty()) {
qualifier = qualifiers.pollFirst();
}
if (scan.getFilter() == null && qualifier == null)
if (scan.getFilter() == null && qualifier == null) {
scan.setFilter(new FirstKeyOnlyFilter());
}
scanner = env.getRegion().getScanner(scan);
boolean hasMoreRows = false;
do {
@ -294,13 +295,13 @@ extends AggregateService implements RegionCoprocessor {
*/
@Override
public void getAvg(RpcController controller, AggregateRequest request,
RpcCallback<AggregateResponse> done) {
RpcCallback<AggregateResponse> done) {
AggregateResponse response = null;
InternalScanner scanner = null;
try {
ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
S sumVal = null;
Long rowCountVal = 0l;
Long rowCountVal = 0L;
Scan scan = ProtobufUtil.toScan(request.getScan());
scanner = env.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
@ -354,13 +355,13 @@ extends AggregateService implements RegionCoprocessor {
*/
@Override
public void getStd(RpcController controller, AggregateRequest request,
RpcCallback<AggregateResponse> done) {
RpcCallback<AggregateResponse> done) {
InternalScanner scanner = null;
AggregateResponse response = null;
try {
ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
S sumVal = null, sumSqVal = null, tempVal = null;
long rowCountVal = 0l;
long rowCountVal = 0L;
Scan scan = ProtobufUtil.toScan(request.getScan());
scanner = env.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
@ -419,7 +420,7 @@ extends AggregateService implements RegionCoprocessor {
*/
@Override
public void getMedian(RpcController controller, AggregateRequest request,
RpcCallback<AggregateResponse> done) {
RpcCallback<AggregateResponse> done) {
AggregateResponse response = null;
InternalScanner scanner = null;
try {
@ -526,5 +527,4 @@ extends AggregateService implements RegionCoprocessor {
public void stop(CoprocessorEnvironment env) throws IOException {
// nothing to do
}
}

View File

@ -18,6 +18,10 @@
*/
package org.apache.hadoop.hbase.coprocessor;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import java.io.Closeable;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
@ -77,10 +81,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
/**
* Export an HBase table. Writes content to sequence files up in HDFS. Use
* {@link Import} to read it back in again. It is implemented by the endpoint
@ -91,10 +91,10 @@ import com.google.protobuf.Service;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public class Export extends ExportProtos.ExportService implements RegionCoprocessor {
private static final Logger LOG = LoggerFactory.getLogger(Export.class);
private static final Class<? extends CompressionCodec> DEFAULT_CODEC = DefaultCodec.class;
private static final SequenceFile.CompressionType DEFAULT_TYPE = SequenceFile.CompressionType.RECORD;
private static final SequenceFile.CompressionType DEFAULT_TYPE =
SequenceFile.CompressionType.RECORD;
private RegionCoprocessorEnvironment env = null;
private UserProvider userProvider;
@ -110,11 +110,13 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
ExportUtils.usage("Wrong number of arguments: " + ArrayUtils.getLength(otherArgs));
return null;
}
Triple<TableName, Scan, Path> arguments = ExportUtils.getArgumentsFromCommandLine(conf, otherArgs);
Triple<TableName, Scan, Path> arguments =
ExportUtils.getArgumentsFromCommandLine(conf, otherArgs);
return run(conf, arguments.getFirst(), arguments.getSecond(), arguments.getThird());
}
public static Map<byte[], Response> run(final Configuration conf, TableName tableName, Scan scan, Path dir) throws Throwable {
public static Map<byte[], Response> run(final Configuration conf, TableName tableName,
Scan scan, Path dir) throws Throwable {
FileSystem fs = dir.getFileSystem(conf);
UserProvider userProvider = UserProvider.instantiate(conf);
checkDir(fs, dir);
@ -158,7 +160,8 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
}
}
private static SequenceFile.CompressionType getCompressionType(final ExportProtos.ExportRequest request) {
private static SequenceFile.CompressionType getCompressionType(
final ExportProtos.ExportRequest request) {
if (request.hasCompressType()) {
return SequenceFile.CompressionType.valueOf(request.getCompressType());
} else {
@ -166,11 +169,13 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
}
}
private static CompressionCodec getCompressionCodec(final Configuration conf, final ExportProtos.ExportRequest request) {
private static CompressionCodec getCompressionCodec(final Configuration conf,
final ExportProtos.ExportRequest request) {
try {
Class<? extends CompressionCodec> codecClass;
if (request.hasCompressCodec()) {
codecClass = conf.getClassByName(request.getCompressCodec()).asSubclass(CompressionCodec.class);
codecClass = conf.getClassByName(request.getCompressCodec())
.asSubclass(CompressionCodec.class);
} else {
codecClass = DEFAULT_CODEC;
}
@ -198,16 +203,17 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
rval.add(SequenceFile.Writer.valueClass(Result.class));
rval.add(getOutputPath(conf, info, request));
if (getCompression(request)) {
rval.add(SequenceFile.Writer.compression(getCompressionType(request), getCompressionCodec(conf, request)));
rval.add(SequenceFile.Writer.compression(getCompressionType(request),
getCompressionCodec(conf, request)));
} else {
rval.add(SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
}
return rval;
}
private static ExportProtos.ExportResponse processData(final Region region, final Configuration conf,
final UserProvider userProvider, final Scan scan, final Token userToken,
final List<SequenceFile.Writer.Option> opts) throws IOException {
private static ExportProtos.ExportResponse processData(final Region region,
final Configuration conf, final UserProvider userProvider, final Scan scan,
final Token userToken, final List<SequenceFile.Writer.Option> opts) throws IOException {
ScanCoprocessor cp = new ScanCoprocessor(region);
RegionScanner scanner = null;
try (RegionOp regionOp = new RegionOp(region);
@ -230,11 +236,15 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
}
Cell firstCell = cells.get(0);
for (Cell cell : cells) {
if (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength(),
cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) != 0) {
throw new IOException("Why the RegionScanner#nextRaw returns the data of different rows??"
+ " first row=" + Bytes.toHex(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength())
+ ", current row=" + Bytes.toHex(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
if (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(),
firstCell.getRowLength(), cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength()) != 0) {
throw new IOException("Why the RegionScanner#nextRaw returns the data of different"
+ " rows?? first row="
+ Bytes.toHex(firstCell.getRowArray(), firstCell.getRowOffset(),
firstCell.getRowLength())
+ ", current row="
+ Bytes.toHex(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
}
}
results.add(Result.create(cells));
@ -298,7 +308,6 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
return builder.build();
}
@Override
public void start(CoprocessorEnvironment environment) throws IOException {
if (environment instanceof RegionCoprocessorEnvironment) {
@ -323,7 +332,8 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
RpcCallback<ExportProtos.ExportResponse> done) {
Region region = env.getRegion();
Configuration conf = HBaseConfiguration.create(env.getConfiguration());
conf.setStrings("io.serializations", conf.get("io.serializations"), ResultSerialization.class.getName());
conf.setStrings("io.serializations", conf.get("io.serializations"),
ResultSerialization.class.getName());
try {
Scan scan = validateKey(region.getRegionInfo(), request);
Token userToken = null;
@ -344,7 +354,8 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
}
}
private Scan validateKey(final RegionInfo region, final ExportProtos.ExportRequest request) throws IOException {
private Scan validateKey(final RegionInfo region, final ExportProtos.ExportRequest request)
throws IOException {
Scan scan = ProtobufUtil.toScan(request.getScan());
byte[] regionStartKey = region.getStartKey();
byte[] originStartKey = scan.getStartRow();
@ -362,7 +373,6 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
}
private static class RegionOp implements Closeable {
private final Region region;
RegionOp(final Region region) throws IOException {
@ -377,7 +387,6 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
}
private static class ScanCoprocessor {
private final HRegion region;
ScanCoprocessor(final Region region) {
@ -439,17 +448,20 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
private static class SecureWriter implements Closeable {
private final PrivilegedWriter privilegedWriter;
SecureWriter(final Configuration conf, final UserProvider userProvider, final Token userToken,
final List<SequenceFile.Writer.Option> opts) throws IOException {
SecureWriter(final Configuration conf, final UserProvider userProvider,
final Token userToken, final List<SequenceFile.Writer.Option> opts)
throws IOException {
privilegedWriter = new PrivilegedWriter(getActiveUser(userProvider, userToken),
SequenceFile.createWriter(conf, opts.toArray(new SequenceFile.Writer.Option[opts.size()])));
SequenceFile.createWriter(conf,
opts.toArray(new SequenceFile.Writer.Option[opts.size()])));
}
void append(final Object key, final Object value) throws IOException {
privilegedWriter.append(key, value);
}
private static User getActiveUser(final UserProvider userProvider, final Token userToken) throws IOException {
private static User getActiveUser(final UserProvider userProvider, final Token userToken)
throws IOException {
User user = RpcServer.getRequestUser().orElse(userProvider.getCurrent());
if (user == null && userToken != null) {
LOG.warn("No found of user credentials, but a token was got from user request");
@ -465,7 +477,8 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
}
}
private static class PrivilegedWriter implements PrivilegedExceptionAction<Boolean>, Closeable {
private static class PrivilegedWriter implements PrivilegedExceptionAction<Boolean>,
Closeable {
private final User user;
private final SequenceFile.Writer out;
private Object key;
@ -502,8 +515,7 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
}
}
public static class Response {
public final static class Response {
private final long rowCount;
private final long cellCount;

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.hbase.security.access;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@ -45,9 +49,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -60,7 +61,6 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
@Deprecated
public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements RegionCoprocessor {
public static final long VERSION = 0L;
private static final Logger LOG = LoggerFactory.getLogger(SecureBulkLoadEndpoint.class);
@ -82,7 +82,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg
@Override
public void prepareBulkLoad(RpcController controller, PrepareBulkLoadRequest request,
RpcCallback<PrepareBulkLoadResponse> done) {
RpcCallback<PrepareBulkLoadResponse> done) {
try {
SecureBulkLoadManager secureBulkLoadManager = this.rsServices.getSecureBulkLoadManager();
@ -100,7 +100,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg
*/
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest
convert(PrepareBulkLoadRequest request)
throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException {
throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException {
byte [] bytes = request.toByteArray();
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest.Builder
builder =
@ -112,7 +112,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg
@Override
public void cleanupBulkLoad(RpcController controller, CleanupBulkLoadRequest request,
RpcCallback<CleanupBulkLoadResponse> done) {
RpcCallback<CleanupBulkLoadResponse> done) {
try {
SecureBulkLoadManager secureBulkLoadManager = this.rsServices.getSecureBulkLoadManager();
secureBulkLoadManager.cleanupBulkLoad((HRegion) this.env.getRegion(), convert(request));
@ -127,7 +127,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg
* Convert from CPEP protobuf 2.5 to internal protobuf 3.3.
*/
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest
convert(CleanupBulkLoadRequest request)
convert(CleanupBulkLoadRequest request)
throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException {
byte [] bytes = request.toByteArray();
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest.Builder
@ -140,7 +140,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg
@Override
public void secureBulkLoadHFiles(RpcController controller, SecureBulkLoadHFilesRequest request,
RpcCallback<SecureBulkLoadHFilesResponse> done) {
RpcCallback<SecureBulkLoadHFilesResponse> done) {
boolean loaded = false;
Map<byte[], List<Path>> map = null;
try {
@ -159,7 +159,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg
* Convert from CPEP protobuf 2.5 to internal protobuf 3.3.
*/
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest
convert(BulkLoadHFileRequest request)
convert(BulkLoadHFileRequest request)
throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException {
byte [] bytes = request.toByteArray();
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.Builder
@ -171,7 +171,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg
}
private BulkLoadHFileRequest ConvertSecureBulkLoadHFilesRequest(
SecureBulkLoadHFilesRequest request) {
SecureBulkLoadHFilesRequest request) {
BulkLoadHFileRequest.Builder bulkLoadHFileRequest = BulkLoadHFileRequest.newBuilder();
RegionSpecifier region =
ProtobufUtil.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, this.env

View File

@ -1,5 +1,4 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -18,6 +17,10 @@
*/
package org.apache.hadoop.hbase.coprocessor;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -36,15 +39,11 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
/**
* The aggregation implementation at a region.
*/
public class ColumnAggregationEndpoint extends ColumnAggregationService
implements RegionCoprocessor {
implements RegionCoprocessor {
private static final Logger LOG = LoggerFactory.getLogger(ColumnAggregationEndpoint.class);
private RegionCoprocessorEnvironment env = null;

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.coprocessor;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -27,31 +31,28 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumRequest;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumResponse;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
/**
* Test coprocessor endpoint that always returns {@code null} for requests to the last region
* in the table. This allows tests to provide assurance of correct {@code null} handling for
* response values.
*/
public class ColumnAggregationEndpointNullResponse
extends ColumnAggregationServiceNullResponse implements RegionCoprocessor {
public class ColumnAggregationEndpointNullResponse extends ColumnAggregationServiceNullResponse
implements RegionCoprocessor {
private static final Logger LOG =
LoggerFactory.getLogger(ColumnAggregationEndpointNullResponse.class);
private RegionCoprocessorEnvironment env = null;
@Override
public Iterable<Service> getServices() {
return Collections.singleton(this);

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.coprocessor;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -38,18 +42,14 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
/**
* Test coprocessor endpoint that always throws a {@link DoNotRetryIOException} for requests on
* the last region in the table. This allows tests to ensure correct error handling of
* coprocessor endpoints throwing exceptions.
*/
public class ColumnAggregationEndpointWithErrors
extends ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors
implements RegionCoprocessor {
extends ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors
implements RegionCoprocessor {
private static final Logger LOG =
LoggerFactory.getLogger(ColumnAggregationEndpointWithErrors.class);
@ -76,7 +76,7 @@ public class ColumnAggregationEndpointWithErrors
@Override
public void sum(RpcController controller, ColumnAggregationWithErrorsSumRequest request,
RpcCallback<ColumnAggregationWithErrorsSumResponse> done) {
RpcCallback<ColumnAggregationWithErrorsSumResponse> done) {
// aggregate at each region
Scan scan = new Scan();
// Family is required in pb. Qualifier is not.

View File

@ -15,12 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import java.io.IOException;
import java.util.Collections;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.RpcServer;
@ -32,16 +35,13 @@ import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestPro
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.util.Threads;
import java.io.IOException;
import java.util.Collections;
/**
* Test implementation of a coprocessor endpoint exposing the
* {@link TestRpcServiceProtos.TestProtobufRpcProto} service methods. For internal use by unit tests
* only.
*/
public class ProtobufCoprocessorService extends TestRpcServiceProtos.TestProtobufRpcProto
implements MasterCoprocessor, RegionCoprocessor {
implements MasterCoprocessor, RegionCoprocessor {
public ProtobufCoprocessorService() {}
@Override
@ -51,34 +51,34 @@ public class ProtobufCoprocessorService extends TestRpcServiceProtos.TestProtobu
@Override
public void ping(RpcController controller, TestProtos.EmptyRequestProto request,
RpcCallback<TestProtos.EmptyResponseProto> done) {
RpcCallback<TestProtos.EmptyResponseProto> done) {
done.run(TestProtos.EmptyResponseProto.getDefaultInstance());
}
@Override
public void echo(RpcController controller, TestProtos.EchoRequestProto request,
RpcCallback<TestProtos.EchoResponseProto> done) {
RpcCallback<TestProtos.EchoResponseProto> done) {
String message = request.getMessage();
done.run(TestProtos.EchoResponseProto.newBuilder().setMessage(message).build());
}
@Override
public void error(RpcController controller, TestProtos.EmptyRequestProto request,
RpcCallback<TestProtos.EmptyResponseProto> done) {
RpcCallback<TestProtos.EmptyResponseProto> done) {
CoprocessorRpcUtils.setControllerException(controller, new IOException("Test exception"));
done.run(null);
}
@Override
public void pause(RpcController controller, PauseRequestProto request,
RpcCallback<EmptyResponseProto> done) {
RpcCallback<EmptyResponseProto> done) {
Threads.sleepWithoutInterrupt(request.getMs());
done.run(EmptyResponseProto.getDefaultInstance());
}
@Override
public void addr(RpcController controller, EmptyRequestProto request,
RpcCallback<AddrResponseProto> done) {
RpcCallback<AddrResponseProto> done) {
done.run(AddrResponseProto.newBuilder()
.setAddr(RpcServer.getRemoteAddress().get().getHostAddress()).build());
}
@ -92,5 +92,4 @@ public class ProtobufCoprocessorService extends TestRpcServiceProtos.TestProtobu
public void stop(CoprocessorEnvironment env) throws IOException {
// To change body of implemented methods use File | Settings | File Templates.
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -53,7 +53,6 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
@Category({ ClientTests.class, MediumTests.class })
public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncCoprocessorEndpoint.class);
@ -80,8 +79,8 @@ public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
TestProtos.EchoResponseProto response =
admin
.<TestRpcServiceProtos.TestProtobufRpcProto.Stub, TestProtos.EchoResponseProto> coprocessorService(
TestRpcServiceProtos.TestProtobufRpcProto::newStub,
.<TestRpcServiceProtos.TestProtobufRpcProto.Stub, TestProtos.EchoResponseProto>
coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto::newStub,
(s, c, done) -> s.echo(c, request, done)).get();
assertEquals("hello", response.getMessage());
}
@ -91,8 +90,8 @@ public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
TestProtos.EmptyRequestProto emptyRequest = TestProtos.EmptyRequestProto.getDefaultInstance();
try {
admin
.<TestRpcServiceProtos.TestProtobufRpcProto.Stub, TestProtos.EmptyResponseProto> coprocessorService(
TestRpcServiceProtos.TestProtobufRpcProto::newStub,
.<TestRpcServiceProtos.TestProtobufRpcProto.Stub, TestProtos.EmptyResponseProto>
coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto::newStub,
(s, c, done) -> s.error(c, emptyRequest, done)).get();
fail("Should have thrown an exception");
} catch (Exception e) {
@ -106,7 +105,8 @@ public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
DummyRegionServerEndpointProtos.DummyResponse response =
admin
.<DummyRegionServerEndpointProtos.DummyService.Stub, DummyRegionServerEndpointProtos.DummyResponse> coprocessorService(
.<DummyRegionServerEndpointProtos.DummyService.Stub,
DummyRegionServerEndpointProtos.DummyResponse> coprocessorService(
DummyRegionServerEndpointProtos.DummyService::newStub,
(s, c, done) -> s.dummyCall(c, request, done), serverName).get();
assertEquals(DUMMY_VALUE, response.getValue());
@ -119,7 +119,8 @@ public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
try {
admin
.<DummyRegionServerEndpointProtos.DummyService.Stub, DummyRegionServerEndpointProtos.DummyResponse> coprocessorService(
.<DummyRegionServerEndpointProtos.DummyService.Stub,
DummyRegionServerEndpointProtos.DummyResponse> coprocessorService(
DummyRegionServerEndpointProtos.DummyService::newStub,
(s, c, done) -> s.dummyThrow(c, request, done), serverName).get();
fail("Should have thrown an exception");
@ -130,8 +131,7 @@ public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
}
public static class DummyRegionServerEndpoint extends DummyService
implements RegionServerCoprocessor {
implements RegionServerCoprocessor {
public DummyRegionServerEndpoint() {}
@Override

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -66,7 +66,6 @@ import org.slf4j.LoggerFactory;
*/
@Category({CoprocessorTests.class, MediumTests.class})
public class TestClassLoading {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestClassLoading.class);
@ -98,9 +97,7 @@ public class TestClassLoading {
private static Class<?> masterCoprocessor = TestMasterCoprocessor.class;
private static final String[] regionServerSystemCoprocessors =
new String[]{
regionServerCoprocessor.getSimpleName()
};
new String[]{ regionServerCoprocessor.getSimpleName() };
private static final String[] masterRegionServerSystemCoprocessors = new String[] {
regionCoprocessor1.getSimpleName(), MultiRowMutationEndpoint.class.getSimpleName(),
@ -211,7 +208,9 @@ public class TestClassLoading {
found2_k2 = found2_k2 && (conf.get("k2") != null);
found2_k3 = found2_k3 && (conf.get("k3") != null);
} else {
found2_k1 = found2_k2 = found2_k3 = false;
found2_k1 = false;
found2_k2 = false;
found2_k3 = false;
}
regionsActiveClassLoaders
.put(region, ((CoprocessorHost) region.getCoprocessorHost()).getExternalClassLoaders());
@ -571,6 +570,4 @@ public class TestClassLoading {
// Now wait a bit longer for the coprocessor hosts to load the CPs
Thread.sleep(1000);
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -67,7 +67,6 @@ import org.slf4j.LoggerFactory;
*/
@Category({CoprocessorTests.class, MediumTests.class})
public class TestCoprocessorEndpoint {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCoprocessorEndpoint.class);
@ -119,14 +118,14 @@ public class TestCoprocessorEndpoint {
}
private Map<byte [], Long> sum(final Table table, final byte [] family,
final byte [] qualifier, final byte [] start, final byte [] end)
throws ServiceException, Throwable {
final byte [] qualifier, final byte [] start, final byte [] end)
throws ServiceException, Throwable {
return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
start, end,
new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
@Override
public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
throws IOException {
throws IOException {
CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
ColumnAggregationProtos.SumRequest.Builder builder =
@ -191,26 +190,28 @@ public class TestCoprocessorEndpoint {
// scan: for all regions
final RpcController controller = new ServerRpcController();
table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
ROWS[0], ROWS[ROWS.length - 1],
new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
throws IOException {
LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
instance.echo(controller, request, callback);
TestProtos.EchoResponseProto response = callback.get();
LOG.debug("Batch.Call returning result " + response);
return response;
}
},
new Batch.Callback<TestProtos.EchoResponseProto>() {
public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
assertNotNull(result);
assertEquals("hello", result.getMessage());
results.put(region, result.getMessage());
}
ROWS[0], ROWS[ROWS.length - 1],
new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
@Override
public TestProtos.EchoResponseProto call(
TestRpcServiceProtos.TestProtobufRpcProto instance) throws IOException {
LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
instance.echo(controller, request, callback);
TestProtos.EchoResponseProto response = callback.get();
LOG.debug("Batch.Call returning result " + response);
return response;
}
},
new Batch.Callback<TestProtos.EchoResponseProto>() {
@Override
public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
assertNotNull(result);
assertEquals("hello", result.getMessage());
results.put(region, result.getMessage());
}
}
);
for (Map.Entry<byte[], String> e : results.entrySet()) {
LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
@ -224,26 +225,28 @@ public class TestCoprocessorEndpoint {
// scan: for region 2 and region 3
table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
ROWS[rowSeperator1], ROWS[ROWS.length - 1],
new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
throws IOException {
LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
instance.echo(controller, request, callback);
TestProtos.EchoResponseProto response = callback.get();
LOG.debug("Batch.Call returning result " + response);
return response;
}
},
new Batch.Callback<TestProtos.EchoResponseProto>() {
public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
assertNotNull(result);
assertEquals("hello", result.getMessage());
results.put(region, result.getMessage());
}
ROWS[rowSeperator1], ROWS[ROWS.length - 1],
new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
@Override
public TestProtos.EchoResponseProto call(
TestRpcServiceProtos.TestProtobufRpcProto instance) throws IOException {
LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
instance.echo(controller, request, callback);
TestProtos.EchoResponseProto response = callback.get();
LOG.debug("Batch.Call returning result " + response);
return response;
}
},
new Batch.Callback<TestProtos.EchoResponseProto>() {
@Override
public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
assertNotNull(result);
assertEquals("hello", result.getMessage());
results.put(region, result.getMessage());
}
}
);
for (Map.Entry<byte[], String> e : results.entrySet()) {
LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
@ -348,6 +351,4 @@ public class TestCoprocessorEndpoint {
}
return ret;
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -26,8 +26,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.*;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -38,9 +40,8 @@ import org.junit.experimental.categories.Category;
/**
* Tests to ensure that 2.0 is backward compatible in loading CoprocessorService.
*/
@Category({SmallTests.class})
@Category({MediumTests.class})
public class TestCoprocessorServiceBackwardCompatibility {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCoprocessorServiceBackwardCompatibility.class);

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -47,7 +47,6 @@ import org.junit.rules.TestName;
@Category({CoprocessorTests.class, MediumTests.class})
public class TestCoprocessorTableEndpoint {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCoprocessorTableEndpoint.class);
@ -81,7 +80,7 @@ public class TestCoprocessorTableEndpoint {
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
desc.addCoprocessor(org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName());
desc.addCoprocessor(ColumnAggregationEndpoint.class.getName());
createTable(desc);
verifyTable(tableName);
@ -96,7 +95,7 @@ public class TestCoprocessorTableEndpoint {
createTable(desc);
desc.addCoprocessor(org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName());
desc.addCoprocessor(ColumnAggregationEndpoint.class.getName());
updateTable(desc);
verifyTable(tableName);
@ -113,24 +112,24 @@ public class TestCoprocessorTableEndpoint {
private static Map<byte [], Long> sum(final Table table, final byte [] family,
final byte [] qualifier, final byte [] start, final byte [] end)
throws ServiceException, Throwable {
return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
start, end,
new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
@Override
public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
throws IOException {
CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
ColumnAggregationProtos.SumRequest.Builder builder =
ColumnAggregationProtos.SumRequest.newBuilder();
builder.setFamily(ByteString.copyFrom(family));
if (qualifier != null && qualifier.length > 0) {
builder.setQualifier(ByteString.copyFrom(qualifier));
new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
@Override
public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
throws IOException {
CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
ColumnAggregationProtos.SumRequest.Builder builder =
ColumnAggregationProtos.SumRequest.newBuilder();
builder.setFamily(ByteString.copyFrom(family));
if (qualifier != null && qualifier.length > 0) {
builder.setQualifier(ByteString.copyFrom(qualifier));
}
instance.sum(null, builder.build(), rpcCallback);
return rpcCallback.get().getSum();
}
instance.sum(null, builder.build(), rpcCallback);
return rpcCallback.get().getSum();
}
});
});
}
private static final void createTable(HTableDescriptor desc) throws Exception {

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -84,7 +84,6 @@ import org.slf4j.LoggerFactory;
*/
@Category({CoprocessorTests.class, MediumTests.class})
public class TestRowProcessorEndpoint {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRowProcessorEndpoint.class);
@ -219,8 +218,7 @@ public class TestRowProcessorEndpoint {
return result;
}
private void concurrentExec(
final Runnable task, final int numThreads) throws Throwable {
private void concurrentExec(final Runnable task, final int numThreads) throws Throwable {
startSignal = new CountDownLatch(numThreads);
doneSignal = new CountDownLatch(numThreads);
for (int i = 0; i < numThreads; ++i) {
@ -313,10 +311,10 @@ public class TestRowProcessorEndpoint {
* So they can be loaded with the endpoint on the coprocessor.
*/
public static class RowProcessorEndpoint<S extends Message,T extends Message>
extends BaseRowProcessorEndpoint<S,T> {
extends BaseRowProcessorEndpoint<S,T> {
public static class IncrementCounterProcessor extends
BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest,
IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> {
BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest,
IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> {
int counter = 0;
byte[] row = new byte[0];
@ -397,7 +395,7 @@ public class TestRowProcessorEndpoint {
}
public static class FriendsOfFriendsProcessor extends
BaseRowProcessor<FriendsOfFriendsProcessorRequest, FriendsOfFriendsProcessorResponse> {
BaseRowProcessor<FriendsOfFriendsProcessorRequest, FriendsOfFriendsProcessorResponse> {
byte[] row = null;
byte[] person = null;
final Set<String> result = new HashSet<>();
@ -482,7 +480,7 @@ public class TestRowProcessorEndpoint {
}
public static class RowSwapProcessor extends
BaseRowProcessor<RowSwapProcessorRequest, RowSwapProcessorResponse> {
BaseRowProcessor<RowSwapProcessorRequest, RowSwapProcessorResponse> {
byte[] row1 = new byte[0];
byte[] row2 = new byte[0];
@ -586,8 +584,7 @@ public class TestRowProcessorEndpoint {
}
public static class TimeoutProcessor extends
BaseRowProcessor<TimeoutProcessorRequest, TimeoutProcessorResponse> {
BaseRowProcessor<TimeoutProcessorRequest, TimeoutProcessorResponse> {
byte[] row = new byte[0];
/**
@ -643,8 +640,7 @@ public class TestRowProcessorEndpoint {
}
}
public static void doScan(
HRegion region, Scan scan, List<Cell> result) throws IOException {
public static void doScan(HRegion region, Scan scan, List<Cell> result) throws IOException {
InternalScanner scanner = null;
try {
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
@ -652,7 +648,9 @@ public class TestRowProcessorEndpoint {
result.clear();
scanner.next(result);
} finally {
if (scanner != null) scanner.close();
if (scanner != null) {
scanner.close();
}
}
}
}
@ -676,5 +674,4 @@ public class TestRowProcessorEndpoint {
out.append("]");
return out.toString();
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -87,7 +87,6 @@ import org.slf4j.LoggerFactory;
@Category({MediumTests.class})
public class TestSecureExport {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSecureExport.class);
@ -146,12 +145,16 @@ public class TestSecureExport {
USER_XO + "/" + LOCALHOST,
USER_NONE + "/" + LOCALHOST);
}
private static User getUserByLogin(final String user) throws IOException {
return User.create(UserGroupInformation.loginUserFromKeytabAndReturnUGI(getPrinciple(user), KEYTAB_FILE.getAbsolutePath()));
return User.create(UserGroupInformation.loginUserFromKeytabAndReturnUGI(
getPrinciple(user), KEYTAB_FILE.getAbsolutePath()));
}
private static String getPrinciple(final String user) {
return user + "/" + LOCALHOST + "@" + KDC.getRealm();
}
private static void setUpClusterKdc() throws Exception {
HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
HBaseKerberosUtils.setPrincipalForTesting(SERVER_PRINCIPAL + "@" + KDC.getRealm());
@ -160,30 +163,42 @@ public class TestSecureExport {
// the following key should be changed.
// 1) DFS_NAMENODE_USER_NAME_KEY -> DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY
// 2) DFS_DATANODE_USER_NAME_KEY -> DFS_DATANODE_KERBEROS_PRINCIPAL_KEY
UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, SERVER_PRINCIPAL + "@" + KDC.getRealm());
UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, SERVER_PRINCIPAL + "@" + KDC.getRealm());
UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY,
SERVER_PRINCIPAL + "@" + KDC.getRealm());
UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY,
SERVER_PRINCIPAL + "@" + KDC.getRealm());
UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
KEYTAB_FILE.getAbsolutePath());
UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY,
KEYTAB_FILE.getAbsolutePath());
// set yarn principal
UTIL.getConfiguration().set(YarnConfiguration.RM_PRINCIPAL, SERVER_PRINCIPAL + "@" + KDC.getRealm());
UTIL.getConfiguration().set(YarnConfiguration.NM_PRINCIPAL, SERVER_PRINCIPAL + "@" + KDC.getRealm());
UTIL.getConfiguration().set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, HTTP_PRINCIPAL + "@" + KDC.getRealm());
UTIL.getConfiguration().set(YarnConfiguration.RM_PRINCIPAL,
SERVER_PRINCIPAL + "@" + KDC.getRealm());
UTIL.getConfiguration().set(YarnConfiguration.NM_PRINCIPAL,
SERVER_PRINCIPAL + "@" + KDC.getRealm());
UTIL.getConfiguration().set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
HTTP_PRINCIPAL + "@" + KDC.getRealm());
UTIL.getConfiguration().setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
UTIL.getConfiguration().set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
UTIL.getConfiguration().set(DFSConfigKeys.DFS_HTTP_POLICY_KEY,
HttpConfig.Policy.HTTPS_ONLY.name());
UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, LOCALHOST + ":0");
UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, LOCALHOST + ":0");
File keystoresDir = new File(UTIL.getDataTestDir("keystore").toUri().getPath());
keystoresDir.mkdirs();
String sslConfDir = KeyStoreTestUtil.getClasspathDir(TestSecureExport.class);
KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, UTIL.getConfiguration(), false);
KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir,
UTIL.getConfiguration(), false);
UTIL.getConfiguration().setBoolean("ignore.secure.ports.for.testing", true);
UserGroupInformation.setConfiguration(UTIL.getConfiguration());
UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, UTIL.getConfiguration().get(
CoprocessorHost.REGION_COPROCESSOR_CONF_KEY) + "," + Export.class.getName());
UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
UTIL.getConfiguration().get(
CoprocessorHost.REGION_COPROCESSOR_CONF_KEY) + "," + Export.class.getName());
}
private static void addLabels(final Configuration conf, final List<String> users, final List<String> labels) throws Exception {
private static void addLabels(final Configuration conf, final List<String> users,
final List<String> labels) throws Exception {
PrivilegedExceptionAction<VisibilityLabelsProtos.VisibilityLabelsResponse> action
= () -> {
try (Connection conn = ConnectionFactory.createConnection(conf)) {
@ -207,19 +222,21 @@ public class TestSecureExport {
@After
public void cleanup() throws IOException {
}
private static void clearOutput(Path path) throws IOException {
FileSystem fs = path.getFileSystem(UTIL.getConfiguration());
if (fs.exists(path)) {
assertEquals(true, fs.delete(path, true));
}
}
/**
* Sets the security firstly for getting the correct default realm.
* @throws Exception
*/
@BeforeClass
public static void beforeClass() throws Exception {
UserProvider.setUserProviderForTesting(UTIL.getConfiguration(), HadoopSecurityEnabledUserProviderForTesting.class);
UserProvider.setUserProviderForTesting(UTIL.getConfiguration(),
HadoopSecurityEnabledUserProviderForTesting.class);
setUpKdcServer();
SecureTestUtil.enableSecurity(UTIL.getConfiguration());
UTIL.getConfiguration().setBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY, true);
@ -252,11 +269,9 @@ public class TestSecureExport {
/**
* Test the ExportEndpoint's access levels. The {@link Export} test is ignored
* since the access exceptions cannot be collected from the mappers.
*
* @throws java.io.IOException
*/
@Test
public void testAccessCase() throws IOException, Throwable {
public void testAccessCase() throws Throwable {
final String exportTable = name.getMethodName();
TableDescriptor exportHtd = TableDescriptorBuilder
.newBuilder(TableName.valueOf(name.getMethodName()))
@ -339,11 +354,13 @@ public class TestSecureExport {
SecureTestUtil.verifyAllowed(deleteAction, getUserByLogin(USER_OWNER));
fs.delete(openDir, true);
}
@Test
public void testVisibilityLabels() throws IOException, Throwable {
final String exportTable = name.getMethodName() + "_export";
final String importTable = name.getMethodName() + "_import";
final TableDescriptor exportHtd = TableDescriptorBuilder.newBuilder(TableName.valueOf(exportTable))
final TableDescriptor exportHtd = TableDescriptorBuilder
.newBuilder(TableName.valueOf(exportTable))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYA))
.setOwnerString(USER_OWNER)
.build();
@ -400,7 +417,8 @@ public class TestSecureExport {
}
};
SecureTestUtil.verifyAllowed(exportAction, getUserByLogin(USER_OWNER));
final TableDescriptor importHtd = TableDescriptorBuilder.newBuilder(TableName.valueOf(importTable))
final TableDescriptor importHtd = TableDescriptorBuilder
.newBuilder(TableName.valueOf(importTable))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYB))
.setOwnerString(USER_OWNER)
.build();
@ -411,7 +429,8 @@ public class TestSecureExport {
importTable,
output.toString()
};
assertEquals(0, ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args));
assertEquals(0, ToolRunner.run(
new Configuration(UTIL.getConfiguration()), new Import(), args));
return null;
};
SecureTestUtil.verifyAllowed(importAction, getUserByLogin(USER_OWNER));

View File

@ -15,17 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
@ -41,12 +38,14 @@ import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.token.Token;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Client proxy for SecureBulkLoadProtocol used in conjunction with SecureBulkLoadEndpoint
* @deprecated Use for backward compatibility testing only. Will be removed when
* SecureBulkLoadEndpoint is not supported.
*/
@Deprecated
@InterfaceAudience.Private
public class SecureBulkLoadEndpointClient {
private Table table;
@ -111,9 +110,8 @@ public class SecureBulkLoadEndpointClient {
}
public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
final Token<?> userToken,
final String bulkToken,
final byte[] startRow) throws IOException {
final Token<?> userToken, final String bulkToken, final byte[] startRow)
throws IOException {
// we never want to send a batch of HFiles to all regions, thus cannot call
// HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
try {
@ -162,5 +160,4 @@ public class SecureBulkLoadEndpointClient {
throw new IOException(throwable);
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegi
@Category({RegionServerTests.class, LargeTests.class})
@Ignore // BROKEN. FIX OR REMOVE.
public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionServerBulkLoad {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
@ -86,8 +85,8 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
final AtomicLong numCompactions = new AtomicLong();
private TableName tableName;
public AtomicHFileLoader(TableName tableName, TestContext ctx,
byte targetFamilies[][]) throws IOException {
public AtomicHFileLoader(TableName tableName, TestContext ctx, byte[][] targetFamilies)
throws IOException {
super(ctx);
this.tableName = tableName;
}
@ -114,19 +113,19 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
final String bulkToken = new SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName);
RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
ClientServiceCallable<Void> callable =
new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row " +
Bytes.toStringBinary(getRow()));
try (Table table = conn.getTable(getTableName())) {
boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths,
null, bulkToken, getLocation().getRegionInfo().getStartKey());
}
return null;
new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row " +
Bytes.toStringBinary(getRow()));
try (Table table = conn.getTable(getTableName())) {
boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths,
null, bulkToken, getLocation().getRegionInfo().getStartKey());
}
};
return null;
}
};
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
caller.callWithRetries(callable, Integer.MAX_VALUE);
@ -156,7 +155,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
}
void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
throws Exception {
throws Exception {
setupTable(tableName, 10);
TestContext ctx = new TestContext(UTIL.getConfiguration());

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -68,7 +68,6 @@ import org.slf4j.LoggerFactory;
@Category({RegionServerTests.class, MediumTests.class})
public class TestServerCustomProtocol {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestServerCustomProtocol.class);
@ -84,7 +83,9 @@ public class TestServerCustomProtocol {
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) return;
if (env instanceof RegionCoprocessorEnvironment) {
return;
}
throw new CoprocessorException("Must be loaded on a table region!");
}
@ -116,9 +117,13 @@ public class TestServerCustomProtocol {
@Override
public void hello(RpcController controller, HelloRequest request,
RpcCallback<HelloResponse> done) {
if (!request.hasName()) done.run(HelloResponse.newBuilder().setResponse(WHOAREYOU).build());
else if (request.getName().equals(NOBODY)) done.run(HelloResponse.newBuilder().build());
else done.run(HelloResponse.newBuilder().setResponse(HELLO + request.getName()).build());
if (!request.hasName()) {
done.run(HelloResponse.newBuilder().setResponse(WHOAREYOU).build());
} else if (request.getName().equals(NOBODY)) {
done.run(HelloResponse.newBuilder().build());
} else {
done.run(HelloResponse.newBuilder().setResponse(HELLO + request.getName()).build());
}
}
@Override
@ -153,19 +158,19 @@ public class TestServerCustomProtocol {
}
@Before
public void before() throws Exception {
public void before() throws Exception {
final byte[][] SPLIT_KEYS = new byte[][] { ROW_B, ROW_C };
Table table = util.createTable(TEST_TABLE, TEST_FAMILY, SPLIT_KEYS);
Put puta = new Put( ROW_A );
Put puta = new Put(ROW_A);
puta.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
table.put(puta);
Put putb = new Put( ROW_B );
Put putb = new Put(ROW_B);
putb.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
table.put(putb);
Put putc = new Put( ROW_C );
Put putc = new Put(ROW_C);
putc.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
table.put(putc);
}
@ -234,7 +239,7 @@ public class TestServerCustomProtocol {
}
private Map<byte [], String> hello(final Table table, final String send, final String response)
throws ServiceException, Throwable {
throws ServiceException, Throwable {
Map<byte [], String> results = hello(table, send);
for (Map.Entry<byte [], String> e: results.entrySet()) {
assertEquals("Invalid custom protocol response", response, e.getValue());
@ -243,13 +248,12 @@ public class TestServerCustomProtocol {
}
private Map<byte [], String> hello(final Table table, final String send)
throws ServiceException, Throwable {
throws ServiceException, Throwable {
return hello(table, send, null, null);
}
private Map<byte [], String> hello(final Table table, final String send, final byte [] start,
final byte [] end)
throws ServiceException, Throwable {
final byte [] end) throws ServiceException, Throwable {
return table.coprocessorService(PingProtos.PingService.class,
start, end,
new Batch.Call<PingProtos.PingService, String>() {
@ -258,7 +262,9 @@ public class TestServerCustomProtocol {
CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder();
if (send != null) builder.setName(send);
if (send != null) {
builder.setName(send);
}
instance.hello(null, builder.build(), rpcCallback);
PingProtos.HelloResponse r = rpcCallback.get();
return r != null && r.hasResponse()? r.getResponse(): null;
@ -267,8 +273,7 @@ public class TestServerCustomProtocol {
}
private Map<byte [], String> compoundOfHelloAndPing(final Table table, final byte [] start,
final byte [] end)
throws ServiceException, Throwable {
final byte [] end) throws ServiceException, Throwable {
return table.coprocessorService(PingProtos.PingService.class,
start, end,
new Batch.Call<PingProtos.PingService, String>() {
@ -286,9 +291,8 @@ public class TestServerCustomProtocol {
});
}
private Map<byte [], String> noop(final Table table, final byte [] start,
final byte [] end)
throws ServiceException, Throwable {
private Map<byte [], String> noop(final Table table, final byte [] start, final byte [] end)
throws ServiceException, Throwable {
return table.coprocessorService(PingProtos.PingService.class, start, end,
new Batch.Call<PingProtos.PingService, String>() {
@Override
@ -397,7 +401,7 @@ public class TestServerCustomProtocol {
}
private Map<byte [], String> ping(final Table table, final byte [] start, final byte [] end)
throws ServiceException, Throwable {
throws ServiceException, Throwable {
return table.coprocessorService(PingProtos.PingService.class, start, end,
new Batch.Call<PingProtos.PingService, String>() {
@Override
@ -410,8 +414,8 @@ public class TestServerCustomProtocol {
private static String doPing(PingProtos.PingService instance) throws IOException {
CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.PingResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback);
return rpcCallback.get().getPong();
instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback);
return rpcCallback.get().getPong();
}
@Test
@ -459,18 +463,17 @@ public class TestServerCustomProtocol {
}
}
private void verifyRegionResults(RegionLocator table,
Map<byte[],String> results, byte[] row) throws Exception {
private void verifyRegionResults(RegionLocator table, Map<byte[],String> results, byte[] row)
throws Exception {
verifyRegionResults(table, results, "pong", row);
}
private void verifyRegionResults(RegionLocator regionLocator,
Map<byte[], String> results, String expected, byte[] row)
throws Exception {
private void verifyRegionResults(RegionLocator regionLocator, Map<byte[], String> results,
String expected, byte[] row) throws Exception {
for (Map.Entry<byte [], String> e: results.entrySet()) {
LOG.info("row=" + Bytes.toString(row) + ", expected=" + expected +
", result key=" + Bytes.toString(e.getKey()) +
", value=" + e.getValue());
", result key=" + Bytes.toString(e.getKey()) +
", value=" + e.getValue());
}
HRegionLocation loc = regionLocator.getRegionLocation(row, true);
byte[] region = loc.getRegionInfo().getRegionName();