HBASE-12668 Adapt PayloadCarryingRpcController so it can also be used in an async way

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Jurriaan Mous 2014-12-10 15:08:42 +01:00 committed by stack
parent f72c3ef34d
commit 3275b964c1
5 changed files with 191 additions and 108 deletions

View File

@ -19,17 +19,29 @@
package org.apache.hadoop.hbase.ipc;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.io.compress.CompressionCodec;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
/**
@ -174,4 +186,118 @@ public abstract class AbstractRpcClient implements RpcClient {
protected static int getPoolSize(Configuration config) {
return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
}
/**
* Make a blocking call. Throws exceptions if there are network problems or if the remote code
* threw an exception.
* @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
* {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
* new Connection each time.
* @return A pair with the Message response and the Cell data (if any).
*/
Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc,
Message param, Message returnType, final User ticket, final InetSocketAddress isa)
throws ServiceException {
long startTime = 0;
if (LOG.isTraceEnabled()) {
startTime = EnvironmentEdgeManager.currentTime();
}
int callTimeout = 0;
CellScanner cells = null;
if (pcrc != null) {
callTimeout = pcrc.getCallTimeout();
cells = pcrc.cellScanner();
// Clear it here so we don't by mistake try and these cells processing results.
pcrc.setCellScanner(null);
}
Pair<Message, CellScanner> val;
try {
val = call(pcrc, md, param, cells, returnType, ticket, isa, callTimeout,
pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);
if (pcrc != null) {
// Shove the results into controller so can be carried across the proxy/pb service void.
if (val.getSecond() != null) pcrc.setCellScanner(val.getSecond());
} else if (val.getSecond() != null) {
throw new ServiceException("Client dropping data on the floor!");
}
if (LOG.isTraceEnabled()) {
long callTime = EnvironmentEdgeManager.currentTime() - startTime;
LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
}
return val.getFirst();
} catch (Throwable e) {
throw new ServiceException(e);
}
}
/**
* Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code> which is servicing the <code>protocol</code> protocol,
* with the <code>ticket</code> credentials, returning the value.
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
* @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
* {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
* new Connection each time.
* @return A pair with the Message response and the Cell data (if any).
* @throws InterruptedException
* @throws java.io.IOException
*/
protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
Descriptors.MethodDescriptor md, Message param, CellScanner cells,
Message returnType, User ticket, InetSocketAddress addr, int callTimeout, int priority) throws
IOException, InterruptedException;
/**
* Creates a "channel" that can be used by a blocking protobuf service. Useful setting up
* protobuf blocking stubs.
* @return A blocking rpc channel that goes via this rpc client instance.
*/
@Override
public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn,
final User ticket, int defaultOperationTimeout) {
return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
}
/**
* Blocking rpc channel that goes via hbase rpc.
*/
@VisibleForTesting
public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
private final InetSocketAddress isa;
private final AbstractRpcClient rpcClient;
private final User ticket;
private final int defaultOperationTimeout;
/**
* @param defaultOperationTimeout - the default timeout when no timeout is given
* by the caller.
*/
protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient,
final ServerName sn, final User ticket, int defaultOperationTimeout) {
this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
this.rpcClient = rpcClient;
this.ticket = ticket;
this.defaultOperationTimeout = defaultOperationTimeout;
}
@Override
public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
Message param, Message returnType) throws ServiceException {
PayloadCarryingRpcController pcrc;
if (controller != null) {
pcrc = (PayloadCarryingRpcController) controller;
if (!pcrc.hasCallTimeout()){
pcrc.setCallTimeout(defaultOperationTimeout);
}
} else {
pcrc = new PayloadCarryingRpcController();
pcrc.setCallTimeout(defaultOperationTimeout);
}
return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
}
}
}

View File

@ -96,4 +96,10 @@ public class PayloadCarryingRpcController
public int getPriority() {
return priority;
}
@Override public void reset() {
super.reset();
priority = 0;
cellScanner = null;
}
}

View File

