Revert "HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base."

This reverts commit ed87a81b4b.
This commit is contained in:
stack 2016-08-05 15:18:48 -07:00
parent ed87a81b4b
commit 0206dc67d6
49 changed files with 1582 additions and 1400 deletions

View File

@ -18,7 +18,8 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
@ -28,15 +29,26 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Added by HBASE-15745 Refactor of RPC classes to better accept async changes.
* Temporary.
* Implementations call a RegionServer.
* Passed to a {@link RpcRetryingCaller} so we retry on fail.
* TODO: this class is actually tied to one region, because most of the paths make use of
* the regioninfo part of location when building requests. The only reason it works for
* multi-region requests (e.g. batch) is that they happen to not use the region parts.
* This could be done cleaner (e.g. having a generic parameter and 2 derived classes,
* RegionCallable and actual RegionServerCallable with ServerName.
* @param <T> the class that the ServerCallable handles
*/
@InterfaceAudience.Private
abstract class AbstractRegionServerCallable<T> implements RetryingCallable<T> {
// Public because used outside of this package over in ipc.
private static final Log LOG = LogFactory.getLog(AbstractRegionServerCallable.class);
protected final Connection connection;
protected final TableName tableName;
protected final byte[] row;
protected HRegionLocation location;
protected final static int MIN_WAIT_DEAD_SERVER = 10000;
/**
@ -115,7 +127,8 @@ abstract class AbstractRegionServerCallable<T> implements RetryingCallable<T> {
@Override
public void prepare(final boolean reload) throws IOException {
// check table state if this is a retry
if (reload && !tableName.equals(TableName.META_TABLE_NAME) &&
if (reload &&
!tableName.equals(TableName.META_TABLE_NAME) &&
getConnection().isTableDisabled(tableName)) {
throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
}

View File

@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
*/
@InterfaceAudience.Private
public class ClientSimpleScanner extends ClientScanner {
public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name,
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcControllerFactory, ExecutorService pool,

View File

@ -18,10 +18,8 @@
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.ExecutorService;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -31,15 +29,17 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.ExecutorService;
/**
* Client scanner for small scan. Generally, only one RPC is called to fetch the
@ -185,7 +185,7 @@ public class ClientSmallScanner extends ClientSimpleScanner {
}
@Override
protected Result[] call(PayloadCarryingRpcController controller) throws Exception {
public Result[] call(int timeout) throws IOException {
if (this.closed) return null;
if (Thread.interrupted()) {
throw new InterruptedIOException();
@ -193,8 +193,13 @@ public class ClientSmallScanner extends ClientSimpleScanner {
ScanRequest request = RequestConverter.buildScanRequest(getLocation()
.getRegionInfo().getRegionName(), getScan(), getCaching(), true);
ScanResponse response = null;
controller = controllerFactory.newController();
try {
controller.setPriority(getTableName());
controller.setCallTimeout(timeout);
response = getStub().scan(controller, request);
Result[] results = ResponseConverter.getResults(controller.cellScanner(), response);
Result[] results = ResponseConverter.getResults(controller.cellScanner(),
response);
if (response.hasMoreResultsInRegion()) {
setHasMoreResultsContext(true);
setServerHasMoreResults(response.getMoreResultsInRegion());
@ -204,6 +209,9 @@ public class ClientSmallScanner extends ClientSimpleScanner {
// We need to update result metrics since we are overriding call()
updateResultsMetrics(results);
return results;
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
@Override

View File

@ -21,13 +21,17 @@ package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
/**
* A RetryingCallable for RPC connection operations.
* A RetryingCallable for generic connection operations.
* @param <V> return type
*/
abstract class RpcRetryingCallable<V> implements RetryingCallable<V>, Closeable {
abstract class ConnectionCallable<V> implements RetryingCallable<V>, Closeable {
protected Connection connection;
public ConnectionCallable(final Connection connection) {
this.connection = connection;
}
@Override
public void prepare(boolean reload) throws IOException {
}
@ -49,17 +53,4 @@ abstract class RpcRetryingCallable<V> implements RetryingCallable<V>, Closeable
public long sleep(long pause, int tries) {
return ConnectionUtils.getPauseTime(pause, tries);
}
@Override
// Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate
// and so we contain references to protobuf.
public V call(int callTimeout) throws IOException {
try {
return rpcCall(callTimeout);
} catch (Exception e) {
throw ProtobufUtil.handleRemoteException(e);
}
}
protected abstract V rpcCall(int callTimeout) throws Exception;
}

View File

@ -20,6 +20,11 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
@ -63,7 +68,6 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@ -91,11 +95,6 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* Main implementation of {@link Connection} and {@link ClusterConnection} interfaces.
* Encapsulates connection to zookeeper and regionservers.
@ -934,13 +933,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.stub = null;
}
boolean isMasterRunning() throws IOException {
MasterProtos.IsMasterRunningResponse response = null;
try {
response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
} catch (Exception e) {
throw ProtobufUtil.handleRemoteException(e);
}
boolean isMasterRunning() throws ServiceException {
MasterProtos.IsMasterRunningResponse response =
this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
return response != null? response.getIsMasterRunning(): false;
}
}
@ -1063,14 +1058,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
/**
* Once setup, check it works by doing isMasterRunning check.
*/
protected abstract void isMasterRunning() throws IOException;
protected abstract void isMasterRunning() throws ServiceException;
/**
* Create a stub. Try once only. It is not typed because there is no common type to
* protobuf services nor their interfaces. Let the caller do appropriate casting.
* @return A stub for master services.
*/
private Object makeStubNoRetries() throws IOException, KeeperException {
private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
ZooKeeperKeepAliveConnection zkw;
try {
zkw = getKeepAliveZooKeeperWatcher();
@ -1126,7 +1121,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
exceptionCaught = e;
} catch (KeeperException e) {
exceptionCaught = e;
} catch (ServiceException e) {
exceptionCaught = e;
}
throw new MasterNotRunningException(exceptionCaught);
} else {
throw new DoNotRetryIOException("Connection was closed while trying to get master");
@ -1157,12 +1155,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
@Override
protected void isMasterRunning() throws IOException {
try {
protected void isMasterRunning() throws ServiceException {
this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
} catch (Exception e) {
throw ProtobufUtil.handleRemoteException(e);
}
}
}
@ -1707,7 +1701,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
// java.net.ConnectException but they're not declared. So we catch it...
LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
return false;
} catch (IOException se) {
} catch (ServiceException se) {
LOG.warn("Checking master connection", se);
return false;
}

View File

@ -27,18 +27,23 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.protobuf.ServiceException;
/**
* A Callable for flushRegion() RPC.
*/
@InterfaceAudience.Private
public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionResponse> {
private static final Log LOG = LogFactory.getLog(FlushRegionCallable.class);
private final byte[] regionName;
private final boolean writeFlushWalMarker;
private boolean reload;
@ -58,15 +63,19 @@ public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionR
regionInfo.getStartKey(), writeFlushWalMarker);
}
@Override
public FlushRegionResponse call(int callTimeout) throws Exception {
return flushRegion();
}
@Override
public void prepare(boolean reload) throws IOException {
super.prepare(reload);
this.reload = reload;
}
@Override
protected FlushRegionResponse call(PayloadCarryingRpcController controller) throws Exception {
// Check whether we should still do the flush to this region. If the regions are changed due
private FlushRegionResponse flushRegion() throws IOException {
// check whether we should still do the flush to this region. If the regions are changed due
// to splits or merges, etc return success
if (!Bytes.equals(location.getRegionInfo().getRegionName(), regionName)) {
if (!reload) {
@ -84,6 +93,13 @@ public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionR
FlushRegionRequest request =
RequestConverter.buildFlushRegionRequest(regionName, writeFlushWalMarker);
try {
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
return stub.flushRegion(controller, request);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
}

View File

@ -18,6 +18,12 @@
*/
package org.apache.hadoop.hbase.client;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
@ -37,6 +43,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
@ -67,16 +74,6 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import com.google.common.annotations.VisibleForTesting;
// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY.
// Internally, we use shaded protobuf. This below are part of our public API.
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
import com.google.protobuf.Service;
// SEE ABOVE NOTE!
/**
* An implementation of {@link Table}. Used to communicate with a single HBase table.
* Lightweight. Get as needed and just close when done.
@ -414,16 +411,23 @@ public class HTable implements Table {
if (get.getConsistency() == Consistency.STRONG) {
// Good old call.
final Get configuredGet = get;
final Get getReq = get;
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
this.rpcControllerFactory, getName(), get.getRow()) {
getName(), get.getRow()) {
@Override
protected Result call(PayloadCarryingRpcController controller) throws Exception {
ClientProtos.GetRequest request = RequestConverter.buildGetRequest(
getLocation().getRegionInfo().getRegionName(), configuredGet);
public Result call(int callTimeout) throws IOException {
ClientProtos.GetRequest request =
RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
ClientProtos.GetResponse response = getStub().get(controller, request);
if (response == null) return null;
return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
@ -439,6 +443,7 @@ public class HTable implements Table {
return callable.call(operationTimeout);
}
/**
* {@inheritDoc}
*/
@ -449,14 +454,16 @@ public class HTable implements Table {
}
try {
Object[] r1 = new Object[gets.size()];
batch((List<? extends Row>)gets, r1);
// Translate.
batch((List) gets, r1);
// translate.
Result [] results = new Result[r1.length];
int i = 0;
for (Object obj: r1) {
// Batch ensures if there is a failure we get an exception instead
results[i++] = (Result)obj;
int i=0;
for (Object o : r1) {
// batch ensures if there is a failure we get an exception instead
results[i++] = (Result) o;
}
return results;
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
@ -504,13 +511,21 @@ public class HTable implements Table {
public void delete(final Delete delete)
throws IOException {
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
this.rpcControllerFactory, getName(), delete.getRow()) {
tableName, delete.getRow()) {
@Override
protected Boolean call(PayloadCarryingRpcController controller) throws Exception {
public Boolean call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), delete);
MutateResponse response = getStub().mutate(controller, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
@ -566,11 +581,20 @@ public class HTable implements Table {
*/
@Override
public void mutateRow(final RowMutations rm) throws IOException {
final RetryingTimeTracker tracker = new RetryingTimeTracker();
PayloadCarryingServerCallable<MultiResponse> callable =
new PayloadCarryingServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
rpcControllerFactory) {
@Override
protected MultiResponse call(PayloadCarryingRpcController controller) throws Exception {
public MultiResponse call(int callTimeout) throws IOException {
tracker.start();
controller.setPriority(tableName);
int remainingTime = tracker.getRemainingTime(callTimeout);
if (remainingTime == 0) {
throw new DoNotRetryIOException("Timeout for mutate row");
}
controller.setCallTimeout(remainingTime);
try {
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
getLocation().getRegionInfo().getRegionName(), rm);
regionMutationBuilder.setAtomic(true);
@ -583,9 +607,13 @@ public class HTable implements Table {
if (ex instanceof IOException) {
throw (IOException) ex;
}
throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
throw new IOException("Failed to mutate row: " +
Bytes.toStringBinary(rm.getRow()), ex);
}
return ResponseConverter.getResults(request, response, controller.cellScanner());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
@ -596,31 +624,38 @@ public class HTable implements Table {
}
}
private static void checkHasFamilies(final Mutation mutation) throws IOException {
if (mutation.numFamilies() == 0) {
throw new IOException("Invalid arguments to " + mutation + ", zero columns specified");
}
}
/**
* {@inheritDoc}
*/
@Override
public Result append(final Append append) throws IOException {
checkHasFamilies(append);
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
this.rpcControllerFactory, getName(), append.getRow()) {
if (append.numFamilies() == 0) {
throw new IOException(
"Invalid arguments to append, no columns specified");
}
NonceGenerator ng = this.connection.getNonceGenerator();
final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
RegionServerCallable<Result> callable =
new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
@Override
protected Result call(PayloadCarryingRpcController controller) throws Exception {
public Result call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNewNonce());
getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
MutateResponse response = getStub().mutate(controller, request);
if (!response.hasResult()) return null;
return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
return rpcCallerFactory.<Result> newCaller(this.rpcTimeout).
callWithRetries(callable, this.operationTimeout);
return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
/**
@ -628,16 +663,27 @@ public class HTable implements Table {
*/
@Override
public Result increment(final Increment increment) throws IOException {
checkHasFamilies(increment);
if (!increment.hasFamilies()) {
throw new IOException(
"Invalid arguments to increment, no columns specified");
}
NonceGenerator ng = this.connection.getNonceGenerator();
final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
this.rpcControllerFactory, getName(), increment.getRow()) {
getName(), increment.getRow()) {
@Override
protected Result call(PayloadCarryingRpcController controller) throws Exception {
public Result call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNewNonce());
getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
MutateResponse response = getStub().mutate(controller, request);
// Should this check for null like append does?
return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
@ -676,20 +722,28 @@ public class HTable implements Table {
NonceGenerator ng = this.connection.getNonceGenerator();
final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
RegionServerCallable<Long> callable = new RegionServerCallable<Long>(this.connection,
this.rpcControllerFactory, getName(), row) {
RegionServerCallable<Long> callable =
new RegionServerCallable<Long>(connection, getName(), row) {
@Override
protected Long call(PayloadCarryingRpcController controller) throws Exception {
public Long call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildIncrementRequest(
getLocation().getRegionInfo().getRegionName(), row, family,
qualifier, amount, durability, nonceGroup, nonce);
MutateResponse response = getStub().mutate(controller, request);
Result result = ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
Result result =
ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
return rpcCallerFactory.<Long> newCaller(rpcTimeout).
callWithRetries(callable, this.operationTimeout);
return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
/**
@ -700,19 +754,26 @@ public class HTable implements Table {
final byte [] family, final byte [] qualifier, final byte [] value,
final Put put)
throws IOException {
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(this.connection,
this.rpcControllerFactory, getName(), row) {
RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getName(), row) {
@Override
protected Boolean call(PayloadCarryingRpcController controller) throws Exception {
public Boolean call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), CompareType.EQUAL, put);
MutateResponse response = getStub().mutate(controller, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).
callWithRetries(callable, this.operationTimeout);
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
/**
@ -723,42 +784,57 @@ public class HTable implements Table {
final byte [] qualifier, final CompareOp compareOp, final byte [] value,
final Put put)
throws IOException {
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(this.connection,
this.rpcControllerFactory, getName(), row) {
RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getName(), row) {
@Override
protected Boolean call(PayloadCarryingRpcController controller) throws Exception {
public Boolean call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
CompareType compareType = CompareType.valueOf(compareOp.name());
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), compareType, put);
MutateResponse response = getStub().mutate(controller, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).
callWithRetries(callable, this.operationTimeout);
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
/**
* {@inheritDoc}
*/
@Override
public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Delete delete)
public boolean checkAndDelete(final byte [] row,
final byte [] family, final byte [] qualifier, final byte [] value,
final Delete delete)
throws IOException {
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(this.connection,
this.rpcControllerFactory, getName(), row) {
RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getName(), row) {
@Override
protected Boolean call(PayloadCarryingRpcController controller) throws Exception {
public Boolean call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), CompareType.EQUAL, delete);
MutateResponse response = getStub().mutate(controller, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).
callWithRetries(callable, this.operationTimeout);
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
/**
@ -769,16 +845,23 @@ public class HTable implements Table {
final byte [] qualifier, final CompareOp compareOp, final byte [] value,
final Delete delete)
throws IOException {
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(this.connection,
this.rpcControllerFactory, getName(), row) {
RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getName(), row) {
@Override
protected Boolean call(PayloadCarryingRpcController controller) throws Exception {
public Boolean call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
CompareType compareType = CompareType.valueOf(compareOp.name());
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), compareType, delete);
MutateResponse response = getStub().mutate(controller, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
@ -792,11 +875,20 @@ public class HTable implements Table {
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final byte [] value, final RowMutations rm)
throws IOException {
final RetryingTimeTracker tracker = new RetryingTimeTracker();
PayloadCarryingServerCallable<MultiResponse> callable =
new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
rpcControllerFactory) {
@Override
protected MultiResponse call(PayloadCarryingRpcController controller) throws Exception {
public MultiResponse call(int callTimeout) throws IOException {
tracker.start();
controller.setPriority(tableName);
int remainingTime = tracker.getRemainingTime(callTimeout);
if (remainingTime == 0) {
throw new DoNotRetryIOException("Timeout for mutate row");
}
controller.setCallTimeout(remainingTime);
try {
CompareType compareType = CompareType.valueOf(compareOp.name());
MultiRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
@ -805,15 +897,18 @@ public class HTable implements Table {
ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
if (res.hasException()) {
Throwable ex = ProtobufUtil.toException(res.getException());
if (ex instanceof IOException) {
if(ex instanceof IOException) {
throw (IOException)ex;
}
throw new IOException("Failed to checkAndMutate row: "+ Bytes.toStringBinary(rm.getRow()), ex);
throw new IOException("Failed to checkAndMutate row: "+
Bytes.toStringBinary(rm.getRow()), ex);
}
return ResponseConverter.getResults(request, response, controller.cellScanner());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
/**
* Currently, we use one array to store 'processed' flag which is returned by server.
* It is excessive to send such a large array, but that is required by the framework right now
@ -873,6 +968,7 @@ public class HTable implements Table {
}
/**
* {@inheritDoc}
* @throws IOException
*/
void flushCommits() throws IOException {
@ -1049,7 +1145,8 @@ public class HTable implements Table {
for (final byte[] r : keys) {
final RegionCoprocessorRpcChannel channel =
new RegionCoprocessorRpcChannel(connection, tableName, r);
Future<R> future = pool.submit(new Callable<R>() {
Future<R> future = pool.submit(
new Callable<R>() {
@Override
public R call() throws Exception {
T instance = ProtobufUtil.newServiceStub(service, channel);
@ -1113,6 +1210,9 @@ public class HTable implements Table {
return tableName + ";" + connection;
}
/**
* {@inheritDoc}
*/
@Override
public <R extends Message> Map<byte[], R> batchCoprocessorService(
Descriptors.MethodDescriptor methodDescriptor, Message request,
@ -1121,6 +1221,7 @@ public class HTable implements Table {
Bytes.BYTES_COMPARATOR));
batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
new Callback<R>() {
@Override
public void update(byte[] region, byte[] row, R result) {
if (region != null) {

View File

@ -21,24 +21,16 @@ package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
/**
* A RetryingCallable for master operations.
* @param <V> return type
*/
// Like RegionServerCallable
abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
protected ClusterConnection connection;
protected MasterKeepAliveConnection master;
private final PayloadCarryingRpcController rpcController;
MasterCallable(final Connection connection,
final RpcControllerFactory rpcConnectionFactory) {
public MasterCallable(final Connection connection) {
this.connection = (ClusterConnection) connection;
this.rpcController = rpcConnectionFactory.newController();
}
@Override
@ -67,31 +59,4 @@ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
public long sleep(long pause, int tries) {
return ConnectionUtils.getPauseTime(pause, tries);
}
/**
* Override that changes Exception from {@link Exception} to {@link IOException}. It also does
* setup of an rpcController and calls through to the unimplemented
* call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation.
*/
@Override
// Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate
// and so we contain references to protobuf. We can't set priority on the rpcController as
// we do in RegionServerCallable because we don't always have a Table when we call.
public V call(int callTimeout) throws IOException {
try {
this.rpcController.setCallTimeout(callTimeout);
return call(this.rpcController);
} catch (Exception e) {
throw ProtobufUtil.handleRemoteException(e);
}
}
/**
* Run RPC call.
* @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a
* facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this
* class.
* @throws Exception
*/
protected abstract V call(PayloadCarryingRpcController rpcController) throws Exception;
}

View File

@ -33,7 +33,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
* against the master on the MasterProtos.MasterService.BlockingInterface; but not by
* final user code. Hence it's package protected.
*/
interface MasterKeepAliveConnection extends MasterProtos.MasterService.BlockingInterface {
interface MasterKeepAliveConnection
extends MasterProtos.MasterService.BlockingInterface {
// Do this instead of implement Closeable because closeable returning IOE is PITA.
void close();
}

View File

@ -30,9 +30,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@ -42,14 +41,14 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;
/**
* Callable that handles the <code>multi</code> method call going against a single
* regionserver; i.e. A RegionServerCallable for the multi call (It is NOT a
* RegionServerCallable that goes against multiple regions).
* regionserver; i.e. A {@link RegionServerCallable} for the multi call (It is not a
* {@link RegionServerCallable} that goes against multiple regions.
* @param <R>
*/
@InterfaceAudience.Private
class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse> {
private final MultiAction<R> multiAction;
private final boolean cellBlock;
@ -80,7 +79,7 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
}
@Override
protected MultiResponse call(PayloadCarryingRpcController controller) throws Exception {
public MultiResponse call(int callTimeout) throws IOException {
int countOfActions = this.multiAction.size();
if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
@ -99,8 +98,10 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
regionActionBuilder.clear();
regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName));
if (this.cellBlock) {
// Pre-size. Presume at least a KV per Action. There are likely more.
// Presize. Presume at least a KV per Action. There are likely more.
if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
// Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
// They have already been handled above. Guess at count of cells
@ -115,18 +116,18 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
// Controller optionally carries cell data over the proxy/service boundary and also
// optionally ferries cell response data back out again.
PayloadCarryingRpcController payloadCarryingRpcController = null;
if (cells != null) {
// Cast. Will fail if we have been passed wrong RpcController type.
payloadCarryingRpcController = (PayloadCarryingRpcController)controller;
payloadCarryingRpcController.setCellScanner(CellUtil.createCellScanner(cells));
}
if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells));
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
ClientProtos.MultiResponse responseProto;
ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
try {
responseProto = getStub().multi(controller, requestProto);
} catch (ServiceException e) {
throw ProtobufUtil.getRemoteException(e);
}
if (responseProto == null) return null; // Occurs on cancel
return ResponseConverter.getResults(requestProto, responseProto,
payloadCarryingRpcController == null? null: payloadCarryingRpcController.cellScanner());
return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
}
/**

View File

@ -16,51 +16,33 @@
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
/**
* This class is used to unify HTable calls with AsyncProcess Framework. HTable can use
* AsyncProcess directly though this class. Also adds global timeout tracking on top of
* RegionServerCallable and implements Cancellable.
* This class is used to unify HTable calls with AsyncProcess Framework.
* HTable can use AsyncProcess directly though this class.
*/
@InterfaceAudience.Private
abstract class PayloadCarryingServerCallable<T> extends RegionServerCallable<T>
implements Cancellable {
private final RetryingTimeTracker tracker = new RetryingTimeTracker();
public abstract class PayloadCarryingServerCallable<T>
extends RegionServerCallable<T> implements Cancellable {
protected PayloadCarryingRpcController controller;
PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
RpcControllerFactory rpcControllerFactory) {
super(connection, rpcControllerFactory, tableName, row);
}
/* Override so can mess with the callTimeout.
* (non-Javadoc)
* @see org.apache.hadoop.hbase.client.RegionServerCallable#rpcCall(int)
*/
@Override
public T call(int callTimeout) throws IOException {
// It is expected (it seems) that tracker.start can be called multiple times (on each trip
// through the call when retrying). Also, we can call start and no need of a stop.
this.tracker.start();
int remainingTime = tracker.getRemainingTime(callTimeout);
if (remainingTime == 0) {
throw new DoNotRetryIOException("Timeout for mutate row");
}
return super.call(remainingTime);
super(connection, tableName, row);
this.controller = rpcControllerFactory.newController();
}
@Override
public void cancel() {
getRpcController().startCancel();
controller.startCancel();
}
@Override
public boolean isCancelled() {
return getRpcController().isCanceled();
return controller.isCanceled();
}
}

View File

@ -27,30 +27,31 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Similar to RegionServerCallable but for the AdminService interface. This service callable
* Similar to {@link RegionServerCallable} but for the AdminService interface. This service callable
* assumes a Table and row and thus does region locating similar to RegionServerCallable.
* Works against Admin stub rather than Client stub.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD",
justification="stub used by ipc")
@InterfaceAudience.Private
public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<T> {
protected AdminService.BlockingInterface stub;
protected final RpcControllerFactory rpcControllerFactory;
private PayloadCarryingRpcController controller = null;
protected final ClusterConnection connection;
protected final RpcControllerFactory rpcControllerFactory;
protected AdminService.BlockingInterface stub;
protected HRegionLocation location;
protected final TableName tableName;
protected final byte[] row;
protected final int replicaId;
protected final static int MIN_WAIT_DEAD_SERVER = 10000;
public RegionAdminServiceCallable(ClusterConnection connection,
@ -81,13 +82,16 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
if (reload || location == null) {
location = getLocation(!reload);
}
if (location == null) {
// With this exception, there will be a retry.
throw new HBaseIOException(getExceptionMessage());
}
this.setStub(connection.getAdmin(location.getServerName()));
}
@ -163,39 +167,7 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
if (rl == null) {
throw new RetriesExhaustedException("Can't get the locations");
}
return rl;
}
/**
* Override that changes Exception from {@link Exception} to {@link IOException}. It also does
* setup of an rpcController and calls through to the unimplemented
* call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation.
*/
@Override
// Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate
// and so we contain references to protobuf. We can't set priority on the rpcController as
// we do in RegionServerCallable because we don't always have a Table when we call.
public T call(int callTimeout) throws IOException {
this.controller = rpcControllerFactory.newController();
this.controller.setPriority(this.tableName);
this.controller.setCallTimeout(callTimeout);
try {
return call(this.controller);
} catch (Exception e) {
throw ProtobufUtil.handleRemoteException(e);
}
}
PayloadCarryingRpcController getCurrentPayloadCarryingRpcController() {
return this.controller;
}
/**
* Run RPC call.
* @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a
* facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this
* class.
* @throws Exception
*/
protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception;
}

View File

@ -1,4 +1,5 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -23,20 +24,12 @@ import java.io.IOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
/**
* Implementations make an rpc call against a RegionService via a protobuf Service.
* Implement #rpcCall(RpcController) and then call {@link #call(int)} to
* trigger the rpc. The {@link #call(int)} eventually invokes your
* #rpcCall(RpcController) meanwhile saving you having to write a bunch of
* boilerplate. The {@link #call(int)} implementation is from {@link RpcRetryingCaller} so rpcs are
* retried on fail.
*
* <p>TODO: this class is actually tied to one region, because most of the paths make use of
* Implementations call a RegionServer and implement {@link #call(int)}.
* Passed to a {@link RpcRetryingCaller} so we retry on fail.
* TODO: this class is actually tied to one region, because most of the paths make use of
* the regioninfo part of location when building requests. The only reason it works for
* multi-region requests (e.g. batch) is that they happen to not use the region parts.
* This could be done cleaner (e.g. having a generic parameter and 2 derived classes,
@ -44,27 +37,18 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
* @param <T> the class that the ServerCallable handles
*/
@InterfaceAudience.Private
public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> {
public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> implements
RetryingCallable<T> {
private ClientService.BlockingInterface stub;
private final PayloadCarryingRpcController rpcController;
/**
* @param connection Connection to use.
* @param tableName Table name to which <code>row</code> belongs.
* @param row The row we want in <code>tableName</code>.
*/
public RegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory,
TableName tableName, byte [] row) {
this(connection, rpcControllerFactory.newController(), tableName, row);
}
public RegionServerCallable(Connection connection, PayloadCarryingRpcController rpcController,
TableName tableName, byte [] row) {
public RegionServerCallable(Connection connection, TableName tableName, byte [] row) {
super(connection, tableName, row);
this.rpcController = rpcController;
if (this.rpcController != null) {
this.rpcController.setPriority(tableName);
}
}
void setClientByServiceName(ServerName service) throws IOException {
@ -85,42 +69,4 @@ public abstract class RegionServerCallable<T> extends AbstractRegionServerCallab
void setStub(final ClientService.BlockingInterface stub) {
this.stub = stub;
}
/**
* Override that changes Exception from {@link Exception} to {@link IOException}. It also does
* setup of an rpcController and calls through to the unimplemented
* call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation.
*/
@Override
public T call(int callTimeout) throws IOException {
if (this.rpcController != null) {
this.rpcController.setCallTimeout(callTimeout);
}
try {
return call(this.rpcController);
} catch (Exception e) {
throw ProtobufUtil.handleRemoteException(e);
}
}
/**
* Run RPC call.
* @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a
* facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this
* class.
* @throws Exception
*/
protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception;
public PayloadCarryingRpcController getRpcController() {
return this.rpcController;
}
long getNonceGroup() {
return getConnection().getNonceGenerator().getNonceGroup();
}
long getNewNonce() {
return getConnection().getNonceGenerator().newNonce();
}
}

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
* Tracks the amount of time remaining for an operation.
*/
class RetryingTimeTracker {
private long globalStartTime = -1;
public void start() {
@ -37,19 +38,16 @@ class RetryingTimeTracker {
if (callTimeout == Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
long remaining = EnvironmentEdgeManager.currentTime() - this.globalStartTime;
long remainingTime = callTimeout - remaining;
int remainingTime = (int) (
callTimeout -
(EnvironmentEdgeManager.currentTime() - this.globalStartTime));
if (remainingTime < 1) {
// If there is no time left, we're trying anyway. It's too late.
// 0 means no timeout, and it's not the intent here. So we secure both cases by
// resetting to the minimum.
remainingTime = 1;
}
if (remainingTime > Integer.MAX_VALUE) {
throw new RuntimeException("remainingTime=" + remainingTime +
" which is > Integer.MAX_VALUE");
}
return (int)remainingTime;
return remainingTime;
}
}

View File

@ -176,7 +176,7 @@ public class ReversedScannerCallable extends ScannerCallable {
@Override
public ScannerCallable getScannerCallableForReplica(int id) {
ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), this.tableName,
ReversedScannerCallable r = new ReversedScannerCallable(this.cConnection, this.tableName,
this.getScan(), this.scanMetrics, this.locateStartRow, controllerFactory, id);
r.setCaching(this.getCaching());
return r;

View File

@ -22,6 +22,9 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import java.io.IOException;
/**
*
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface RpcRetryingCaller<T> {

View File

@ -36,7 +36,6 @@ public class RpcRetryingCallerFactory {
private final int rpcTimeout;
private final RetryingCallerInterceptor interceptor;
private final int startLogErrorsCnt;
/* These below data members are UNUSED!!!*/
private final boolean enableBackPressure;
private ServerStatisticTracker stats;

View File

@ -29,6 +29,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
@ -44,6 +46,8 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.protobuf.ServiceException;
/**
* Caller that goes to replica if the primary region does no answer within a configurable
@ -53,6 +57,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
*/
@InterfaceAudience.Private
public class RpcRetryingCallerWithReadReplicas {
private static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class);
protected final ExecutorService pool;
protected final ClusterConnection cConnection;
protected final Configuration conf;
@ -92,7 +98,7 @@ public class RpcRetryingCallerWithReadReplicas {
private final PayloadCarryingRpcController controller;
public ReplicaRegionServerCallable(int id, HRegionLocation location) {
super(RpcRetryingCallerWithReadReplicas.this.cConnection, rpcControllerFactory,
super(RpcRetryingCallerWithReadReplicas.this.cConnection,
RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
this.id = id;
this.location = location;
@ -135,20 +141,28 @@ public class RpcRetryingCallerWithReadReplicas {
}
@Override
protected Result call(PayloadCarryingRpcController controller) throws Exception {
public Result call(int callTimeout) throws Exception {
if (controller.isCanceled()) return null;
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
byte[] reg = location.getRegionInfo().getRegionName();
ClientProtos.GetRequest request =
RequestConverter.buildGetRequest(reg, get);
controller.setCallTimeout(callTimeout);
try {
ClientProtos.GetResponse response = getStub().get(controller, request);
if (response == null) {
return null;
}
return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
@Override

View File

@ -52,6 +52,9 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.DNS;
import com.google.protobuf.ServiceException;
import com.google.protobuf.TextFormat;
/**
* Scanner operations such as create, next, etc.
* Used by {@link ResultScanner}s made by {@link Table}. Passed to a retrying caller such as
@ -71,6 +74,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
protected boolean renew = false;
private Scan scan;
private int caching = 1;
protected final ClusterConnection cConnection;
protected ScanMetrics scanMetrics;
private boolean logScannerActivity = false;
private int logCutOffLatency = 1000;
@ -121,8 +125,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
*/
public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
super(connection, rpcControllerFactory, tableName, scan.getStartRow());
super(connection, tableName, scan.getStartRow());
this.id = id;
this.cConnection = connection;
this.scan = scan;
this.scanMetrics = scanMetrics;
Configuration conf = connection.getConfiguration();
@ -180,16 +185,25 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
}
}
protected Result [] call(PayloadCarryingRpcController controller) throws Exception {
@Override
public Result [] call(int callTimeout) throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
if (this.closed) {
if (this.scannerId != -1) {
if (controller == null) {
controller = controllerFactory.newController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
}
if (closed) {
if (scannerId != -1) {
close();
}
} else {
if (this.scannerId == -1L) {
if (scannerId == -1L) {
this.scannerId = openScanner();
} else {
Result [] rrs = null;
@ -198,9 +212,11 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
setHeartbeatMessage(false);
try {
incRPCcallsMetrics();
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
request =
RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
this.scanMetrics != null, renew);
ScanResponse response = null;
try {
response = getStub().scan(controller, request);
// Client and RS maintain a nextCallSeq number during the scan. Every next() call
// from client to server will increment this number in both sides. Client passes this
@ -228,8 +244,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
updateServerSideMetrics(response);
// moreResults is only used for the case where a filter exhausts all elements
if (response.hasMoreResults() && !response.getMoreResults()) {
this.scannerId = -1L;
this.closed = true;
scannerId = -1L;
closed = true;
// Implied that no results were returned back, either.
return null;
}
@ -243,11 +259,14 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
// Server didn't respond whether it has more results or not.
setHasMoreResultsContext(false);
}
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
updateResultsMetrics(rrs);
} catch (IOException e) {
if (logScannerActivity) {
LOG.info("Got exception making request " + ProtobufUtil.toText(request) + " to " +
getLocation(), e);
LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
+ " to " + getLocation(), e);
}
IOException ioe = e;
if (e instanceof RemoteException) {
@ -257,8 +276,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
try {
HRegionLocation location =
getConnection().relocateRegion(getTableName(), scan.getStartRow());
LOG.info("Scanner=" + scannerId + " expired, current region location is " +
location.toString());
LOG.info("Scanner=" + scannerId
+ " expired, current region location is " + location.toString());
} catch (Throwable t) {
LOG.info("Failed to relocate region", t);
}
@ -357,8 +376,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
try {
getStub().scan(controller, request);
} catch (Exception e) {
throw ProtobufUtil.handleRemoteException(e);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
} catch (IOException e) {
LOG.warn("Ignore, probably already closed", e);
@ -368,8 +387,10 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
protected long openScanner() throws IOException {
incRPCcallsMetrics();
ScanRequest request = RequestConverter.buildScanRequest(
getLocation().getRegionInfo().getRegionName(), this.scan, 0, false);
ScanRequest request =
RequestConverter.buildScanRequest(
getLocation().getRegionInfo().getRegionName(),
this.scan, 0, false);
try {
ScanResponse response = getStub().scan(controller, request);
long id = response.getScannerId();
@ -378,8 +399,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
+ " on region " + getLocation().toString());
}
return id;
} catch (Exception e) {
throw ProtobufUtil.handleRemoteException(e);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
@ -422,6 +443,11 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
return caching;
}
@Override
public ClusterConnection getConnection() {
return cConnection;
}
/**
* Set the number of rows that will be fetched on next
* @param caching the number of rows for caching

View File

@ -22,9 +22,6 @@ import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -41,33 +38,39 @@ import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.token.Token;
import com.google.protobuf.ServiceException;
/**
* Client proxy for SecureBulkLoadProtocol
*/
@InterfaceAudience.Private
public class SecureBulkLoadClient {
private Table table;
private final RpcControllerFactory rpcControllerFactory;
public SecureBulkLoadClient(final Configuration conf, Table table) {
public SecureBulkLoadClient(Table table) {
this.table = table;
this.rpcControllerFactory = new RpcControllerFactory(conf);
}
public String prepareBulkLoad(final Connection conn) throws IOException {
try {
RegionServerCallable<String> callable = new RegionServerCallable<String>(conn,
this.rpcControllerFactory, table.getName(), HConstants.EMPTY_START_ROW) {
RegionServerCallable<String> callable =
new RegionServerCallable<String>(conn, table.getName(), HConstants.EMPTY_START_ROW) {
@Override
protected String call(PayloadCarryingRpcController controller) throws Exception {
public String call(int callTimeout) throws IOException {
byte[] regionName = getLocation().getRegionInfo().getRegionName();
RegionSpecifier region =
RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
PrepareBulkLoadRequest request = PrepareBulkLoadRequest.newBuilder()
RequestConverter
.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
try {
PrepareBulkLoadRequest request =
PrepareBulkLoadRequest.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(table.getName()))
.setRegion(region).build();
PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request);
return response.getBulkToken();
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
@ -79,16 +82,21 @@ public class SecureBulkLoadClient {
public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException {
try {
RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn,
this.rpcControllerFactory, table.getName(), HConstants.EMPTY_START_ROW) {
RegionServerCallable<Void> callable =
new RegionServerCallable<Void>(conn, table.getName(), HConstants.EMPTY_START_ROW) {
@Override
protected Void call(PayloadCarryingRpcController controller) throws Exception {
public Void call(int callTimeout) throws IOException {
byte[] regionName = getLocation().getRegionInfo().getRegionName();
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
try {
CleanupBulkLoadRequest request =
CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bulkToken).build();
CleanupBulkLoadRequest.newBuilder().setRegion(region)
.setBulkToken(bulkToken).build();
getStub().cleanupBulkLoad(null, request);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
return null;
}
};
@ -122,8 +130,8 @@ public class SecureBulkLoadClient {
try {
BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
return response.getLoaded();
} catch (Exception se) {
throw ProtobufUtil.handleRemoteException(se);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}

View File

@ -77,4 +77,5 @@ public class MasterCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
}
return response;
}
}

View File

@ -17,39 +17,24 @@
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
/**
* Optionally carries Cells across the proxy/service interface down into ipc. On its
* way out it optionally carries a set of result Cell data. We stick the Cells here when we want
* to avoid having to protobuf them (for performance reasons). This class is used ferrying data
* across the proxy/protobuf service chasm. Also does call timeout. Used by client and server
* ipc'ing.
* to avoid having to protobuf them. This class is used ferrying data across the proxy/protobuf
* service chasm. Used by client and server ipc'ing.
*/
@InterfaceAudience.Private
public class PayloadCarryingRpcController implements RpcController, CellScannable {
/**
* The time, in ms before the call should expire.
*/
protected volatile Integer callTimeout;
protected volatile boolean cancelled = false;
protected final AtomicReference<RpcCallback<Object>> cancellationCb = new AtomicReference<>(null);
protected final AtomicReference<RpcCallback<IOException>> failureCb = new AtomicReference<>(null);
private IOException exception;
public class PayloadCarryingRpcController
extends TimeLimitedRpcController implements CellScannable {
public static final int PRIORITY_UNSET = -1;
/**
@ -107,20 +92,6 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl
(tn != null && tn.isSystemTable())? HConstants.SYSTEMTABLE_QOS: HConstants.NORMAL_QOS;
}
/**
* @param regionName RegionName. If hbase:meta, we'll set high priority.
*/
public void setPriority(final byte [] regionName) {
if (isMetaRegion(regionName)) {
setPriority(TableName.META_TABLE_NAME);
}
}
private static boolean isMetaRegion(final byte[] regionName) {
return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
|| Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
}
/**
* @return The priority of this request
*/
@ -128,103 +99,9 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl
return priority;
}
@Override
public void reset() {
@Override public void reset() {
super.reset();
priority = 0;
cellScanner = null;
exception = null;
cancelled = false;
failureCb.set(null);
cancellationCb.set(null);
callTimeout = null;
}
public int getCallTimeout() {
if (callTimeout != null) {
return callTimeout;
} else {
return 0;
}
}
public void setCallTimeout(int callTimeout) {
this.callTimeout = callTimeout;
}
public boolean hasCallTimeout(){
return callTimeout != null;
}
@Override
public String errorText() {
if (exception != null) {
return exception.getMessage();
} else {
return null;
}
}
/**
* For use in async rpc clients
* @return true if failed
*/
@Override
public boolean failed() {
return this.exception != null;
}
@Override
public boolean isCanceled() {
return cancelled;
}
@Override
public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
this.cancellationCb.set(cancellationCb);
if (this.cancelled) {
cancellationCb.run(null);
}
}
/**
* Notify a callback on error.
* For use in async rpc clients
*
* @param failureCb the callback to call on error
*/
public void notifyOnFail(RpcCallback<IOException> failureCb) {
this.failureCb.set(failureCb);
if (this.exception != null) {
failureCb.run(this.exception);
}
}
@Override
public void setFailed(String reason) {
this.exception = new IOException(reason);
if (this.failureCb.get() != null) {
this.failureCb.get().run(this.exception);
}
}
/**
* Set failed with an exception to pass on.
* For use in async rpc clients
*
* @param e exception to set with
*/
public void setFailed(IOException e) {
this.exception = e;
if (this.failureCb.get() != null) {
this.failureCb.get().run(this.exception);
}
}
@Override
public void startCancel() {
cancelled = true;
if (cancellationCb.get() != null) {
cancellationCb.get().run(null);
}
}
}

View File

@ -76,23 +76,30 @@ public class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
Descriptors.MethodDescriptor method, Message request, Message responsePrototype)
throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Call: " + method.getName() + ", " + request.toString());
LOG.trace("Call: "+method.getName()+", "+request.toString());
}
if (row == null) {
throw new IllegalArgumentException("Missing row property for remote region location");
}
final RpcController rpcController = controller == null
? rpcControllerFactory.newController() : controller;
final ClientProtos.CoprocessorServiceCall call =
CoprocessorRpcUtils.buildServiceCall(row, method, request);
RegionServerCallable<CoprocessorServiceResponse> callable =
new RegionServerCallable<CoprocessorServiceResponse>(connection,
controller == null? this.rpcControllerFactory.newController():
(PayloadCarryingRpcController)controller,
table, row) {
new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
@Override
protected CoprocessorServiceResponse call(PayloadCarryingRpcController controller)
throws Exception {
public CoprocessorServiceResponse call(int callTimeout) throws Exception {
if (rpcController instanceof PayloadCarryingRpcController) {
((PayloadCarryingRpcController) rpcController).setPriority(tableName);
}
if (rpcController instanceof TimeLimitedRpcController) {
((TimeLimitedRpcController) rpcController).setCallTimeout(callTimeout);
}
byte[] regionName = getLocation().getRegionInfo().getRegionName();
return ProtobufUtil.execService(getRpcController(), getStub(), call, regionName);
return ProtobufUtil.execService(rpcController, getStub(), call, regionName);
}
};
CoprocessorServiceResponse result = rpcCallerFactory.<CoprocessorServiceResponse> newCaller()

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.Private
public class TimeLimitedRpcController implements RpcController {
/**
* The time, in ms before the call should expire.
*/
protected volatile Integer callTimeout;
protected volatile boolean cancelled = false;
protected final AtomicReference<RpcCallback<Object>> cancellationCb =
new AtomicReference<>(null);
protected final AtomicReference<RpcCallback<IOException>> failureCb =
new AtomicReference<>(null);
private IOException exception;
public int getCallTimeout() {
if (callTimeout != null) {
return callTimeout;
} else {
return 0;
}
}
public void setCallTimeout(int callTimeout) {
this.callTimeout = callTimeout;
}
public boolean hasCallTimeout(){
return callTimeout != null;
}
@Override
public String errorText() {
if (exception != null) {
return exception.getMessage();
} else {
return null;
}
}
/**
* For use in async rpc clients
* @return true if failed
*/
@Override
public boolean failed() {
return this.exception != null;
}
@Override
public boolean isCanceled() {
return cancelled;
}
@Override
public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
this.cancellationCb.set(cancellationCb);
if (this.cancelled) {
cancellationCb.run(null);
}
}
/**
* Notify a callback on error.
* For use in async rpc clients
*
* @param failureCb the callback to call on error
*/
public void notifyOnFail(RpcCallback<IOException> failureCb) {
this.failureCb.set(failureCb);
if (this.exception != null) {
failureCb.run(this.exception);
}
}
@Override
public void reset() {
exception = null;
cancelled = false;
failureCb.set(null);
cancellationCb.set(null);
callTimeout = null;
}
@Override
public void setFailed(String reason) {
this.exception = new IOException(reason);
if (this.failureCb.get() != null) {
this.failureCb.get().run(this.exception);
}
}
/**
* Set failed with an exception to pass on.
* For use in async rpc clients
*
* @param e exception to set with
*/
public void setFailed(IOException e) {
this.exception = e;
if (this.failureCb.get() != null) {
this.failureCb.get().run(this.exception);
}
}
@Override
public void startCancel() {
cancelled = true;
if (cancellationCb.get() != null) {
cancellationCb.get().run(null);
}
}
}

View File

@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hbase.protobuf;
import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier
.RegionSpecifierType.REGION_NAME;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@ -41,11 +38,14 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier
.RegionSpecifierType.REGION_NAME;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -125,8 +124,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.LiveServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionInTransition;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
import org.apache.hadoop.hbase.protobuf.generated.FSProtos.HBaseVersionFileContent;
import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
@ -172,9 +171,11 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.DynamicClassLoader;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Methods;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import com.google.common.collect.ArrayListMultimap;
@ -333,32 +334,17 @@ public final class ProtobufUtil {
* a new IOException that wraps the unexpected ServiceException.
*/
public static IOException getRemoteException(ServiceException se) {
return makeIOExceptionOfException(se);
Throwable e = se.getCause();
if (e == null) {
return new IOException(se);
}
/**
* Like {@link #getRemoteException(ServiceException)} but more generic, able to handle more than
* just {@link ServiceException}. Prefer this method to
* {@link #getRemoteException(ServiceException)} because trying to
* contain direct protobuf references.
* @param e
*/
public static IOException handleRemoteException(Exception e) {
return makeIOExceptionOfException(e);
if (ExceptionUtil.isInterrupt(e)) {
return ExceptionUtil.asInterrupt(e);
}
private static IOException makeIOExceptionOfException(Exception e) {
Throwable t = e;
if (e instanceof ServiceException) {
t = e.getCause();
if (e instanceof RemoteException) {
e = ((RemoteException) e).unwrapRemoteException();
}
if (ExceptionUtil.isInterrupt(t)) {
return ExceptionUtil.asInterrupt(t);
}
if (t instanceof RemoteException) {
t = ((RemoteException)t).unwrapRemoteException();
}
return t instanceof IOException? (IOException)t: new HBaseIOException(t);
return e instanceof IOException ? (IOException) e : new IOException(se);
}
/**
@ -1266,6 +1252,7 @@ public final class ProtobufUtil {
return toMutation(type, mutation, builder, HConstants.NO_NONCE);
}
@SuppressWarnings("deprecation")
public static MutationProto toMutation(final MutationType type, final Mutation mutation,
MutationProto.Builder builder, long nonce)
throws IOException {
@ -2671,11 +2658,13 @@ public final class ProtobufUtil {
}
}
@SuppressWarnings("deprecation")
public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] family,
List<Path> inputPaths, List<Path> outputPaths, Path storeDir) {
return toCompactionDescriptor(info, null, family, inputPaths, outputPaths, storeDir);
}
@SuppressWarnings("deprecation")
public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] regionName,
byte[] family, List<Path> inputPaths, List<Path> outputPaths, Path storeDir) {
// compaction descriptor contains relative paths.
@ -3674,28 +3663,4 @@ public final class ProtobufUtil {
return new RegionLoadStats(stats.getMemstoreLoad(), stats.getHeapOccupancy(),
stats.getCompactionPressure());
}
/**
* @param msg
* @return A String version of the passed in <code>msg</code>
*/
public static String toText(Message msg) {
return TextFormat.shortDebugString(msg);
}
public static byte [] toBytes(ByteString bs) {
return bs.toByteArray();
}
/**
* Contain ServiceException inside here. Take a callable that is doing our pb rpc and run it.
* @throws IOException
*/
public static <T> T call(Callable<T> callable) throws IOException {
try {
return callable.call();
} catch (Exception e) {
throw ProtobufUtil.handleRemoteException(e);
}
}
}

View File

@ -65,6 +65,7 @@ public class TestClientScanner {
RpcControllerFactory controllerFactory;
@Before
@SuppressWarnings("deprecation")
public void setup() throws IOException {
clusterConn = Mockito.mock(ClusterConnection.class);
rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);

View File

@ -45,5 +45,4 @@ public class HBaseIOException extends IOException {
public HBaseIOException(Throwable cause) {
super(cause);
}
}
}}

View File

@ -75,17 +75,20 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -96,6 +99,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;

View File

@ -87,8 +87,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
@ -104,6 +102,7 @@ import org.apache.hadoop.util.ToolRunner;
/**
* Tool to load the output of HFileOutputFormat into an existing table.
* @see #usage()
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
@ -131,13 +130,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private String bulkToken;
private UserProvider userProvider;
private int nrThreads;
private RpcControllerFactory rpcControllerFactory;
private LoadIncrementalHFiles() {}
public LoadIncrementalHFiles(Configuration conf) throws Exception {
super(conf);
this.rpcControllerFactory = new RpcControllerFactory(conf);
initialize();
}
@ -325,7 +322,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
// LQI queue does not need to be threadsafe -- all operations on this queue
// happen in this thread
Deque<LoadQueueItem> queue = new LinkedList<>();
SecureBulkLoadClient secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
SecureBulkLoadClient secureClient = new SecureBulkLoadClient(table);
try {
/*
@ -476,11 +473,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
/**
* Used by the replication sink to load the hfiles from the source cluster. It does the following,
* <ol>
* <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
* <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
* </li>
* </ol>
* 1. {@link LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)} 2.
* {@link
* LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)}
* @param table Table to which these hfiles should be loaded to
* @param conn Connection to use
* @param queue {@link LoadQueueItem} has hfiles yet to be loaded
@ -781,23 +776,27 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
throws IOException {
final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size());
final List<Pair<byte[], String>> famPaths =
new ArrayList<>(lqis.size());
for (LoadQueueItem lqi : lqis) {
famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
}
final RegionServerCallable<Boolean> svrCallable = new RegionServerCallable<Boolean>(conn,
rpcControllerFactory, tableName, first) {
final RegionServerCallable<Boolean> svrCallable =
new RegionServerCallable<Boolean>(conn, tableName, first) {
@Override
protected Boolean call(PayloadCarryingRpcController controller) throws Exception {
public Boolean call(int callTimeout) throws Exception {
SecureBulkLoadClient secureClient = null;
boolean success = false;
try {
LOG.debug("Going to connect to server " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
byte[] regionName = getLocation().getRegionInfo().getRegionName();
try (Table table = conn.getTable(getTableName())) {
secureClient = new SecureBulkLoadClient(getConf(), table);
success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
secureClient = new SecureBulkLoadClient(table);
success =
secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
assignSeqIds, fsDelegationToken.getUserToken(), bulkToken);
}
return success;
@ -1079,7 +1078,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
/**
* Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
* used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes
* used only when {@link SecureBulkLoadEndpoint} is configured in hbase.coprocessor.region.classes
* property. This directory is used as a temporary directory where all files are initially
* copied/moved from user given directory, set all the required file permissions and then from
* their it is finally loaded into a table. This should be set only when, one would like to manage
@ -1089,4 +1088,5 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
public void setBulkToken(String stagingDir) {
this.bulkToken = stagingDir;
}
}

View File

@ -35,6 +35,8 @@ import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import com.google.protobuf.ServiceException;
/**
* The Class ExpiredMobFileCleanerChore for running cleaner regularly to remove the expired
* mob files.
@ -84,6 +86,10 @@ public class ExpiredMobFileCleanerChore extends ScheduledChore {
} catch (LockTimeoutException e) {
LOG.info("Fail to acquire the lock because of timeout, maybe a"
+ " MobCompactor is running", e);
} catch (ServiceException e) {
LOG.error(
"Fail to clean the expired mob files for the column " + hcd.getNameAsString()
+ " in the table " + htd.getNameAsString(), e);
} catch (IOException e) {
LOG.error(
"Fail to clean the expired mob files for the column " + hcd.getNameAsString()

View File

@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
@ -454,7 +454,8 @@ public class ServerManager {
/**
* Adds the onlineServers list. onlineServers should be locked.
* @param serverName The remote servers name.
* @param s
* @param sl
* @return Server load from the removed server, if any.
*/
@VisibleForTesting
void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {

View File

@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.google.protobuf.ServiceException;
/**
* The cleaner to delete the expired MOB files.
*/
@ -58,8 +60,11 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
* directory.
* @param tableName The current table name.
* @param family The current family.
* @throws ServiceException
* @throws IOException
*/
public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family) throws IOException {
public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family)
throws ServiceException, IOException {
Configuration conf = getConf();
TableName tn = TableName.valueOf(tableName);
FileSystem fs = FileSystem.get(conf);
@ -94,7 +99,7 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
String tableName = args[0];
String familyName = args[1];
TableName tn = TableName.valueOf(tableName);
HBaseAdmin.available(getConf());
HBaseAdmin.checkHBaseAvailable(getConf());
Connection connection = ConnectionFactory.createConnection(getConf());
Admin admin = connection.getAdmin();
try {
@ -122,4 +127,5 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
}
}
}
}

View File

@ -38,6 +38,8 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.KeeperException;
import com.google.protobuf.ServiceException;
/**
* The sweep tool. It deletes the mob files that are not used and merges the small mob files to
* bigger ones. Each run of this sweep tool only handles one column family. The runs on
@ -62,10 +64,10 @@ public class Sweeper extends Configured implements Tool {
* @throws ServiceException
*/
int sweepFamily(String tableName, String familyName) throws IOException, InterruptedException,
ClassNotFoundException, KeeperException {
ClassNotFoundException, KeeperException, ServiceException {
Configuration conf = getConf();
// make sure the target HBase exists.
HBaseAdmin.available(conf);
HBaseAdmin.checkHBaseAvailable(conf);
Connection connection = ConnectionFactory.createConnection(getConf());
Admin admin = connection.getAdmin();
try {

View File

@ -87,6 +87,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.ipc.TimeLimitedRpcController;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@ -2764,15 +2765,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
timeLimitDelta =
scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
}
if (controller instanceof PayloadCarryingRpcController) {
PayloadCarryingRpcController pRpcController =
(PayloadCarryingRpcController)controller;
if (pRpcController.getCallTimeout() > 0) {
timeLimitDelta = Math.min(timeLimitDelta, pRpcController.getCallTimeout());
if (controller instanceof TimeLimitedRpcController) {
TimeLimitedRpcController timeLimitedRpcController =
(TimeLimitedRpcController)controller;
if (timeLimitedRpcController.getCallTimeout() > 0) {
timeLimitDelta = Math.min(timeLimitDelta,
timeLimitedRpcController.getCallTimeout());
}
} else {
throw new UnsupportedOperationException("We only do " +
"PayloadCarryingRpcControllers! FIX IF A PROBLEM");
}
// Use half of whichever timeout value was more restrictive... But don't allow
// the time limit to be less than the allowable minimum (could cause an

View File

@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@ -42,6 +44,7 @@ import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
@ -58,8 +61,10 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
*/
@InterfaceAudience.Private
public class WALEditsReplaySink {
private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
private static final int MAX_BATCH_SIZE = 1024;
private final Configuration conf;
private final ClusterConnection conn;
private final TableName tableName;
@ -161,8 +166,8 @@ public class WALEditsReplaySink {
try {
RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null);
ReplayServerCallable<ReplicateWALEntryResponse> callable =
new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.rpcControllerFactory,
this.tableName, regionLoc, entries);
new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
regionInfo, entries);
factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout);
} catch (IOException ie) {
if (skipErrors) {
@ -179,19 +184,31 @@ public class WALEditsReplaySink {
* @param <R>
*/
class ReplayServerCallable<R> extends RegionServerCallable<ReplicateWALEntryResponse> {
private HRegionInfo regionInfo;
private List<Entry> entries;
ReplayServerCallable(final Connection connection, RpcControllerFactory rpcControllerFactory,
final TableName tableName, final HRegionLocation regionLoc, final List<Entry> entries) {
super(connection, rpcControllerFactory, tableName, null);
ReplayServerCallable(final Connection connection, final TableName tableName,
final HRegionLocation regionLoc, final HRegionInfo regionInfo,
final List<Entry> entries) {
super(connection, tableName, null);
this.entries = entries;
this.regionInfo = regionInfo;
setLocation(regionLoc);
}
@Override
protected ReplicateWALEntryResponse call(PayloadCarryingRpcController controller)
throws Exception {
if (entries.isEmpty()) return null;
public ReplicateWALEntryResponse call(int callTimeout) throws IOException {
try {
replayToServer(this.regionInfo, this.entries);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
return null;
}
private void replayToServer(HRegionInfo regionInfo, List<Entry> entries)
throws IOException, ServiceException {
if (entries.isEmpty()) return;
Entry[] entriesArray = new Entry[entries.size()];
entriesArray = entries.toArray(entriesArray);
@ -199,8 +216,12 @@ public class WALEditsReplaySink {
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray);
controller.setCellScanner(p.getSecond());
return remoteSvr.replay(controller, p.getFirst());
PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
try {
remoteSvr.replay(controller, p.getFirst());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
@Override

View File

@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
@ -45,21 +46,27 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RetryingCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
import org.apache.hadoop.hbase.replication.BaseWALEntryFilter;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
@ -67,17 +74,12 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
import org.apache.hadoop.util.StringUtils;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.protobuf.ServiceException;
/**
* A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint
@ -609,8 +611,9 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
* Calls replay on the passed edits for the given set of entries belonging to the region. It skips
* the entry if the region boundaries have changed or the region is gone.
*/
static class RegionReplicaReplayCallable extends
RegionAdminServiceCallable<ReplicateWALEntryResponse> {
static class RegionReplicaReplayCallable
extends RegionAdminServiceCallable<ReplicateWALEntryResponse> {
private final List<Entry> entries;
private final byte[] initialEncodedRegionName;
private final AtomicLong skippedEntries;
@ -625,25 +628,38 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
}
public ReplicateWALEntryResponse call(PayloadCarryingRpcController controller) throws Exception {
// Check whether we should still replay this entry. If the regions are changed, or the
@Override
public ReplicateWALEntryResponse call(int timeout) throws IOException {
return replayToServer(this.entries, timeout);
}
private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout)
throws IOException {
// check whether we should still replay this entry. If the regions are changed, or the
// entry is not coming form the primary region, filter it out because we do not need it.
// Regions can change because of (1) region split (2) region merge (3) table recreated
boolean skip = false;
if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
initialEncodedRegionName)) {
skip = true;
}
if (!this.entries.isEmpty() && !skip) {
Entry[] entriesArray = new Entry[this.entries.size()];
entriesArray = this.entries.toArray(entriesArray);
if (!entries.isEmpty() && !skip) {
Entry[] entriesArray = new Entry[entries.size()];
entriesArray = entries.toArray(entriesArray);
// set the region name for the target region replica
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location
.getRegionInfo().getEncodedNameAsBytes(), null, null, null);
controller.setCellScanner(p.getSecond());
try {
PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
controller.setCallTimeout(timeout);
controller.setPriority(tableName);
return stub.replay(controller, p.getFirst());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
if (skip) {

View File

@ -23,18 +23,19 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@ -79,11 +80,13 @@ public class Merge extends Configured implements Tool {
// Verify HBase is down
LOG.info("Verifying that HBase is not running...");
try {
HBaseAdmin.available(getConf());
HBaseAdmin.checkHBaseAvailable(getConf());
LOG.fatal("HBase cluster must be off-line, and is not. Aborting.");
return -1;
} catch (ZooKeeperConnectionException zkce) {
// If no zk, presume no master.
} catch (MasterNotRunningException e) {
// Expected. Ignore.
}
// Initialize MetaUtils and and get the root of the HBase installation

View File

@ -60,6 +60,7 @@ public class TestNamespace {
private static ZKNamespaceManager zkNamespaceManager;
private String prefix = "TestNamespace";
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
@ -300,8 +301,7 @@ public class TestNamespace {
runWithExpectedException(new Callable<Void>() {
@Override
public Void call() throws Exception {
HTableDescriptor htd =
new HTableDescriptor(TableName.valueOf("non_existing_namespace", "table1"));
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("non_existing_namespace", "table1"));
htd.addFamily(new HColumnDescriptor("family1"));
admin.createTable(htd);
return null;
@ -387,4 +387,5 @@ public class TestNamespace {
}
fail("Should have thrown exception " + exceptionClass);
}
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ProcedureInfo;
@ -58,6 +59,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -65,6 +67,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.protobuf.ServiceException;
/**
* Class to test HBaseAdmin.
@ -639,9 +643,11 @@ public class TestAdmin2 {
long start = System.currentTimeMillis();
try {
HBaseAdmin.available(conf);
HBaseAdmin.checkHBaseAvailable(conf);
assertTrue(false);
} catch (MasterNotRunningException ignored) {
} catch (ZooKeeperConnectionException ignored) {
} catch (ServiceException ignored) {
} catch (IOException ignored) {
}
long end = System.currentTimeMillis();

View File

@ -28,10 +28,13 @@ import java.net.UnknownHostException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
@ -53,6 +56,7 @@ import com.google.protobuf.ServiceException;
@Category({MediumTests.class, ClientTests.class})
public class TestClientTimeouts {
private static final Log LOG = LogFactory.getLog(TestClientTimeouts.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected static int SLAVES = 1;
@ -83,6 +87,7 @@ public class TestClientTimeouts {
*/
@Test
public void testAdminTimeout() throws Exception {
Connection lastConnection = null;
boolean lastFailed = false;
int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get();
RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
@ -100,7 +105,7 @@ public class TestClientTimeouts {
connection = ConnectionFactory.createConnection(conf);
admin = connection.getAdmin();
// run some admin commands
HBaseAdmin.available(conf);
HBaseAdmin.checkHBaseAvailable(conf);
admin.setBalancerRunning(false, false);
} catch (ZooKeeperConnectionException ex) {
// Since we are randomly throwing SocketTimeoutExceptions, it is possible to get

View File

@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.master.HMaster;
@ -104,6 +103,8 @@ public class TestHCM {
TableName.valueOf("test2");
private static final TableName TABLE_NAME3 =
TableName.valueOf("test3");
private static final TableName TABLE_NAME4 =
TableName.valueOf("test4");
private static final byte[] FAM_NAM = Bytes.toBytes("f");
private static final byte[] ROW = Bytes.toBytes("bbb");
private static final byte[] ROW_X = Bytes.toBytes("xxx");
@ -406,11 +407,10 @@ public class TestHCM {
long pauseTime;
long baseTime = 100;
TableName tableName = TableName.valueOf("HCM-testCallableSleep");
Table table = TEST_UTIL.createTable(tableName, FAM_NAM);
RegionServerCallable<Object> regionServerCallable = new RegionServerCallable<Object>(
TEST_UTIL.getConnection(), new RpcControllerFactory(TEST_UTIL.getConfiguration()),
tableName, ROW) {
@Override
protected Object call(PayloadCarryingRpcController controller) throws Exception {
TEST_UTIL.getConnection(), tableName, ROW) {
public Object call(int timeout) throws IOException {
return null;
}
};
@ -424,10 +424,9 @@ public class TestHCM {
RegionAdminServiceCallable<Object> regionAdminServiceCallable =
new RegionAdminServiceCallable<Object>(
(ClusterConnection) TEST_UTIL.getConnection(),
new RpcControllerFactory(TEST_UTIL.getConfiguration()), tableName, ROW) {
@Override
public Object call(PayloadCarryingRpcController controller) throws Exception {
(ClusterConnection) TEST_UTIL.getConnection(), new RpcControllerFactory(
TEST_UTIL.getConfiguration()), tableName, ROW) {
public Object call(int timeout) throws IOException {
return null;
}
};
@ -439,22 +438,17 @@ public class TestHCM {
assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
}
MasterCallable<Object> masterCallable = new MasterCallable<Object>(TEST_UTIL.getConnection(),
new RpcControllerFactory(TEST_UTIL.getConfiguration())) {
@Override
protected Object call(PayloadCarryingRpcController rpcController) throws Exception {
MasterCallable masterCallable = new MasterCallable(TEST_UTIL.getConnection()) {
public Object call(int timeout) throws IOException {
return null;
}
};
try {
for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
pauseTime = masterCallable.sleep(baseTime, i);
assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
}
} finally {
masterCallable.close();
}
}
private void testConnectionClose(boolean allowsInterrupt) throws Exception {
@ -1155,6 +1149,7 @@ public class TestHCM {
ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(timeMachine);
try {
long timeBase = timeMachine.currentTime();
long largeAmountOfTime = ANY_PAUSE * 1000;
ConnectionImplementation.ServerErrorTracker tracker =
new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100);

View File

@ -35,8 +35,6 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@ -334,21 +332,20 @@ public class TestReplicaWithCluster {
// bulk load HFiles
LOG.debug("Loading test data");
@SuppressWarnings("deprecation")
final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection();
table = conn.getTable(hdt.getTableName());
final String bulkToken =
new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn);
RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn,
new RpcControllerFactory(HTU.getConfiguration()), hdt.getTableName(),
TestHRegionServerBulkLoad.rowkey(0)) {
final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn);
RegionServerCallable<Void> callable = new RegionServerCallable<Void>(
conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) {
@Override
protected Void call(PayloadCarryingRpcController controller) throws Exception {
public Void call(int timeout) throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()));
SecureBulkLoadClient secureClient = null;
byte[] regionName = getLocation().getRegionInfo().getRegionName();
try (Table table = conn.getTable(getTableName())) {
secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table);
secureClient = new SecureBulkLoadClient(table);
secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
true, null, bulkToken);
}

View File

@ -62,8 +62,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
@ -200,20 +198,19 @@ public class TestHRegionServerBulkLoad {
}
// bulk load HFiles
final ClusterConnection conn = (ClusterConnection)UTIL.getConnection();
final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
Table table = conn.getTable(tableName);
final String bulkToken = new SecureBulkLoadClient(UTIL.getConfiguration(), table).
prepareBulkLoad(conn);
RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn,
new RpcControllerFactory(UTIL.getConfiguration()), tableName, Bytes.toBytes("aaa")) {
final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn);
RegionServerCallable<Void> callable =
new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
@Override
public Void call(PayloadCarryingRpcController controller) throws Exception {
public Void call(int callTimeout) throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()));
SecureBulkLoadClient secureClient = null;
byte[] regionName = getLocation().getRegionInfo().getRegionName();
try (Table table = conn.getTable(getTableName())) {
secureClient = new SecureBulkLoadClient(UTIL.getConfiguration(), table);
secureClient = new SecureBulkLoadClient(table);
secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
true, null, bulkToken);
}
@ -227,15 +224,15 @@ public class TestHRegionServerBulkLoad {
// Periodically do compaction to reduce the number of open file handles.
if (numBulkLoads.get() % 5 == 0) {
// 5 * 50 = 250 open file handles!
callable = new RegionServerCallable<Void>(conn,
new RpcControllerFactory(UTIL.getConfiguration()), tableName, Bytes.toBytes("aaa")) {
callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
@Override
protected Void call(PayloadCarryingRpcController controller) throws Exception {
public Void call(int callTimeout) throws Exception {
LOG.debug("compacting " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()));
AdminProtos.AdminService.BlockingInterface server =
conn.getAdmin(getLocation().getServerName());
CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(
CompactRegionRequest request =
RequestConverter.buildCompactRegionRequest(
getLocation().getRegionInfo().getRegionName(), true, null);
server.compactRegion(null, request);
numCompactions.incrementAndGet();

View File

@ -33,8 +33,6 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
@ -91,12 +89,10 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
// bulk load HFiles
final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
RegionServerCallable<Void> callable =
new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName,
Bytes.toBytes("aaa")) {
new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
@Override
protected Void call(PayloadCarryingRpcController controller) throws Exception {
public Void call(int callTimeout) throws Exception {
LOG.info("Non-secure old client");
byte[] regionName = getLocation().getRegionInfo().getRegionName();
BulkLoadHFileRequest request =
@ -113,10 +109,9 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
// Periodically do compaction to reduce the number of open file handles.
if (numBulkLoads.get() % 5 == 0) {
// 5 * 50 = 250 open file handles!
callable = new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName,
Bytes.toBytes("aaa")) {
callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
@Override
protected Void call(PayloadCarryingRpcController controller) throws Exception {
public Void call(int callTimeout) throws Exception {
LOG.debug("compacting " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()));
AdminProtos.AdminService.BlockingInterface server =

View File

@ -33,13 +33,13 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -62,8 +62,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
super(duration);
}
private static final Log LOG =
LogFactory.getLog(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
@BeforeClass
public static void setUpBeforeClass() throws IOException {
@ -104,17 +103,16 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
Table table = conn.getTable(tableName);
final String bulkToken = new SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName);
RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
RegionServerCallable<Void> callable =
new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName,
Bytes.toBytes("aaa")) {
new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
@Override
protected Void call(PayloadCarryingRpcController controller) throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row " +
Bytes.toStringBinary(getRow()));
public Void call(int callTimeout) throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()));
try (Table table = conn.getTable(getTableName())) {
boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths,
null, bulkToken, getLocation().getRegionInfo().getStartKey());
boolean loaded =
new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths, null,
bulkToken, getLocation().getRegionInfo().getStartKey());
}
return null;
}
@ -126,10 +124,9 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
// Periodically do compaction to reduce the number of open file handles.
if (numBulkLoads.get() % 5 == 0) {
// 5 * 50 = 250 open file handles!
callable = new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName,
Bytes.toBytes("aaa")) {
callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
@Override
protected Void call(PayloadCarryingRpcController controller) throws Exception {
public Void call(int callTimeout) throws Exception {
LOG.debug("compacting " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()));
AdminProtos.AdminService.BlockingInterface server =

View File

@ -17,6 +17,8 @@
package org.apache.hadoop.hbase.spark;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
@ -35,8 +37,6 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ByteString;
/**
* This filter will push down all qualifier logic given to us