HBASE-12668 Adapt PayloadCarryingRpcController so it can also be used in an async way
Signed-off-by: stack <stack@apache.org> Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
This commit is contained in:
parent
f0541fceed
commit
54381aab11
|
@ -19,17 +19,29 @@
|
|||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.protobuf.BlockingRpcChannel;
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.codec.Codec;
|
||||
import org.apache.hadoop.hbase.codec.KeyValueCodec;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.PoolMap;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
|
||||
/**
|
||||
|
@ -174,4 +186,118 @@ public abstract class AbstractRpcClient implements RpcClient {
|
|||
protected static int getPoolSize(Configuration config) {
|
||||
return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Make a blocking call. Throws exceptions if there are network problems or if the remote code
|
||||
* threw an exception.
|
||||
* @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
|
||||
* {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
|
||||
* new Connection each time.
|
||||
* @return A pair with the Message response and the Cell data (if any).
|
||||
*/
|
||||
Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc,
|
||||
Message param, Message returnType, final User ticket, final InetSocketAddress isa)
|
||||
throws ServiceException {
|
||||
long startTime = 0;
|
||||
if (LOG.isTraceEnabled()) {
|
||||
startTime = EnvironmentEdgeManager.currentTime();
|
||||
}
|
||||
int callTimeout = 0;
|
||||
CellScanner cells = null;
|
||||
if (pcrc != null) {
|
||||
callTimeout = pcrc.getCallTimeout();
|
||||
cells = pcrc.cellScanner();
|
||||
// Clear it here so we don't by mistake try and these cells processing results.
|
||||
pcrc.setCellScanner(null);
|
||||
}
|
||||
Pair<Message, CellScanner> val;
|
||||
try {
|
||||
val = call(pcrc, md, param, cells, returnType, ticket, isa, callTimeout,
|
||||
pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);
|
||||
if (pcrc != null) {
|
||||
// Shove the results into controller so can be carried across the proxy/pb service void.
|
||||
if (val.getSecond() != null) pcrc.setCellScanner(val.getSecond());
|
||||
} else if (val.getSecond() != null) {
|
||||
throw new ServiceException("Client dropping data on the floor!");
|
||||
}
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
long callTime = EnvironmentEdgeManager.currentTime() - startTime;
|
||||
LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
|
||||
}
|
||||
return val.getFirst();
|
||||
} catch (Throwable e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Make a call, passing <code>param</code>, to the IPC server running at
|
||||
* <code>address</code> which is servicing the <code>protocol</code> protocol,
|
||||
* with the <code>ticket</code> credentials, returning the value.
|
||||
* Throws exceptions if there are network problems or if the remote code
|
||||
* threw an exception.
|
||||
* @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
|
||||
* {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
|
||||
* new Connection each time.
|
||||
* @return A pair with the Message response and the Cell data (if any).
|
||||
* @throws InterruptedException
|
||||
* @throws java.io.IOException
|
||||
*/
|
||||
protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
|
||||
Descriptors.MethodDescriptor md, Message param, CellScanner cells,
|
||||
Message returnType, User ticket, InetSocketAddress addr, int callTimeout, int priority) throws
|
||||
IOException, InterruptedException;
|
||||
|
||||
/**
|
||||
* Creates a "channel" that can be used by a blocking protobuf service. Useful setting up
|
||||
* protobuf blocking stubs.
|
||||
* @return A blocking rpc channel that goes via this rpc client instance.
|
||||
*/
|
||||
@Override
|
||||
public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn,
|
||||
final User ticket, int defaultOperationTimeout) {
|
||||
return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocking rpc channel that goes via hbase rpc.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
|
||||
private final InetSocketAddress isa;
|
||||
private final AbstractRpcClient rpcClient;
|
||||
private final User ticket;
|
||||
private final int defaultOperationTimeout;
|
||||
|
||||
/**
|
||||
* @param defaultOperationTimeout - the default timeout when no timeout is given
|
||||
* by the caller.
|
||||
*/
|
||||
protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient,
|
||||
final ServerName sn, final User ticket, int defaultOperationTimeout) {
|
||||
this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
|
||||
this.rpcClient = rpcClient;
|
||||
this.ticket = ticket;
|
||||
this.defaultOperationTimeout = defaultOperationTimeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
|
||||
Message param, Message returnType) throws ServiceException {
|
||||
PayloadCarryingRpcController pcrc;
|
||||
if (controller != null) {
|
||||
pcrc = (PayloadCarryingRpcController) controller;
|
||||
if (!pcrc.hasCallTimeout()){
|
||||
pcrc.setCallTimeout(defaultOperationTimeout);
|
||||
}
|
||||
} else {
|
||||
pcrc = new PayloadCarryingRpcController();
|
||||
pcrc.setCallTimeout(defaultOperationTimeout);
|
||||
}
|
||||
|
||||
return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -96,4 +96,10 @@ public class PayloadCarryingRpcController
|
|||
public int getPriority() {
|
||||
return priority;
|
||||
}
|
||||
|
||||
@Override public void reset() {
|
||||
super.reset();
|
||||
priority = 0;
|
||||
cellScanner = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,14 +19,10 @@
|
|||
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.protobuf.BlockingRpcChannel;
|
||||
import com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.Message.Builder;
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
|
@ -1144,7 +1140,8 @@ public class RpcClientImpl extends AbstractRpcClient {
|
|||
* @throws InterruptedException
|
||||
* @throws IOException
|
||||
*/
|
||||
Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,
|
||||
@Override
|
||||
protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,
|
||||
Message param, CellScanner cells,
|
||||
Message returnType, User ticket, InetSocketAddress addr, int callTimeout, int priority)
|
||||
throws IOException, InterruptedException {
|
||||
|
@ -1285,99 +1282,4 @@ public class RpcClientImpl extends AbstractRpcClient {
|
|||
|
||||
return connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Make a blocking call. Throws exceptions if there are network problems or if the remote code
|
||||
* threw an exception.
|
||||
* @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
|
||||
* {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
|
||||
* new Connection each time.
|
||||
* @return A pair with the Message response and the Cell data (if any).
|
||||
*/
|
||||
Message callBlockingMethod(MethodDescriptor md, PayloadCarryingRpcController pcrc,
|
||||
Message param, Message returnType, final User ticket, final InetSocketAddress isa)
|
||||
throws ServiceException {
|
||||
long startTime = 0;
|
||||
if (LOG.isTraceEnabled()) {
|
||||
startTime = EnvironmentEdgeManager.currentTime();
|
||||
}
|
||||
int callTimeout = 0;
|
||||
CellScanner cells = null;
|
||||
if (pcrc != null) {
|
||||
callTimeout = pcrc.getCallTimeout();
|
||||
cells = pcrc.cellScanner();
|
||||
// Clear it here so we don't by mistake try and these cells processing results.
|
||||
pcrc.setCellScanner(null);
|
||||
}
|
||||
Pair<Message, CellScanner> val;
|
||||
try {
|
||||
val = call(pcrc, md, param, cells, returnType, ticket, isa, callTimeout,
|
||||
pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);
|
||||
if (pcrc != null) {
|
||||
// Shove the results into controller so can be carried across the proxy/pb service void.
|
||||
if (val.getSecond() != null) pcrc.setCellScanner(val.getSecond());
|
||||
} else if (val.getSecond() != null) {
|
||||
throw new ServiceException("Client dropping data on the floor!");
|
||||
}
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
long callTime = EnvironmentEdgeManager.currentTime() - startTime;
|
||||
LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
|
||||
}
|
||||
return val.getFirst();
|
||||
} catch (Throwable e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a "channel" that can be used by a blocking protobuf service. Useful setting up
|
||||
* protobuf blocking stubs.
|
||||
* @return A blocking rpc channel that goes via this rpc client instance.
|
||||
*/
|
||||
@Override
|
||||
public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn,
|
||||
final User ticket, int defaultOperationTimeout) {
|
||||
return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocking rpc channel that goes via hbase rpc.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
|
||||
private final InetSocketAddress isa;
|
||||
private final RpcClientImpl rpcClient;
|
||||
private final User ticket;
|
||||
private final int defaultOperationTimeout;
|
||||
|
||||
/**
|
||||
* @param defaultOperationTimeout - the default timeout when no timeout is given
|
||||
* by the caller.
|
||||
*/
|
||||
protected BlockingRpcChannelImplementation(final RpcClientImpl rpcClient, final ServerName sn,
|
||||
final User ticket, int defaultOperationTimeout) {
|
||||
this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
|
||||
this.rpcClient = rpcClient;
|
||||
this.ticket = ticket;
|
||||
this.defaultOperationTimeout = defaultOperationTimeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message callBlockingMethod(MethodDescriptor md, RpcController controller,
|
||||
Message param, Message returnType) throws ServiceException {
|
||||
PayloadCarryingRpcController pcrc;
|
||||
if (controller != null) {
|
||||
pcrc = (PayloadCarryingRpcController) controller;
|
||||
if (!pcrc.hasCallTimeout()){
|
||||
pcrc.setCallTimeout(defaultOperationTimeout);
|
||||
}
|
||||
} else {
|
||||
pcrc = new PayloadCarryingRpcController();
|
||||
pcrc.setCallTimeout(defaultOperationTimeout);
|
||||
}
|
||||
|
||||
return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
@ -36,6 +37,11 @@ public class TimeLimitedRpcController implements RpcController {
|
|||
protected final AtomicReference<RpcCallback<Object>> cancellationCb =
|
||||
new AtomicReference<RpcCallback<Object>>(null);
|
||||
|
||||
protected final AtomicReference<RpcCallback<IOException>> failureCb =
|
||||
new AtomicReference<RpcCallback<IOException>>(null);
|
||||
|
||||
private IOException exception;
|
||||
|
||||
public Integer getCallTimeout() {
|
||||
return callTimeout;
|
||||
}
|
||||
|
@ -50,12 +56,20 @@ public class TimeLimitedRpcController implements RpcController {
|
|||
|
||||
@Override
|
||||
public String errorText() {
|
||||
throw new UnsupportedOperationException();
|
||||
if (exception != null) {
|
||||
return exception.getMessage();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For use in async rpc clients
|
||||
* @return true if failed
|
||||
*/
|
||||
@Override
|
||||
public boolean failed() {
|
||||
throw new UnsupportedOperationException();
|
||||
return this.exception != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -66,16 +80,52 @@ public class TimeLimitedRpcController implements RpcController {
|
|||
@Override
|
||||
public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
|
||||
this.cancellationCb.set(cancellationCb);
|
||||
if (this.cancelled) {
|
||||
cancellationCb.run(null);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify a callback on error.
|
||||
* For use in async rpc clients
|
||||
*
|
||||
* @param failureCb the callback to call on error
|
||||
*/
|
||||
public void notifyOnFail(RpcCallback<IOException> failureCb) {
|
||||
this.failureCb.set(failureCb);
|
||||
if (this.exception != null) {
|
||||
failureCb.run(this.exception);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
throw new UnsupportedOperationException();
|
||||
exception = null;
|
||||
cancelled = false;
|
||||
failureCb.set(null);
|
||||
cancellationCb.set(null);
|
||||
callTimeout = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFailed(String arg0) {
|
||||
throw new UnsupportedOperationException();
|
||||
public void setFailed(String reason) {
|
||||
this.exception = new IOException(reason);
|
||||
if (this.failureCb.get() != null) {
|
||||
this.failureCb.get().run(this.exception);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set failed with an exception to pass on.
|
||||
* For use in async rpc clients
|
||||
*
|
||||
* @param e exception to set with
|
||||
*/
|
||||
public void setFailed(IOException e) {
|
||||
this.exception = e;
|
||||
if (this.failureCb.get() != null) {
|
||||
this.failureCb.get().run(this.exception);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClientImpl;
|
||||
|
@ -145,7 +146,7 @@ public class TestClientTimeouts {
|
|||
* Blocking rpc channel that goes via hbase rpc.
|
||||
*/
|
||||
static class RandomTimeoutBlockingRpcChannel
|
||||
extends RpcClientImpl.BlockingRpcChannelImplementation {
|
||||
extends AbstractRpcClient.BlockingRpcChannelImplementation {
|
||||
private static final Random RANDOM = new Random(System.currentTimeMillis());
|
||||
public static final double CHANCE_OF_TIMEOUT = 0.3;
|
||||
private static AtomicInteger invokations = new AtomicInteger();
|
||||
|
|
Loading…
Reference in New Issue