From 3275b964c1c1fbb20ffe646b37317496b265660b Mon Sep 17 00:00:00 2001 From: Jurriaan Mous Date: Wed, 10 Dec 2014 15:08:42 +0100 Subject: [PATCH] HBASE-12668 Adapt PayloadCarryingRpcController so it can also be used in an async way Signed-off-by: stack --- .../hadoop/hbase/ipc/AbstractRpcClient.java | 126 ++++++++++++++++++ .../ipc/PayloadCarryingRpcController.java | 6 + .../hadoop/hbase/ipc/RpcClientImpl.java | 102 +------------- .../hbase/ipc/TimeLimitedRpcController.java | 62 ++++++++- .../hbase/client/TestClientTimeouts.java | 3 +- 5 files changed, 191 insertions(+), 108 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 766ad8fa7e8..df43f6f76d5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -19,17 +19,29 @@ package org.apache.hadoop.hbase.ipc; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.io.compress.CompressionCodec; +import java.io.IOException; +import java.net.InetSocketAddress; import java.net.SocketAddress; /** @@ -174,4 +186,118 @@ public abstract class AbstractRpcClient implements RpcClient { protected static int getPoolSize(Configuration config) { return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1); } + + + /** + * Make a blocking call. Throws exceptions if there are network problems or if the remote code + * threw an exception. + * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. + * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a + * new Connection each time. + * @return A pair with the Message response and the Cell data (if any). + */ + Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc, + Message param, Message returnType, final User ticket, final InetSocketAddress isa) + throws ServiceException { + long startTime = 0; + if (LOG.isTraceEnabled()) { + startTime = EnvironmentEdgeManager.currentTime(); + } + int callTimeout = 0; + CellScanner cells = null; + if (pcrc != null) { + callTimeout = pcrc.getCallTimeout(); + cells = pcrc.cellScanner(); + // Clear it here so we don't by mistake try and these cells processing results. + pcrc.setCellScanner(null); + } + Pair val; + try { + val = call(pcrc, md, param, cells, returnType, ticket, isa, callTimeout, + pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS); + if (pcrc != null) { + // Shove the results into controller so can be carried across the proxy/pb service void. + if (val.getSecond() != null) pcrc.setCellScanner(val.getSecond()); + } else if (val.getSecond() != null) { + throw new ServiceException("Client dropping data on the floor!"); + } + + if (LOG.isTraceEnabled()) { + long callTime = EnvironmentEdgeManager.currentTime() - startTime; + LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms"); + } + return val.getFirst(); + } catch (Throwable e) { + throw new ServiceException(e); + } + } + + /** + * Make a call, passing param, to the IPC server running at + * address which is servicing the protocol protocol, + * with the ticket credentials, returning the value. + * Throws exceptions if there are network problems or if the remote code + * threw an exception. + * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. + * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a + * new Connection each time. + * @return A pair with the Message response and the Cell data (if any). + * @throws InterruptedException + * @throws java.io.IOException + */ + protected abstract Pair call(PayloadCarryingRpcController pcrc, + Descriptors.MethodDescriptor md, Message param, CellScanner cells, + Message returnType, User ticket, InetSocketAddress addr, int callTimeout, int priority) throws + IOException, InterruptedException; + + /** + * Creates a "channel" that can be used by a blocking protobuf service. Useful setting up + * protobuf blocking stubs. + * @return A blocking rpc channel that goes via this rpc client instance. + */ + @Override + public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, + final User ticket, int defaultOperationTimeout) { + return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout); + } + + /** + * Blocking rpc channel that goes via hbase rpc. + */ + @VisibleForTesting + public static class BlockingRpcChannelImplementation implements BlockingRpcChannel { + private final InetSocketAddress isa; + private final AbstractRpcClient rpcClient; + private final User ticket; + private final int defaultOperationTimeout; + + /** + * @param defaultOperationTimeout - the default timeout when no timeout is given + * by the caller. + */ + protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient, + final ServerName sn, final User ticket, int defaultOperationTimeout) { + this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); + this.rpcClient = rpcClient; + this.ticket = ticket; + this.defaultOperationTimeout = defaultOperationTimeout; + } + + @Override + public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, + Message param, Message returnType) throws ServiceException { + PayloadCarryingRpcController pcrc; + if (controller != null) { + pcrc = (PayloadCarryingRpcController) controller; + if (!pcrc.hasCallTimeout()){ + pcrc.setCallTimeout(defaultOperationTimeout); + } + } else { + pcrc = new PayloadCarryingRpcController(); + pcrc.setCallTimeout(defaultOperationTimeout); + } + + return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa); + } + } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java index 141f2344eb4..80d6fa00560 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java @@ -96,4 +96,10 @@ public class PayloadCarryingRpcController public int getPriority() { return priority; } + + @Override public void reset() { + super.reset(); + priority = 0; + cellScanner = null; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index c5578a1af8f..97fa475a069 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -19,14 +19,10 @@ package org.apache.hadoop.hbase.ipc; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.Message.Builder; import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -1144,7 +1140,8 @@ public class RpcClientImpl extends AbstractRpcClient { * @throws InterruptedException * @throws IOException */ - Pair call(PayloadCarryingRpcController pcrc, MethodDescriptor md, + @Override + protected Pair call(PayloadCarryingRpcController pcrc, MethodDescriptor md, Message param, CellScanner cells, Message returnType, User ticket, InetSocketAddress addr, int callTimeout, int priority) throws IOException, InterruptedException { @@ -1285,99 +1282,4 @@ public class RpcClientImpl extends AbstractRpcClient { return connection; } - - /** - * Make a blocking call. Throws exceptions if there are network problems or if the remote code - * threw an exception. - * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. - * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a - * new Connection each time. - * @return A pair with the Message response and the Cell data (if any). - */ - Message callBlockingMethod(MethodDescriptor md, PayloadCarryingRpcController pcrc, - Message param, Message returnType, final User ticket, final InetSocketAddress isa) - throws ServiceException { - long startTime = 0; - if (LOG.isTraceEnabled()) { - startTime = EnvironmentEdgeManager.currentTime(); - } - int callTimeout = 0; - CellScanner cells = null; - if (pcrc != null) { - callTimeout = pcrc.getCallTimeout(); - cells = pcrc.cellScanner(); - // Clear it here so we don't by mistake try and these cells processing results. - pcrc.setCellScanner(null); - } - Pair val; - try { - val = call(pcrc, md, param, cells, returnType, ticket, isa, callTimeout, - pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS); - if (pcrc != null) { - // Shove the results into controller so can be carried across the proxy/pb service void. - if (val.getSecond() != null) pcrc.setCellScanner(val.getSecond()); - } else if (val.getSecond() != null) { - throw new ServiceException("Client dropping data on the floor!"); - } - - if (LOG.isTraceEnabled()) { - long callTime = EnvironmentEdgeManager.currentTime() - startTime; - LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms"); - } - return val.getFirst(); - } catch (Throwable e) { - throw new ServiceException(e); - } - } - - /** - * Creates a "channel" that can be used by a blocking protobuf service. Useful setting up - * protobuf blocking stubs. - * @return A blocking rpc channel that goes via this rpc client instance. - */ - @Override - public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, - final User ticket, int defaultOperationTimeout) { - return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout); - } - - /** - * Blocking rpc channel that goes via hbase rpc. - */ - @VisibleForTesting - public static class BlockingRpcChannelImplementation implements BlockingRpcChannel { - private final InetSocketAddress isa; - private final RpcClientImpl rpcClient; - private final User ticket; - private final int defaultOperationTimeout; - - /** - * @param defaultOperationTimeout - the default timeout when no timeout is given - * by the caller. - */ - protected BlockingRpcChannelImplementation(final RpcClientImpl rpcClient, final ServerName sn, - final User ticket, int defaultOperationTimeout) { - this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); - this.rpcClient = rpcClient; - this.ticket = ticket; - this.defaultOperationTimeout = defaultOperationTimeout; - } - - @Override - public Message callBlockingMethod(MethodDescriptor md, RpcController controller, - Message param, Message returnType) throws ServiceException { - PayloadCarryingRpcController pcrc; - if (controller != null) { - pcrc = (PayloadCarryingRpcController) controller; - if (!pcrc.hasCallTimeout()){ - pcrc.setCallTimeout(defaultOperationTimeout); - } - } else { - pcrc = new PayloadCarryingRpcController(); - pcrc.setCallTimeout(defaultOperationTimeout); - } - - return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa); - } - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java index 2ab2a5b924b..94b743f307b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java @@ -18,8 +18,7 @@ package org.apache.hadoop.hbase.ipc; - - +import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -38,6 +37,11 @@ public class TimeLimitedRpcController implements RpcController { protected final AtomicReference> cancellationCb = new AtomicReference>(null); + protected final AtomicReference> failureCb = + new AtomicReference>(null); + + private IOException exception; + public Integer getCallTimeout() { return callTimeout; } @@ -52,12 +56,20 @@ public class TimeLimitedRpcController implements RpcController { @Override public String errorText() { - throw new UnsupportedOperationException(); + if (exception != null) { + return exception.getMessage(); + } else { + return null; + } } + /** + * For use in async rpc clients + * @return true if failed + */ @Override public boolean failed() { - throw new UnsupportedOperationException(); + return this.exception != null; } @Override @@ -68,16 +80,52 @@ public class TimeLimitedRpcController implements RpcController { @Override public void notifyOnCancel(RpcCallback cancellationCb) { this.cancellationCb.set(cancellationCb); + if (this.cancelled) { + cancellationCb.run(null); + } + } + + /** + * Notify a callback on error. + * For use in async rpc clients + * + * @param failureCb the callback to call on error + */ + public void notifyOnFail(RpcCallback failureCb) { + this.failureCb.set(failureCb); + if (this.exception != null) { + failureCb.run(this.exception); + } } @Override public void reset() { - throw new UnsupportedOperationException(); + exception = null; + cancelled = false; + failureCb.set(null); + cancellationCb.set(null); + callTimeout = null; } @Override - public void setFailed(String arg0) { - throw new UnsupportedOperationException(); + public void setFailed(String reason) { + this.exception = new IOException(reason); + if (this.failureCb.get() != null) { + this.failureCb.get().run(this.exception); + } + } + + /** + * Set failed with an exception to pass on. + * For use in async rpc clients + * + * @param e exception to set with + */ + public void setFailed(IOException e) { + this.exception = e; + if (this.failureCb.get() != null) { + this.failureCb.get().run(this.exception); + } } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index 0fbf21e18a2..c04c4f284df 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ipc.AbstractRpcClient; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcClientImpl; @@ -146,7 +147,7 @@ public class TestClientTimeouts { * Blocking rpc channel that goes via hbase rpc. */ static class RandomTimeoutBlockingRpcChannel - extends RpcClientImpl.BlockingRpcChannelImplementation { + extends AbstractRpcClient.BlockingRpcChannelImplementation { private static final Random RANDOM = new Random(System.currentTimeMillis()); public static final double CHANCE_OF_TIMEOUT = 0.3; private static AtomicInteger invokations = new AtomicInteger();