HBASE-10169 Batch coprocessor (Jingcheng Du and Gary Helmling)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1576791 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2014-03-12 15:56:57 +00:00
parent b22548ff06
commit d54525ca90
20 changed files with 4632 additions and 51 deletions

View File

@ -36,12 +36,14 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -53,6 +55,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
@ -70,10 +73,13 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import com.google.common.annotations.VisibleForTesting;
/**
* <p>Used to communicate with a single HBase table. An implementation of
* {@link HTableInterface}. Instances of this class can be constructed directly but it is
@ -1495,8 +1501,7 @@ public class HTable implements HTableInterface {
public static void setRegionCachePrefetch(
final TableName tableName,
final boolean enable) throws IOException {
HConnectionManager.execute(new HConnectable<Void>(HBaseConfiguration
.create()) {
HConnectionManager.execute(new HConnectable<Void>(HBaseConfiguration.create()) {
@Override
public Void connect(HConnection connection) throws IOException {
connection.setRegionCachePrefetch(tableName, enable);
@ -1693,4 +1698,104 @@ public class HTable implements HTableInterface {
t.close();
}
}
/**
* {@inheritDoc}
*/
@Override
public <R extends Message> Map<byte[], R> batchCoprocessorService(
Descriptors.MethodDescriptor methodDescriptor, Message request,
byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
Bytes.BYTES_COMPARATOR));
batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
new Callback<R>() {
@Override
public void update(byte[] region, byte[] row, R result) {
if (region != null) {
results.put(region, result);
}
}
});
return results;
}
/**
* {@inheritDoc}
*/
@Override
public <R extends Message> void batchCoprocessorService(
final Descriptors.MethodDescriptor methodDescriptor, final Message request,
byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
throws ServiceException, Throwable {
// get regions covered by the row range
Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
getKeysAndRegionsInRange(startKey, endKey, true);
List<byte[]> keys = keysAndRegions.getFirst();
List<HRegionLocation> regions = keysAndRegions.getSecond();
// check if we have any calls to make
if (keys.isEmpty()) {
LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
", end=" + Bytes.toStringBinary(endKey));
return;
}
List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
for (int i = 0; i < keys.size(); i++) {
final byte[] rowKey = keys.get(i);
final byte[] region = regions.get(i).getRegionInfo().getRegionName();
RegionCoprocessorServiceExec exec =
new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
execs.add(exec);
execsByRow.put(rowKey, exec);
}
// tracking for any possible deserialization errors on success callback
// TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
final List<Row> callbackErrorActions = new ArrayList<Row>();
final List<String> callbackErrorServers = new ArrayList<String>();
Object[] results = new Object[execs.size()];
AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, pool,
RpcRetryingCallerFactory.instantiate(configuration), true);
AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
new Callback<ClientProtos.CoprocessorServiceResult>() {
@Override
public void update(byte[] region, byte[] row,
ClientProtos.CoprocessorServiceResult serviceResult) {
if (LOG.isTraceEnabled()) {
LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
": region=" + Bytes.toStringBinary(region) +
", row=" + Bytes.toStringBinary(row) +
", value=" + serviceResult.getValue().getValue());
}
try {
callback.update(region, row,
(R) responsePrototype.newBuilderForType().mergeFrom(
serviceResult.getValue().getValue()).build());
} catch (InvalidProtocolBufferException e) {
LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
e);
callbackErrorExceptions.add(e);
callbackErrorActions.add(execsByRow.get(row));
callbackErrorServers.add("null");
}
}
}, results);
future.waitUntilDone();
if (future.hasError()) {
throw future.getErrors();
} else if (!callbackErrorExceptions.isEmpty()) {
throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
callbackErrorServers);
}
}
}

View File