@ -19,14 +19,10 @@
package org.apache.hadoop.hbase.ipc;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.Message.Builder;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@ -1144,7 +1140,8 @@ public class RpcClientImpl extends AbstractRpcClient {
* @throws InterruptedException
* @throws IOException
*/
Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,
@Override
protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,
Message param, CellScanner cells,
Message returnType, User ticket, InetSocketAddress addr, int callTimeout, int priority)
throws IOException, InterruptedException {
@ -1285,99 +1282,4 @@ public class RpcClientImpl extends AbstractRpcClient {
return connection;
}
/**
* Make a blocking call. Throws exceptions if there are network problems or if the remote code
* threw an exception.
* @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
* {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
* new Connection each time.
* @return A pair with the Message response and the Cell data (if any).
*/
Message callBlockingMethod(MethodDescriptor md, PayloadCarryingRpcController pcrc,
Message param, Message returnType, final User ticket, final InetSocketAddress isa)
throws ServiceException {
long startTime = 0;
if (LOG.isTraceEnabled()) {
startTime = EnvironmentEdgeManager.currentTime();
}
int callTimeout = 0;
CellScanner cells = null;
if (pcrc != null) {
callTimeout = pcrc.getCallTimeout();
cells = pcrc.cellScanner();
// Clear it here so we don't by mistake try and these cells processing results.
pcrc.setCellScanner(null);
}
Pair<Message, CellScanner> val;
try {
val = call(pcrc, md, param, cells, returnType, ticket, isa, callTimeout,
pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);
if (pcrc != null) {
// Shove the results into controller so can be carried across the proxy/pb service void.
if (val.getSecond() != null) pcrc.setCellScanner(val.getSecond());
} else if (val.getSecond() != null) {
throw new ServiceException("Client dropping data on the floor!");
}
if (LOG.isTraceEnabled()) {
long callTime = EnvironmentEdgeManager.currentTime() - startTime;
LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
}
return val.getFirst();
} catch (Throwable e) {
throw new ServiceException(e);
}
}
/**
* Creates a "channel" that can be used by a blocking protobuf service. Useful setting up
* protobuf blocking stubs.
* @return A blocking rpc channel that goes via this rpc client instance.
*/
@Override
public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn,
final User ticket, int defaultOperationTimeout) {
return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
}
/**
* Blocking rpc channel that goes via hbase rpc.
*/
@VisibleForTesting
public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
private final InetSocketAddress isa;
private final RpcClientImpl rpcClient;
private final User ticket;
private final int defaultOperationTimeout;
/**
* @param defaultOperationTimeout - the default timeout when no timeout is given
* by the caller.
*/
protected BlockingRpcChannelImplementation(final RpcClientImpl rpcClient, final ServerName sn,
final User ticket, int defaultOperationTimeout) {
this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
this.rpcClient = rpcClient;
this.ticket = ticket;
this.defaultOperationTimeout = defaultOperationTimeout;
}
@Override
public Message callBlockingMethod(MethodDescriptor md, RpcController controller,
Message param, Message returnType) throws ServiceException {
PayloadCarryingRpcController pcrc;
if (controller != null) {
pcrc = (PayloadCarryingRpcController) controller;
if (!pcrc.hasCallTimeout()){
pcrc.setCallTimeout(defaultOperationTimeout);
}
} else {
pcrc = new PayloadCarryingRpcController();
pcrc.setCallTimeout(defaultOperationTimeout);
}
return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
}
}
}

View File

@ -18,8 +18,7 @@
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -38,6 +37,11 @@ public class TimeLimitedRpcController implements RpcController {
protected final AtomicReference<RpcCallback<Object>> cancellationCb =
new AtomicReference<RpcCallback<Object>>(null);
protected final AtomicReference<RpcCallback<IOException>> failureCb =
new AtomicReference<RpcCallback<IOException>>(null);
private IOException exception;
public Integer getCallTimeout() {
return callTimeout;
}
@ -52,12 +56,20 @@ public class TimeLimitedRpcController implements RpcController {
@Override
public String errorText() {
throw new UnsupportedOperationException();
if (exception != null) {
return exception.getMessage();
} else {
return null;
}
}
/**
* For use in async rpc clients
* @return true if failed
*/
@Override
public boolean failed() {
throw new UnsupportedOperationException();
return this.exception != null;
}
@Override
@ -68,16 +80,52 @@ public class TimeLimitedRpcController implements RpcController {
@Override
public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
this.cancellationCb.set(cancellationCb);
if (this.cancelled) {
cancellationCb.run(null);
}
}
/**
* Notify a callback on error.
* For use in async rpc clients
*
* @param failureCb the callback to call on error
*/
public void notifyOnFail(RpcCallback<IOException> failureCb) {
this.failureCb.set(failureCb);
if (this.exception != null) {
failureCb.run(this.exception);
}
}
@Override
public void reset() {
throw new UnsupportedOperationException();
exception = null;
cancelled = false;
failureCb.set(null);
cancellationCb.set(null);
callTimeout = null;
}
@Override
public void setFailed(String arg0) {
throw new UnsupportedOperationException();
public void setFailed(String reason) {
this.exception = new IOException(reason);
if (this.failureCb.get() != null) {
this.failureCb.get().run(this.exception);
}
}
/**
* Set failed with an exception to pass on.
* For use in async rpc clients
*
* @param e exception to set with
*/
public void setFailed(IOException e) {
this.exception = e;
if (this.failureCb.get() != null) {
this.failureCb.get().run(this.exception);
}
}
@Override

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcClientImpl;
@ -146,7 +147,7 @@ public class TestClientTimeouts {
* Blocking rpc channel that goes via hbase rpc.
*/
static class RandomTimeoutBlockingRpcChannel
extends RpcClientImpl.BlockingRpcChannelImplementation {
extends AbstractRpcClient.BlockingRpcChannelImplementation {
private static final Random RANDOM = new Random(System.currentTimeMillis());
public static final double CHANCE_OF_TIMEOUT = 0.3;
private static AtomicInteger invokations = new AtomicInteger();