HBASE-19764 Fix Checkstyle errors in hbase-endpoint
Signed-off-by: Jan Hentschel <jan.hentschel@ultratendency.com>
This commit is contained in:
parent
e915024e3c
commit
0309ea6b41
|
@ -69,6 +69,13 @@
|
|||
<groupId>net.revelc.code</groupId>
|
||||
<artifactId>warbucks-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<configuration>
|
||||
<failOnViolation>true</failOnViolation>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -16,7 +15,6 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.client.coprocessor;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
|
||||
|
@ -42,9 +40,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
|
@ -58,6 +53,9 @@ import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRespo
|
|||
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This client class is for invoking the aggregate functions deployed on the
|
||||
|
@ -135,7 +133,7 @@ public class AggregationClient implements Closeable {
|
|||
|
||||
/**
|
||||
* Constructor with Conf object
|
||||
* @param cfg
|
||||
* @param cfg Configuration to use
|
||||
*/
|
||||
public AggregationClient(Configuration cfg) {
|
||||
try {
|
||||
|
@ -157,17 +155,16 @@ public class AggregationClient implements Closeable {
|
|||
* It gives the maximum value of a column for a given column family for the
|
||||
* given range. In case qualifier is null, a max of all values for the given
|
||||
* family is returned.
|
||||
* @param tableName
|
||||
* @param ci
|
||||
* @param scan
|
||||
* @param tableName the name of the table to scan
|
||||
* @param ci the user's ColumnInterpreter implementation
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @return max val <R>
|
||||
* @throws Throwable
|
||||
* The caller is supposed to handle the exception as they are thrown
|
||||
* @throws Throwable The caller is supposed to handle the exception as they are thrown
|
||||
* & propagated to it.
|
||||
*/
|
||||
public <R, S, P extends Message, Q extends Message, T extends Message> R max(
|
||||
final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
|
||||
throws Throwable {
|
||||
final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
|
||||
throws Throwable {
|
||||
try (Table table = connection.getTable(tableName)) {
|
||||
return max(table, ci, scan);
|
||||
}
|
||||
|
@ -177,17 +174,16 @@ public class AggregationClient implements Closeable {
|
|||
* It gives the maximum value of a column for a given column family for the
|
||||
* given range. In case qualifier is null, a max of all values for the given
|
||||
* family is returned.
|
||||
* @param table
|
||||
* @param ci
|
||||
* @param scan
|
||||
* @param table table to scan.
|
||||
* @param ci the user's ColumnInterpreter implementation
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @return max val <>
|
||||
* @throws Throwable
|
||||
* The caller is supposed to handle the exception as they are thrown
|
||||
* @throws Throwable The caller is supposed to handle the exception as they are thrown
|
||||
* & propagated to it.
|
||||
*/
|
||||
public <R, S, P extends Message, Q extends Message, T extends Message>
|
||||
R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
|
||||
final Scan scan) throws Throwable {
|
||||
R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
|
||||
throws Throwable {
|
||||
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
|
||||
class MaxCallBack implements Batch.Callback<R> {
|
||||
R max = null;
|
||||
|
@ -225,21 +221,20 @@ public class AggregationClient implements Closeable {
|
|||
return aMaxCallBack.getMax();
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* It gives the minimum value of a column for a given column family for the
|
||||
* given range. In case qualifier is null, a min of all values for the given
|
||||
* family is returned.
|
||||
* @param tableName
|
||||
* @param ci
|
||||
* @param scan
|
||||
* @param tableName the name of the table to scan
|
||||
* @param ci the user's ColumnInterpreter implementation
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @return min val <R>
|
||||
* @throws Throwable
|
||||
* @throws Throwable The caller is supposed to handle the exception as they are thrown
|
||||
* & propagated to it.
|
||||
*/
|
||||
public <R, S, P extends Message, Q extends Message, T extends Message> R min(
|
||||
final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
|
||||
throws Throwable {
|
||||
final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
|
||||
throws Throwable {
|
||||
try (Table table = connection.getTable(tableName)) {
|
||||
return min(table, ci, scan);
|
||||
}
|
||||
|
@ -249,18 +244,18 @@ public class AggregationClient implements Closeable {
|
|||
* It gives the minimum value of a column for a given column family for the
|
||||
* given range. In case qualifier is null, a min of all values for the given
|
||||
* family is returned.
|
||||
* @param table
|
||||
* @param ci
|
||||
* @param scan
|
||||
* @param table table to scan.
|
||||
* @param ci the user's ColumnInterpreter implementation
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @return min val <R>
|
||||
* @throws Throwable
|
||||
* @throws Throwable The caller is supposed to handle the exception as they are thrown
|
||||
* & propagated to it.
|
||||
*/
|
||||
public <R, S, P extends Message, Q extends Message, T extends Message>
|
||||
R min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
|
||||
final Scan scan) throws Throwable {
|
||||
R min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
|
||||
throws Throwable {
|
||||
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
|
||||
class MinCallBack implements Batch.Callback<R> {
|
||||
|
||||
private R min = null;
|
||||
|
||||
public R getMinimum() {
|
||||
|
@ -272,10 +267,10 @@ public class AggregationClient implements Closeable {
|
|||
min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min;
|
||||
}
|
||||
}
|
||||
|
||||
MinCallBack minCallBack = new MinCallBack();
|
||||
table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
|
||||
new Batch.Call<AggregateService, R>() {
|
||||
|
||||
@Override
|
||||
public R call(AggregateService instance) throws IOException {
|
||||
RpcController controller = new AggregationClientRpcController();
|
||||
|
@ -305,17 +300,18 @@ public class AggregationClient implements Closeable {
|
|||
* filter as it may set the flag to skip to next row, but the value read is
|
||||
* not of the given filter: in this case, this particular row will not be
|
||||
* counted ==> an error.
|
||||
* @param tableName
|
||||
* @param ci
|
||||
* @param scan
|
||||
* @param tableName the name of the table to scan
|
||||
* @param ci the user's ColumnInterpreter implementation
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @return <R, S>
|
||||
* @throws Throwable
|
||||
* @throws Throwable The caller is supposed to handle the exception as they are thrown
|
||||
* & propagated to it.
|
||||
*/
|
||||
public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(
|
||||
final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
|
||||
throws Throwable {
|
||||
final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
|
||||
throws Throwable {
|
||||
try (Table table = connection.getTable(tableName)) {
|
||||
return rowCount(table, ci, scan);
|
||||
return rowCount(table, ci, scan);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -326,15 +322,16 @@ public class AggregationClient implements Closeable {
|
|||
* filter as it may set the flag to skip to next row, but the value read is
|
||||
* not of the given filter: in this case, this particular row will not be
|
||||
* counted ==> an error.
|
||||
* @param table
|
||||
* @param ci
|
||||
* @param scan
|
||||
* @param table table to scan.
|
||||
* @param ci the user's ColumnInterpreter implementation
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @return <R, S>
|
||||
* @throws Throwable
|
||||
* @throws Throwable The caller is supposed to handle the exception as they are thrown
|
||||
* & propagated to it.
|
||||
*/
|
||||
public <R, S, P extends Message, Q extends Message, T extends Message>
|
||||
long rowCount(final Table table,
|
||||
final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
|
||||
long rowCount(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
|
||||
throws Throwable {
|
||||
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true);
|
||||
class RowNumCallback implements Batch.Callback<Long> {
|
||||
private final AtomicLong rowCountL = new AtomicLong(0);
|
||||
|
@ -348,6 +345,7 @@ public class AggregationClient implements Closeable {
|
|||
rowCountL.addAndGet(result.longValue());
|
||||
}
|
||||
}
|
||||
|
||||
RowNumCallback rowNum = new RowNumCallback();
|
||||
table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
|
||||
new Batch.Call<AggregateService, Long>() {
|
||||
|
@ -373,32 +371,34 @@ public class AggregationClient implements Closeable {
|
|||
/**
|
||||
* It sums up the value returned from various regions. In case qualifier is
|
||||
* null, summation of all the column qualifiers in the given family is done.
|
||||
* @param tableName
|
||||
* @param ci
|
||||
* @param scan
|
||||
* @param tableName the name of the table to scan
|
||||
* @param ci the user's ColumnInterpreter implementation
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @return sum <S>
|
||||
* @throws Throwable
|
||||
* @throws Throwable The caller is supposed to handle the exception as they are thrown
|
||||
* & propagated to it.
|
||||
*/
|
||||
public <R, S, P extends Message, Q extends Message, T extends Message> S sum(
|
||||
final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
|
||||
throws Throwable {
|
||||
throws Throwable {
|
||||
try (Table table = connection.getTable(tableName)) {
|
||||
return sum(table, ci, scan);
|
||||
return sum(table, ci, scan);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* It sums up the value returned from various regions. In case qualifier is
|
||||
* null, summation of all the column qualifiers in the given family is done.
|
||||
* @param table
|
||||
* @param ci
|
||||
* @param scan
|
||||
* @param table table to scan.
|
||||
* @param ci the user's ColumnInterpreter implementation
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @return sum <S>
|
||||
* @throws Throwable
|
||||
* @throws Throwable The caller is supposed to handle the exception as they are thrown
|
||||
* & propagated to it.
|
||||
*/
|
||||
public <R, S, P extends Message, Q extends Message, T extends Message>
|
||||
S sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
|
||||
final Scan scan) throws Throwable {
|
||||
S sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
|
||||
throws Throwable {
|
||||
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
|
||||
|
||||
class SumCallBack implements Batch.Callback<S> {
|
||||
|
@ -443,15 +443,16 @@ public class AggregationClient implements Closeable {
|
|||
* It computes average while fetching sum and row count from all the
|
||||
* corresponding regions. Approach is to compute a global sum of region level
|
||||
* sum and rowcount and then compute the average.
|
||||
* @param tableName
|
||||
* @param scan
|
||||
* @throws Throwable
|
||||
* @param tableName the name of the table to scan
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @throws Throwable The caller is supposed to handle the exception as they are thrown
|
||||
* & propagated to it.
|
||||
*/
|
||||
private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
|
||||
final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
|
||||
throws Throwable {
|
||||
try (Table table = connection.getTable(tableName)) {
|
||||
return getAvgArgs(table, ci, scan);
|
||||
return getAvgArgs(table, ci, scan);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -459,17 +460,18 @@ public class AggregationClient implements Closeable {
|
|||
* It computes average while fetching sum and row count from all the
|
||||
* corresponding regions. Approach is to compute a global sum of region level
|
||||
* sum and rowcount and then compute the average.
|
||||
* @param table
|
||||
* @param scan
|
||||
* @throws Throwable
|
||||
* @param table table to scan.
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @throws Throwable The caller is supposed to handle the exception as they are thrown
|
||||
* & propagated to it.
|
||||
*/
|
||||
private <R, S, P extends Message, Q extends Message, T extends Message>
|
||||
Pair<S, Long> getAvgArgs(final Table table,
|
||||
final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
|
||||
Pair<S, Long> getAvgArgs(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
|
||||
final Scan scan) throws Throwable {
|
||||
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
|
||||
class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
|
||||
S sum = null;
|
||||
Long rowCount = 0l;
|
||||
Long rowCount = 0L;
|
||||
|
||||
public synchronized Pair<S, Long> getAvgArgs() {
|
||||
return new Pair<>(sum, rowCount);
|
||||
|
@ -481,6 +483,7 @@ public class AggregationClient implements Closeable {
|
|||
rowCount += result.getSecond();
|
||||
}
|
||||
}
|
||||
|
||||
AvgCallBack avgCallBack = new AvgCallBack();
|
||||
table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
|
||||
new Batch.Call<AggregateService, Pair<S, Long>>() {
|
||||
|
@ -518,15 +521,16 @@ public class AggregationClient implements Closeable {
|
|||
* its return type should be a decimal value, irrespective of what
|
||||
* columninterpreter says. So, this methods collects the necessary parameters
|
||||
* to compute the average and returs the double value.
|
||||
* @param tableName
|
||||
* @param ci
|
||||
* @param scan
|
||||
* @param tableName the name of the table to scan
|
||||
* @param ci the user's ColumnInterpreter implementation
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @return <R, S>
|
||||
* @throws Throwable
|
||||
* @throws Throwable The caller is supposed to handle the exception as they are thrown
|
||||
* & propagated to it.
|
||||
*/
|
||||
public <R, S, P extends Message, Q extends Message, T extends Message>
|
||||
double avg(final TableName tableName,
|
||||
final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
|
||||
double avg(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci,
|
||||
Scan scan) throws Throwable {
|
||||
Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
|
||||
return ci.divideForAvg(p.getFirst(), p.getSecond());
|
||||
}
|
||||
|
@ -537,14 +541,16 @@ public class AggregationClient implements Closeable {
|
|||
* its return type should be a decimal value, irrespective of what
|
||||
* columninterpreter says. So, this methods collects the necessary parameters
|
||||
* to compute the average and returs the double value.
|
||||
* @param table
|
||||
* @param ci
|
||||
* @param scan
|
||||
* @param table table to scan.
|
||||
* @param ci the user's ColumnInterpreter implementation
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @return <R, S>
|
||||
* @throws Throwable
|
||||
* @throws Throwable The caller is supposed to handle the exception as they are thrown
|
||||
* & propagated to it.
|
||||
*/
|
||||
public <R, S, P extends Message, Q extends Message, T extends Message> double avg(
|
||||
final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
|
||||
final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan)
|
||||
throws Throwable {
|
||||
Pair<S, Long> p = getAvgArgs(table, ci, scan);
|
||||
return ci.divideForAvg(p.getFirst(), p.getSecond());
|
||||
}
|
||||
|
@ -555,17 +561,18 @@ public class AggregationClient implements Closeable {
|
|||
* average*average). From individual regions, it obtains sum, square sum and
|
||||
* number of rows. With these, the above values are computed to get the global
|
||||
* std.
|
||||
* @param table
|
||||
* @param scan
|
||||
* @param table table to scan.
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @return standard deviations
|
||||
* @throws Throwable
|
||||
* @throws Throwable The caller is supposed to handle the exception as they are thrown
|
||||
* & propagated to it.
|
||||
*/
|
||||
private <R, S, P extends Message, Q extends Message, T extends Message>
|
||||
Pair<List<S>, Long> getStdArgs(final Table table,
|
||||
final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
|
||||
Pair<List<S>, Long> getStdArgs(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
|
||||
final Scan scan) throws Throwable {
|
||||
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
|
||||
class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
|
||||
long rowCountVal = 0l;
|
||||
long rowCountVal = 0L;
|
||||
S sumVal = null, sumSqVal = null;
|
||||
|
||||
public synchronized Pair<List<S>, Long> getStdParams() {
|
||||
|
@ -585,6 +592,7 @@ public class AggregationClient implements Closeable {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
StdCallback stdCallback = new StdCallback();
|
||||
table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
|
||||
new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
|
||||
|
@ -626,17 +634,18 @@ public class AggregationClient implements Closeable {
|
|||
* return type should be a decimal value, irrespective of what
|
||||
* columninterpreter says. So, this methods collects the necessary parameters
|
||||
* to compute the std and returns the double value.
|
||||
* @param tableName
|
||||
* @param ci
|
||||
* @param scan
|
||||
* @param tableName the name of the table to scan
|
||||
* @param ci the user's ColumnInterpreter implementation
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @return <R, S>
|
||||
* @throws Throwable
|
||||
* @throws Throwable The caller is supposed to handle the exception as they are thrown
|
||||
* & propagated to it.
|
||||
*/
|
||||
public <R, S, P extends Message, Q extends Message, T extends Message>
|
||||
double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
|
||||
double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
|
||||
Scan scan) throws Throwable {
|
||||
try (Table table = connection.getTable(tableName)) {
|
||||
return std(table, ci, scan);
|
||||
return std(table, ci, scan);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -646,11 +655,12 @@ public class AggregationClient implements Closeable {
|
|||
* return type should be a decimal value, irrespective of what
|
||||
* columninterpreter says. So, this methods collects the necessary parameters
|
||||
* to compute the std and returns the double value.
|
||||
* @param table
|
||||
* @param ci
|
||||
* @param scan
|
||||
* @param table table to scan.
|
||||
* @param ci the user's ColumnInterpreter implementation
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @return <R, S>
|
||||
* @throws Throwable
|
||||
* @throws Throwable The caller is supposed to handle the exception as they are thrown
|
||||
* & propagated to it.
|
||||
*/
|
||||
public <R, S, P extends Message, Q extends Message, T extends Message> double std(
|
||||
final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
|
||||
|
@ -667,17 +677,18 @@ public class AggregationClient implements Closeable {
|
|||
* It helps locate the region with median for a given column whose weight
|
||||
* is specified in an optional column.
|
||||
* From individual regions, it obtains sum of values and sum of weights.
|
||||
* @param table
|
||||
* @param ci
|
||||
* @param scan
|
||||
* @param table table to scan.
|
||||
* @param ci the user's ColumnInterpreter implementation
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @return pair whose first element is a map between start row of the region
|
||||
* and (sum of values, sum of weights) for the region, the second element is
|
||||
* (sum of values, sum of weights) for all the regions chosen
|
||||
* @throws Throwable
|
||||
* and (sum of values, sum of weights) for the region, the second element is
|
||||
* (sum of values, sum of weights) for all the regions chosen
|
||||
* @throws Throwable The caller is supposed to handle the exception as they are thrown
|
||||
* & propagated to it.
|
||||
*/
|
||||
private <R, S, P extends Message, Q extends Message, T extends Message>
|
||||
Pair<NavigableMap<byte[], List<S>>, List<S>>
|
||||
getMedianArgs(final Table table,
|
||||
Pair<NavigableMap<byte[], List<S>>, List<S>>
|
||||
getMedianArgs(final Table table,
|
||||
final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
|
||||
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
|
||||
final NavigableMap<byte[], List<S>> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
|
@ -731,17 +742,18 @@ public class AggregationClient implements Closeable {
|
|||
* This is the client side interface/handler for calling the median method for a
|
||||
* given cf-cq combination. This method collects the necessary parameters
|
||||
* to compute the median and returns the median.
|
||||
* @param tableName
|
||||
* @param ci
|
||||
* @param scan
|
||||
* @param tableName the name of the table to scan
|
||||
* @param ci the user's ColumnInterpreter implementation
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @return R the median
|
||||
* @throws Throwable
|
||||
* @throws Throwable The caller is supposed to handle the exception as they are thrown
|
||||
* & propagated to it.
|
||||
*/
|
||||
public <R, S, P extends Message, Q extends Message, T extends Message>
|
||||
R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
|
||||
R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
|
||||
Scan scan) throws Throwable {
|
||||
try (Table table = connection.getTable(tableName)) {
|
||||
return median(table, ci, scan);
|
||||
return median(table, ci, scan);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -749,15 +761,15 @@ public class AggregationClient implements Closeable {
|
|||
* This is the client side interface/handler for calling the median method for a
|
||||
* given cf-cq combination. This method collects the necessary parameters
|
||||
* to compute the median and returns the median.
|
||||
* @param table
|
||||
* @param ci
|
||||
* @param scan
|
||||
* @param table table to scan.
|
||||
* @param ci the user's ColumnInterpreter implementation
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @return R the median
|
||||
* @throws Throwable
|
||||
* @throws Throwable The caller is supposed to handle the exception as they are thrown
|
||||
* & propagated to it.
|
||||
*/
|
||||
public <R, S, P extends Message, Q extends Message, T extends Message>
|
||||
R median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci,
|
||||
Scan scan) throws Throwable {
|
||||
R median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
|
||||
Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan);
|
||||
byte[] startRow = null;
|
||||
byte[] colFamily = scan.getFamilies()[0];
|
||||
|
@ -776,14 +788,19 @@ public class AggregationClient implements Closeable {
|
|||
for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
|
||||
S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
|
||||
double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
|
||||
if (newSumVal > halfSumVal) break; // we found the region with the median
|
||||
if (newSumVal > halfSumVal) {
|
||||
// we found the region with the median
|
||||
break;
|
||||
}
|
||||
movingSumVal = newSumVal;
|
||||
startRow = entry.getKey();
|
||||
}
|
||||
// scan the region with median and find it
|
||||
Scan scan2 = new Scan(scan);
|
||||
// inherit stop row from method parameter
|
||||
if (startRow != null) scan2.setStartRow(startRow);
|
||||
if (startRow != null) {
|
||||
scan2.setStartRow(startRow);
|
||||
}
|
||||
ResultScanner scanner = null;
|
||||
try {
|
||||
int cacheSize = scan2.getCaching();
|
||||
|
@ -815,8 +832,8 @@ public class AggregationClient implements Closeable {
|
|||
movingSumVal = newSumVal;
|
||||
kv = r.getColumnLatestCell(colFamily, qualifier);
|
||||
value = ci.getValue(colFamily, qualifier, kv);
|
||||
}
|
||||
}
|
||||
}
|
||||
} while (results != null && results.length > 0);
|
||||
} finally {
|
||||
if (scanner != null) {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -27,21 +27,22 @@ import java.lang.reflect.ParameterizedType;
|
|||
import java.lang.reflect.Type;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Helper class for constructing aggregation request and response.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class AggregationHelper {
|
||||
public final class AggregationHelper {
|
||||
private AggregationHelper() {}
|
||||
|
||||
/**
|
||||
* @param scan
|
||||
* @param scan the HBase scan object to use to read data from HBase
|
||||
* @param canFamilyBeAbsent whether column family can be absent in familyMap of scan
|
||||
*/
|
||||
private static void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException {
|
||||
|
@ -64,8 +65,8 @@ public class AggregationHelper {
|
|||
validateParameters(scan, canFamilyBeAbsent);
|
||||
final AggregateRequest.Builder requestBuilder = AggregateRequest.newBuilder();
|
||||
requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
|
||||
P columnInterpreterSpecificData = null;
|
||||
if ((columnInterpreterSpecificData = ci.getRequestData()) != null) {
|
||||
P columnInterpreterSpecificData = ci.getRequestData();
|
||||
if (columnInterpreterSpecificData != null) {
|
||||
requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
|
||||
}
|
||||
requestBuilder.setScan(ProtobufUtil.toScan(scan));
|
||||
|
@ -80,7 +81,7 @@ public class AggregationHelper {
|
|||
* @param position the position of the argument in the class declaration
|
||||
* @param b the ByteString which should be parsed to get the instance created
|
||||
* @return the instance
|
||||
* @throws IOException
|
||||
* @throws IOException Either we couldn't instantiate the method object, or "parseFrom" failed.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
// Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO.
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -52,11 +52,11 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
* summing/processing the individual results obtained from the AggregateService for each region.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
public class AsyncAggregationClient {
|
||||
public final class AsyncAggregationClient {
|
||||
private AsyncAggregationClient() {}
|
||||
|
||||
private static abstract class AbstractAggregationCallback<T>
|
||||
implements CoprocessorCallback<AggregateResponse> {
|
||||
|
||||
private final CompletableFuture<T> future;
|
||||
|
||||
protected boolean finished = false;
|
||||
|
@ -172,6 +172,7 @@ public class AsyncAggregationClient {
|
|||
future.completeExceptionally(e);
|
||||
return future;
|
||||
}
|
||||
|
||||
AbstractAggregationCallback<R> callback = new AbstractAggregationCallback<R>(future) {
|
||||
|
||||
private R min;
|
||||
|
@ -200,8 +201,8 @@ public class AsyncAggregationClient {
|
|||
}
|
||||
|
||||
public static <R, S, P extends Message, Q extends Message, T extends Message>
|
||||
CompletableFuture<Long>
|
||||
rowCount(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
|
||||
CompletableFuture<Long> rowCount(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci,
|
||||
Scan scan) {
|
||||
CompletableFuture<Long> future = new CompletableFuture<>();
|
||||
AggregateRequest req;
|
||||
try {
|
||||
|
@ -243,7 +244,6 @@ public class AsyncAggregationClient {
|
|||
return future;
|
||||
}
|
||||
AbstractAggregationCallback<S> callback = new AbstractAggregationCallback<S>(future) {
|
||||
|
||||
private S sum;
|
||||
|
||||
@Override
|
||||
|
@ -268,8 +268,8 @@ public class AsyncAggregationClient {
|
|||
}
|
||||
|
||||
public static <R, S, P extends Message, Q extends Message, T extends Message>
|
||||
CompletableFuture<Double>
|
||||
avg(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
|
||||
CompletableFuture<Double> avg(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci,
|
||||
Scan scan) {
|
||||
CompletableFuture<Double> future = new CompletableFuture<>();
|
||||
AggregateRequest req;
|
||||
try {
|
||||
|
@ -279,7 +279,6 @@ public class AsyncAggregationClient {
|
|||
return future;
|
||||
}
|
||||
AbstractAggregationCallback<Double> callback = new AbstractAggregationCallback<Double>(future) {
|
||||
|
||||
private S sum;
|
||||
|
||||
long count = 0L;
|
||||
|
@ -306,8 +305,8 @@ public class AsyncAggregationClient {
|
|||
}
|
||||
|
||||
public static <R, S, P extends Message, Q extends Message, T extends Message>
|
||||
CompletableFuture<Double>
|
||||
std(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
|
||||
CompletableFuture<Double> std(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci,
|
||||
Scan scan) {
|
||||
CompletableFuture<Double> future = new CompletableFuture<>();
|
||||
AggregateRequest req;
|
||||
try {
|
||||
|
@ -365,20 +364,20 @@ public class AsyncAggregationClient {
|
|||
AbstractAggregationCallback<NavigableMap<byte[], S>> callback =
|
||||
new AbstractAggregationCallback<NavigableMap<byte[], S>>(future) {
|
||||
|
||||
private final NavigableMap<byte[], S> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
private final NavigableMap<byte[], S> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
@Override
|
||||
protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException {
|
||||
if (resp.getFirstPartCount() > 0) {
|
||||
map.put(region.getStartKey(), getPromotedValueFromProto(ci, resp, firstPartIndex));
|
||||
}
|
||||
@Override
|
||||
protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException {
|
||||
if (resp.getFirstPartCount() > 0) {
|
||||
map.put(region.getStartKey(), getPromotedValueFromProto(ci, resp, firstPartIndex));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NavigableMap<byte[], S> getFinalResult() {
|
||||
return map;
|
||||
}
|
||||
};
|
||||
@Override
|
||||
protected NavigableMap<byte[], S> getFinalResult() {
|
||||
return map;
|
||||
}
|
||||
};
|
||||
table
|
||||
.<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
|
||||
(stub, controller, rpcCallback) -> stub.getMedian(controller, req, rpcCallback), callback)
|
||||
|
@ -388,8 +387,8 @@ public class AsyncAggregationClient {
|
|||
}
|
||||
|
||||
private static <R, S, P extends Message, Q extends Message, T extends Message> void findMedian(
|
||||
CompletableFuture<R> future, AsyncTable<AdvancedScanResultConsumer> table,
|
||||
ColumnInterpreter<R, S, P, Q, T> ci, Scan scan, NavigableMap<byte[], S> sumByRegion) {
|
||||
CompletableFuture<R> future, AsyncTable<AdvancedScanResultConsumer> table,
|
||||
ColumnInterpreter<R, S, P, Q, T> ci, Scan scan, NavigableMap<byte[], S> sumByRegion) {
|
||||
double halfSum = ci.divideForAvg(sumByRegion.values().stream().reduce(ci::add).get(), 2L);
|
||||
S movingSum = null;
|
||||
byte[] startRow = null;
|
||||
|
@ -411,7 +410,6 @@ public class AsyncAggregationClient {
|
|||
byte[] weightQualifier = qualifiers.last();
|
||||
byte[] valueQualifier = qualifiers.first();
|
||||
table.scan(scan, new AdvancedScanResultConsumer() {
|
||||
|
||||
private S sum = baseSum;
|
||||
|
||||
private R value = null;
|
||||
|
@ -457,8 +455,8 @@ public class AsyncAggregationClient {
|
|||
}
|
||||
|
||||
public static <R, S, P extends Message, Q extends Message, T extends Message>
|
||||
CompletableFuture<R> median(AsyncTable<AdvancedScanResultConsumer> table,
|
||||
ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
|
||||
CompletableFuture<R> median(AsyncTable<AdvancedScanResultConsumer> table,
|
||||
ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
|
||||
CompletableFuture<R> future = new CompletableFuture<>();
|
||||
sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> {
|
||||
if (error != null) {
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -36,9 +35,6 @@ import java.util.NavigableSet;
|
|||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||
|
@ -47,6 +43,9 @@ import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateReque
|
|||
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* A concrete AggregateProtocol implementation. Its system level coprocessor
|
||||
|
@ -62,7 +61,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message>
|
||||
extends AggregateService implements RegionCoprocessor {
|
||||
extends AggregateService implements RegionCoprocessor {
|
||||
protected static final Logger log = LoggerFactory.getLogger(AggregateImplementation.class);
|
||||
private RegionCoprocessorEnvironment env;
|
||||
|
||||
|
@ -75,7 +74,7 @@ extends AggregateService implements RegionCoprocessor {
|
|||
*/
|
||||
@Override
|
||||
public void getMax(RpcController controller, AggregateRequest request,
|
||||
RpcCallback<AggregateResponse> done) {
|
||||
RpcCallback<AggregateResponse> done) {
|
||||
InternalScanner scanner = null;
|
||||
AggregateResponse response = null;
|
||||
T max = null;
|
||||
|
@ -130,7 +129,7 @@ extends AggregateService implements RegionCoprocessor {
|
|||
*/
|
||||
@Override
|
||||
public void getMin(RpcController controller, AggregateRequest request,
|
||||
RpcCallback<AggregateResponse> done) {
|
||||
RpcCallback<AggregateResponse> done) {
|
||||
AggregateResponse response = null;
|
||||
InternalScanner scanner = null;
|
||||
T min = null;
|
||||
|
@ -183,10 +182,10 @@ extends AggregateService implements RegionCoprocessor {
|
|||
*/
|
||||
@Override
|
||||
public void getSum(RpcController controller, AggregateRequest request,
|
||||
RpcCallback<AggregateResponse> done) {
|
||||
RpcCallback<AggregateResponse> done) {
|
||||
AggregateResponse response = null;
|
||||
InternalScanner scanner = null;
|
||||
long sum = 0l;
|
||||
long sum = 0L;
|
||||
try {
|
||||
ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
|
||||
S sumVal = null;
|
||||
|
@ -206,8 +205,9 @@ extends AggregateService implements RegionCoprocessor {
|
|||
int listSize = results.size();
|
||||
for (int i = 0; i < listSize; i++) {
|
||||
temp = ci.getValue(colFamily, qualifier, results.get(i));
|
||||
if (temp != null)
|
||||
if (temp != null) {
|
||||
sumVal = ci.add(sumVal, ci.castToReturnType(temp));
|
||||
}
|
||||
}
|
||||
results.clear();
|
||||
} while (hasMoreRows);
|
||||
|
@ -235,9 +235,9 @@ extends AggregateService implements RegionCoprocessor {
|
|||
*/
|
||||
@Override
|
||||
public void getRowNum(RpcController controller, AggregateRequest request,
|
||||
RpcCallback<AggregateResponse> done) {
|
||||
RpcCallback<AggregateResponse> done) {
|
||||
AggregateResponse response = null;
|
||||
long counter = 0l;
|
||||
long counter = 0L;
|
||||
List<Cell> results = new ArrayList<>();
|
||||
InternalScanner scanner = null;
|
||||
try {
|
||||
|
@ -250,8 +250,9 @@ extends AggregateService implements RegionCoprocessor {
|
|||
if (qualifiers != null && !qualifiers.isEmpty()) {
|
||||
qualifier = qualifiers.pollFirst();
|
||||
}
|
||||
if (scan.getFilter() == null && qualifier == null)
|
||||
if (scan.getFilter() == null && qualifier == null) {
|
||||
scan.setFilter(new FirstKeyOnlyFilter());
|
||||
}
|
||||
scanner = env.getRegion().getScanner(scan);
|
||||
boolean hasMoreRows = false;
|
||||
do {
|
||||
|
@ -294,13 +295,13 @@ extends AggregateService implements RegionCoprocessor {
|
|||
*/
|
||||
@Override
|
||||
public void getAvg(RpcController controller, AggregateRequest request,
|
||||
RpcCallback<AggregateResponse> done) {
|
||||
RpcCallback<AggregateResponse> done) {
|
||||
AggregateResponse response = null;
|
||||
InternalScanner scanner = null;
|
||||
try {
|
||||
ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
|
||||
S sumVal = null;
|
||||
Long rowCountVal = 0l;
|
||||
Long rowCountVal = 0L;
|
||||
Scan scan = ProtobufUtil.toScan(request.getScan());
|
||||
scanner = env.getRegion().getScanner(scan);
|
||||
byte[] colFamily = scan.getFamilies()[0];
|
||||
|
@ -354,13 +355,13 @@ extends AggregateService implements RegionCoprocessor {
|
|||
*/
|
||||
@Override
|
||||
public void getStd(RpcController controller, AggregateRequest request,
|
||||
RpcCallback<AggregateResponse> done) {
|
||||
RpcCallback<AggregateResponse> done) {
|
||||
InternalScanner scanner = null;
|
||||
AggregateResponse response = null;
|
||||
try {
|
||||
ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
|
||||
S sumVal = null, sumSqVal = null, tempVal = null;
|
||||
long rowCountVal = 0l;
|
||||
long rowCountVal = 0L;
|
||||
Scan scan = ProtobufUtil.toScan(request.getScan());
|
||||
scanner = env.getRegion().getScanner(scan);
|
||||
byte[] colFamily = scan.getFamilies()[0];
|
||||
|
@ -419,7 +420,7 @@ extends AggregateService implements RegionCoprocessor {
|
|||
*/
|
||||
@Override
|
||||
public void getMedian(RpcController controller, AggregateRequest request,
|
||||
RpcCallback<AggregateResponse> done) {
|
||||
RpcCallback<AggregateResponse> done) {
|
||||
AggregateResponse response = null;
|
||||
InternalScanner scanner = null;
|
||||
try {
|
||||
|
@ -526,5 +527,4 @@ extends AggregateService implements RegionCoprocessor {
|
|||
public void stop(CoprocessorEnvironment env) throws IOException {
|
||||
// nothing to do
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,6 +18,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
@ -77,10 +81,6 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
|
||||
/**
|
||||
* Export an HBase table. Writes content to sequence files up in HDFS. Use
|
||||
* {@link Import} to read it back in again. It is implemented by the endpoint
|
||||
|
@ -91,10 +91,10 @@ import com.google.protobuf.Service;
|
|||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
|
||||
@InterfaceStability.Evolving
|
||||
public class Export extends ExportProtos.ExportService implements RegionCoprocessor {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Export.class);
|
||||
private static final Class<? extends CompressionCodec> DEFAULT_CODEC = DefaultCodec.class;
|
||||
private static final SequenceFile.CompressionType DEFAULT_TYPE = SequenceFile.CompressionType.RECORD;
|
||||
private static final SequenceFile.CompressionType DEFAULT_TYPE =
|
||||
SequenceFile.CompressionType.RECORD;
|
||||
private RegionCoprocessorEnvironment env = null;
|
||||
private UserProvider userProvider;
|
||||
|
||||
|
@ -110,11 +110,13 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
|
|||
ExportUtils.usage("Wrong number of arguments: " + ArrayUtils.getLength(otherArgs));
|
||||
return null;
|
||||
}
|
||||
Triple<TableName, Scan, Path> arguments = ExportUtils.getArgumentsFromCommandLine(conf, otherArgs);
|
||||
Triple<TableName, Scan, Path> arguments =
|
||||
ExportUtils.getArgumentsFromCommandLine(conf, otherArgs);
|
||||
return run(conf, arguments.getFirst(), arguments.getSecond(), arguments.getThird());
|
||||
}
|
||||
|
||||
public static Map<byte[], Response> run(final Configuration conf, TableName tableName, Scan scan, Path dir) throws Throwable {
|
||||
public static Map<byte[], Response> run(final Configuration conf, TableName tableName,
|
||||
Scan scan, Path dir) throws Throwable {
|
||||
FileSystem fs = dir.getFileSystem(conf);
|
||||
UserProvider userProvider = UserProvider.instantiate(conf);
|
||||
checkDir(fs, dir);
|
||||
|
@ -158,7 +160,8 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
|
|||
}
|
||||
}
|
||||
|
||||
private static SequenceFile.CompressionType getCompressionType(final ExportProtos.ExportRequest request) {
|
||||
private static SequenceFile.CompressionType getCompressionType(
|
||||
final ExportProtos.ExportRequest request) {
|
||||
if (request.hasCompressType()) {
|
||||
return SequenceFile.CompressionType.valueOf(request.getCompressType());
|
||||
} else {
|
||||
|
@ -166,11 +169,13 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
|
|||
}
|
||||
}
|
||||
|
||||
private static CompressionCodec getCompressionCodec(final Configuration conf, final ExportProtos.ExportRequest request) {
|
||||
private static CompressionCodec getCompressionCodec(final Configuration conf,
|
||||
final ExportProtos.ExportRequest request) {
|
||||
try {
|
||||
Class<? extends CompressionCodec> codecClass;
|
||||
if (request.hasCompressCodec()) {
|
||||
codecClass = conf.getClassByName(request.getCompressCodec()).asSubclass(CompressionCodec.class);
|
||||
codecClass = conf.getClassByName(request.getCompressCodec())
|
||||
.asSubclass(CompressionCodec.class);
|
||||
} else {
|
||||
codecClass = DEFAULT_CODEC;
|
||||
}
|
||||
|
@ -198,16 +203,17 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
|
|||
rval.add(SequenceFile.Writer.valueClass(Result.class));
|
||||
rval.add(getOutputPath(conf, info, request));
|
||||
if (getCompression(request)) {
|
||||
rval.add(SequenceFile.Writer.compression(getCompressionType(request), getCompressionCodec(conf, request)));
|
||||
rval.add(SequenceFile.Writer.compression(getCompressionType(request),
|
||||
getCompressionCodec(conf, request)));
|
||||
} else {
|
||||
rval.add(SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
private static ExportProtos.ExportResponse processData(final Region region, final Configuration conf,
|
||||
final UserProvider userProvider, final Scan scan, final Token userToken,
|
||||
final List<SequenceFile.Writer.Option> opts) throws IOException {
|
||||
private static ExportProtos.ExportResponse processData(final Region region,
|
||||
final Configuration conf, final UserProvider userProvider, final Scan scan,
|
||||
final Token userToken, final List<SequenceFile.Writer.Option> opts) throws IOException {
|
||||
ScanCoprocessor cp = new ScanCoprocessor(region);
|
||||
RegionScanner scanner = null;
|
||||
try (RegionOp regionOp = new RegionOp(region);
|
||||
|
@ -230,11 +236,15 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
|
|||
}
|
||||
Cell firstCell = cells.get(0);
|
||||
for (Cell cell : cells) {
|
||||
if (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength(),
|
||||
cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) != 0) {
|
||||
throw new IOException("Why the RegionScanner#nextRaw returns the data of different rows??"
|
||||
+ " first row=" + Bytes.toHex(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength())
|
||||
+ ", current row=" + Bytes.toHex(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
|
||||
if (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(),
|
||||
firstCell.getRowLength(), cell.getRowArray(), cell.getRowOffset(),
|
||||
cell.getRowLength()) != 0) {
|
||||
throw new IOException("Why the RegionScanner#nextRaw returns the data of different"
|
||||
+ " rows?? first row="
|
||||
+ Bytes.toHex(firstCell.getRowArray(), firstCell.getRowOffset(),
|
||||
firstCell.getRowLength())
|
||||
+ ", current row="
|
||||
+ Bytes.toHex(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
|
||||
}
|
||||
}
|
||||
results.add(Result.create(cells));
|
||||
|
@ -298,7 +308,6 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
|
|||
return builder.build();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void start(CoprocessorEnvironment environment) throws IOException {
|
||||
if (environment instanceof RegionCoprocessorEnvironment) {
|
||||
|
@ -323,7 +332,8 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
|
|||
RpcCallback<ExportProtos.ExportResponse> done) {
|
||||
Region region = env.getRegion();
|
||||
Configuration conf = HBaseConfiguration.create(env.getConfiguration());
|
||||
conf.setStrings("io.serializations", conf.get("io.serializations"), ResultSerialization.class.getName());
|
||||
conf.setStrings("io.serializations", conf.get("io.serializations"),
|
||||
ResultSerialization.class.getName());
|
||||
try {
|
||||
Scan scan = validateKey(region.getRegionInfo(), request);
|
||||
Token userToken = null;
|
||||
|
@ -344,7 +354,8 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
|
|||
}
|
||||
}
|
||||
|
||||
private Scan validateKey(final RegionInfo region, final ExportProtos.ExportRequest request) throws IOException {
|
||||
private Scan validateKey(final RegionInfo region, final ExportProtos.ExportRequest request)
|
||||
throws IOException {
|
||||
Scan scan = ProtobufUtil.toScan(request.getScan());
|
||||
byte[] regionStartKey = region.getStartKey();
|
||||
byte[] originStartKey = scan.getStartRow();
|
||||
|
@ -362,7 +373,6 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
|
|||
}
|
||||
|
||||
private static class RegionOp implements Closeable {
|
||||
|
||||
private final Region region;
|
||||
|
||||
RegionOp(final Region region) throws IOException {
|
||||
|
@ -377,7 +387,6 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
|
|||
}
|
||||
|
||||
private static class ScanCoprocessor {
|
||||
|
||||
private final HRegion region;
|
||||
|
||||
ScanCoprocessor(final Region region) {
|
||||
|
@ -439,17 +448,20 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
|
|||
private static class SecureWriter implements Closeable {
|
||||
private final PrivilegedWriter privilegedWriter;
|
||||
|
||||
SecureWriter(final Configuration conf, final UserProvider userProvider, final Token userToken,
|
||||
final List<SequenceFile.Writer.Option> opts) throws IOException {
|
||||
SecureWriter(final Configuration conf, final UserProvider userProvider,
|
||||
final Token userToken, final List<SequenceFile.Writer.Option> opts)
|
||||
throws IOException {
|
||||
privilegedWriter = new PrivilegedWriter(getActiveUser(userProvider, userToken),
|
||||
SequenceFile.createWriter(conf, opts.toArray(new SequenceFile.Writer.Option[opts.size()])));
|
||||
SequenceFile.createWriter(conf,
|
||||
opts.toArray(new SequenceFile.Writer.Option[opts.size()])));
|
||||
}
|
||||
|
||||
void append(final Object key, final Object value) throws IOException {
|
||||
privilegedWriter.append(key, value);
|
||||
}
|
||||
|
||||
private static User getActiveUser(final UserProvider userProvider, final Token userToken) throws IOException {
|
||||
private static User getActiveUser(final UserProvider userProvider, final Token userToken)
|
||||
throws IOException {
|
||||
User user = RpcServer.getRequestUser().orElse(userProvider.getCurrent());
|
||||
if (user == null && userToken != null) {
|
||||
LOG.warn("No found of user credentials, but a token was got from user request");
|
||||
|
@ -465,7 +477,8 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
|
|||
}
|
||||
}
|
||||
|
||||
private static class PrivilegedWriter implements PrivilegedExceptionAction<Boolean>, Closeable {
|
||||
private static class PrivilegedWriter implements PrivilegedExceptionAction<Boolean>,
|
||||
Closeable {
|
||||
private final User user;
|
||||
private final SequenceFile.Writer out;
|
||||
private Object key;
|
||||
|
@ -502,8 +515,7 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
|
|||
}
|
||||
}
|
||||
|
||||
public static class Response {
|
||||
|
||||
public final static class Response {
|
||||
private final long rowCount;
|
||||
private final long cellCount;
|
||||
|
||||
|
|
|
@ -18,6 +18,10 @@
|
|||
|
||||
package org.apache.hadoop.hbase.security.access;
|
||||
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -45,9 +49,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
|
|||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
|
||||
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -60,7 +61,6 @@ import org.slf4j.LoggerFactory;
|
|||
@InterfaceAudience.Private
|
||||
@Deprecated
|
||||
public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements RegionCoprocessor {
|
||||
|
||||
public static final long VERSION = 0L;
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SecureBulkLoadEndpoint.class);
|
||||
|
@ -82,7 +82,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg
|
|||
|
||||
@Override
|
||||
public void prepareBulkLoad(RpcController controller, PrepareBulkLoadRequest request,
|
||||
RpcCallback<PrepareBulkLoadResponse> done) {
|
||||
RpcCallback<PrepareBulkLoadResponse> done) {
|
||||
try {
|
||||
SecureBulkLoadManager secureBulkLoadManager = this.rsServices.getSecureBulkLoadManager();
|
||||
|
||||
|
@ -100,7 +100,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg
|
|||
*/
|
||||
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest
|
||||
convert(PrepareBulkLoadRequest request)
|
||||
throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException {
|
||||
throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException {
|
||||
byte [] bytes = request.toByteArray();
|
||||
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest.Builder
|
||||
builder =
|
||||
|
@ -112,7 +112,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg
|
|||
|
||||
@Override
|
||||
public void cleanupBulkLoad(RpcController controller, CleanupBulkLoadRequest request,
|
||||
RpcCallback<CleanupBulkLoadResponse> done) {
|
||||
RpcCallback<CleanupBulkLoadResponse> done) {
|
||||
try {
|
||||
SecureBulkLoadManager secureBulkLoadManager = this.rsServices.getSecureBulkLoadManager();
|
||||
secureBulkLoadManager.cleanupBulkLoad((HRegion) this.env.getRegion(), convert(request));
|
||||
|
@ -127,7 +127,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg
|
|||
* Convert from CPEP protobuf 2.5 to internal protobuf 3.3.
|
||||
*/
|
||||
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest
|
||||
convert(CleanupBulkLoadRequest request)
|
||||
convert(CleanupBulkLoadRequest request)
|
||||
throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException {
|
||||
byte [] bytes = request.toByteArray();
|
||||
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest.Builder
|
||||
|
@ -140,7 +140,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg
|
|||
|
||||
@Override
|
||||
public void secureBulkLoadHFiles(RpcController controller, SecureBulkLoadHFilesRequest request,
|
||||
RpcCallback<SecureBulkLoadHFilesResponse> done) {
|
||||
RpcCallback<SecureBulkLoadHFilesResponse> done) {
|
||||
boolean loaded = false;
|
||||
Map<byte[], List<Path>> map = null;
|
||||
try {
|
||||
|
@ -159,7 +159,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg
|
|||
* Convert from CPEP protobuf 2.5 to internal protobuf 3.3.
|
||||
*/
|
||||
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest
|
||||
convert(BulkLoadHFileRequest request)
|
||||
convert(BulkLoadHFileRequest request)
|
||||
throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException {
|
||||
byte [] bytes = request.toByteArray();
|
||||
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.Builder
|
||||
|
@ -171,7 +171,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg
|
|||
}
|
||||
|
||||
private BulkLoadHFileRequest ConvertSecureBulkLoadHFilesRequest(
|
||||
SecureBulkLoadHFilesRequest request) {
|
||||
SecureBulkLoadHFilesRequest request) {
|
||||
BulkLoadHFileRequest.Builder bulkLoadHFileRequest = BulkLoadHFileRequest.newBuilder();
|
||||
RegionSpecifier region =
|
||||
ProtobufUtil.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, this.env
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -18,6 +17,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -36,15 +39,11 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
|
||||
/**
|
||||
* The aggregation implementation at a region.
|
||||
*/
|
||||
public class ColumnAggregationEndpoint extends ColumnAggregationService
|
||||
implements RegionCoprocessor {
|
||||
implements RegionCoprocessor {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ColumnAggregationEndpoint.class);
|
||||
private RegionCoprocessorEnvironment env = null;
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -17,6 +17,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -27,31 +31,28 @@ import org.apache.hadoop.hbase.CellUtil;
|
|||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumRequest;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumResponse;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
|
||||
/**
|
||||
* Test coprocessor endpoint that always returns {@code null} for requests to the last region
|
||||
* in the table. This allows tests to provide assurance of correct {@code null} handling for
|
||||
* response values.
|
||||
*/
|
||||
public class ColumnAggregationEndpointNullResponse
|
||||
extends ColumnAggregationServiceNullResponse implements RegionCoprocessor {
|
||||
public class ColumnAggregationEndpointNullResponse extends ColumnAggregationServiceNullResponse
|
||||
implements RegionCoprocessor {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ColumnAggregationEndpointNullResponse.class);
|
||||
|
||||
private RegionCoprocessorEnvironment env = null;
|
||||
|
||||
@Override
|
||||
public Iterable<Service> getServices() {
|
||||
return Collections.singleton(this);
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -17,6 +17,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -38,18 +42,14 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
|
||||
/**
|
||||
* Test coprocessor endpoint that always throws a {@link DoNotRetryIOException} for requests on
|
||||
* the last region in the table. This allows tests to ensure correct error handling of
|
||||
* coprocessor endpoints throwing exceptions.
|
||||
*/
|
||||
public class ColumnAggregationEndpointWithErrors
|
||||
extends ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors
|
||||
implements RegionCoprocessor {
|
||||
extends ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors
|
||||
implements RegionCoprocessor {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ColumnAggregationEndpointWithErrors.class);
|
||||
|
||||
|
@ -76,7 +76,7 @@ public class ColumnAggregationEndpointWithErrors
|
|||
|
||||
@Override
|
||||
public void sum(RpcController controller, ColumnAggregationWithErrorsSumRequest request,
|
||||
RpcCallback<ColumnAggregationWithErrorsSumResponse> done) {
|
||||
RpcCallback<ColumnAggregationWithErrorsSumResponse> done) {
|
||||
// aggregate at each region
|
||||
Scan scan = new Scan();
|
||||
// Family is required in pb. Qualifier is not.
|
||||
|
|
|
@ -15,12 +15,15 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
|
@ -32,16 +35,13 @@ import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestPro
|
|||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
/**
|
||||
* Test implementation of a coprocessor endpoint exposing the
|
||||
* {@link TestRpcServiceProtos.TestProtobufRpcProto} service methods. For internal use by unit tests
|
||||
* only.
|
||||
*/
|
||||
public class ProtobufCoprocessorService extends TestRpcServiceProtos.TestProtobufRpcProto
|
||||
implements MasterCoprocessor, RegionCoprocessor {
|
||||
implements MasterCoprocessor, RegionCoprocessor {
|
||||
public ProtobufCoprocessorService() {}
|
||||
|
||||
@Override
|
||||
|
@ -51,34 +51,34 @@ public class ProtobufCoprocessorService extends TestRpcServiceProtos.TestProtobu
|
|||
|
||||
@Override
|
||||
public void ping(RpcController controller, TestProtos.EmptyRequestProto request,
|
||||
RpcCallback<TestProtos.EmptyResponseProto> done) {
|
||||
RpcCallback<TestProtos.EmptyResponseProto> done) {
|
||||
done.run(TestProtos.EmptyResponseProto.getDefaultInstance());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void echo(RpcController controller, TestProtos.EchoRequestProto request,
|
||||
RpcCallback<TestProtos.EchoResponseProto> done) {
|
||||
RpcCallback<TestProtos.EchoResponseProto> done) {
|
||||
String message = request.getMessage();
|
||||
done.run(TestProtos.EchoResponseProto.newBuilder().setMessage(message).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void error(RpcController controller, TestProtos.EmptyRequestProto request,
|
||||
RpcCallback<TestProtos.EmptyResponseProto> done) {
|
||||
RpcCallback<TestProtos.EmptyResponseProto> done) {
|
||||
CoprocessorRpcUtils.setControllerException(controller, new IOException("Test exception"));
|
||||
done.run(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pause(RpcController controller, PauseRequestProto request,
|
||||
RpcCallback<EmptyResponseProto> done) {
|
||||
RpcCallback<EmptyResponseProto> done) {
|
||||
Threads.sleepWithoutInterrupt(request.getMs());
|
||||
done.run(EmptyResponseProto.getDefaultInstance());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addr(RpcController controller, EmptyRequestProto request,
|
||||
RpcCallback<AddrResponseProto> done) {
|
||||
RpcCallback<AddrResponseProto> done) {
|
||||
done.run(AddrResponseProto.newBuilder()
|
||||
.setAddr(RpcServer.getRemoteAddress().get().getHostAddress()).build());
|
||||
}
|
||||
|
@ -92,5 +92,4 @@ public class ProtobufCoprocessorService extends TestRpcServiceProtos.TestProtobu
|
|||
public void stop(CoprocessorEnvironment env) throws IOException {
|
||||
// To change body of implemented methods use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -53,7 +53,6 @@ import org.junit.runners.Parameterized;
|
|||
@RunWith(Parameterized.class)
|
||||
@Category({ ClientTests.class, MediumTests.class })
|
||||
public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestAsyncCoprocessorEndpoint.class);
|
||||
|
@ -80,8 +79,8 @@ public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
|
|||
TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
|
||||
TestProtos.EchoResponseProto response =
|
||||
admin
|
||||
.<TestRpcServiceProtos.TestProtobufRpcProto.Stub, TestProtos.EchoResponseProto> coprocessorService(
|
||||
TestRpcServiceProtos.TestProtobufRpcProto::newStub,
|
||||
.<TestRpcServiceProtos.TestProtobufRpcProto.Stub, TestProtos.EchoResponseProto>
|
||||
coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto::newStub,
|
||||
(s, c, done) -> s.echo(c, request, done)).get();
|
||||
assertEquals("hello", response.getMessage());
|
||||
}
|
||||
|
@ -91,8 +90,8 @@ public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
|
|||
TestProtos.EmptyRequestProto emptyRequest = TestProtos.EmptyRequestProto.getDefaultInstance();
|
||||
try {
|
||||
admin
|
||||
.<TestRpcServiceProtos.TestProtobufRpcProto.Stub, TestProtos.EmptyResponseProto> coprocessorService(
|
||||
TestRpcServiceProtos.TestProtobufRpcProto::newStub,
|
||||
.<TestRpcServiceProtos.TestProtobufRpcProto.Stub, TestProtos.EmptyResponseProto>
|
||||
coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto::newStub,
|
||||
(s, c, done) -> s.error(c, emptyRequest, done)).get();
|
||||
fail("Should have thrown an exception");
|
||||
} catch (Exception e) {
|
||||
|
@ -106,7 +105,8 @@ public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
|
|||
DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
|
||||
DummyRegionServerEndpointProtos.DummyResponse response =
|
||||
admin
|
||||
.<DummyRegionServerEndpointProtos.DummyService.Stub, DummyRegionServerEndpointProtos.DummyResponse> coprocessorService(
|
||||
.<DummyRegionServerEndpointProtos.DummyService.Stub,
|
||||
DummyRegionServerEndpointProtos.DummyResponse> coprocessorService(
|
||||
DummyRegionServerEndpointProtos.DummyService::newStub,
|
||||
(s, c, done) -> s.dummyCall(c, request, done), serverName).get();
|
||||
assertEquals(DUMMY_VALUE, response.getValue());
|
||||
|
@ -119,7 +119,8 @@ public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
|
|||
DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
|
||||
try {
|
||||
admin
|
||||
.<DummyRegionServerEndpointProtos.DummyService.Stub, DummyRegionServerEndpointProtos.DummyResponse> coprocessorService(
|
||||
.<DummyRegionServerEndpointProtos.DummyService.Stub,
|
||||
DummyRegionServerEndpointProtos.DummyResponse> coprocessorService(
|
||||
DummyRegionServerEndpointProtos.DummyService::newStub,
|
||||
(s, c, done) -> s.dummyThrow(c, request, done), serverName).get();
|
||||
fail("Should have thrown an exception");
|
||||
|
@ -130,8 +131,7 @@ public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
|
|||
}
|
||||
|
||||
public static class DummyRegionServerEndpoint extends DummyService
|
||||
implements RegionServerCoprocessor {
|
||||
|
||||
implements RegionServerCoprocessor {
|
||||
public DummyRegionServerEndpoint() {}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -66,7 +66,6 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
@Category({CoprocessorTests.class, MediumTests.class})
|
||||
public class TestClassLoading {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestClassLoading.class);
|
||||
|
@ -98,9 +97,7 @@ public class TestClassLoading {
|
|||
private static Class<?> masterCoprocessor = TestMasterCoprocessor.class;
|
||||
|
||||
private static final String[] regionServerSystemCoprocessors =
|
||||
new String[]{
|
||||
regionServerCoprocessor.getSimpleName()
|
||||
};
|
||||
new String[]{ regionServerCoprocessor.getSimpleName() };
|
||||
|
||||
private static final String[] masterRegionServerSystemCoprocessors = new String[] {
|
||||
regionCoprocessor1.getSimpleName(), MultiRowMutationEndpoint.class.getSimpleName(),
|
||||
|
@ -211,7 +208,9 @@ public class TestClassLoading {
|
|||
found2_k2 = found2_k2 && (conf.get("k2") != null);
|
||||
found2_k3 = found2_k3 && (conf.get("k3") != null);
|
||||
} else {
|
||||
found2_k1 = found2_k2 = found2_k3 = false;
|
||||
found2_k1 = false;
|
||||
found2_k2 = false;
|
||||
found2_k3 = false;
|
||||
}
|
||||
regionsActiveClassLoaders
|
||||
.put(region, ((CoprocessorHost) region.getCoprocessorHost()).getExternalClassLoaders());
|
||||
|
@ -571,6 +570,4 @@ public class TestClassLoading {
|
|||
// Now wait a bit longer for the coprocessor hosts to load the CPs
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -67,7 +67,6 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
@Category({CoprocessorTests.class, MediumTests.class})
|
||||
public class TestCoprocessorEndpoint {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestCoprocessorEndpoint.class);
|
||||
|
@ -119,14 +118,14 @@ public class TestCoprocessorEndpoint {
|
|||
}
|
||||
|
||||
private Map<byte [], Long> sum(final Table table, final byte [] family,
|
||||
final byte [] qualifier, final byte [] start, final byte [] end)
|
||||
throws ServiceException, Throwable {
|
||||
final byte [] qualifier, final byte [] start, final byte [] end)
|
||||
throws ServiceException, Throwable {
|
||||
return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
|
||||
start, end,
|
||||
new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
|
||||
@Override
|
||||
public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
|
||||
throws IOException {
|
||||
throws IOException {
|
||||
CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
|
||||
new CoprocessorRpcUtils.BlockingRpcCallback<>();
|
||||
ColumnAggregationProtos.SumRequest.Builder builder =
|
||||
|
@ -191,26 +190,28 @@ public class TestCoprocessorEndpoint {
|
|||
// scan: for all regions
|
||||
final RpcController controller = new ServerRpcController();
|
||||
table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
|
||||
ROWS[0], ROWS[ROWS.length - 1],
|
||||
new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
|
||||
public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
|
||||
throws IOException {
|
||||
LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
|
||||
CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
|
||||
new CoprocessorRpcUtils.BlockingRpcCallback<>();
|
||||
instance.echo(controller, request, callback);
|
||||
TestProtos.EchoResponseProto response = callback.get();
|
||||
LOG.debug("Batch.Call returning result " + response);
|
||||
return response;
|
||||
}
|
||||
},
|
||||
new Batch.Callback<TestProtos.EchoResponseProto>() {
|
||||
public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
|
||||
assertNotNull(result);
|
||||
assertEquals("hello", result.getMessage());
|
||||
results.put(region, result.getMessage());
|
||||
}
|
||||
ROWS[0], ROWS[ROWS.length - 1],
|
||||
new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
|
||||
@Override
|
||||
public TestProtos.EchoResponseProto call(
|
||||
TestRpcServiceProtos.TestProtobufRpcProto instance) throws IOException {
|
||||
LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
|
||||
CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
|
||||
new CoprocessorRpcUtils.BlockingRpcCallback<>();
|
||||
instance.echo(controller, request, callback);
|
||||
TestProtos.EchoResponseProto response = callback.get();
|
||||
LOG.debug("Batch.Call returning result " + response);
|
||||
return response;
|
||||
}
|
||||
},
|
||||
new Batch.Callback<TestProtos.EchoResponseProto>() {
|
||||
@Override
|
||||
public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
|
||||
assertNotNull(result);
|
||||
assertEquals("hello", result.getMessage());
|
||||
results.put(region, result.getMessage());
|
||||
}
|
||||
}
|
||||
);
|
||||
for (Map.Entry<byte[], String> e : results.entrySet()) {
|
||||
LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
|
||||
|
@ -224,26 +225,28 @@ public class TestCoprocessorEndpoint {
|
|||
|
||||
// scan: for region 2 and region 3
|
||||
table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
|
||||
ROWS[rowSeperator1], ROWS[ROWS.length - 1],
|
||||
new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
|
||||
public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
|
||||
throws IOException {
|
||||
LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
|
||||
CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
|
||||
new CoprocessorRpcUtils.BlockingRpcCallback<>();
|
||||
instance.echo(controller, request, callback);
|
||||
TestProtos.EchoResponseProto response = callback.get();
|
||||
LOG.debug("Batch.Call returning result " + response);
|
||||
return response;
|
||||
}
|
||||
},
|
||||
new Batch.Callback<TestProtos.EchoResponseProto>() {
|
||||
public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
|
||||
assertNotNull(result);
|
||||
assertEquals("hello", result.getMessage());
|
||||
results.put(region, result.getMessage());
|
||||
}
|
||||
ROWS[rowSeperator1], ROWS[ROWS.length - 1],
|
||||
new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
|
||||
@Override
|
||||
public TestProtos.EchoResponseProto call(
|
||||
TestRpcServiceProtos.TestProtobufRpcProto instance) throws IOException {
|
||||
LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
|
||||
CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
|
||||
new CoprocessorRpcUtils.BlockingRpcCallback<>();
|
||||
instance.echo(controller, request, callback);
|
||||
TestProtos.EchoResponseProto response = callback.get();
|
||||
LOG.debug("Batch.Call returning result " + response);
|
||||
return response;
|
||||
}
|
||||
},
|
||||
new Batch.Callback<TestProtos.EchoResponseProto>() {
|
||||
@Override
|
||||
public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
|
||||
assertNotNull(result);
|
||||
assertEquals("hello", result.getMessage());
|
||||
results.put(region, result.getMessage());
|
||||
}
|
||||
}
|
||||
);
|
||||
for (Map.Entry<byte[], String> e : results.entrySet()) {
|
||||
LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
|
||||
|
@ -348,6 +351,4 @@ public class TestCoprocessorEndpoint {
|
|||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -26,8 +26,10 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.*;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -38,9 +40,8 @@ import org.junit.experimental.categories.Category;
|
|||
/**
|
||||
* Tests to ensure that 2.0 is backward compatible in loading CoprocessorService.
|
||||
*/
|
||||
@Category({SmallTests.class})
|
||||
@Category({MediumTests.class})
|
||||
public class TestCoprocessorServiceBackwardCompatibility {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestCoprocessorServiceBackwardCompatibility.class);
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -47,7 +47,6 @@ import org.junit.rules.TestName;
|
|||
|
||||
@Category({CoprocessorTests.class, MediumTests.class})
|
||||
public class TestCoprocessorTableEndpoint {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestCoprocessorTableEndpoint.class);
|
||||
|
@ -81,7 +80,7 @@ public class TestCoprocessorTableEndpoint {
|
|||
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
|
||||
desc.addCoprocessor(org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName());
|
||||
desc.addCoprocessor(ColumnAggregationEndpoint.class.getName());
|
||||
|
||||
createTable(desc);
|
||||
verifyTable(tableName);
|
||||
|
@ -96,7 +95,7 @@ public class TestCoprocessorTableEndpoint {
|
|||
|
||||
createTable(desc);
|
||||
|
||||
desc.addCoprocessor(org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName());
|
||||
desc.addCoprocessor(ColumnAggregationEndpoint.class.getName());
|
||||
updateTable(desc);
|
||||
|
||||
verifyTable(tableName);
|
||||
|
@ -113,24 +112,24 @@ public class TestCoprocessorTableEndpoint {
|
|||
private static Map<byte [], Long> sum(final Table table, final byte [] family,
|
||||
final byte [] qualifier, final byte [] start, final byte [] end)
|
||||
throws ServiceException, Throwable {
|
||||
return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
|
||||
return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
|
||||
start, end,
|
||||
new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
|
||||
@Override
|
||||
public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
|
||||
throws IOException {
|
||||
CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
|
||||
new CoprocessorRpcUtils.BlockingRpcCallback<>();
|
||||
ColumnAggregationProtos.SumRequest.Builder builder =
|
||||
ColumnAggregationProtos.SumRequest.newBuilder();
|
||||
builder.setFamily(ByteString.copyFrom(family));
|
||||
if (qualifier != null && qualifier.length > 0) {
|
||||
builder.setQualifier(ByteString.copyFrom(qualifier));
|
||||
new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
|
||||
@Override
|
||||
public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
|
||||
throws IOException {
|
||||
CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
|
||||
new CoprocessorRpcUtils.BlockingRpcCallback<>();
|
||||
ColumnAggregationProtos.SumRequest.Builder builder =
|
||||
ColumnAggregationProtos.SumRequest.newBuilder();
|
||||
builder.setFamily(ByteString.copyFrom(family));
|
||||
if (qualifier != null && qualifier.length > 0) {
|
||||
builder.setQualifier(ByteString.copyFrom(qualifier));
|
||||
}
|
||||
instance.sum(null, builder.build(), rpcCallback);
|
||||
return rpcCallback.get().getSum();
|
||||
}
|
||||
instance.sum(null, builder.build(), rpcCallback);
|
||||
return rpcCallback.get().getSum();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private static final void createTable(HTableDescriptor desc) throws Exception {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -84,7 +84,6 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
@Category({CoprocessorTests.class, MediumTests.class})
|
||||
public class TestRowProcessorEndpoint {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRowProcessorEndpoint.class);
|
||||
|
@ -219,8 +218,7 @@ public class TestRowProcessorEndpoint {
|
|||
return result;
|
||||
}
|
||||
|
||||
private void concurrentExec(
|
||||
final Runnable task, final int numThreads) throws Throwable {
|
||||
private void concurrentExec(final Runnable task, final int numThreads) throws Throwable {
|
||||
startSignal = new CountDownLatch(numThreads);
|
||||
doneSignal = new CountDownLatch(numThreads);
|
||||
for (int i = 0; i < numThreads; ++i) {
|
||||
|
@ -313,10 +311,10 @@ public class TestRowProcessorEndpoint {
|
|||
* So they can be loaded with the endpoint on the coprocessor.
|
||||
*/
|
||||
public static class RowProcessorEndpoint<S extends Message,T extends Message>
|
||||
extends BaseRowProcessorEndpoint<S,T> {
|
||||
extends BaseRowProcessorEndpoint<S,T> {
|
||||
public static class IncrementCounterProcessor extends
|
||||
BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest,
|
||||
IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> {
|
||||
BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest,
|
||||
IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> {
|
||||
int counter = 0;
|
||||
byte[] row = new byte[0];
|
||||
|
||||
|
@ -397,7 +395,7 @@ public class TestRowProcessorEndpoint {
|
|||
}
|
||||
|
||||
public static class FriendsOfFriendsProcessor extends
|
||||
BaseRowProcessor<FriendsOfFriendsProcessorRequest, FriendsOfFriendsProcessorResponse> {
|
||||
BaseRowProcessor<FriendsOfFriendsProcessorRequest, FriendsOfFriendsProcessorResponse> {
|
||||
byte[] row = null;
|
||||
byte[] person = null;
|
||||
final Set<String> result = new HashSet<>();
|
||||
|
@ -482,7 +480,7 @@ public class TestRowProcessorEndpoint {
|
|||
}
|
||||
|
||||
public static class RowSwapProcessor extends
|
||||
BaseRowProcessor<RowSwapProcessorRequest, RowSwapProcessorResponse> {
|
||||
BaseRowProcessor<RowSwapProcessorRequest, RowSwapProcessorResponse> {
|
||||
byte[] row1 = new byte[0];
|
||||
byte[] row2 = new byte[0];
|
||||
|
||||
|
@ -586,8 +584,7 @@ public class TestRowProcessorEndpoint {
|
|||
}
|
||||
|
||||
public static class TimeoutProcessor extends
|
||||
BaseRowProcessor<TimeoutProcessorRequest, TimeoutProcessorResponse> {
|
||||
|
||||
BaseRowProcessor<TimeoutProcessorRequest, TimeoutProcessorResponse> {
|
||||
byte[] row = new byte[0];
|
||||
|
||||
/**
|
||||
|
@ -643,8 +640,7 @@ public class TestRowProcessorEndpoint {
|
|||
}
|
||||
}
|
||||
|
||||
public static void doScan(
|
||||
HRegion region, Scan scan, List<Cell> result) throws IOException {
|
||||
public static void doScan(HRegion region, Scan scan, List<Cell> result) throws IOException {
|
||||
InternalScanner scanner = null;
|
||||
try {
|
||||
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
|
||||
|
@ -652,7 +648,9 @@ public class TestRowProcessorEndpoint {
|
|||
result.clear();
|
||||
scanner.next(result);
|
||||
} finally {
|
||||
if (scanner != null) scanner.close();
|
||||
if (scanner != null) {
|
||||
scanner.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -676,5 +674,4 @@ public class TestRowProcessorEndpoint {
|
|||
out.append("]");
|
||||
return out.toString();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -87,7 +87,6 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
@Category({MediumTests.class})
|
||||
public class TestSecureExport {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestSecureExport.class);
|
||||
|
@ -146,12 +145,16 @@ public class TestSecureExport {
|
|||
USER_XO + "/" + LOCALHOST,
|
||||
USER_NONE + "/" + LOCALHOST);
|
||||
}
|
||||
|
||||
private static User getUserByLogin(final String user) throws IOException {
|
||||
return User.create(UserGroupInformation.loginUserFromKeytabAndReturnUGI(getPrinciple(user), KEYTAB_FILE.getAbsolutePath()));
|
||||
return User.create(UserGroupInformation.loginUserFromKeytabAndReturnUGI(
|
||||
getPrinciple(user), KEYTAB_FILE.getAbsolutePath()));
|
||||
}
|
||||
|
||||
private static String getPrinciple(final String user) {
|
||||
return user + "/" + LOCALHOST + "@" + KDC.getRealm();
|
||||
}
|
||||
|
||||
private static void setUpClusterKdc() throws Exception {
|
||||
HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
|
||||
HBaseKerberosUtils.setPrincipalForTesting(SERVER_PRINCIPAL + "@" + KDC.getRealm());
|
||||
|
@ -160,30 +163,42 @@ public class TestSecureExport {
|
|||
// the following key should be changed.
|
||||
// 1) DFS_NAMENODE_USER_NAME_KEY -> DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY
|
||||
// 2) DFS_DATANODE_USER_NAME_KEY -> DFS_DATANODE_KERBEROS_PRINCIPAL_KEY
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, SERVER_PRINCIPAL + "@" + KDC.getRealm());
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, SERVER_PRINCIPAL + "@" + KDC.getRealm());
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY,
|
||||
SERVER_PRINCIPAL + "@" + KDC.getRealm());
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY,
|
||||
SERVER_PRINCIPAL + "@" + KDC.getRealm());
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
|
||||
KEYTAB_FILE.getAbsolutePath());
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY,
|
||||
KEYTAB_FILE.getAbsolutePath());
|
||||
// set yarn principal
|
||||
UTIL.getConfiguration().set(YarnConfiguration.RM_PRINCIPAL, SERVER_PRINCIPAL + "@" + KDC.getRealm());
|
||||
UTIL.getConfiguration().set(YarnConfiguration.NM_PRINCIPAL, SERVER_PRINCIPAL + "@" + KDC.getRealm());
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, HTTP_PRINCIPAL + "@" + KDC.getRealm());
|
||||
UTIL.getConfiguration().set(YarnConfiguration.RM_PRINCIPAL,
|
||||
SERVER_PRINCIPAL + "@" + KDC.getRealm());
|
||||
UTIL.getConfiguration().set(YarnConfiguration.NM_PRINCIPAL,
|
||||
SERVER_PRINCIPAL + "@" + KDC.getRealm());
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
|
||||
HTTP_PRINCIPAL + "@" + KDC.getRealm());
|
||||
UTIL.getConfiguration().setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_HTTP_POLICY_KEY,
|
||||
HttpConfig.Policy.HTTPS_ONLY.name());
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, LOCALHOST + ":0");
|
||||
UTIL.getConfiguration().set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, LOCALHOST + ":0");
|
||||
|
||||
File keystoresDir = new File(UTIL.getDataTestDir("keystore").toUri().getPath());
|
||||
keystoresDir.mkdirs();
|
||||
String sslConfDir = KeyStoreTestUtil.getClasspathDir(TestSecureExport.class);
|
||||
KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, UTIL.getConfiguration(), false);
|
||||
KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir,
|
||||
UTIL.getConfiguration(), false);
|
||||
|
||||
UTIL.getConfiguration().setBoolean("ignore.secure.ports.for.testing", true);
|
||||
UserGroupInformation.setConfiguration(UTIL.getConfiguration());
|
||||
UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, UTIL.getConfiguration().get(
|
||||
CoprocessorHost.REGION_COPROCESSOR_CONF_KEY) + "," + Export.class.getName());
|
||||
UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||
UTIL.getConfiguration().get(
|
||||
CoprocessorHost.REGION_COPROCESSOR_CONF_KEY) + "," + Export.class.getName());
|
||||
}
|
||||
private static void addLabels(final Configuration conf, final List<String> users, final List<String> labels) throws Exception {
|
||||
|
||||
private static void addLabels(final Configuration conf, final List<String> users,
|
||||
final List<String> labels) throws Exception {
|
||||
PrivilegedExceptionAction<VisibilityLabelsProtos.VisibilityLabelsResponse> action
|
||||
= () -> {
|
||||
try (Connection conn = ConnectionFactory.createConnection(conf)) {
|
||||
|
@ -207,19 +222,21 @@ public class TestSecureExport {
|
|||
@After
|
||||
public void cleanup() throws IOException {
|
||||
}
|
||||
|
||||
private static void clearOutput(Path path) throws IOException {
|
||||
FileSystem fs = path.getFileSystem(UTIL.getConfiguration());
|
||||
if (fs.exists(path)) {
|
||||
assertEquals(true, fs.delete(path, true));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the security firstly for getting the correct default realm.
|
||||
* @throws Exception
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
UserProvider.setUserProviderForTesting(UTIL.getConfiguration(), HadoopSecurityEnabledUserProviderForTesting.class);
|
||||
UserProvider.setUserProviderForTesting(UTIL.getConfiguration(),
|
||||
HadoopSecurityEnabledUserProviderForTesting.class);
|
||||
setUpKdcServer();
|
||||
SecureTestUtil.enableSecurity(UTIL.getConfiguration());
|
||||
UTIL.getConfiguration().setBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY, true);
|
||||
|
@ -252,11 +269,9 @@ public class TestSecureExport {
|
|||
/**
|
||||
* Test the ExportEndpoint's access levels. The {@link Export} test is ignored
|
||||
* since the access exceptions cannot be collected from the mappers.
|
||||
*
|
||||
* @throws java.io.IOException
|
||||
*/
|
||||
@Test
|
||||
public void testAccessCase() throws IOException, Throwable {
|
||||
public void testAccessCase() throws Throwable {
|
||||
final String exportTable = name.getMethodName();
|
||||
TableDescriptor exportHtd = TableDescriptorBuilder
|
||||
.newBuilder(TableName.valueOf(name.getMethodName()))
|
||||
|
@ -339,11 +354,13 @@ public class TestSecureExport {
|
|||
SecureTestUtil.verifyAllowed(deleteAction, getUserByLogin(USER_OWNER));
|
||||
fs.delete(openDir, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVisibilityLabels() throws IOException, Throwable {
|
||||
final String exportTable = name.getMethodName() + "_export";
|
||||
final String importTable = name.getMethodName() + "_import";
|
||||
final TableDescriptor exportHtd = TableDescriptorBuilder.newBuilder(TableName.valueOf(exportTable))
|
||||
final TableDescriptor exportHtd = TableDescriptorBuilder
|
||||
.newBuilder(TableName.valueOf(exportTable))
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYA))
|
||||
.setOwnerString(USER_OWNER)
|
||||
.build();
|
||||
|
@ -400,7 +417,8 @@ public class TestSecureExport {
|
|||
}
|
||||
};
|
||||
SecureTestUtil.verifyAllowed(exportAction, getUserByLogin(USER_OWNER));
|
||||
final TableDescriptor importHtd = TableDescriptorBuilder.newBuilder(TableName.valueOf(importTable))
|
||||
final TableDescriptor importHtd = TableDescriptorBuilder
|
||||
.newBuilder(TableName.valueOf(importTable))
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYB))
|
||||
.setOwnerString(USER_OWNER)
|
||||
.build();
|
||||
|
@ -411,7 +429,8 @@ public class TestSecureExport {
|
|||
importTable,
|
||||
output.toString()
|
||||
};
|
||||
assertEquals(0, ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args));
|
||||
assertEquals(0, ToolRunner.run(
|
||||
new Configuration(UTIL.getConfiguration()), new Import(), args));
|
||||
return null;
|
||||
};
|
||||
SecureTestUtil.verifyAllowed(importAction, getUserByLogin(USER_OWNER));
|
||||
|
|
|
@ -15,17 +15,14 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||
|
@ -41,12 +38,14 @@ import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
|
|||
import org.apache.hadoop.hbase.util.ByteStringer;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Client proxy for SecureBulkLoadProtocol used in conjunction with SecureBulkLoadEndpoint
|
||||
* @deprecated Use for backward compatibility testing only. Will be removed when
|
||||
* SecureBulkLoadEndpoint is not supported.
|
||||
*/
|
||||
@Deprecated
|
||||
@InterfaceAudience.Private
|
||||
public class SecureBulkLoadEndpointClient {
|
||||
private Table table;
|
||||
|
@ -111,9 +110,8 @@ public class SecureBulkLoadEndpointClient {
|
|||
}
|
||||
|
||||
public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
|
||||
final Token<?> userToken,
|
||||
final String bulkToken,
|
||||
final byte[] startRow) throws IOException {
|
||||
final Token<?> userToken, final String bulkToken, final byte[] startRow)
|
||||
throws IOException {
|
||||
// we never want to send a batch of HFiles to all regions, thus cannot call
|
||||
// HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
|
||||
try {
|
||||
|
@ -162,5 +160,4 @@ public class SecureBulkLoadEndpointClient {
|
|||
throw new IOException(throwable);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegi
|
|||
@Category({RegionServerTests.class, LargeTests.class})
|
||||
@Ignore // BROKEN. FIX OR REMOVE.
|
||||
public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionServerBulkLoad {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
|
||||
|
@ -86,8 +85,8 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
|
|||
final AtomicLong numCompactions = new AtomicLong();
|
||||
private TableName tableName;
|
||||
|
||||
public AtomicHFileLoader(TableName tableName, TestContext ctx,
|
||||
byte targetFamilies[][]) throws IOException {
|
||||
public AtomicHFileLoader(TableName tableName, TestContext ctx, byte[][] targetFamilies)
|
||||
throws IOException {
|
||||
super(ctx);
|
||||
this.tableName = tableName;
|
||||
}
|
||||
|
@ -114,19 +113,19 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
|
|||
final String bulkToken = new SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName);
|
||||
RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
|
||||
ClientServiceCallable<Void> callable =
|
||||
new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
|
||||
rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
|
||||
@Override
|
||||
protected Void rpcCall() throws Exception {
|
||||
LOG.debug("Going to connect to server " + getLocation() + " for row " +
|
||||
Bytes.toStringBinary(getRow()));
|
||||
try (Table table = conn.getTable(getTableName())) {
|
||||
boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths,
|
||||
null, bulkToken, getLocation().getRegionInfo().getStartKey());
|
||||
}
|
||||
return null;
|
||||
new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
|
||||
rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
|
||||
@Override
|
||||
protected Void rpcCall() throws Exception {
|
||||
LOG.debug("Going to connect to server " + getLocation() + " for row " +
|
||||
Bytes.toStringBinary(getRow()));
|
||||
try (Table table = conn.getTable(getTableName())) {
|
||||
boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths,
|
||||
null, bulkToken, getLocation().getRegionInfo().getStartKey());
|
||||
}
|
||||
};
|
||||
return null;
|
||||
}
|
||||
};
|
||||
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
|
||||
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
|
||||
caller.callWithRetries(callable, Integer.MAX_VALUE);
|
||||
|
@ -156,7 +155,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
|
|||
}
|
||||
|
||||
void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
|
||||
throws Exception {
|
||||
throws Exception {
|
||||
setupTable(tableName, 10);
|
||||
|
||||
TestContext ctx = new TestContext(UTIL.getConfiguration());
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -68,7 +68,6 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
@Category({RegionServerTests.class, MediumTests.class})
|
||||
public class TestServerCustomProtocol {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestServerCustomProtocol.class);
|
||||
|
@ -84,7 +83,9 @@ public class TestServerCustomProtocol {
|
|||
|
||||
@Override
|
||||
public void start(CoprocessorEnvironment env) throws IOException {
|
||||
if (env instanceof RegionCoprocessorEnvironment) return;
|
||||
if (env instanceof RegionCoprocessorEnvironment) {
|
||||
return;
|
||||
}
|
||||
throw new CoprocessorException("Must be loaded on a table region!");
|
||||
}
|
||||
|
||||
|
@ -116,9 +117,13 @@ public class TestServerCustomProtocol {
|
|||
@Override
|
||||
public void hello(RpcController controller, HelloRequest request,
|
||||
RpcCallback<HelloResponse> done) {
|
||||
if (!request.hasName()) done.run(HelloResponse.newBuilder().setResponse(WHOAREYOU).build());
|
||||
else if (request.getName().equals(NOBODY)) done.run(HelloResponse.newBuilder().build());
|
||||
else done.run(HelloResponse.newBuilder().setResponse(HELLO + request.getName()).build());
|
||||
if (!request.hasName()) {
|
||||
done.run(HelloResponse.newBuilder().setResponse(WHOAREYOU).build());
|
||||
} else if (request.getName().equals(NOBODY)) {
|
||||
done.run(HelloResponse.newBuilder().build());
|
||||
} else {
|
||||
done.run(HelloResponse.newBuilder().setResponse(HELLO + request.getName()).build());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -153,19 +158,19 @@ public class TestServerCustomProtocol {
|
|||
}
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
public void before() throws Exception {
|
||||
final byte[][] SPLIT_KEYS = new byte[][] { ROW_B, ROW_C };
|
||||
Table table = util.createTable(TEST_TABLE, TEST_FAMILY, SPLIT_KEYS);
|
||||
|
||||
Put puta = new Put( ROW_A );
|
||||
Put puta = new Put(ROW_A);
|
||||
puta.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
|
||||
table.put(puta);
|
||||
|
||||
Put putb = new Put( ROW_B );
|
||||
Put putb = new Put(ROW_B);
|
||||
putb.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
|
||||
table.put(putb);
|
||||
|
||||
Put putc = new Put( ROW_C );
|
||||
Put putc = new Put(ROW_C);
|
||||
putc.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
|
||||
table.put(putc);
|
||||
}
|
||||
|
@ -234,7 +239,7 @@ public class TestServerCustomProtocol {
|
|||
}
|
||||
|
||||
private Map<byte [], String> hello(final Table table, final String send, final String response)
|
||||
throws ServiceException, Throwable {
|
||||
throws ServiceException, Throwable {
|
||||
Map<byte [], String> results = hello(table, send);
|
||||
for (Map.Entry<byte [], String> e: results.entrySet()) {
|
||||
assertEquals("Invalid custom protocol response", response, e.getValue());
|
||||
|
@ -243,13 +248,12 @@ public class TestServerCustomProtocol {
|
|||
}
|
||||
|
||||
private Map<byte [], String> hello(final Table table, final String send)
|
||||
throws ServiceException, Throwable {
|
||||
throws ServiceException, Throwable {
|
||||
return hello(table, send, null, null);
|
||||
}
|
||||
|
||||
private Map<byte [], String> hello(final Table table, final String send, final byte [] start,
|
||||
final byte [] end)
|
||||
throws ServiceException, Throwable {
|
||||
final byte [] end) throws ServiceException, Throwable {
|
||||
return table.coprocessorService(PingProtos.PingService.class,
|
||||
start, end,
|
||||
new Batch.Call<PingProtos.PingService, String>() {
|
||||
|
@ -258,7 +262,9 @@ public class TestServerCustomProtocol {
|
|||
CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback =
|
||||
new CoprocessorRpcUtils.BlockingRpcCallback<>();
|
||||
PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder();
|
||||
if (send != null) builder.setName(send);
|
||||
if (send != null) {
|
||||
builder.setName(send);
|
||||
}
|
||||
instance.hello(null, builder.build(), rpcCallback);
|
||||
PingProtos.HelloResponse r = rpcCallback.get();
|
||||
return r != null && r.hasResponse()? r.getResponse(): null;
|
||||
|
@ -267,8 +273,7 @@ public class TestServerCustomProtocol {
|
|||
}
|
||||
|
||||
private Map<byte [], String> compoundOfHelloAndPing(final Table table, final byte [] start,
|
||||
final byte [] end)
|
||||
throws ServiceException, Throwable {
|
||||
final byte [] end) throws ServiceException, Throwable {
|
||||
return table.coprocessorService(PingProtos.PingService.class,
|
||||
start, end,
|
||||
new Batch.Call<PingProtos.PingService, String>() {
|
||||
|
@ -286,9 +291,8 @@ public class TestServerCustomProtocol {
|
|||
});
|
||||
}
|
||||
|
||||
private Map<byte [], String> noop(final Table table, final byte [] start,
|
||||
final byte [] end)
|
||||
throws ServiceException, Throwable {
|
||||
private Map<byte [], String> noop(final Table table, final byte [] start, final byte [] end)
|
||||
throws ServiceException, Throwable {
|
||||
return table.coprocessorService(PingProtos.PingService.class, start, end,
|
||||
new Batch.Call<PingProtos.PingService, String>() {
|
||||
@Override
|
||||
|
@ -397,7 +401,7 @@ public class TestServerCustomProtocol {
|
|||
}
|
||||
|
||||
private Map<byte [], String> ping(final Table table, final byte [] start, final byte [] end)
|
||||
throws ServiceException, Throwable {
|
||||
throws ServiceException, Throwable {
|
||||
return table.coprocessorService(PingProtos.PingService.class, start, end,
|
||||
new Batch.Call<PingProtos.PingService, String>() {
|
||||
@Override
|
||||
|
@ -410,8 +414,8 @@ public class TestServerCustomProtocol {
|
|||
private static String doPing(PingProtos.PingService instance) throws IOException {
|
||||
CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.PingResponse> rpcCallback =
|
||||
new CoprocessorRpcUtils.BlockingRpcCallback<>();
|
||||
instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback);
|
||||
return rpcCallback.get().getPong();
|
||||
instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback);
|
||||
return rpcCallback.get().getPong();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -459,18 +463,17 @@ public class TestServerCustomProtocol {
|
|||
}
|
||||
}
|
||||
|
||||
private void verifyRegionResults(RegionLocator table,
|
||||
Map<byte[],String> results, byte[] row) throws Exception {
|
||||
private void verifyRegionResults(RegionLocator table, Map<byte[],String> results, byte[] row)
|
||||
throws Exception {
|
||||
verifyRegionResults(table, results, "pong", row);
|
||||
}
|
||||
|
||||
private void verifyRegionResults(RegionLocator regionLocator,
|
||||
Map<byte[], String> results, String expected, byte[] row)
|
||||
throws Exception {
|
||||
private void verifyRegionResults(RegionLocator regionLocator, Map<byte[], String> results,
|
||||
String expected, byte[] row) throws Exception {
|
||||
for (Map.Entry<byte [], String> e: results.entrySet()) {
|
||||
LOG.info("row=" + Bytes.toString(row) + ", expected=" + expected +
|
||||
", result key=" + Bytes.toString(e.getKey()) +
|
||||
", value=" + e.getValue());
|
||||
", result key=" + Bytes.toString(e.getKey()) +
|
||||
", value=" + e.getValue());
|
||||
}
|
||||
HRegionLocation loc = regionLocator.getRegionLocation(row, true);
|
||||
byte[] region = loc.getRegionInfo().getRegionName();
|
||||
|
|
Loading…
Reference in New Issue