@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.client;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
@ -591,4 +593,69 @@ public interface HTableInterface extends Closeable {
* @throws IOException if a remote or network exception occurs.
*/
void setWriteBufferSize(long writeBufferSize) throws IOException;
/**
* Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table
* region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all
* the invocations to the same region server will be batched into one call. The coprocessor
* service is invoked according to the service instance, method name and parameters.
*
* @param methodDescriptor
* the descriptor for the protobuf service method to call.
* @param request
* the method call parameters
* @param startKey
* start region selection with region containing this row. If {@code null}, the
* selection will start with the first table region.
* @param endKey
* select regions up to and including the region containing this row. If {@code null},
* selection will continue through the last table region.
* @param responsePrototype
* the proto type of the response of the method in Service.
* @param <R>
* the response type for the coprocessor Service method
* @throws ServiceException
* @throws Throwable
* @return a map of result values keyed by region name
*/
@InterfaceAudience.Private
<R extends Message> Map<byte[], R> batchCoprocessorService(
Descriptors.MethodDescriptor methodDescriptor, Message request,
byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable;
/**
* Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table
* region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all
* the invocations to the same region server will be batched into one call. The coprocessor
* service is invoked according to the service instance, method name and parameters.
*
* <p>
* The given
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[],byte[],Object)}
* method will be called with the return value from each region's invocation.
* </p>
*
* @param methodDescriptor
* the descriptor for the protobuf service method to call.
* @param request
* the method call parameters
* @param startKey
* start region selection with region containing this row. If {@code null}, the
* selection will start with the first table region.
* @param endKey
* select regions up to and including the region containing this row. If {@code null},
* selection will continue through the last table region.
* @param responsePrototype
* the proto type of the response of the method in Service.
* @param callback
* callback to invoke with the response for each region
* @param <R>
* the response type for the coprocessor Service method
* @throws ServiceException
* @throws Throwable
*/
@InterfaceAudience.Private
<R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor,
Message request, byte[] startKey, byte[] endKey, R responsePrototype,
Batch.Callback<R> callback) throws ServiceException, Throwable;
}

View File

@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
@ -638,5 +640,23 @@ public class HTablePool implements Closeable {
byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
}
@Override
public <R extends Message> Map<byte[], R> batchCoprocessorService(
Descriptors.MethodDescriptor method, Message request,
byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
checkState();
return table.batchCoprocessorService(method, request, startKey, endKey,
responsePrototype);
}
@Override
public <R extends Message> void batchCoprocessorService(
Descriptors.MethodDescriptor method, Message request,
byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
throws ServiceException, Throwable {
checkState();
table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback);
}
}
}

View File

