HBASE-27210 Clean up error-prone findings in hbase-endpoint (#4646)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Conflicts: hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java
This commit is contained in:
parent
ce1455c365
commit
15db425ad9
|
@ -692,10 +692,9 @@ public class AggregationClient implements Closeable {
|
||||||
public <R, S, P extends Message, Q extends Message, T extends Message> double
|
public <R, S, P extends Message, Q extends Message, T extends Message> double
|
||||||
std(final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
|
std(final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
|
||||||
Pair<List<S>, Long> p = getStdArgs(table, ci, scan);
|
Pair<List<S>, Long> p = getStdArgs(table, ci, scan);
|
||||||
double res = 0d;
|
|
||||||
double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
|
double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
|
||||||
double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
|
double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
|
||||||
res = avgOfSumSq - (avg) * (avg); // variance
|
double res = avgOfSumSq - avg * avg; // variance
|
||||||
res = Math.pow(res, 0.5);
|
res = Math.pow(res, 0.5);
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
@ -868,14 +867,6 @@ public class AggregationClient implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
byte[] getBytesFromResponse(ByteString response) {
|
byte[] getBytesFromResponse(ByteString response) {
|
||||||
ByteBuffer bb = response.asReadOnlyByteBuffer();
|
return response.toByteArray();
|
||||||
bb.rewind();
|
|
||||||
byte[] bytes;
|
|
||||||
if (bb.hasArray()) {
|
|
||||||
bytes = bb.array();
|
|
||||||
} else {
|
|
||||||
bytes = response.toByteArray();
|
|
||||||
}
|
|
||||||
return bytes;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,7 +84,7 @@ public final class AggregationHelper {
|
||||||
* @return the instance
|
* @return the instance
|
||||||
* @throws IOException Either we couldn't instantiate the method object, or "parseFrom" failed.
|
* @throws IOException Either we couldn't instantiate the method object, or "parseFrom" failed.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings({ "unchecked", "TypeParameterUnusedInFormals" })
|
||||||
// Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO.
|
// Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO.
|
||||||
public static <T extends Message> T getParsedGenericInstance(Class<?> runtimeClass, int position,
|
public static <T extends Message> T getParsedGenericInstance(Class<?> runtimeClass, int position,
|
||||||
ByteString b) throws IOException {
|
ByteString b) throws IOException {
|
||||||
|
|
|
@ -31,6 +31,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
@ -50,12 +51,12 @@ import org.slf4j.LoggerFactory;
|
||||||
* aggregate function at a region level. {@link ColumnInterpreter} is used to interpret column
|
* aggregate function at a region level. {@link ColumnInterpreter} is used to interpret column
|
||||||
* value. This class is parameterized with the following (these are the types with which the
|
* value. This class is parameterized with the following (these are the types with which the
|
||||||
* {@link ColumnInterpreter} is parameterized, and for more description on these, refer to
|
* {@link ColumnInterpreter} is parameterized, and for more description on these, refer to
|
||||||
* {@link ColumnInterpreter}):
|
* {@link ColumnInterpreter}):<br>
|
||||||
* @param <T> Cell value data type
|
* <T> Cell value data type<br>
|
||||||
* @param <S> Promoted data type
|
* <S> Promoted data type<br>
|
||||||
* @param <P> PB message that is used to transport initializer specific bytes
|
* <P> PB message that is used to transport initializer specific bytes<br>
|
||||||
* @param <Q> PB message that is used to transport Cell (<T>) instance
|
* <Q> PB message that is used to transport Cell (<T>) instance<br>
|
||||||
* @param <R> PB message that is used to transport Promoted (<S>) instance
|
* <R> PB message that is used to transport Promoted (<S>) instance<br>
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message>
|
public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message>
|
||||||
|
@ -107,10 +108,7 @@ public class AggregateImplementation<T, S, P extends Message, Q extends Message,
|
||||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
CoprocessorRpcUtils.setControllerException(controller, e);
|
||||||
} finally {
|
} finally {
|
||||||
if (scanner != null) {
|
if (scanner != null) {
|
||||||
try {
|
IOUtils.closeQuietly(scanner);
|
||||||
scanner.close();
|
|
||||||
} catch (IOException ignored) {
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.info("Maximum from this region is "
|
log.info("Maximum from this region is "
|
||||||
|
@ -160,10 +158,7 @@ public class AggregateImplementation<T, S, P extends Message, Q extends Message,
|
||||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
CoprocessorRpcUtils.setControllerException(controller, e);
|
||||||
} finally {
|
} finally {
|
||||||
if (scanner != null) {
|
if (scanner != null) {
|
||||||
try {
|
IOUtils.closeQuietly(scanner);
|
||||||
scanner.close();
|
|
||||||
} catch (IOException ignored) {
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.info("Minimum from this region is "
|
log.info("Minimum from this region is "
|
||||||
|
@ -216,10 +211,7 @@ public class AggregateImplementation<T, S, P extends Message, Q extends Message,
|
||||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
CoprocessorRpcUtils.setControllerException(controller, e);
|
||||||
} finally {
|
} finally {
|
||||||
if (scanner != null) {
|
if (scanner != null) {
|
||||||
try {
|
IOUtils.closeQuietly(scanner);
|
||||||
scanner.close();
|
|
||||||
} catch (IOException ignored) {
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.debug("Sum from this region is " + env.getRegion().getRegionInfo().getRegionNameAsString()
|
log.debug("Sum from this region is " + env.getRegion().getRegionInfo().getRegionNameAsString()
|
||||||
|
@ -267,10 +259,7 @@ public class AggregateImplementation<T, S, P extends Message, Q extends Message,
|
||||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
CoprocessorRpcUtils.setControllerException(controller, e);
|
||||||
} finally {
|
} finally {
|
||||||
if (scanner != null) {
|
if (scanner != null) {
|
||||||
try {
|
IOUtils.closeQuietly(scanner);
|
||||||
scanner.close();
|
|
||||||
} catch (IOException ignored) {
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.info("Row counter from this region is "
|
log.info("Row counter from this region is "
|
||||||
|
@ -331,10 +320,7 @@ public class AggregateImplementation<T, S, P extends Message, Q extends Message,
|
||||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
CoprocessorRpcUtils.setControllerException(controller, e);
|
||||||
} finally {
|
} finally {
|
||||||
if (scanner != null) {
|
if (scanner != null) {
|
||||||
try {
|
IOUtils.closeQuietly(scanner);
|
||||||
scanner.close();
|
|
||||||
} catch (IOException ignored) {
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
done.run(response);
|
done.run(response);
|
||||||
|
@ -397,10 +383,7 @@ public class AggregateImplementation<T, S, P extends Message, Q extends Message,
|
||||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
CoprocessorRpcUtils.setControllerException(controller, e);
|
||||||
} finally {
|
} finally {
|
||||||
if (scanner != null) {
|
if (scanner != null) {
|
||||||
try {
|
IOUtils.closeQuietly(scanner);
|
||||||
scanner.close();
|
|
||||||
} catch (IOException ignored) {
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
done.run(response);
|
done.run(response);
|
||||||
|
@ -462,10 +445,7 @@ public class AggregateImplementation<T, S, P extends Message, Q extends Message,
|
||||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
CoprocessorRpcUtils.setControllerException(controller, e);
|
||||||
} finally {
|
} finally {
|
||||||
if (scanner != null) {
|
if (scanner != null) {
|
||||||
try {
|
IOUtils.closeQuietly(scanner);
|
||||||
scanner.close();
|
|
||||||
} catch (IOException ignored) {
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
done.run(response);
|
done.run(response);
|
||||||
|
|
|
@ -25,7 +25,6 @@ import java.io.IOException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
@ -109,6 +108,7 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
|
||||||
return run(conf, arguments.getFirst(), arguments.getSecond(), arguments.getThird());
|
return run(conf, arguments.getFirst(), arguments.getSecond(), arguments.getThird());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("ModifiedButNotUsed")
|
||||||
public static Map<byte[], Response> run(final Configuration conf, TableName tableName, Scan scan,
|
public static Map<byte[], Response> run(final Configuration conf, TableName tableName, Scan scan,
|
||||||
Path dir) throws Throwable {
|
Path dir) throws Throwable {
|
||||||
FileSystem fs = dir.getFileSystem(conf);
|
FileSystem fs = dir.getFileSystem(conf);
|
||||||
|
@ -125,7 +125,6 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
|
||||||
table.coprocessorService(ExportProtos.ExportService.class, scan.getStartRow(),
|
table.coprocessorService(ExportProtos.ExportService.class, scan.getStartRow(),
|
||||||
scan.getStopRow(), (ExportProtos.ExportService service) -> {
|
scan.getStopRow(), (ExportProtos.ExportService service) -> {
|
||||||
ServerRpcController controller = new ServerRpcController();
|
ServerRpcController controller = new ServerRpcController();
|
||||||
Map<byte[], ExportProtos.ExportResponse> rval = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
|
||||||
CoprocessorRpcUtils.BlockingRpcCallback<ExportProtos.ExportResponse> rpcCallback =
|
CoprocessorRpcUtils.BlockingRpcCallback<ExportProtos.ExportResponse> rpcCallback =
|
||||||
new CoprocessorRpcUtils.BlockingRpcCallback<>();
|
new CoprocessorRpcUtils.BlockingRpcCallback<>();
|
||||||
service.export(controller, request, rpcCallback);
|
service.export(controller, request, rpcCallback);
|
||||||
|
@ -190,7 +189,7 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces
|
||||||
|
|
||||||
private static List<SequenceFile.Writer.Option> getWriterOptions(final Configuration conf,
|
private static List<SequenceFile.Writer.Option> getWriterOptions(final Configuration conf,
|
||||||
final RegionInfo info, final ExportProtos.ExportRequest request) throws IOException {
|
final RegionInfo info, final ExportProtos.ExportRequest request) throws IOException {
|
||||||
List<SequenceFile.Writer.Option> rval = new LinkedList<>();
|
List<SequenceFile.Writer.Option> rval = new ArrayList<>(5);
|
||||||
rval.add(SequenceFile.Writer.keyClass(ImmutableBytesWritable.class));
|
rval.add(SequenceFile.Writer.keyClass(ImmutableBytesWritable.class));
|
||||||
rval.add(SequenceFile.Writer.valueClass(Result.class));
|
rval.add(SequenceFile.Writer.valueClass(Result.class));
|
||||||
rval.add(getOutputPath(conf, info, request));
|
rval.add(getOutputPath(conf, info, request));
|
||||||
|
|
|
@ -56,7 +56,7 @@ public class TestAsyncAggregationClient {
|
||||||
|
|
||||||
private static byte[] CQ2 = Bytes.toBytes("CQ2");
|
private static byte[] CQ2 = Bytes.toBytes("CQ2");
|
||||||
|
|
||||||
private static int COUNT = 1000;
|
private static long COUNT = 1000;
|
||||||
|
|
||||||
private static AsyncConnection CONN;
|
private static AsyncConnection CONN;
|
||||||
|
|
||||||
|
@ -141,7 +141,7 @@ public class TestAsyncAggregationClient {
|
||||||
long halfSum = COUNT * (COUNT - 1) / 4;
|
long halfSum = COUNT * (COUNT - 1) / 4;
|
||||||
long median = 0L;
|
long median = 0L;
|
||||||
long sum = 0L;
|
long sum = 0L;
|
||||||
for (int i = 0; i < COUNT; i++) {
|
for (long i = 0; i < COUNT; i++) {
|
||||||
sum += i;
|
sum += i;
|
||||||
if (sum > halfSum) {
|
if (sum > halfSum) {
|
||||||
median = i - 1;
|
median = i - 1;
|
||||||
|
@ -158,7 +158,7 @@ public class TestAsyncAggregationClient {
|
||||||
LongStream.range(0, COUNT).map(l -> l * l).reduce((l1, l2) -> l1 + l2).getAsLong() / 2;
|
LongStream.range(0, COUNT).map(l -> l * l).reduce((l1, l2) -> l1 + l2).getAsLong() / 2;
|
||||||
long median = 0L;
|
long median = 0L;
|
||||||
long sum = 0L;
|
long sum = 0L;
|
||||||
for (int i = 0; i < COUNT; i++) {
|
for (long i = 0; i < COUNT; i++) {
|
||||||
sum += i * i;
|
sum += i * i;
|
||||||
if (sum > halfSum) {
|
if (sum > halfSum) {
|
||||||
median = i - 1;
|
median = i - 1;
|
||||||
|
|
|
@ -155,8 +155,8 @@ public class TestRpcControllerFactory {
|
||||||
// change one of the connection properties so we get a new Connection with our configuration
|
// change one of the connection properties so we get a new Connection with our configuration
|
||||||
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1);
|
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1);
|
||||||
|
|
||||||
Connection connection = ConnectionFactory.createConnection(conf);
|
try (Connection connection = ConnectionFactory.createConnection(conf)) {
|
||||||
Table table = connection.getTable(tableName);
|
try (Table table = connection.getTable(tableName)) {
|
||||||
byte[] row = Bytes.toBytes("row");
|
byte[] row = Bytes.toBytes("row");
|
||||||
Put p = new Put(row);
|
Put p = new Put(row);
|
||||||
p.addColumn(fam1, fam1, Bytes.toBytes("val0"));
|
p.addColumn(fam1, fam1, Bytes.toBytes("val0"));
|
||||||
|
@ -208,7 +208,7 @@ public class TestRpcControllerFactory {
|
||||||
|
|
||||||
// reversed, regular
|
// reversed, regular
|
||||||
scanInfo.setSmall(false);
|
scanInfo.setSmall(false);
|
||||||
counter = doScan(table, scanInfo, counter + 1);
|
doScan(table, scanInfo, counter + 1);
|
||||||
|
|
||||||
// make sure we have no priority count
|
// make sure we have no priority count
|
||||||
verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0);
|
verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0);
|
||||||
|
@ -217,9 +217,8 @@ public class TestRpcControllerFactory {
|
||||||
get.setPriority(HConstants.ADMIN_QOS);
|
get.setPriority(HConstants.ADMIN_QOS);
|
||||||
table.get(get);
|
table.get(get);
|
||||||
verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1);
|
verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1);
|
||||||
|
}
|
||||||
table.close();
|
}
|
||||||
connection.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int doScan(Table table, Scan scan, int expectedCount) throws IOException {
|
int doScan(Table table, Scan scan, int expectedCount) throws IOException {
|
||||||
|
|
|
@ -83,8 +83,7 @@ public class TestAggregationClient {
|
||||||
try {
|
try {
|
||||||
client.rowCount(TABLE_NAME, new LongColumnInterpreter(), new Scan());
|
client.rowCount(TABLE_NAME, new LongColumnInterpreter(), new Scan());
|
||||||
fail("Expected IOException");
|
fail("Expected IOException");
|
||||||
} catch (Throwable e) {
|
} catch (IOException e) {
|
||||||
assertTrue(e instanceof IOException);
|
|
||||||
assertTrue(e.getMessage().contains("Connection not initialized"));
|
assertTrue(e.getMessage().contains("Connection not initialized"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -126,14 +126,14 @@ public class TestBatchCoprocessorEndpoint {
|
||||||
builder.build(), ROWS[0], ROWS[ROWS.length - 1],
|
builder.build(), ROWS[0], ROWS[ROWS.length - 1],
|
||||||
ColumnAggregationNullResponseSumResponse.getDefaultInstance());
|
ColumnAggregationNullResponseSumResponse.getDefaultInstance());
|
||||||
|
|
||||||
int sumResult = 0;
|
long sumResult = 0;
|
||||||
int expectedResult = 0;
|
long expectedResult = 0;
|
||||||
for (Map.Entry<byte[], ColumnAggregationNullResponseSumResponse> e : results.entrySet()) {
|
for (Map.Entry<byte[], ColumnAggregationNullResponseSumResponse> e : results.entrySet()) {
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Got value " + e.getValue().getSum() + " for region " + Bytes.toStringBinary(e.getKey()));
|
"Got value " + e.getValue().getSum() + " for region " + Bytes.toStringBinary(e.getKey()));
|
||||||
sumResult += e.getValue().getSum();
|
sumResult += e.getValue().getSum();
|
||||||
}
|
}
|
||||||
for (int i = 0; i < rowSeperator2; i++) {
|
for (long i = 0; i < rowSeperator2; i++) {
|
||||||
expectedResult += i;
|
expectedResult += i;
|
||||||
}
|
}
|
||||||
assertEquals("Invalid result", expectedResult, sumResult);
|
assertEquals("Invalid result", expectedResult, sumResult);
|
||||||
|
@ -167,14 +167,14 @@ public class TestBatchCoprocessorEndpoint {
|
||||||
Table table = util.getConnection().getTable(TEST_TABLE);
|
Table table = util.getConnection().getTable(TEST_TABLE);
|
||||||
Map<byte[], SumResponse> results =
|
Map<byte[], SumResponse> results =
|
||||||
sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0], ROWS[ROWS.length - 1]);
|
sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0], ROWS[ROWS.length - 1]);
|
||||||
int sumResult = 0;
|
long sumResult = 0;
|
||||||
int expectedResult = 0;
|
long expectedResult = 0;
|
||||||
for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
|
for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Got value " + e.getValue().getSum() + " for region " + Bytes.toStringBinary(e.getKey()));
|
"Got value " + e.getValue().getSum() + " for region " + Bytes.toStringBinary(e.getKey()));
|
||||||
sumResult += e.getValue().getSum();
|
sumResult += e.getValue().getSum();
|
||||||
}
|
}
|
||||||
for (int i = 0; i < ROWSIZE; i++) {
|
for (long i = 0; i < ROWSIZE; i++) {
|
||||||
expectedResult += i;
|
expectedResult += i;
|
||||||
}
|
}
|
||||||
assertEquals("Invalid result", expectedResult, sumResult);
|
assertEquals("Invalid result", expectedResult, sumResult);
|
||||||
|
@ -190,7 +190,7 @@ public class TestBatchCoprocessorEndpoint {
|
||||||
"Got value " + e.getValue().getSum() + " for region " + Bytes.toStringBinary(e.getKey()));
|
"Got value " + e.getValue().getSum() + " for region " + Bytes.toStringBinary(e.getKey()));
|
||||||
sumResult += e.getValue().getSum();
|
sumResult += e.getValue().getSum();
|
||||||
}
|
}
|
||||||
for (int i = rowSeperator1; i < ROWSIZE; i++) {
|
for (long i = rowSeperator1; i < ROWSIZE; i++) {
|
||||||
expectedResult += i;
|
expectedResult += i;
|
||||||
}
|
}
|
||||||
assertEquals("Invalid result", expectedResult, sumResult);
|
assertEquals("Invalid result", expectedResult, sumResult);
|
||||||
|
@ -202,14 +202,14 @@ public class TestBatchCoprocessorEndpoint {
|
||||||
Table table = util.getConnection().getTable(TEST_TABLE);
|
Table table = util.getConnection().getTable(TEST_TABLE);
|
||||||
Map<byte[], SumResponse> results =
|
Map<byte[], SumResponse> results =
|
||||||
sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0], ROWS[ROWS.length - 1]);
|
sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0], ROWS[ROWS.length - 1]);
|
||||||
int sumResult = 0;
|
long sumResult = 0;
|
||||||
int expectedResult = 0;
|
long expectedResult = 0;
|
||||||
for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
|
for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Got value " + e.getValue().getSum() + " for region " + Bytes.toStringBinary(e.getKey()));
|
"Got value " + e.getValue().getSum() + " for region " + Bytes.toStringBinary(e.getKey()));
|
||||||
sumResult += e.getValue().getSum();
|
sumResult += e.getValue().getSum();
|
||||||
}
|
}
|
||||||
for (int i = 0; i < ROWSIZE; i++) {
|
for (long i = 0; i < ROWSIZE; i++) {
|
||||||
expectedResult += i;
|
expectedResult += i;
|
||||||
}
|
}
|
||||||
assertEquals("Invalid result", expectedResult, sumResult);
|
assertEquals("Invalid result", expectedResult, sumResult);
|
||||||
|
@ -223,7 +223,7 @@ public class TestBatchCoprocessorEndpoint {
|
||||||
"Got value " + e.getValue().getSum() + " for region " + Bytes.toStringBinary(e.getKey()));
|
"Got value " + e.getValue().getSum() + " for region " + Bytes.toStringBinary(e.getKey()));
|
||||||
sumResult += e.getValue().getSum();
|
sumResult += e.getValue().getSum();
|
||||||
}
|
}
|
||||||
for (int i = rowSeperator1; i < ROWSIZE; i++) {
|
for (long i = rowSeperator1; i < ROWSIZE; i++) {
|
||||||
expectedResult += i;
|
expectedResult += i;
|
||||||
}
|
}
|
||||||
assertEquals("Invalid result", expectedResult, sumResult);
|
assertEquals("Invalid result", expectedResult, sumResult);
|
||||||
|
@ -262,14 +262,14 @@ public class TestBatchCoprocessorEndpoint {
|
||||||
hasError = true;
|
hasError = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
int sumResult = 0;
|
long sumResult = 0;
|
||||||
int expectedResult = 0;
|
long expectedResult = 0;
|
||||||
for (Map.Entry<byte[], ColumnAggregationWithErrorsSumResponse> e : results.entrySet()) {
|
for (Map.Entry<byte[], ColumnAggregationWithErrorsSumResponse> e : results.entrySet()) {
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Got value " + e.getValue().getSum() + " for region " + Bytes.toStringBinary(e.getKey()));
|
"Got value " + e.getValue().getSum() + " for region " + Bytes.toStringBinary(e.getKey()));
|
||||||
sumResult += e.getValue().getSum();
|
sumResult += e.getValue().getSum();
|
||||||
}
|
}
|
||||||
for (int i = 0; i < rowSeperator2; i++) {
|
for (long i = 0; i < rowSeperator2; i++) {
|
||||||
expectedResult += i;
|
expectedResult += i;
|
||||||
}
|
}
|
||||||
assertEquals("Invalid result", expectedResult, sumResult);
|
assertEquals("Invalid result", expectedResult, sumResult);
|
||||||
|
|
|
@ -142,13 +142,13 @@ public class TestCoprocessorEndpoint {
|
||||||
Table table = util.getConnection().getTable(TEST_TABLE);
|
Table table = util.getConnection().getTable(TEST_TABLE);
|
||||||
Map<byte[], Long> results =
|
Map<byte[], Long> results =
|
||||||
sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0], ROWS[ROWS.length - 1]);
|
sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0], ROWS[ROWS.length - 1]);
|
||||||
int sumResult = 0;
|
long sumResult = 0;
|
||||||
int expectedResult = 0;
|
long expectedResult = 0;
|
||||||
for (Map.Entry<byte[], Long> e : results.entrySet()) {
|
for (Map.Entry<byte[], Long> e : results.entrySet()) {
|
||||||
LOG.info("Got value " + e.getValue() + " for region " + Bytes.toStringBinary(e.getKey()));
|
LOG.info("Got value " + e.getValue() + " for region " + Bytes.toStringBinary(e.getKey()));
|
||||||
sumResult += e.getValue();
|
sumResult += e.getValue();
|
||||||
}
|
}
|
||||||
for (int i = 0; i < ROWSIZE; i++) {
|
for (long i = 0; i < ROWSIZE; i++) {
|
||||||
expectedResult += i;
|
expectedResult += i;
|
||||||
}
|
}
|
||||||
assertEquals("Invalid result", expectedResult, sumResult);
|
assertEquals("Invalid result", expectedResult, sumResult);
|
||||||
|
@ -267,6 +267,7 @@ public class TestCoprocessorEndpoint {
|
||||||
String> results = table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
|
String> results = table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
|
||||||
ROWS[0], ROWS[ROWS.length - 1],
|
ROWS[0], ROWS[ROWS.length - 1],
|
||||||
new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, String>() {
|
new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, String>() {
|
||||||
|
@Override
|
||||||
public String call(TestRpcServiceProtos.TestProtobufRpcProto instance)
|
public String call(TestRpcServiceProtos.TestProtobufRpcProto instance)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
|
CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
|
||||||
|
|
|
@ -57,11 +57,11 @@ public class TestCoprocessorServiceBackwardCompatibility {
|
||||||
implements CoprocessorService, SingletonCoprocessorService {
|
implements CoprocessorService, SingletonCoprocessorService {
|
||||||
// depending on the value passed thru DummyRequest, the following fields would be incremented
|
// depending on the value passed thru DummyRequest, the following fields would be incremented
|
||||||
// value == MASTER
|
// value == MASTER
|
||||||
static int numMaster = 0;
|
static long numMaster = 0;
|
||||||
// value == REGIONSERVER
|
// value == REGIONSERVER
|
||||||
static int numRegionServer = 0;
|
static long numRegionServer = 0;
|
||||||
// value == REGION
|
// value == REGION
|
||||||
static int numRegion = 0;
|
static long numRegion = 0;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Service getService() {
|
public Service getService() {
|
||||||
|
|
|
@ -159,12 +159,12 @@ public class TestCoprocessorTableEndpoint {
|
||||||
try {
|
try {
|
||||||
Map<byte[], Long> results =
|
Map<byte[], Long> results =
|
||||||
sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0], ROWS[ROWS.length - 1]);
|
sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0], ROWS[ROWS.length - 1]);
|
||||||
int sumResult = 0;
|
long sumResult = 0;
|
||||||
int expectedResult = 0;
|
long expectedResult = 0;
|
||||||
for (Map.Entry<byte[], Long> e : results.entrySet()) {
|
for (Map.Entry<byte[], Long> e : results.entrySet()) {
|
||||||
sumResult += e.getValue();
|
sumResult += e.getValue();
|
||||||
}
|
}
|
||||||
for (int i = 0; i < ROWSIZE; i++) {
|
for (long i = 0; i < ROWSIZE; i++) {
|
||||||
expectedResult += i;
|
expectedResult += i;
|
||||||
}
|
}
|
||||||
assertEquals("Invalid result", expectedResult, sumResult);
|
assertEquals("Invalid result", expectedResult, sumResult);
|
||||||
|
|
|
@ -51,11 +51,10 @@ public class TestImportExport extends org.apache.hadoop.hbase.mapreduce.TestImpo
|
||||||
Export.main(args);
|
Export.main(args);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Skip the test which is unrelated to the coprocessor.Export.
|
|
||||||
*/
|
|
||||||
@Test
|
@Test
|
||||||
@Ignore
|
@Ignore
|
||||||
|
@Override
|
||||||
public void testImport94Table() throws Throwable {
|
public void testImport94Table() throws Throwable {
|
||||||
|
// Skip the test which is unrelated to the coprocessor.Export.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -108,7 +108,7 @@ public class TestRowProcessorEndpoint {
|
||||||
private final AtomicInteger failures = new AtomicInteger(0);
|
private final AtomicInteger failures = new AtomicInteger(0);
|
||||||
|
|
||||||
private static HBaseTestingUtility util = new HBaseTestingUtility();
|
private static HBaseTestingUtility util = new HBaseTestingUtility();
|
||||||
private static volatile int expectedCounter = 0;
|
private static AtomicInteger expectedCounter = new AtomicInteger();
|
||||||
private static int rowSize, row2Size;
|
private static int rowSize, row2Size;
|
||||||
|
|
||||||
private volatile static Table table = null;
|
private volatile static Table table = null;
|
||||||
|
@ -348,11 +348,11 @@ public class TestRowProcessorEndpoint {
|
||||||
counter = kvs.isEmpty() ? 0 : Bytes.toInt(CellUtil.cloneValue(kvs.iterator().next()));
|
counter = kvs.isEmpty() ? 0 : Bytes.toInt(CellUtil.cloneValue(kvs.iterator().next()));
|
||||||
|
|
||||||
// Assert counter value
|
// Assert counter value
|
||||||
assertEquals(expectedCounter, counter);
|
assertEquals(expectedCounter.get(), counter);
|
||||||
|
|
||||||
// Increment counter and send it to both memstore and wal edit
|
// Increment counter and send it to both memstore and wal edit
|
||||||
counter += 1;
|
counter += 1;
|
||||||
expectedCounter += 1;
|
expectedCounter.incrementAndGet();
|
||||||
|
|
||||||
Put p = new Put(row);
|
Put p = new Put(row);
|
||||||
KeyValue kv = new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter));
|
KeyValue kv = new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter));
|
||||||
|
@ -581,6 +581,7 @@ public class TestRowProcessorEndpoint {
|
||||||
this.row = row;
|
this.row = row;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public Collection<byte[]> getRowsToLock() {
|
public Collection<byte[]> getRowsToLock() {
|
||||||
return Collections.singleton(row);
|
return Collections.singleton(row);
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,8 +23,8 @@ import com.google.protobuf.ServiceException;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
@ -82,6 +81,8 @@ import org.junit.rules.TestName;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||||
|
|
||||||
@Category({ MediumTests.class })
|
@Category({ MediumTests.class })
|
||||||
public class TestSecureExport {
|
public class TestSecureExport {
|
||||||
@ClassRule
|
@ClassRule
|
||||||
|
@ -336,7 +337,7 @@ public class TestSecureExport {
|
||||||
return null;
|
return null;
|
||||||
};
|
};
|
||||||
SecureTestUtil.verifyAllowed(putAction, getUserByLogin(USER_OWNER));
|
SecureTestUtil.verifyAllowed(putAction, getUserByLogin(USER_OWNER));
|
||||||
List<Pair<List<String>, Integer>> labelsAndRowCounts = new LinkedList<>();
|
List<Pair<List<String>, Integer>> labelsAndRowCounts = new ArrayList<>(5);
|
||||||
labelsAndRowCounts.add(new Pair<>(Arrays.asList(SECRET), 1));
|
labelsAndRowCounts.add(new Pair<>(Arrays.asList(SECRET), 1));
|
||||||
labelsAndRowCounts.add(new Pair<>(Arrays.asList(PRIVATE, CONFIDENTIAL), 1));
|
labelsAndRowCounts.add(new Pair<>(Arrays.asList(PRIVATE, CONFIDENTIAL), 1));
|
||||||
labelsAndRowCounts.add(new Pair<>(Arrays.asList(TOPSECRET), 1));
|
labelsAndRowCounts.add(new Pair<>(Arrays.asList(TOPSECRET), 1));
|
||||||
|
@ -388,11 +389,7 @@ public class TestSecureExport {
|
||||||
try (Connection conn = ConnectionFactory.createConnection(UTIL.getConfiguration());
|
try (Connection conn = ConnectionFactory.createConnection(UTIL.getConfiguration());
|
||||||
Table table = conn.getTable(importHtd.getTableName());
|
Table table = conn.getTable(importHtd.getTableName());
|
||||||
ResultScanner scanner = table.getScanner(scan)) {
|
ResultScanner scanner = table.getScanner(scan)) {
|
||||||
int count = 0;
|
assertEquals(rowCount, Iterables.size(scanner));
|
||||||
for (Result r : scanner) {
|
|
||||||
++count;
|
|
||||||
}
|
|
||||||
assertEquals(rowCount, count);
|
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
};
|
};
|
||||||
|
|
|
@ -91,6 +91,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
|
||||||
this.tableName = tableName;
|
this.tableName = tableName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void doAnAction() throws Exception {
|
public void doAnAction() throws Exception {
|
||||||
long iteration = numBulkLoads.getAndIncrement();
|
long iteration = numBulkLoads.getAndIncrement();
|
||||||
Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", iteration));
|
Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", iteration));
|
||||||
|
@ -118,8 +119,8 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
|
||||||
LOG.debug("Going to connect to server " + getLocation() + " for row "
|
LOG.debug("Going to connect to server " + getLocation() + " for row "
|
||||||
+ Bytes.toStringBinary(getRow()));
|
+ Bytes.toStringBinary(getRow()));
|
||||||
try (Table table = conn.getTable(getTableName())) {
|
try (Table table = conn.getTable(getTableName())) {
|
||||||
boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths, null,
|
new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths, null, bulkToken,
|
||||||
bulkToken, getLocation().getRegionInfo().getStartKey());
|
getLocation().getRegionInfo().getStartKey());
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -150,6 +151,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
|
void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
setupTable(tableName, 10);
|
setupTable(tableName, 10);
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
@ -83,14 +84,13 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
|
||||||
/**
|
/**
|
||||||
* Prepare 16 random hfile ranges required for creating hfiles
|
* Prepare 16 random hfile ranges required for creating hfiles
|
||||||
*/
|
*/
|
||||||
Iterator<String> randomHFileRangeListIterator = null;
|
|
||||||
Set<String> randomHFileRanges = new HashSet<>(16);
|
Set<String> randomHFileRanges = new HashSet<>(16);
|
||||||
for (int i = 0; i < 16; i++) {
|
for (int i = 0; i < 16; i++) {
|
||||||
randomHFileRanges.add(UTIL1.getRandomUUID().toString());
|
randomHFileRanges.add(HBaseTestingUtility.getRandomUUID().toString());
|
||||||
}
|
}
|
||||||
List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
|
List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
|
||||||
Collections.sort(randomHFileRangeList);
|
Collections.sort(randomHFileRangeList);
|
||||||
randomHFileRangeListIterator = randomHFileRangeList.iterator();
|
Iterator<String> randomHFileRangeListIterator = randomHFileRangeList.iterator();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows
|
* at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows
|
||||||
|
|
Loading…
Reference in New Issue