HBASE-15745 Refactor RPC classes to better accept async changes
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
86ca09e0e5
commit
56358a0fd3
|
@ -0,0 +1,158 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.SocketTimeoutException;
|
||||
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* Implementations call a RegionServer.
|
||||
* Passed to a {@link RpcRetryingCaller} so we retry on fail.
|
||||
* TODO: this class is actually tied to one region, because most of the paths make use of
|
||||
* the regioninfo part of location when building requests. The only reason it works for
|
||||
* multi-region requests (e.g. batch) is that they happen to not use the region parts.
|
||||
* This could be done cleaner (e.g. having a generic parameter and 2 derived classes,
|
||||
* RegionCallable and actual RegionServerCallable with ServerName.
|
||||
* @param <T> the class that the ServerCallable handles
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
abstract class AbstractRegionServerCallable<T> implements RetryingCallableBase {
|
||||
protected final Connection connection;
|
||||
protected final TableName tableName;
|
||||
protected final byte[] row;
|
||||
protected HRegionLocation location;
|
||||
|
||||
protected final static int MIN_WAIT_DEAD_SERVER = 10000;
|
||||
|
||||
/**
|
||||
* @param connection Connection to use.
|
||||
* @param tableName Table name to which <code>row</code> belongs.
|
||||
* @param row The row we want in <code>tableName</code>.
|
||||
*/
|
||||
public AbstractRegionServerCallable(Connection connection, TableName tableName, byte[] row) {
|
||||
this.connection = connection;
|
||||
this.tableName = tableName;
|
||||
this.row = row;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@link ClusterConnection} instance used by this Callable.
|
||||
*/
|
||||
ClusterConnection getConnection() {
|
||||
return (ClusterConnection) this.connection;
|
||||
}
|
||||
|
||||
protected HRegionLocation getLocation() {
|
||||
return this.location;
|
||||
}
|
||||
|
||||
protected void setLocation(final HRegionLocation location) {
|
||||
this.location = location;
|
||||
}
|
||||
|
||||
public TableName getTableName() {
|
||||
return this.tableName;
|
||||
}
|
||||
|
||||
public byte [] getRow() {
|
||||
return this.row;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void throwable(Throwable t, boolean retrying) {
|
||||
if (t instanceof SocketTimeoutException ||
|
||||
t instanceof ConnectException ||
|
||||
t instanceof RetriesExhaustedException ||
|
||||
(location != null && getConnection().isDeadServer(location.getServerName()))) {
|
||||
// if thrown these exceptions, we clear all the cache entries that
|
||||
// map to that slow/dead server; otherwise, let cache miss and ask
|
||||
// hbase:meta again to find the new location
|
||||
if (this.location != null) {
|
||||
getConnection().clearCaches(location.getServerName());
|
||||
}
|
||||
} else if (t instanceof RegionMovedException) {
|
||||
getConnection().updateCachedLocations(tableName, row, t, location);
|
||||
} else if (t instanceof NotServingRegionException && !retrying) {
|
||||
// Purge cache entries for this specific region from hbase:meta cache
|
||||
// since we don't call connect(true) when number of retries is 1.
|
||||
getConnection().deleteCachedRegionLocation(location);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getExceptionMessageAdditionalDetail() {
|
||||
return "row '" + Bytes.toString(row) + "' on table '" + tableName + "' at " + location;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long sleep(long pause, int tries) {
|
||||
// Tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
|
||||
long sleep = ConnectionUtils.getPauseTime(pause, tries + 1);
|
||||
if (sleep < MIN_WAIT_DEAD_SERVER
|
||||
&& (location == null || getConnection().isDeadServer(location.getServerName()))) {
|
||||
sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
|
||||
}
|
||||
return sleep;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the HRegionInfo for the current region
|
||||
*/
|
||||
public HRegionInfo getHRegionInfo() {
|
||||
if (this.location == null) {
|
||||
return null;
|
||||
}
|
||||
return this.location.getRegionInfo();
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare for connection to the server hosting region with row from tablename. Does lookup
|
||||
* to find region location and hosting server.
|
||||
* @param reload Set this to true if connection should re-find the region
|
||||
* @throws IOException e
|
||||
*/
|
||||
@Override
|
||||
public void prepare(final boolean reload) throws IOException {
|
||||
try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
|
||||
this.location = regionLocator.getRegionLocation(row, reload);
|
||||
}
|
||||
if (this.location == null) {
|
||||
throw new IOException("Failed to find location, tableName=" + tableName +
|
||||
", row=" + Bytes.toString(row) + ", reload=" + reload);
|
||||
}
|
||||
setClientByServiceName(this.location.getServerName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the Rpc client for Client services
|
||||
* @param serviceName to get client for
|
||||
* @throws IOException When client could not be created
|
||||
*/
|
||||
abstract void setClientByServiceName(ServerName serviceName) throws IOException;
|
||||
}
|
|
@ -118,12 +118,11 @@ class FastFailInterceptorContext extends
|
|||
tries = 0;
|
||||
}
|
||||
|
||||
public FastFailInterceptorContext prepare(RetryingCallable<?> callable) {
|
||||
public FastFailInterceptorContext prepare(RetryingCallableBase callable) {
|
||||
return prepare(callable, 0);
|
||||
}
|
||||
|
||||
public FastFailInterceptorContext prepare(RetryingCallable<?> callable,
|
||||
int tries) {
|
||||
public FastFailInterceptorContext prepare(RetryingCallableBase callable, int tries) {
|
||||
if (callable instanceof RegionServerCallable) {
|
||||
RegionServerCallable<?> retryingCallable = (RegionServerCallable<?>) callable;
|
||||
server = retryingCallable.getLocation().getServerName();
|
||||
|
|
|
@ -29,14 +29,14 @@ class NoOpRetryingInterceptorContext extends RetryingCallerInterceptorContext {
|
|||
|
||||
@Override
|
||||
public RetryingCallerInterceptorContext prepare(
|
||||
RetryingCallable<?> callable) {
|
||||
RetryingCallableBase callable) {
|
||||
// Do Nothing
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RetryingCallerInterceptorContext prepare(
|
||||
RetryingCallable<?> callable, int tries) {
|
||||
RetryingCallableBase callable, int tries) {
|
||||
// Do Nothing
|
||||
return this;
|
||||
}
|
||||
|
|
|
@ -21,15 +21,10 @@ package org.apache.hadoop.hbase.client;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotEnabledException;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* Implementations call a RegionServer and implement {@link #call(int)}.
|
||||
|
@ -42,16 +37,10 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
* @param <T> the class that the ServerCallable handles
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
|
||||
// Public because used outside of this package over in ipc.
|
||||
private static final Log LOG = LogFactory.getLog(RegionServerCallable.class);
|
||||
protected final Connection connection;
|
||||
protected final TableName tableName;
|
||||
protected final byte[] row;
|
||||
protected HRegionLocation location;
|
||||
private ClientService.BlockingInterface stub;
|
||||
public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> implements
|
||||
RetryingCallable<T> {
|
||||
|
||||
protected final static int MIN_WAIT_DEAD_SERVER = 10000;
|
||||
private ClientService.BlockingInterface stub;
|
||||
|
||||
/**
|
||||
* @param connection Connection to use.
|
||||
|
@ -59,97 +48,25 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
|
|||
* @param row The row we want in <code>tableName</code>.
|
||||
*/
|
||||
public RegionServerCallable(Connection connection, TableName tableName, byte [] row) {
|
||||
this.connection = connection;
|
||||
this.tableName = tableName;
|
||||
this.row = row;
|
||||
super(connection, tableName, row);
|
||||
}
|
||||
|
||||
void setClientByServiceName(ServerName service) throws IOException {
|
||||
this.setStub(getConnection().getClient(service));
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare for connection to the server hosting region with row from tablename. Does lookup
|
||||
* to find region location and hosting server.
|
||||
* @param reload Set to true to re-check the table state
|
||||
* @throws IOException e
|
||||
* @return Client Rpc protobuf communication stub
|
||||
*/
|
||||
@Override
|
||||
public void prepare(final boolean reload) throws IOException {
|
||||
// check table state if this is a retry
|
||||
if (reload &&
|
||||
!tableName.equals(TableName.META_TABLE_NAME) &&
|
||||
getConnection().isTableDisabled(tableName)) {
|
||||
throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
|
||||
}
|
||||
try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
|
||||
this.location = regionLocator.getRegionLocation(row);
|
||||
}
|
||||
if (this.location == null) {
|
||||
throw new IOException("Failed to find location, tableName=" + tableName +
|
||||
", row=" + Bytes.toString(row) + ", reload=" + reload);
|
||||
}
|
||||
setStub(getConnection().getClient(this.location.getServerName()));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@link HConnection} instance used by this Callable.
|
||||
*/
|
||||
HConnection getConnection() {
|
||||
return (HConnection) this.connection;
|
||||
}
|
||||
|
||||
protected ClientService.BlockingInterface getStub() {
|
||||
return this.stub;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the client protobuf communication stub
|
||||
* @param stub to set
|
||||
*/
|
||||
void setStub(final ClientService.BlockingInterface stub) {
|
||||
this.stub = stub;
|
||||
}
|
||||
|
||||
protected HRegionLocation getLocation() {
|
||||
return this.location;
|
||||
}
|
||||
|
||||
protected void setLocation(final HRegionLocation location) {
|
||||
this.location = location;
|
||||
}
|
||||
|
||||
public TableName getTableName() {
|
||||
return this.tableName;
|
||||
}
|
||||
|
||||
public byte [] getRow() {
|
||||
return this.row;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void throwable(Throwable t, boolean retrying) {
|
||||
if (location != null) {
|
||||
getConnection().updateCachedLocations(tableName, location.getRegionInfo().getRegionName(),
|
||||
row, t, location.getServerName());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getExceptionMessageAdditionalDetail() {
|
||||
return "row '" + Bytes.toString(row) + "' on table '" + tableName + "' at " + location;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long sleep(long pause, int tries) {
|
||||
// Tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
|
||||
long sleep = ConnectionUtils.getPauseTime(pause, tries + 1);
|
||||
if (sleep < MIN_WAIT_DEAD_SERVER
|
||||
&& (location == null || getConnection().isDeadServer(location.getServerName()))) {
|
||||
sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
|
||||
}
|
||||
return sleep;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the HRegionInfo for the current region
|
||||
*/
|
||||
public HRegionInfo getHRegionInfo() {
|
||||
if (this.location == null) {
|
||||
return null;
|
||||
}
|
||||
return this.location.getRegionInfo();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,8 +19,6 @@
|
|||
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
|
@ -29,23 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
* @param <T> result class from executing <tt>this</tt>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface RetryingCallable<T> {
|
||||
/**
|
||||
* Prepare by setting up any connections to servers, etc., ahead of {@link #call(int)} invocation.
|
||||
* @param reload Set this to true if need to requery locations
|
||||
* @throws IOException e
|
||||
*/
|
||||
void prepare(final boolean reload) throws IOException;
|
||||
|
||||
/**
|
||||
* Called when {@link #call(int)} throws an exception and we are going to retry; take action to
|
||||
* make it so we succeed on next call (clear caches, do relookup of locations, etc.).
|
||||
* @param t
|
||||
* @param retrying True if we are in retrying mode (we are not in retrying mode when max
|
||||
* retries == 1; we ARE in retrying mode if retries > 1 even when we are the last attempt)
|
||||
*/
|
||||
void throwable(final Throwable t, boolean retrying);
|
||||
|
||||
public interface RetryingCallable<T> extends RetryingCallableBase {
|
||||
/**
|
||||
* Computes a result, or throws an exception if unable to do so.
|
||||
*
|
||||
|
@ -54,18 +36,4 @@ public interface RetryingCallable<T> {
|
|||
* @throws Exception if unable to compute a result
|
||||
*/
|
||||
T call(int callTimeout) throws Exception;
|
||||
|
||||
/**
|
||||
* @return Some details from the implementation that we would like to add to a terminating
|
||||
* exception; i.e. a fatal exception is being thrown ending retries and we might like to add
|
||||
* more implementation-specific detail on to the exception being thrown.
|
||||
*/
|
||||
String getExceptionMessageAdditionalDetail();
|
||||
|
||||
/**
|
||||
* @param pause
|
||||
* @param tries
|
||||
* @return Suggestion on how much to sleep between retries
|
||||
*/
|
||||
long sleep(final long pause, final int tries);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* All generic methods for a Callable that can be retried. It is extended with Sync and
|
||||
* Async versions.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface RetryingCallableBase {
|
||||
/**
|
||||
* Prepare by setting up any connections to servers, etc., ahead of call invocation.
|
||||
* @param reload Set this to true if need to requery locations
|
||||
* @throws IOException e
|
||||
*/
|
||||
void prepare(final boolean reload) throws IOException;
|
||||
|
||||
/**
|
||||
* Called when call throws an exception and we are going to retry; take action to
|
||||
* make it so we succeed on next call (clear caches, do relookup of locations, etc.).
|
||||
* @param t throwable which was thrown
|
||||
* @param retrying True if we are in retrying mode (we are not in retrying mode when max
|
||||
* retries == 1; we ARE in retrying mode if retries > 1 even when we are the
|
||||
* last attempt)
|
||||
*/
|
||||
void throwable(final Throwable t, boolean retrying);
|
||||
|
||||
/**
|
||||
* @return Some details from the implementation that we would like to add to a terminating
|
||||
* exception; i.e. a fatal exception is being thrown ending retries and we might like to
|
||||
* add more implementation-specific detail on to the exception being thrown.
|
||||
*/
|
||||
String getExceptionMessageAdditionalDetail();
|
||||
|
||||
/**
|
||||
* @param pause time to pause
|
||||
* @param tries amount of tries until till sleep
|
||||
* @return Suggestion on how much to sleep between retries
|
||||
*/
|
||||
long sleep(final long pause, final int tries);
|
||||
}
|
|
@ -50,7 +50,7 @@ abstract class RetryingCallerInterceptorContext {
|
|||
* used for use in the current retrying call
|
||||
*/
|
||||
public abstract RetryingCallerInterceptorContext prepare(
|
||||
RetryingCallable<?> callable);
|
||||
RetryingCallableBase callable);
|
||||
|
||||
/**
|
||||
* Telescopic extension that takes which of the many retries we are currently
|
||||
|
@ -65,5 +65,5 @@ abstract class RetryingCallerInterceptorContext {
|
|||
* retrying call
|
||||
*/
|
||||
public abstract RetryingCallerInterceptorContext prepare(
|
||||
RetryingCallable<?> callable, int tries);
|
||||
RetryingCallableBase callable, int tries);
|
||||
}
|
||||
|
|
|
@ -17,275 +17,22 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.security.sasl.SaslException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.MetricsConnection;
|
||||
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
|
||||
import org.apache.hadoop.hbase.security.AuthMethod;
|
||||
import org.apache.hadoop.hbase.security.SaslClientHandler;
|
||||
import org.apache.hadoop.hbase.security.SaslUtil;
|
||||
import org.apache.hadoop.hbase.security.SecurityInfo;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.TokenSelector;
|
||||
import org.apache.htrace.Span;
|
||||
import org.apache.htrace.Trace;
|
||||
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcCallback;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufOutputStream;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
|
||||
import io.netty.util.Timeout;
|
||||
import io.netty.util.TimerTask;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.MetricsConnection;
|
||||
|
||||
/**
|
||||
* Netty RPC channel
|
||||
* Interface for Async Rpc Channels
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class AsyncRpcChannel {
|
||||
private static final Log LOG = LogFactory.getLog(AsyncRpcChannel.class.getName());
|
||||
|
||||
private static final int MAX_SASL_RETRIES = 5;
|
||||
|
||||
protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDDLERS
|
||||
= new HashMap<>();
|
||||
|
||||
static {
|
||||
TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
|
||||
new AuthenticationTokenSelector());
|
||||
}
|
||||
|
||||
final AsyncRpcClient client;
|
||||
|
||||
// Contains the channel to work with.
|
||||
// Only exists when connected
|
||||
private Channel channel;
|
||||
|
||||
String name;
|
||||
final User ticket;
|
||||
final String serviceName;
|
||||
final InetSocketAddress address;
|
||||
|
||||
private int failureCounter = 0;
|
||||
|
||||
boolean useSasl;
|
||||
AuthMethod authMethod;
|
||||
private int reloginMaxBackoff;
|
||||
private Token<? extends TokenIdentifier> token;
|
||||
private String serverPrincipal;
|
||||
|
||||
// NOTE: closed and connected flags below are only changed when a lock on pendingCalls
|
||||
private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
|
||||
private boolean connected = false;
|
||||
private boolean closed = false;
|
||||
|
||||
private Timeout cleanupTimer;
|
||||
|
||||
private final TimerTask timeoutTask = new TimerTask() {
|
||||
@Override
|
||||
public void run(Timeout timeout) throws Exception {
|
||||
cleanupCalls();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Constructor for netty RPC channel
|
||||
* @param bootstrap to construct channel on
|
||||
* @param client to connect with
|
||||
* @param ticket of user which uses connection
|
||||
* @param serviceName name of service to connect to
|
||||
* @param address to connect to
|
||||
*/
|
||||
public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket,
|
||||
String serviceName, InetSocketAddress address) {
|
||||
this.client = client;
|
||||
|
||||
this.ticket = ticket;
|
||||
this.serviceName = serviceName;
|
||||
this.address = address;
|
||||
|
||||
this.channel = connect(bootstrap).channel();
|
||||
|
||||
name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString()
|
||||
+ ((ticket == null) ? " from unknown user" : (" from " + ticket.getName())));
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to channel
|
||||
* @param bootstrap to connect to
|
||||
* @return future of connection
|
||||
*/
|
||||
private ChannelFuture connect(final Bootstrap bootstrap) {
|
||||
return bootstrap.remoteAddress(address).connect()
|
||||
.addListener(new GenericFutureListener<ChannelFuture>() {
|
||||
@Override
|
||||
public void operationComplete(final ChannelFuture f) throws Exception {
|
||||
if (!f.isSuccess()) {
|
||||
retryOrClose(bootstrap, failureCounter++, client.failureSleep, f.cause());
|
||||
return;
|
||||
}
|
||||
channel = f.channel();
|
||||
|
||||
setupAuthorization();
|
||||
|
||||
ByteBuf b = channel.alloc().directBuffer(6);
|
||||
createPreamble(b, authMethod);
|
||||
channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
|
||||
if (useSasl) {
|
||||
UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI();
|
||||
if (authMethod == AuthMethod.KERBEROS) {
|
||||
if (ticket != null && ticket.getRealUser() != null) {
|
||||
ticket = ticket.getRealUser();
|
||||
}
|
||||
}
|
||||
SaslClientHandler saslHandler;
|
||||
if (ticket == null) {
|
||||
throw new FatalConnectionException("ticket/user is null");
|
||||
}
|
||||
final UserGroupInformation realTicket = ticket;
|
||||
saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() {
|
||||
@Override
|
||||
public SaslClientHandler run() throws IOException {
|
||||
return getSaslHandler(realTicket, bootstrap);
|
||||
}
|
||||
});
|
||||
if (saslHandler != null) {
|
||||
// Sasl connect is successful. Let's set up Sasl channel handler
|
||||
channel.pipeline().addFirst(saslHandler);
|
||||
} else {
|
||||
// fall back to simple auth because server told us so.
|
||||
authMethod = AuthMethod.SIMPLE;
|
||||
useSasl = false;
|
||||
}
|
||||
} else {
|
||||
startHBaseConnection(f.channel());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Start HBase connection
|
||||
* @param ch channel to start connection on
|
||||
*/
|
||||
private void startHBaseConnection(Channel ch) {
|
||||
ch.pipeline().addLast("frameDecoder",
|
||||
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
|
||||
ch.pipeline().addLast(new AsyncServerResponseHandler(this));
|
||||
try {
|
||||
writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
close(future.cause());
|
||||
return;
|
||||
}
|
||||
List<AsyncCall> callsToWrite;
|
||||
synchronized (pendingCalls) {
|
||||
connected = true;
|
||||
callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
|
||||
}
|
||||
for (AsyncCall call : callsToWrite) {
|
||||
writeRequest(call);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
close(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get SASL handler
|
||||
* @param bootstrap to reconnect to
|
||||
* @return new SASL handler
|
||||
* @throws java.io.IOException if handler failed to create
|
||||
*/
|
||||
private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
|
||||
final Bootstrap bootstrap) throws IOException {
|
||||
return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
|
||||
client.fallbackAllowed,
|
||||
client.conf.get("hbase.rpc.protection",
|
||||
SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()),
|
||||
new SaslClientHandler.SaslExceptionHandler() {
|
||||
@Override
|
||||
public void handle(int retryCount, Random random, Throwable cause) {
|
||||
try {
|
||||
// Handle Sasl failure. Try to potentially get new credentials
|
||||
handleSaslConnectionFailure(retryCount, cause, realTicket);
|
||||
|
||||
retryOrClose(bootstrap, failureCounter++, random.nextInt(reloginMaxBackoff) + 1,
|
||||
cause);
|
||||
} catch (IOException | InterruptedException e) {
|
||||
close(e);
|
||||
}
|
||||
}
|
||||
}, new SaslClientHandler.SaslSuccessfulConnectHandler() {
|
||||
@Override
|
||||
public void onSuccess(Channel channel) {
|
||||
startHBaseConnection(channel);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Retry to connect or close
|
||||
* @param bootstrap to connect with
|
||||
* @param failureCount failure count
|
||||
* @param e exception of fail
|
||||
*/
|
||||
private void retryOrClose(final Bootstrap bootstrap, int failureCount, long timeout,
|
||||
Throwable e) {
|
||||
if (failureCount < client.maxRetries) {
|
||||
client.newTimeout(new TimerTask() {
|
||||
@Override
|
||||
public void run(Timeout timeout) throws Exception {
|
||||
connect(bootstrap);
|
||||
}
|
||||
}, timeout, TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
client.failedServers.addToFailedServers(address);
|
||||
close(e);
|
||||
}
|
||||
}
|
||||
public interface AsyncRpcChannel {
|
||||
|
||||
/**
|
||||
* Calls method on channel
|
||||
|
@ -294,439 +41,32 @@ public class AsyncRpcChannel {
|
|||
* @param request to send
|
||||
* @param responsePrototype to construct response with
|
||||
*/
|
||||
public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
|
||||
Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
|
||||
final PayloadCarryingRpcController controller, final Message request,
|
||||
final Message responsePrototype, MetricsConnection.CallStats callStats) {
|
||||
final AsyncCall call = new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(),
|
||||
method, request, controller, responsePrototype, callStats);
|
||||
controller.notifyOnCancel(new RpcCallback<Object>() {
|
||||
@Override
|
||||
public void run(Object parameter) {
|
||||
// TODO: do not need to call AsyncCall.setFailed?
|
||||
synchronized (pendingCalls) {
|
||||
pendingCalls.remove(call.id);
|
||||
}
|
||||
}
|
||||
});
|
||||
// TODO: this should be handled by PayloadCarryingRpcController.
|
||||
if (controller.isCanceled()) {
|
||||
// To finish if the call was cancelled before we set the notification (race condition)
|
||||
call.cancel(true);
|
||||
return call;
|
||||
}
|
||||
|
||||
synchronized (pendingCalls) {
|
||||
if (closed) {
|
||||
Promise<Message> promise = channel.eventLoop().newPromise();
|
||||
promise.setFailure(new ConnectException());
|
||||
return promise;
|
||||
}
|
||||
pendingCalls.put(call.id, call);
|
||||
// Add timeout for cleanup if none is present
|
||||
if (cleanupTimer == null && call.getRpcTimeout() > 0) {
|
||||
cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
if (!connected) {
|
||||
return call;
|
||||
}
|
||||
}
|
||||
writeRequest(call);
|
||||
return call;
|
||||
}
|
||||
|
||||
AsyncCall removePendingCall(int id) {
|
||||
synchronized (pendingCalls) {
|
||||
return pendingCalls.remove(id);
|
||||
}
|
||||
}
|
||||
final Message responsePrototype, MetricsConnection.CallStats callStats);
|
||||
|
||||
/**
|
||||
* Write the channel header
|
||||
* @param channel to write to
|
||||
* @return future of write
|
||||
* @throws java.io.IOException on failure to write
|
||||
* Get the EventLoop on which this channel operated
|
||||
* @return EventLoop
|
||||
*/
|
||||
private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
|
||||
RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder()
|
||||
.setServiceName(serviceName);
|
||||
|
||||
RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
|
||||
if (userInfoPB != null) {
|
||||
headerBuilder.setUserInfo(userInfoPB);
|
||||
}
|
||||
|
||||
if (client.codec != null) {
|
||||
headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
|
||||
}
|
||||
if (client.compressor != null) {
|
||||
headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
|
||||
}
|
||||
|
||||
headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
|
||||
RPCProtos.ConnectionHeader header = headerBuilder.build();
|
||||
|
||||
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
|
||||
|
||||
ByteBuf b = channel.alloc().directBuffer(totalSize);
|
||||
|
||||
b.writeInt(header.getSerializedSize());
|
||||
b.writeBytes(header.toByteArray());
|
||||
|
||||
return channel.writeAndFlush(b);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write request to channel
|
||||
* @param call to write
|
||||
*/
|
||||
private void writeRequest(final AsyncCall call) {
|
||||
try {
|
||||
final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
|
||||
.newBuilder();
|
||||
requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName())
|
||||
.setRequestParam(call.param != null);
|
||||
|
||||
if (Trace.isTracing()) {
|
||||
Span s = Trace.currentSpan();
|
||||
requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder()
|
||||
.setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
|
||||
}
|
||||
|
||||
ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner());
|
||||
if (cellBlock != null) {
|
||||
final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
|
||||
.newBuilder();
|
||||
cellBlockBuilder.setLength(cellBlock.limit());
|
||||
requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
|
||||
}
|
||||
// Only pass priority if there one. Let zero be same as no priority.
|
||||
if (call.controller.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) {
|
||||
requestHeaderBuilder.setPriority(call.controller.getPriority());
|
||||
}
|
||||
|
||||
RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
|
||||
|
||||
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
|
||||
if (cellBlock != null) {
|
||||
totalSize += cellBlock.remaining();
|
||||
}
|
||||
|
||||
ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
|
||||
try (ByteBufOutputStream out = new ByteBufOutputStream(b)) {
|
||||
call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
|
||||
}
|
||||
|
||||
channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
|
||||
} catch (IOException e) {
|
||||
close(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up server authorization
|
||||
* @throws java.io.IOException if auth setup failed
|
||||
*/
|
||||
private void setupAuthorization() throws IOException {
|
||||
SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
|
||||
this.useSasl = client.userProvider.isHBaseSecurityEnabled();
|
||||
|
||||
this.token = null;
|
||||
if (useSasl && securityInfo != null) {
|
||||
AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
|
||||
if (tokenKind != null) {
|
||||
TokenSelector<? extends TokenIdentifier> tokenSelector = TOKEN_HANDDLERS.get(tokenKind);
|
||||
if (tokenSelector != null) {
|
||||
token = tokenSelector.selectToken(new Text(client.clusterId),
|
||||
ticket.getUGI().getTokens());
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("No token selector found for type " + tokenKind);
|
||||
}
|
||||
}
|
||||
String serverKey = securityInfo.getServerPrincipal();
|
||||
if (serverKey == null) {
|
||||
throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
|
||||
}
|
||||
this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
|
||||
address.getAddress().getCanonicalHostName().toLowerCase());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is "
|
||||
+ serverPrincipal);
|
||||
}
|
||||
}
|
||||
|
||||
if (!useSasl) {
|
||||
authMethod = AuthMethod.SIMPLE;
|
||||
} else if (token != null) {
|
||||
authMethod = AuthMethod.DIGEST;
|
||||
} else {
|
||||
authMethod = AuthMethod.KERBEROS;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(
|
||||
"Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl);
|
||||
}
|
||||
reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the user information
|
||||
* @param ugi User Group Information
|
||||
* @param authMethod Authorization method
|
||||
* @return UserInformation protobuf
|
||||
*/
|
||||
private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
|
||||
if (ugi == null || authMethod == AuthMethod.DIGEST) {
|
||||
// Don't send user for token auth
|
||||
return null;
|
||||
}
|
||||
RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
|
||||
if (authMethod == AuthMethod.KERBEROS) {
|
||||
// Send effective user for Kerberos auth
|
||||
userInfoPB.setEffectiveUser(ugi.getUserName());
|
||||
} else if (authMethod == AuthMethod.SIMPLE) {
|
||||
// Send both effective user and real user for simple auth
|
||||
userInfoPB.setEffectiveUser(ugi.getUserName());
|
||||
if (ugi.getRealUser() != null) {
|
||||
userInfoPB.setRealUser(ugi.getRealUser().getUserName());
|
||||
}
|
||||
}
|
||||
return userInfoPB.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create connection preamble
|
||||
* @param byteBuf to write to
|
||||
* @param authMethod to write
|
||||
*/
|
||||
private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
|
||||
byteBuf.writeBytes(HConstants.RPC_HEADER);
|
||||
byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
|
||||
byteBuf.writeByte(authMethod.code);
|
||||
}
|
||||
|
||||
private void close0(Throwable e) {
|
||||
List<AsyncCall> toCleanup;
|
||||
synchronized (pendingCalls) {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
closed = true;
|
||||
toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
|
||||
pendingCalls.clear();
|
||||
}
|
||||
IOException closeException = null;
|
||||
if (e != null) {
|
||||
if (e instanceof IOException) {
|
||||
closeException = (IOException) e;
|
||||
} else {
|
||||
closeException = new IOException(e);
|
||||
}
|
||||
}
|
||||
// log the info
|
||||
if (LOG.isDebugEnabled() && closeException != null) {
|
||||
LOG.debug(name + ": closing ipc connection to " + address, closeException);
|
||||
}
|
||||
if (cleanupTimer != null) {
|
||||
cleanupTimer.cancel();
|
||||
cleanupTimer = null;
|
||||
}
|
||||
for (AsyncCall call : toCleanup) {
|
||||
call.setFailed(closeException != null ? closeException
|
||||
: new ConnectionClosingException(
|
||||
"Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
|
||||
}
|
||||
channel.disconnect().addListener(ChannelFutureListener.CLOSE);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(name + ": closed");
|
||||
}
|
||||
}
|
||||
EventExecutor getEventExecutor();
|
||||
|
||||
/**
|
||||
* Close connection
|
||||
* @param e exception on close
|
||||
* @param cause of closure.
|
||||
*/
|
||||
public void close(final Throwable e) {
|
||||
client.removeConnection(this);
|
||||
|
||||
// Move closing from the requesting thread to the channel thread
|
||||
if (channel.eventLoop().inEventLoop()) {
|
||||
close0(e);
|
||||
} else {
|
||||
channel.eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
close0(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up calls.
|
||||
*/
|
||||
private void cleanupCalls() {
|
||||
List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
|
||||
long currentTime = EnvironmentEdgeManager.currentTime();
|
||||
long nextCleanupTaskDelay = -1L;
|
||||
synchronized (pendingCalls) {
|
||||
for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
|
||||
AsyncCall call = iter.next();
|
||||
long timeout = call.getRpcTimeout();
|
||||
if (timeout > 0) {
|
||||
if (currentTime - call.getStartTime() >= timeout) {
|
||||
iter.remove();
|
||||
toCleanup.add(call);
|
||||
} else {
|
||||
if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
|
||||
nextCleanupTaskDelay = timeout;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (nextCleanupTaskDelay > 0) {
|
||||
cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
cleanupTimer = null;
|
||||
}
|
||||
}
|
||||
for (AsyncCall call : toCleanup) {
|
||||
call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
|
||||
+ (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
|
||||
}
|
||||
}
|
||||
void close(Throwable cause);
|
||||
|
||||
/**
|
||||
* Check if the connection is alive
|
||||
*
|
||||
* @return true if alive
|
||||
*/
|
||||
public boolean isAlive() {
|
||||
return channel.isOpen();
|
||||
}
|
||||
boolean isAlive();
|
||||
|
||||
/**
|
||||
* Check if user should authenticate over Kerberos
|
||||
* @return true if should be authenticated over Kerberos
|
||||
* @throws java.io.IOException on failure of check
|
||||
* Get the address on which this channel operates
|
||||
* @return InetSocketAddress
|
||||
*/
|
||||
private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
|
||||
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
|
||||
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
|
||||
UserGroupInformation realUser = currentUser.getRealUser();
|
||||
return authMethod == AuthMethod.KERBEROS && loginUser != null &&
|
||||
// Make sure user logged in using Kerberos either keytab or TGT
|
||||
loginUser.hasKerberosCredentials() &&
|
||||
// relogin only in case it is the login user (e.g. JT)
|
||||
// or superuser (like oozie).
|
||||
(loginUser.equals(currentUser) || loginUser.equals(realUser));
|
||||
}
|
||||
|
||||
/**
|
||||
* If multiple clients with the same principal try to connect to the same server at the same time,
|
||||
* the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
|
||||
* work around this, what is done is that the client backs off randomly and tries to initiate the
|
||||
* connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
|
||||
* attempted.
|
||||
* <p>
|
||||
* The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the
|
||||
* user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such
|
||||
* cases, it is prudent to throw a runtime exception when we receive a SaslException from the
|
||||
* underlying authentication implementation, so there is no retry from other high level (for eg,
|
||||
* HCM or HBaseAdmin).
|
||||
* </p>
|
||||
* @param currRetries retry count
|
||||
* @param ex exception describing fail
|
||||
* @param user which is trying to connect
|
||||
* @throws java.io.IOException if IO fail
|
||||
* @throws InterruptedException if thread is interrupted
|
||||
*/
|
||||
private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
|
||||
final UserGroupInformation user) throws IOException, InterruptedException {
|
||||
user.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws IOException, InterruptedException {
|
||||
if (shouldAuthenticateOverKrb()) {
|
||||
if (currRetries < MAX_SASL_RETRIES) {
|
||||
LOG.debug("Exception encountered while connecting to the server : " + ex);
|
||||
// try re-login
|
||||
if (UserGroupInformation.isLoginKeytabBased()) {
|
||||
UserGroupInformation.getLoginUser().reloginFromKeytab();
|
||||
} else {
|
||||
UserGroupInformation.getLoginUser().reloginFromTicketCache();
|
||||
}
|
||||
|
||||
// Should reconnect
|
||||
return null;
|
||||
} else {
|
||||
String msg = "Couldn't setup connection for "
|
||||
+ UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
|
||||
LOG.warn(msg);
|
||||
throw (IOException) new IOException(msg).initCause(ex);
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Exception encountered while connecting to " + "the server : " + ex);
|
||||
}
|
||||
if (ex instanceof RemoteException) {
|
||||
throw (RemoteException) ex;
|
||||
}
|
||||
if (ex instanceof SaslException) {
|
||||
String msg = "SASL authentication failed."
|
||||
+ " The most likely cause is missing or invalid credentials." + " Consider 'kinit'.";
|
||||
LOG.fatal(msg, ex);
|
||||
throw new RuntimeException(msg, ex);
|
||||
}
|
||||
throw new IOException(ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public int getConnectionHashCode() {
|
||||
return ConnectionId.hashCode(ticket, serviceName, address);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getConnectionHashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj instanceof AsyncRpcChannel) {
|
||||
AsyncRpcChannel channel = (AsyncRpcChannel) obj;
|
||||
return channel.hashCode() == obj.hashCode();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
|
||||
}
|
||||
|
||||
/**
|
||||
* Listens to call writes and fails if write failed
|
||||
*/
|
||||
private static final class CallWriteListener implements ChannelFutureListener {
|
||||
private final AsyncRpcChannel rpcChannel;
|
||||
private final int id;
|
||||
|
||||
public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) {
|
||||
this.rpcChannel = asyncRpcChannel;
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
AsyncCall call = rpcChannel.removePendingCall(id);
|
||||
if (call != null) {
|
||||
if (future.cause() instanceof IOException) {
|
||||
call.setFailed((IOException) future.cause());
|
||||
} else {
|
||||
call.setFailed(new IOException(future.cause()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
InetSocketAddress getAddress();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,743 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcCallback;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufOutputStream;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
|
||||
import io.netty.util.Timeout;
|
||||
import io.netty.util.TimerTask;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.security.sasl.SaslException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.MetricsConnection;
|
||||
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
|
||||
import org.apache.hadoop.hbase.security.AuthMethod;
|
||||
import org.apache.hadoop.hbase.security.SaslClientHandler;
|
||||
import org.apache.hadoop.hbase.security.SaslUtil;
|
||||
import org.apache.hadoop.hbase.security.SecurityInfo;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.TokenSelector;
|
||||
import org.apache.htrace.Span;
|
||||
import org.apache.htrace.Trace;
|
||||
|
||||
/**
|
||||
* Netty RPC channel
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class AsyncRpcChannelImpl implements AsyncRpcChannel {
|
||||
private static final Log LOG = LogFactory.getLog(AsyncRpcChannelImpl.class.getName());
|
||||
|
||||
private static final int MAX_SASL_RETRIES = 5;
|
||||
|
||||
protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDDLERS
|
||||
= new HashMap<>();
|
||||
|
||||
static {
|
||||
TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
|
||||
new AuthenticationTokenSelector());
|
||||
}
|
||||
|
||||
final AsyncRpcClient client;
|
||||
|
||||
// Contains the channel to work with.
|
||||
// Only exists when connected
|
||||
private Channel channel;
|
||||
|
||||
String name;
|
||||
final User ticket;
|
||||
final String serviceName;
|
||||
final InetSocketAddress address;
|
||||
|
||||
private int failureCounter = 0;
|
||||
|
||||
boolean useSasl;
|
||||
AuthMethod authMethod;
|
||||
private int reloginMaxBackoff;
|
||||
private Token<? extends TokenIdentifier> token;
|
||||
private String serverPrincipal;
|
||||
|
||||
// NOTE: closed and connected flags below are only changed when a lock on pendingCalls
|
||||
private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
|
||||
private boolean connected = false;
|
||||
private boolean closed = false;
|
||||
|
||||
private Timeout cleanupTimer;
|
||||
|
||||
private final TimerTask timeoutTask = new TimerTask() {
|
||||
@Override
|
||||
public void run(Timeout timeout) throws Exception {
|
||||
cleanupCalls();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Constructor for netty RPC channel
|
||||
* @param bootstrap to construct channel on
|
||||
* @param client to connect with
|
||||
* @param ticket of user which uses connection
|
||||
* @param serviceName name of service to connect to
|
||||
* @param address to connect to
|
||||
*/
|
||||
public AsyncRpcChannelImpl(Bootstrap bootstrap, final AsyncRpcClient client, User ticket,
|
||||
String serviceName, InetSocketAddress address) {
|
||||
this.client = client;
|
||||
|
||||
this.ticket = ticket;
|
||||
this.serviceName = serviceName;
|
||||
this.address = address;
|
||||
|
||||
this.channel = connect(bootstrap).channel();
|
||||
|
||||
name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString()
|
||||
+ ((ticket == null) ? " from unknown user" : (" from " + ticket.getName())));
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to channel
|
||||
* @param bootstrap to connect to
|
||||
* @return future of connection
|
||||
*/
|
||||
private ChannelFuture connect(final Bootstrap bootstrap) {
|
||||
return bootstrap.remoteAddress(address).connect()
|
||||
.addListener(new GenericFutureListener<ChannelFuture>() {
|
||||
@Override
|
||||
public void operationComplete(final ChannelFuture f) throws Exception {
|
||||
if (!f.isSuccess()) {
|
||||
retryOrClose(bootstrap, failureCounter++, client.failureSleep, f.cause());
|
||||
return;
|
||||
}
|
||||
channel = f.channel();
|
||||
|
||||
setupAuthorization();
|
||||
|
||||
ByteBuf b = channel.alloc().directBuffer(6);
|
||||
createPreamble(b, authMethod);
|
||||
channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
|
||||
if (useSasl) {
|
||||
UserGroupInformation ticket = AsyncRpcChannelImpl.this.ticket.getUGI();
|
||||
if (authMethod == AuthMethod.KERBEROS) {
|
||||
if (ticket != null && ticket.getRealUser() != null) {
|
||||
ticket = ticket.getRealUser();
|
||||
}
|
||||
}
|
||||
SaslClientHandler saslHandler;
|
||||
if (ticket == null) {
|
||||
throw new FatalConnectionException("ticket/user is null");
|
||||
}
|
||||
final UserGroupInformation realTicket = ticket;
|
||||
saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() {
|
||||
@Override
|
||||
public SaslClientHandler run() throws IOException {
|
||||
return getSaslHandler(realTicket, bootstrap);
|
||||
}
|
||||
});
|
||||
if (saslHandler != null) {
|
||||
// Sasl connect is successful. Let's set up Sasl channel handler
|
||||
channel.pipeline().addFirst(saslHandler);
|
||||
} else {
|
||||
// fall back to simple auth because server told us so.
|
||||
authMethod = AuthMethod.SIMPLE;
|
||||
useSasl = false;
|
||||
}
|
||||
} else {
|
||||
startHBaseConnection(f.channel());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Start HBase connection
|
||||
* @param ch channel to start connection on
|
||||
*/
|
||||
private void startHBaseConnection(Channel ch) {
|
||||
ch.pipeline().addLast("frameDecoder",
|
||||
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
|
||||
ch.pipeline().addLast(new AsyncServerResponseHandler(this));
|
||||
try {
|
||||
writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
close(future.cause());
|
||||
return;
|
||||
}
|
||||
List<AsyncCall> callsToWrite;
|
||||
synchronized (pendingCalls) {
|
||||
connected = true;
|
||||
callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
|
||||
}
|
||||
for (AsyncCall call : callsToWrite) {
|
||||
writeRequest(call);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
close(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get SASL handler
|
||||
* @param bootstrap to reconnect to
|
||||
* @return new SASL handler
|
||||
* @throws java.io.IOException if handler failed to create
|
||||
*/
|
||||
private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
|
||||
final Bootstrap bootstrap) throws IOException {
|
||||
return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
|
||||
client.fallbackAllowed,
|
||||
client.conf.get("hbase.rpc.protection",
|
||||
SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()),
|
||||
new SaslClientHandler.SaslExceptionHandler() {
|
||||
@Override
|
||||
public void handle(int retryCount, Random random, Throwable cause) {
|
||||
try {
|
||||
// Handle Sasl failure. Try to potentially get new credentials
|
||||
handleSaslConnectionFailure(retryCount, cause, realTicket);
|
||||
|
||||
retryOrClose(bootstrap, failureCounter++, random.nextInt(reloginMaxBackoff) + 1,
|
||||
cause);
|
||||
} catch (IOException | InterruptedException e) {
|
||||
close(e);
|
||||
}
|
||||
}
|
||||
}, new SaslClientHandler.SaslSuccessfulConnectHandler() {
|
||||
@Override
|
||||
public void onSuccess(Channel channel) {
|
||||
startHBaseConnection(channel);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Retry to connect or close
|
||||
* @param bootstrap to connect with
|
||||
* @param failureCount failure count
|
||||
* @param e exception of fail
|
||||
*/
|
||||
private void retryOrClose(final Bootstrap bootstrap, int failureCount, long timeout,
|
||||
Throwable e) {
|
||||
if (failureCount < client.maxRetries) {
|
||||
client.newTimeout(new TimerTask() {
|
||||
@Override
|
||||
public void run(Timeout timeout) throws Exception {
|
||||
connect(bootstrap);
|
||||
}
|
||||
}, timeout, TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
client.failedServers.addToFailedServers(address);
|
||||
close(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls method on channel
|
||||
* @param method to call
|
||||
* @param controller to run call with
|
||||
* @param request to send
|
||||
* @param responsePrototype to construct response with
|
||||
*/
|
||||
public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
|
||||
final PayloadCarryingRpcController controller, final Message request,
|
||||
final Message responsePrototype, MetricsConnection.CallStats callStats) {
|
||||
final AsyncCall call = new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(),
|
||||
method, request, controller, responsePrototype, callStats);
|
||||
controller.notifyOnCancel(new RpcCallback<Object>() {
|
||||
@Override
|
||||
public void run(Object parameter) {
|
||||
// TODO: do not need to call AsyncCall.setFailed?
|
||||
synchronized (pendingCalls) {
|
||||
pendingCalls.remove(call.id);
|
||||
}
|
||||
}
|
||||
});
|
||||
// TODO: this should be handled by PayloadCarryingRpcController.
|
||||
if (controller.isCanceled()) {
|
||||
// To finish if the call was cancelled before we set the notification (race condition)
|
||||
call.cancel(true);
|
||||
return call;
|
||||
}
|
||||
|
||||
synchronized (pendingCalls) {
|
||||
if (closed) {
|
||||
Promise<Message> promise = channel.eventLoop().newPromise();
|
||||
promise.setFailure(new ConnectException());
|
||||
return promise;
|
||||
}
|
||||
pendingCalls.put(call.id, call);
|
||||
// Add timeout for cleanup if none is present
|
||||
if (cleanupTimer == null && call.getRpcTimeout() > 0) {
|
||||
cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
if (!connected) {
|
||||
return call;
|
||||
}
|
||||
}
|
||||
writeRequest(call);
|
||||
return call;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventLoop getEventExecutor() {
|
||||
return this.channel.eventLoop();
|
||||
}
|
||||
|
||||
AsyncCall removePendingCall(int id) {
|
||||
synchronized (pendingCalls) {
|
||||
return pendingCalls.remove(id);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write the channel header
|
||||
* @param channel to write to
|
||||
* @return future of write
|
||||
* @throws java.io.IOException on failure to write
|
||||
*/
|
||||
private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
|
||||
RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder()
|
||||
.setServiceName(serviceName);
|
||||
|
||||
RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
|
||||
if (userInfoPB != null) {
|
||||
headerBuilder.setUserInfo(userInfoPB);
|
||||
}
|
||||
|
||||
if (client.codec != null) {
|
||||
headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
|
||||
}
|
||||
if (client.compressor != null) {
|
||||
headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
|
||||
}
|
||||
|
||||
headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
|
||||
RPCProtos.ConnectionHeader header = headerBuilder.build();
|
||||
|
||||
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
|
||||
|
||||
ByteBuf b = channel.alloc().directBuffer(totalSize);
|
||||
|
||||
b.writeInt(header.getSerializedSize());
|
||||
b.writeBytes(header.toByteArray());
|
||||
|
||||
return channel.writeAndFlush(b);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write request to channel
|
||||
* @param call to write
|
||||
*/
|
||||
private void writeRequest(final AsyncCall call) {
|
||||
try {
|
||||
final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
|
||||
.newBuilder();
|
||||
requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName())
|
||||
.setRequestParam(call.param != null);
|
||||
|
||||
if (Trace.isTracing()) {
|
||||
Span s = Trace.currentSpan();
|
||||
requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder()
|
||||
.setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
|
||||
}
|
||||
|
||||
ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner());
|
||||
if (cellBlock != null) {
|
||||
final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
|
||||
.newBuilder();
|
||||
cellBlockBuilder.setLength(cellBlock.limit());
|
||||
requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
|
||||
}
|
||||
// Only pass priority if there one. Let zero be same as no priority.
|
||||
if (call.controller.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) {
|
||||
requestHeaderBuilder.setPriority(call.controller.getPriority());
|
||||
}
|
||||
|
||||
RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
|
||||
|
||||
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
|
||||
if (cellBlock != null) {
|
||||
totalSize += cellBlock.remaining();
|
||||
}
|
||||
|
||||
ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
|
||||
try (ByteBufOutputStream out = new ByteBufOutputStream(b)) {
|
||||
call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
|
||||
}
|
||||
|
||||
channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
|
||||
} catch (IOException e) {
|
||||
close(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up server authorization
|
||||
* @throws java.io.IOException if auth setup failed
|
||||
*/
|
||||
private void setupAuthorization() throws IOException {
|
||||
SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
|
||||
this.useSasl = client.userProvider.isHBaseSecurityEnabled();
|
||||
|
||||
this.token = null;
|
||||
if (useSasl && securityInfo != null) {
|
||||
AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
|
||||
if (tokenKind != null) {
|
||||
TokenSelector<? extends TokenIdentifier> tokenSelector = TOKEN_HANDDLERS.get(tokenKind);
|
||||
if (tokenSelector != null) {
|
||||
token = tokenSelector.selectToken(new Text(client.clusterId),
|
||||
ticket.getUGI().getTokens());
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("No token selector found for type " + tokenKind);
|
||||
}
|
||||
}
|
||||
String serverKey = securityInfo.getServerPrincipal();
|
||||
if (serverKey == null) {
|
||||
throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
|
||||
}
|
||||
this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
|
||||
address.getAddress().getCanonicalHostName().toLowerCase());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is "
|
||||
+ serverPrincipal);
|
||||
}
|
||||
}
|
||||
|
||||
if (!useSasl) {
|
||||
authMethod = AuthMethod.SIMPLE;
|
||||
} else if (token != null) {
|
||||
authMethod = AuthMethod.DIGEST;
|
||||
} else {
|
||||
authMethod = AuthMethod.KERBEROS;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(
|
||||
"Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl);
|
||||
}
|
||||
reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the user information
|
||||
* @param ugi User Group Information
|
||||
* @param authMethod Authorization method
|
||||
* @return UserInformation protobuf
|
||||
*/
|
||||
private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
|
||||
if (ugi == null || authMethod == AuthMethod.DIGEST) {
|
||||
// Don't send user for token auth
|
||||
return null;
|
||||
}
|
||||
RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
|
||||
if (authMethod == AuthMethod.KERBEROS) {
|
||||
// Send effective user for Kerberos auth
|
||||
userInfoPB.setEffectiveUser(ugi.getUserName());
|
||||
} else if (authMethod == AuthMethod.SIMPLE) {
|
||||
// Send both effective user and real user for simple auth
|
||||
userInfoPB.setEffectiveUser(ugi.getUserName());
|
||||
if (ugi.getRealUser() != null) {
|
||||
userInfoPB.setRealUser(ugi.getRealUser().getUserName());
|
||||
}
|
||||
}
|
||||
return userInfoPB.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create connection preamble
|
||||
* @param byteBuf to write to
|
||||
* @param authMethod to write
|
||||
*/
|
||||
private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
|
||||
byteBuf.writeBytes(HConstants.RPC_HEADER);
|
||||
byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
|
||||
byteBuf.writeByte(authMethod.code);
|
||||
}
|
||||
|
||||
private void close0(Throwable e) {
|
||||
List<AsyncCall> toCleanup;
|
||||
synchronized (pendingCalls) {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
closed = true;
|
||||
toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
|
||||
pendingCalls.clear();
|
||||
}
|
||||
IOException closeException = null;
|
||||
if (e != null) {
|
||||
if (e instanceof IOException) {
|
||||
closeException = (IOException) e;
|
||||
} else {
|
||||
closeException = new IOException(e);
|
||||
}
|
||||
}
|
||||
// log the info
|
||||
if (LOG.isDebugEnabled() && closeException != null) {
|
||||
LOG.debug(name + ": closing ipc connection to " + address, closeException);
|
||||
}
|
||||
if (cleanupTimer != null) {
|
||||
cleanupTimer.cancel();
|
||||
cleanupTimer = null;
|
||||
}
|
||||
for (AsyncCall call : toCleanup) {
|
||||
call.setFailed(closeException != null ? closeException
|
||||
: new ConnectionClosingException(
|
||||
"Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
|
||||
}
|
||||
channel.disconnect().addListener(ChannelFutureListener.CLOSE);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(name + ": closed");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close connection
|
||||
* @param e exception on close
|
||||
*/
|
||||
public void close(final Throwable e) {
|
||||
client.removeConnection(this);
|
||||
|
||||
// Move closing from the requesting thread to the channel thread
|
||||
if (channel.eventLoop().inEventLoop()) {
|
||||
close0(e);
|
||||
} else {
|
||||
channel.eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
close0(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up calls.
|
||||
*/
|
||||
private void cleanupCalls() {
|
||||
List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
|
||||
long currentTime = EnvironmentEdgeManager.currentTime();
|
||||
long nextCleanupTaskDelay = -1L;
|
||||
synchronized (pendingCalls) {
|
||||
for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
|
||||
AsyncCall call = iter.next();
|
||||
long timeout = call.getRpcTimeout();
|
||||
if (timeout > 0) {
|
||||
if (currentTime - call.getStartTime() >= timeout) {
|
||||
iter.remove();
|
||||
toCleanup.add(call);
|
||||
} else {
|
||||
if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
|
||||
nextCleanupTaskDelay = timeout;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (nextCleanupTaskDelay > 0) {
|
||||
cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
cleanupTimer = null;
|
||||
}
|
||||
}
|
||||
for (AsyncCall call : toCleanup) {
|
||||
call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
|
||||
+ (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the connection is alive
|
||||
* @return true if alive
|
||||
*/
|
||||
public boolean isAlive() {
|
||||
return channel.isOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getAddress() {
|
||||
return this.address;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if user should authenticate over Kerberos
|
||||
* @return true if should be authenticated over Kerberos
|
||||
* @throws java.io.IOException on failure of check
|
||||
*/
|
||||
private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
|
||||
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
|
||||
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
|
||||
UserGroupInformation realUser = currentUser.getRealUser();
|
||||
return authMethod == AuthMethod.KERBEROS && loginUser != null &&
|
||||
// Make sure user logged in using Kerberos either keytab or TGT
|
||||
loginUser.hasKerberosCredentials() &&
|
||||
// relogin only in case it is the login user (e.g. JT)
|
||||
// or superuser (like oozie).
|
||||
(loginUser.equals(currentUser) || loginUser.equals(realUser));
|
||||
}
|
||||
|
||||
/**
|
||||
* If multiple clients with the same principal try to connect to the same server at the same time,
|
||||
* the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
|
||||
* work around this, what is done is that the client backs off randomly and tries to initiate the
|
||||
* connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
|
||||
* attempted.
|
||||
* <p>
|
||||
* The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the
|
||||
* user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such
|
||||
* cases, it is prudent to throw a runtime exception when we receive a SaslException from the
|
||||
* underlying authentication implementation, so there is no retry from other high level (for eg,
|
||||
* HCM or HBaseAdmin).
|
||||
* </p>
|
||||
* @param currRetries retry count
|
||||
* @param ex exception describing fail
|
||||
* @param user which is trying to connect
|
||||
* @throws java.io.IOException if IO fail
|
||||
* @throws InterruptedException if thread is interrupted
|
||||
*/
|
||||
private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
|
||||
final UserGroupInformation user) throws IOException, InterruptedException {
|
||||
user.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws IOException, InterruptedException {
|
||||
if (shouldAuthenticateOverKrb()) {
|
||||
if (currRetries < MAX_SASL_RETRIES) {
|
||||
LOG.debug("Exception encountered while connecting to the server : " + ex);
|
||||
// try re-login
|
||||
if (UserGroupInformation.isLoginKeytabBased()) {
|
||||
UserGroupInformation.getLoginUser().reloginFromKeytab();
|
||||
} else {
|
||||
UserGroupInformation.getLoginUser().reloginFromTicketCache();
|
||||
}
|
||||
|
||||
// Should reconnect
|
||||
return null;
|
||||
} else {
|
||||
String msg = "Couldn't setup connection for "
|
||||
+ UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
|
||||
LOG.warn(msg);
|
||||
throw (IOException) new IOException(msg).initCause(ex);
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Exception encountered while connecting to " + "the server : " + ex);
|
||||
}
|
||||
if (ex instanceof RemoteException) {
|
||||
throw (RemoteException) ex;
|
||||
}
|
||||
if (ex instanceof SaslException) {
|
||||
String msg = "SASL authentication failed."
|
||||
+ " The most likely cause is missing or invalid credentials." + " Consider 'kinit'.";
|
||||
LOG.fatal(msg, ex);
|
||||
throw new RuntimeException(msg, ex);
|
||||
}
|
||||
throw new IOException(ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public int getConnectionHashCode() {
|
||||
return ConnectionId.hashCode(ticket, serviceName, address);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getConnectionHashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj instanceof AsyncRpcChannelImpl) {
|
||||
AsyncRpcChannelImpl channel = (AsyncRpcChannelImpl) obj;
|
||||
return channel.hashCode() == obj.hashCode();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
|
||||
}
|
||||
|
||||
/**
|
||||
* Listens to call writes and fails if write failed
|
||||
*/
|
||||
private static final class CallWriteListener implements ChannelFutureListener {
|
||||
private final AsyncRpcChannelImpl rpcChannel;
|
||||
private final int id;
|
||||
|
||||
public CallWriteListener(AsyncRpcChannelImpl asyncRpcChannelImpl, int id) {
|
||||
this.rpcChannel = asyncRpcChannelImpl;
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
AsyncCall call = rpcChannel.removePendingCall(id);
|
||||
if (call != null) {
|
||||
if (future.cause() instanceof IOException) {
|
||||
call.setFailed((IOException) future.cause());
|
||||
} else {
|
||||
call.setFailed(new IOException(future.cause()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -388,11 +388,11 @@ public class AsyncRpcClient extends AbstractRpcClient {
|
|||
}
|
||||
rpcChannel = connections.get(hashCode);
|
||||
if (rpcChannel != null && !rpcChannel.isAlive()) {
|
||||
LOG.debug("Removing dead channel from server="+rpcChannel.address.toString());
|
||||
LOG.debug("Removing dead channel from server="+rpcChannel.getAddress().toString());
|
||||
connections.remove(hashCode);
|
||||
}
|
||||
if (rpcChannel == null) {
|
||||
rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location);
|
||||
rpcChannel = new AsyncRpcChannelImpl(this.bootstrap, this, ticket, serviceName, location);
|
||||
connections.put(hashCode, rpcChannel);
|
||||
}
|
||||
}
|
||||
|
@ -415,8 +415,8 @@ public class AsyncRpcClient extends AbstractRpcClient {
|
|||
synchronized (connections) {
|
||||
for (AsyncRpcChannel rpcChannel : connections.values()) {
|
||||
if (rpcChannel.isAlive() &&
|
||||
rpcChannel.address.getPort() == sn.getPort() &&
|
||||
rpcChannel.address.getHostName().contentEquals(sn.getHostname())) {
|
||||
rpcChannel.getAddress().getPort() == sn.getPort() &&
|
||||
rpcChannel.getAddress().getHostName().contentEquals(sn.getHostname())) {
|
||||
LOG.info("The server on " + sn.toString() +
|
||||
" is dead - stopping the connection " + rpcChannel.toString());
|
||||
rpcChannel.close(null);
|
||||
|
|
|
@ -37,13 +37,13 @@ import io.netty.channel.SimpleChannelInboundHandler;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class AsyncServerResponseHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||
private final AsyncRpcChannel channel;
|
||||
private final AsyncRpcChannelImpl channel;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param channel on which this response handler operates
|
||||
*/
|
||||
public AsyncServerResponseHandler(AsyncRpcChannel channel) {
|
||||
public AsyncServerResponseHandler(AsyncRpcChannelImpl channel) {
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -7,7 +7,7 @@
|
|||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
@ -15,68 +15,20 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import com.google.protobuf.BlockingRpcChannel;
|
||||
import com.google.protobuf.RpcChannel;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
||||
|
||||
import com.google.protobuf.BlockingRpcChannel;
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcChannel;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
/**
|
||||
* Base class which provides clients with an RPC connection to
|
||||
* Base interface which provides clients with an RPC connection to
|
||||
* call coprocessor endpoint {@link com.google.protobuf.Service}s.
|
||||
* Note that clients should not use this class directly, except through
|
||||
* {@link org.apache.hadoop.hbase.client.HTableInterface#coprocessorService(byte[])}.
|
||||
* {@link org.apache.hadoop.hbase.client.Table#coprocessorService(byte[])}.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class CoprocessorRpcChannel implements RpcChannel, BlockingRpcChannel {
|
||||
private static final Log LOG = LogFactory.getLog(CoprocessorRpcChannel.class);
|
||||
|
||||
@Override
|
||||
@InterfaceAudience.Private
|
||||
public void callMethod(Descriptors.MethodDescriptor method,
|
||||
RpcController controller,
|
||||
Message request, Message responsePrototype,
|
||||
RpcCallback<Message> callback) {
|
||||
Message response = null;
|
||||
try {
|
||||
response = callExecService(controller, method, request, responsePrototype);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Call failed on IOException", ioe);
|
||||
ResponseConverter.setControllerException(controller, ioe);
|
||||
}
|
||||
if (callback != null) {
|
||||
callback.run(response);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@InterfaceAudience.Private
|
||||
public Message callBlockingMethod(Descriptors.MethodDescriptor method,
|
||||
RpcController controller,
|
||||
Message request, Message responsePrototype)
|
||||
throws ServiceException {
|
||||
try {
|
||||
return callExecService(controller, method, request, responsePrototype);
|
||||
} catch (IOException ioe) {
|
||||
throw new ServiceException("Error calling method "+method.getFullName(), ioe);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract Message callExecService(RpcController controller,
|
||||
Descriptors.MethodDescriptor method, Message request, Message responsePrototype)
|
||||
throws IOException;
|
||||
}
|
||||
public interface CoprocessorRpcChannel extends RpcChannel, BlockingRpcChannel {
|
||||
}
|
|
@ -42,7 +42,7 @@ import com.google.protobuf.RpcController;
|
|||
* @see org.apache.hadoop.hbase.client.HBaseAdmin#coprocessorService()
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MasterCoprocessorRpcChannel extends CoprocessorRpcChannel{
|
||||
public class MasterCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
|
||||
private static final Log LOG = LogFactory.getLog(MasterCoprocessorRpcChannel.class);
|
||||
|
||||
private final ClusterConnection connection;
|
||||
|
|
|
@ -45,7 +45,7 @@ import com.google.protobuf.RpcController;
|
|||
* @see org.apache.hadoop.hbase.client.Table#coprocessorService(byte[])
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
|
||||
public class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
|
||||
private static final Log LOG = LogFactory.getLog(RegionCoprocessorRpcChannel.class);
|
||||
|
||||
private final ClusterConnection connection;
|
||||
|
@ -57,6 +57,12 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
|
|||
private RpcRetryingCallerFactory rpcCallerFactory;
|
||||
private RpcControllerFactory rpcControllerFactory;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param conn connection to use
|
||||
* @param table to connect to
|
||||
* @param row to locate region with
|
||||
*/
|
||||
public RegionCoprocessorRpcChannel(ClusterConnection conn, TableName table, byte[] row) {
|
||||
this.connection = conn;
|
||||
this.table = table;
|
||||
|
@ -114,6 +120,10 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
|
|||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get last region this RpcChannel communicated with
|
||||
* @return region name as byte array
|
||||
*/
|
||||
public byte[] getLastRegion() {
|
||||
return lastRegion;
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ import com.google.protobuf.RpcController;
|
|||
* @see org.apache.hadoop.hbase.client.HBaseAdmin#coprocessorService(ServerName)
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RegionServerCoprocessorRpcChannel extends CoprocessorRpcChannel {
|
||||
public class RegionServerCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
|
||||
private static final Log LOG = LogFactory.getLog(RegionServerCoprocessorRpcChannel.class);
|
||||
private final ClusterConnection connection;
|
||||
private final ServerName serverName;
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
||||
|
||||
/**
|
||||
* Base class which provides clients with an RPC connection to
|
||||
* call coprocessor endpoint {@link com.google.protobuf.Service}s.
|
||||
* Note that clients should not use this class directly, except through
|
||||
* {@link org.apache.hadoop.hbase.client.HTableInterface#coprocessorService(byte[])}.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class SyncCoprocessorRpcChannel implements CoprocessorRpcChannel {
|
||||
private static final Log LOG = LogFactory.getLog(SyncCoprocessorRpcChannel.class);
|
||||
|
||||
@Override
|
||||
@InterfaceAudience.Private
|
||||
public void callMethod(Descriptors.MethodDescriptor method,
|
||||
RpcController controller,
|
||||
Message request, Message responsePrototype,
|
||||
RpcCallback<Message> callback) {
|
||||
Message response = null;
|
||||
try {
|
||||
response = callExecService(controller, method, request, responsePrototype);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Call failed on IOException", ioe);
|
||||
ResponseConverter.setControllerException(controller, ioe);
|
||||
}
|
||||
if (callback != null) {
|
||||
callback.run(response);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@InterfaceAudience.Private
|
||||
public Message callBlockingMethod(Descriptors.MethodDescriptor method,
|
||||
RpcController controller,
|
||||
Message request, Message responsePrototype)
|
||||
throws ServiceException {
|
||||
try {
|
||||
return callExecService(controller, method, request, responsePrototype);
|
||||
} catch (IOException ioe) {
|
||||
throw new ServiceException("Error calling method "+method.getFullName(), ioe);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract Message callExecService(RpcController controller,
|
||||
Descriptors.MethodDescriptor method, Message request, Message responsePrototype)
|
||||
throws IOException;
|
||||
}
|
Loading…
Reference in New Issue