@ -0,0 +1,108 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import com.google.common.base.Objects;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Represents a coprocessor service method execution against a single region. While coprocessor
* service calls are performed against a region, this class implements {@link Row} in order to
* make use of the {@link AsyncProcess} framework for batching multi-region calls per region server.
*
* <p><b>Note:</b> This class should not be instantiated directly. Use either
* {@link HTable#batchCoprocessorService(MethodDescriptor, Message, byte[], byte[],
* Message, Batch.Callback)}
* or {@link HTable#batchCoprocessorService(MethodDescriptor, Message, byte[], byte[], Message)}
* instead.</p>
*/
@InterfaceAudience.Private
public class RegionCoprocessorServiceExec implements Row {
/*
* This duplicates region name in MultiAction, but allows us to easily access the region name in
* the AsyncProcessCallback context.
*/
private final byte[] region;
private final byte[] startKey;
private final MethodDescriptor method;
private final Message request;
public RegionCoprocessorServiceExec(byte[] region, byte[] startKey,
MethodDescriptor method, Message request) {
this.region = region;
this.startKey = startKey;
this.method = method;
this.request = request;
}
@Override
public byte[] getRow() {
return startKey;
}
public byte[] getRegion() {
return region;
}
public MethodDescriptor getMethod() {
return method;
}
public Message getRequest() {
return request;
}
@Override
public int compareTo(Row o) {
int res = Bytes.compareTo(this.getRow(), o.getRow());
if ((o instanceof RegionCoprocessorServiceExec) && res == 0) {
RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) o;
res = method.getFullName().compareTo(exec.getMethod().getFullName());
if (res == 0) {
res = Bytes.compareTo(request.toByteArray(), exec.getRequest().toByteArray());
}
}
return res;
}
@Override
public int hashCode() {
return Objects.hashCode(Bytes.hashCode(this.getRow()), method.getFullName(), request);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Row other = (Row) obj;
return compareTo(other) == 0;
}
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionCoprocessorServiceExec;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
@ -533,6 +534,14 @@ public final class RequestConverter {
} else if (row instanceof Increment) {
regionActionBuilder.addAction(actionBuilder.setMutation(
ProtobufUtil.toMutation((Increment)row, mutationBuilder, action.getNonce())));
} else if (row instanceof RegionCoprocessorServiceExec) {
RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
regionActionBuilder.addAction(actionBuilder.setServiceCall(
ClientProtos.CoprocessorServiceCall.newBuilder()
.setRow(HBaseZeroCopyByteString.wrap(exec.getRow()))
.setServiceName(exec.getMethod().getService().getFullName())
.setMethodName(exec.getMethod().getName())
.setRequest(exec.getRequest().toByteString())));
} else if (row instanceof RowMutations) {
throw new UnsupportedOperationException("No RowMutations in multi calls; use mutateRow");
} else {

View File

@ -120,6 +120,8 @@ public final class ResponseConverter {
results.add(regionName, roe.getIndex(), ProtobufUtil.toException(roe.getException()));
} else if (roe.hasResult()) {
results.add(regionName, roe.getIndex(), ProtobufUtil.toResult(roe.getResult(), cells));
} else if (roe.hasServiceResult()) {
results.add(regionName, roe.getIndex(), roe.getServiceResult());
} else {
// no result & no exception. Unexpected.
throw new IllegalStateException("No result & no exception roe=" + roe +

View File

@ -303,6 +303,10 @@ message CoprocessorServiceCall {
required bytes request = 4;
}
message CoprocessorServiceResult {
optional NameBytesPair value = 1;
}
message CoprocessorServiceRequest {
required RegionSpecifier region = 1;
required CoprocessorServiceCall call = 2;
@ -320,6 +324,7 @@ message Action {
optional uint32 index = 1;
optional MutationProto mutation = 2;
optional Get get = 3;
optional CoprocessorServiceCall service_call = 4;
}
/**
@ -343,6 +348,8 @@ message ResultOrException {
optional uint32 index = 1;
optional Result result = 2;
optional NameBytesPair exception = 3;
// result if this was a coprocessor service call
optional CoprocessorServiceResult service_result = 4;
}
/**

View File

@ -42,11 +42,14 @@ import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost.Environment;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.io.MultipleIOException;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
@ -328,4 +331,20 @@ public class HTableWrapper implements HTableInterface {
byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
}
@Override
public <R extends Message> Map<byte[], R> batchCoprocessorService(
MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey,
R responsePrototype) throws ServiceException, Throwable {
return table.batchCoprocessorService(methodDescriptor, request, startKey, endKey,
responsePrototype);
}
@Override
public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor,
Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
throws ServiceException, Throwable {
table.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
callback);
}
}

View File

@ -44,8 +44,6 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.CoprocessorHConnection;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTableWrapper;
@ -54,7 +52,6 @@ import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
import org.apache.hadoop.hbase.util.VersionInfo;
/**
* Provides the common setup framework and runtime services for coprocessor
* invocation from HBase services.

View File

@ -3309,12 +3309,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
checkOpen();
requestCount.increment();
HRegion region = getRegion(request.getRegion());
// ignore the passed in controller (from the serialized call)
ServerRpcController execController = new ServerRpcController();
Message result = region.execService(execController, request.getCall());
if (execController.getFailedOn() != null) {
throw execController.getFailedOn();
}
Message result = execServiceOnRegion(region, request.getCall());
CoprocessorServiceResponse.Builder builder =
CoprocessorServiceResponse.newBuilder();
builder.setRegion(RequestConverter.buildRegionSpecifier(
@ -3328,6 +3323,17 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
}
}
private Message execServiceOnRegion(HRegion region,
final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
// ignore the passed in controller (from the serialized call)
ServerRpcController execController = new ServerRpcController();
Message result = region.execService(execController, serviceCall);
if (execController.getFailedOn() != null) {
throw execController.getFailedOn();
}
return result;
}
/**
* Execute multiple actions on a table: get, mutate, and/or execCoprocessor
*
@ -3418,6 +3424,20 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
if (action.hasGet()) {
Get get = ProtobufUtil.toGet(action.getGet());
r = region.get(get);
} else if (action.hasServiceCall()) {
resultOrExceptionBuilder = ResultOrException.newBuilder();
try {
Message result = execServiceOnRegion(region, action.getServiceCall());
ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
ClientProtos.CoprocessorServiceResult.newBuilder();
resultOrExceptionBuilder.setServiceResult(
serviceResultBuilder.setValue(
serviceResultBuilder.getValueBuilder()
.setName(result.getClass().getName())
.setValue(result.toByteString())));
} catch (IOException ioe) {
resultOrExceptionBuilder.setException(ResponseConverter.buildException(ioe));
}
} else if (action.hasMutation()) {
MutationType type = action.getMutation().getMutateType();
if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
@ -4365,7 +4385,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
case DELETE:
rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
break;
default:
default:
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
}
}

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
@ -66,6 +67,8 @@ import org.apache.hadoop.hbase.rest.model.TableSchemaModel;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
@ -825,4 +828,19 @@ public class RemoteHTable implements HTableInterface {
long amount, boolean writeToWAL) throws IOException {
throw new IOException("incrementColumnValue not supported");
}
@Override
public <R extends Message> Map<byte[], R> batchCoprocessorService(
Descriptors.MethodDescriptor method, Message request,
byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
throw new UnsupportedOperationException("batchCoprocessorService not implemented");
}
@Override
public <R extends Message> void batchCoprocessorService(
Descriptors.MethodDescriptor method, Message request,
byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
throws ServiceException, Throwable {
throw new UnsupportedOperationException("batchCoprocessorService not implemented");
}
}

View File

@ -112,6 +112,7 @@ implements Coprocessor, CoprocessorService {
}
}
}
LOG.info("Returning result " + sumResult);
done.run(SumResponse.newBuilder().setSum(sumResult).build());
}
}

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.SumRequest;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.SumResponse;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
/**
* Test coprocessor endpoint that always returns {@code null} for requests to the last region
* in the table. This allows tests to provide assurance of correct {@code null} handling for
* response values.
*/
public class ColumnAggregationEndpointNullResponse
extends
ColumnAggregationServiceNullResponse
implements Coprocessor, CoprocessorService {
static final Log LOG = LogFactory.getLog(ColumnAggregationEndpointNullResponse.class);
private RegionCoprocessorEnvironment env = null;
@Override
public Service getService() {
return this;
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment)env;
return;
}
throw new CoprocessorException("Must be loaded on a table region!");
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// Nothing to do.
}
@Override
public void sum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
// aggregate at each region
Scan scan = new Scan();
// Family is required in pb. Qualifier is not.
byte[] family = request.getFamily().toByteArray();
byte[] qualifier = request.hasQualifier() ? request.getQualifier().toByteArray() : null;
if (request.hasQualifier()) {
scan.addColumn(family, qualifier);
} else {
scan.addFamily(family);
}
int sumResult = 0;
InternalScanner scanner = null;
try {
HRegion region = this.env.getRegion();
// for the last region in the table, return null to test null handling
if (Bytes.equals(region.getEndKey(), HConstants.EMPTY_END_ROW)) {
done.run(null);
return;
}
scanner = region.getScanner(scan);
List<Cell> curVals = new ArrayList<Cell>();
boolean hasMore = false;
do {
curVals.clear();
hasMore = scanner.next(curVals);
for (Cell kv : curVals) {
if (CellUtil.matchingQualifier(kv, qualifier)) {
sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());
}
}
} while (hasMore);
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
// Set result to -1 to indicate error.
sumResult = -1;
LOG.info("Setting sum result to -1 to indicate error", e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
sumResult = -1;
LOG.info("Setting sum result to -1 to indicate error", e);
}
}
}
done.run(SumResponse.newBuilder().setSum(sumResult).build());
LOG.info("Returning sum " + sumResult + " for region " +
Bytes.toStringBinary(env.getRegion().getRegionName()));
}
}

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.SumRequest;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.SumResponse;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
/**
* Test coprocessor endpoint that always throws a {@link DoNotRetryIOException} for requests on
* the last region in the table. This allows tests to ensure correct error handling of
* coprocessor endpoints throwing exceptions.
*/
public class ColumnAggregationEndpointWithErrors
extends
ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors
implements Coprocessor, CoprocessorService {
static final Log LOG = LogFactory.getLog(ColumnAggregationEndpointWithErrors.class);
private RegionCoprocessorEnvironment env = null;
@Override
public Service getService() {
return this;
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment)env;
return;
}
throw new CoprocessorException("Must be loaded on a table region!");
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// Nothing to do.
}
@Override
public void sum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
// aggregate at each region
Scan scan = new Scan();
// Family is required in pb. Qualifier is not.
byte[] family = request.getFamily().toByteArray();
byte[] qualifier = request.hasQualifier() ? request.getQualifier().toByteArray() : null;
if (request.hasQualifier()) {
scan.addColumn(family, qualifier);
} else {
scan.addFamily(family);
}
int sumResult = 0;
InternalScanner scanner = null;
try {
HRegion region = this.env.getRegion();
// throw an exception for requests to the last region in the table, to test error handling
if (Bytes.equals(region.getEndKey(), HConstants.EMPTY_END_ROW)) {
throw new DoNotRetryIOException("An expected exception");
}
scanner = region.getScanner(scan);
List<Cell> curVals = new ArrayList<Cell>();
boolean hasMore = false;
do {
curVals.clear();
hasMore = scanner.next(curVals);
for (Cell kv : curVals) {
if (CellUtil.matchingQualifier(kv, qualifier)) {
sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());
}
}
} while (hasMore);
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
// Set result to -1 to indicate error.
sumResult = -1;
LOG.info("Setting sum result to -1 to indicate error", e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
sumResult = -1;
LOG.info("Setting sum result to -1 to indicate error", e);
}
}
}
done.run(SumResponse.newBuilder().setSum(sumResult).build());
}
}

View File

@ -0,0 +1,280 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
import com.google.protobuf.HBaseZeroCopyByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.protobuf.ServiceException;
/**
* TestEndpoint: test cases to verify the batch execution of coprocessor Endpoint
*/
@Category(MediumTests.class)
public class TestBatchCoprocessorEndpoint {
private static final Log LOG = LogFactory.getLog(TestBatchCoprocessorEndpoint.class);
private static final TableName TEST_TABLE =
TableName.valueOf("TestTable");
private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
private static byte[] ROW = Bytes.toBytes("testRow");
private static final int ROWSIZE = 20;
private static final int rowSeperator1 = 5;
private static final int rowSeperator2 = 12;
private static byte[][] ROWS = makeN(ROW, ROWSIZE);
private static HBaseTestingUtility util = new HBaseTestingUtility();
@BeforeClass
public static void setupBeforeClass() throws Exception {
// set configure to indicate which cp should be loaded
Configuration conf = util.getConfiguration();
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
ProtobufCoprocessorService.class.getName(),
ColumnAggregationEndpointWithErrors.class.getName(),
ColumnAggregationEndpointNullResponse.class.getName());
conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
ProtobufCoprocessorService.class.getName());
util.startMiniCluster(2);
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
util.waitUntilAllRegionsAssigned(TEST_TABLE);
admin.close();
HTable table = new HTable(conf, TEST_TABLE);
for (int i = 0; i < ROWSIZE; i++) {
Put put = new Put(ROWS[i]);
put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
table.put(put);
}
table.close();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();
}
@Test
public void testAggregationNullResponse() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
ColumnAggregationWithNullResponseProtos.SumRequest.Builder builder =
ColumnAggregationWithNullResponseProtos.SumRequest
.newBuilder();
builder.setFamily(HBaseZeroCopyByteString.wrap(TEST_FAMILY));
if (TEST_QUALIFIER != null && TEST_QUALIFIER.length > 0) {
builder.setQualifier(HBaseZeroCopyByteString.wrap(TEST_QUALIFIER));
}
Map<byte[], ColumnAggregationWithNullResponseProtos.SumResponse> results =
table.batchCoprocessorService(
ColumnAggregationServiceNullResponse.getDescriptor().findMethodByName("sum"),
builder.build(), ROWS[0], ROWS[ROWS.length - 1],
ColumnAggregationWithNullResponseProtos.SumResponse.getDefaultInstance());
int sumResult = 0;
int expectedResult = 0;
for (Map.Entry<byte[], ColumnAggregationWithNullResponseProtos.SumResponse> e :
results.entrySet()) {
LOG.info("Got value " + e.getValue().getSum() + " for region "
+ Bytes.toStringBinary(e.getKey()));
sumResult += e.getValue().getSum();
}
for (int i = 0; i < rowSeperator2; i++) {
expectedResult += i;
}
assertEquals("Invalid result", expectedResult, sumResult);
table.close();
}
private static byte[][] makeN(byte[] base, int n) {
byte[][] ret = new byte[n][];
for (int i = 0; i < n; i++) {
ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
}
return ret;
}
private Map<byte[], SumResponse> sum(final HTable table, final byte[] family,
final byte[] qualifier, final byte[] start, final byte[] end) throws ServiceException,
Throwable {
ColumnAggregationProtos.SumRequest.Builder builder = ColumnAggregationProtos.SumRequest
.newBuilder();
builder.setFamily(HBaseZeroCopyByteString.wrap(family));
if (qualifier != null && qualifier.length > 0) {
builder.setQualifier(HBaseZeroCopyByteString.wrap(qualifier));
}
return table.batchCoprocessorService(
ColumnAggregationProtos.ColumnAggregationService.getDescriptor().findMethodByName("sum"),
builder.build(), start, end, ColumnAggregationProtos.SumResponse.getDefaultInstance());
}
@Test
public void testAggregationWithReturnValue() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
Map<byte[], SumResponse> results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0],
ROWS[ROWS.length - 1]);
int sumResult = 0;
int expectedResult = 0;
for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
LOG.info("Got value " + e.getValue().getSum() + " for region "
+ Bytes.toStringBinary(e.getKey()));
sumResult += e.getValue().getSum();
}
for (int i = 0; i < ROWSIZE; i++) {
expectedResult += i;
}
assertEquals("Invalid result", expectedResult, sumResult);
results.clear();
// scan: for region 2 and region 3
results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1],
ROWS[ROWS.length - 1]);
sumResult = 0;
expectedResult = 0;
for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
LOG.info("Got value " + e.getValue().getSum() + " for region "
+ Bytes.toStringBinary(e.getKey()));
sumResult += e.getValue().getSum();
}
for (int i = rowSeperator1; i < ROWSIZE; i++) {
expectedResult += i;
}
assertEquals("Invalid result", expectedResult, sumResult);
table.close();
}
@Test
public void testAggregation() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
Map<byte[], SumResponse> results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
ROWS[0], ROWS[ROWS.length - 1]);
int sumResult = 0;
int expectedResult = 0;
for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
LOG.info("Got value " + e.getValue().getSum() + " for region "
+ Bytes.toStringBinary(e.getKey()));
sumResult += e.getValue().getSum();
}
for (int i = 0; i < ROWSIZE; i++) {
expectedResult += i;
}
assertEquals("Invalid result", expectedResult, sumResult);
// scan: for region 2 and region 3
results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1], ROWS[ROWS.length - 1]);
sumResult = 0;
expectedResult = 0;
for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
LOG.info("Got value " + e.getValue().getSum() + " for region "
+ Bytes.toStringBinary(e.getKey()));
sumResult += e.getValue().getSum();
}
for (int i = rowSeperator1; i < ROWSIZE; i++) {
expectedResult += i;
}
assertEquals("Invalid result", expectedResult, sumResult);
table.close();
}
@Test
public void testAggregationWithErrors() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
final Map<byte[], ColumnAggregationWithErrorsProtos.SumResponse> results =
Collections.synchronizedMap(
new TreeMap<byte[], ColumnAggregationWithErrorsProtos.SumResponse>(
Bytes.BYTES_COMPARATOR
));
ColumnAggregationWithErrorsProtos.SumRequest.Builder builder =
ColumnAggregationWithErrorsProtos.SumRequest
.newBuilder();
builder.setFamily(HBaseZeroCopyByteString.wrap(TEST_FAMILY));
if (TEST_QUALIFIER != null && TEST_QUALIFIER.length > 0) {
builder.setQualifier(HBaseZeroCopyByteString.wrap(TEST_QUALIFIER));
}
boolean hasError = false;
try {
table.batchCoprocessorService(
ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors.getDescriptor()
.findMethodByName("sum"),
builder.build(), ROWS[0], ROWS[ROWS.length - 1],
ColumnAggregationWithErrorsProtos.SumResponse.getDefaultInstance(),
new Batch.Callback<ColumnAggregationWithErrorsProtos.SumResponse>() {
@Override
public void update(byte[] region, byte[] row,
ColumnAggregationWithErrorsProtos.SumResponse result) {
results.put(region, result);
}
});
} catch (Throwable t) {
LOG.info("Exceptions in coprocessor service", t);
hasError = true;
}
int sumResult = 0;
int expectedResult = 0;
for (Map.Entry<byte[], ColumnAggregationWithErrorsProtos.SumResponse> e : results.entrySet()) {
LOG.info("Got value " + e.getValue().getSum() + " for region "
+ Bytes.toStringBinary(e.getKey()));
sumResult += e.getValue().getSum();
}
for (int i = 0; i < rowSeperator2; i++) {
expectedResult += i;
}
assertEquals("Invalid result", expectedResult, sumResult);
assertTrue(hasError);
table.close();
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Coprocessor test
option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated";
option java_outer_classname = "ColumnAggregationWithNullResponseProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
message SumRequest {
required bytes family = 1;
optional bytes qualifier = 2;
}
message SumResponse {
optional int64 sum = 1;
}
service ColumnAggregationServiceNullResponse {
rpc sum(SumRequest) returns(SumResponse);
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Coprocessor test
option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated";
option java_outer_classname = "ColumnAggregationWithErrorsProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
message SumRequest {
required bytes family = 1;
optional bytes qualifier = 2;
}
message SumResponse {
required int64 sum = 1;
}
service ColumnAggregationServiceWithErrors {
rpc sum(SumRequest) returns(SumResponse);
}