HBASE-16433 Remove AsyncRpcChannel related stuffs
This commit is contained in:
parent
8a692ff189
commit
a1f760ff76
|
@ -1,34 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.client;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Promise for responses
|
|
||||||
* @param <V> Value type
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Public
|
|
||||||
@InterfaceStability.Evolving
|
|
||||||
|
|
||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_SAME_SIMPLE_NAME_AS_INTERFACE",
|
|
||||||
justification="Agree that this can be confusing but folks will pull in this and think twice "
|
|
||||||
+ "about pulling in netty; incidence of confusion should be rare in this case.")
|
|
||||||
public interface Future<V> extends io.netty.util.concurrent.Future<V> {
|
|
||||||
}
|
|
|
@ -1,30 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.client;
|
|
||||||
|
|
||||||
import io.netty.util.concurrent.GenericFutureListener;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Specific interface for the Response future listener
|
|
||||||
* @param <V> Value type.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public interface ResponseFutureListener<V>
|
|
||||||
extends GenericFutureListener<Future<V>> {
|
|
||||||
}
|
|
|
@ -19,7 +19,11 @@ package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
import com.google.protobuf.Descriptors;
|
import com.google.protobuf.Descriptors;
|
||||||
import com.google.protobuf.Message;
|
import com.google.protobuf.Message;
|
||||||
|
|
||||||
|
import io.netty.util.concurrent.DefaultPromise;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
|
@ -39,12 +43,12 @@ import org.apache.hadoop.ipc.RemoteException;
|
||||||
* @param <M> Message returned in communication to be converted
|
* @param <M> Message returned in communication to be converted
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class AsyncCall<M extends Message, T> extends Promise<T> {
|
public class AsyncCall<M extends Message, T> extends DefaultPromise<T> {
|
||||||
private static final Log LOG = LogFactory.getLog(AsyncCall.class.getName());
|
private static final Log LOG = LogFactory.getLog(AsyncCall.class.getName());
|
||||||
|
|
||||||
final int id;
|
final int id;
|
||||||
|
|
||||||
private final AsyncRpcChannelImpl channel;
|
private final AsyncRpcChannel channel;
|
||||||
|
|
||||||
final Descriptors.MethodDescriptor method;
|
final Descriptors.MethodDescriptor method;
|
||||||
final Message param;
|
final Message param;
|
||||||
|
@ -77,7 +81,7 @@ public class AsyncCall<M extends Message, T> extends Promise<T> {
|
||||||
* @param priority for this request
|
* @param priority for this request
|
||||||
* @param metrics MetricsConnection to which the metrics are stored for this request
|
* @param metrics MetricsConnection to which the metrics are stored for this request
|
||||||
*/
|
*/
|
||||||
public AsyncCall(AsyncRpcChannelImpl channel, int connectId, Descriptors.MethodDescriptor
|
public AsyncCall(AsyncRpcChannel channel, int connectId, Descriptors.MethodDescriptor
|
||||||
md, Message param, CellScanner cellScanner, M responseDefaultType, MessageConverter<M, T>
|
md, Message param, CellScanner cellScanner, M responseDefaultType, MessageConverter<M, T>
|
||||||
messageConverter, IOExceptionConverter exceptionConverter, long rpcTimeout, int priority,
|
messageConverter, IOExceptionConverter exceptionConverter, long rpcTimeout, int priority,
|
||||||
MetricsConnection metrics) {
|
MetricsConnection metrics) {
|
||||||
|
|
|
@ -20,19 +20,298 @@ package org.apache.hadoop.hbase.ipc;
|
||||||
import com.google.protobuf.Descriptors;
|
import com.google.protobuf.Descriptors;
|
||||||
import com.google.protobuf.Message;
|
import com.google.protobuf.Message;
|
||||||
|
|
||||||
import io.netty.util.concurrent.EventExecutor;
|
import io.netty.bootstrap.Bootstrap;
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.ByteBufOutputStream;
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.ChannelFutureListener;
|
||||||
|
import io.netty.channel.EventLoop;
|
||||||
|
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
|
||||||
|
import io.netty.util.Timeout;
|
||||||
|
import io.netty.util.TimerTask;
|
||||||
|
import io.netty.util.concurrent.GenericFutureListener;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.ConnectException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import javax.security.sasl.SaslException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.Future;
|
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
|
||||||
|
import org.apache.hadoop.hbase.security.AuthMethod;
|
||||||
|
import org.apache.hadoop.hbase.security.SaslClientHandler;
|
||||||
|
import org.apache.hadoop.hbase.security.SaslUtil;
|
||||||
|
import org.apache.hadoop.hbase.security.SecurityInfo;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
import org.apache.hadoop.security.token.TokenSelector;
|
||||||
|
import org.apache.htrace.Span;
|
||||||
|
import org.apache.htrace.Trace;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface for Async Rpc Channels
|
* Netty RPC channel
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public interface AsyncRpcChannel {
|
public class AsyncRpcChannel {
|
||||||
|
private static final Log LOG = LogFactory.getLog(AsyncRpcChannel.class.getName());
|
||||||
|
|
||||||
|
private static final int MAX_SASL_RETRIES = 5;
|
||||||
|
|
||||||
|
protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDDLERS
|
||||||
|
= new HashMap<>();
|
||||||
|
|
||||||
|
static {
|
||||||
|
TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
|
||||||
|
new AuthenticationTokenSelector());
|
||||||
|
}
|
||||||
|
|
||||||
|
final AsyncRpcClient client;
|
||||||
|
|
||||||
|
// Contains the channel to work with.
|
||||||
|
// Only exists when connected
|
||||||
|
private Channel channel;
|
||||||
|
|
||||||
|
String name;
|
||||||
|
final User ticket;
|
||||||
|
final String serviceName;
|
||||||
|
final InetSocketAddress address;
|
||||||
|
|
||||||
|
private int failureCounter = 0;
|
||||||
|
|
||||||
|
boolean useSasl;
|
||||||
|
AuthMethod authMethod;
|
||||||
|
private int reloginMaxBackoff;
|
||||||
|
private Token<? extends TokenIdentifier> token;
|
||||||
|
private String serverPrincipal;
|
||||||
|
|
||||||
|
// NOTE: closed and connected flags below are only changed when a lock on pendingCalls
|
||||||
|
private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
|
||||||
|
private boolean connected = false;
|
||||||
|
private boolean closed = false;
|
||||||
|
|
||||||
|
private Timeout cleanupTimer;
|
||||||
|
|
||||||
|
private final TimerTask timeoutTask = new TimerTask() {
|
||||||
|
@Override
|
||||||
|
public void run(Timeout timeout) throws Exception {
|
||||||
|
cleanupCalls();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor for netty RPC channel
|
||||||
|
* @param bootstrap to construct channel on
|
||||||
|
* @param client to connect with
|
||||||
|
* @param ticket of user which uses connection
|
||||||
|
* @param serviceName name of service to connect to
|
||||||
|
* @param address to connect to
|
||||||
|
*/
|
||||||
|
public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket,
|
||||||
|
String serviceName, InetSocketAddress address) {
|
||||||
|
this.client = client;
|
||||||
|
|
||||||
|
this.ticket = ticket;
|
||||||
|
this.serviceName = serviceName;
|
||||||
|
this.address = address;
|
||||||
|
|
||||||
|
this.channel = connect(bootstrap).channel();
|
||||||
|
|
||||||
|
name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString()
|
||||||
|
+ ((ticket == null) ? " from unknown user" : (" from " + ticket.getName())));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connect to channel
|
||||||
|
* @param bootstrap to connect to
|
||||||
|
* @return future of connection
|
||||||
|
*/
|
||||||
|
private ChannelFuture connect(final Bootstrap bootstrap) {
|
||||||
|
return bootstrap.remoteAddress(address).connect()
|
||||||
|
.addListener(new GenericFutureListener<ChannelFuture>() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(final ChannelFuture f) throws Exception {
|
||||||
|
if (!f.isSuccess()) {
|
||||||
|
retryOrClose(bootstrap, failureCounter++, client.failureSleep, f.cause());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
channel = f.channel();
|
||||||
|
|
||||||
|
setupAuthorization();
|
||||||
|
|
||||||
|
ByteBuf b = channel.alloc().directBuffer(6);
|
||||||
|
createPreamble(b, authMethod);
|
||||||
|
channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
|
||||||
|
if (useSasl) {
|
||||||
|
UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI();
|
||||||
|
if (authMethod == AuthMethod.KERBEROS) {
|
||||||
|
if (ticket != null && ticket.getRealUser() != null) {
|
||||||
|
ticket = ticket.getRealUser();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SaslClientHandler saslHandler;
|
||||||
|
if (ticket == null) {
|
||||||
|
throw new FatalConnectionException("ticket/user is null");
|
||||||
|
}
|
||||||
|
final UserGroupInformation realTicket = ticket;
|
||||||
|
saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() {
|
||||||
|
@Override
|
||||||
|
public SaslClientHandler run() throws IOException {
|
||||||
|
return getSaslHandler(realTicket, bootstrap);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (saslHandler != null) {
|
||||||
|
// Sasl connect is successful. Let's set up Sasl channel handler
|
||||||
|
channel.pipeline().addFirst(saslHandler);
|
||||||
|
} else {
|
||||||
|
// fall back to simple auth because server told us so.
|
||||||
|
authMethod = AuthMethod.SIMPLE;
|
||||||
|
useSasl = false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
startHBaseConnection(f.channel());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start HBase connection
|
||||||
|
* @param ch channel to start connection on
|
||||||
|
*/
|
||||||
|
private void startHBaseConnection(Channel ch) {
|
||||||
|
ch.pipeline().addLast("frameDecoder",
|
||||||
|
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
|
||||||
|
ch.pipeline().addLast(new AsyncServerResponseHandler(this));
|
||||||
|
try {
|
||||||
|
writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
|
if (!future.isSuccess()) {
|
||||||
|
close(future.cause());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
List<AsyncCall> callsToWrite;
|
||||||
|
synchronized (pendingCalls) {
|
||||||
|
connected = true;
|
||||||
|
callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
|
||||||
|
}
|
||||||
|
for (AsyncCall call : callsToWrite) {
|
||||||
|
writeRequest(call);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (IOException e) {
|
||||||
|
close(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startConnectionWithEncryption(Channel ch) {
|
||||||
|
// for rpc encryption, the order of ChannelInboundHandler should be:
|
||||||
|
// LengthFieldBasedFrameDecoder->SaslClientHandler->LengthFieldBasedFrameDecoder
|
||||||
|
// Don't skip the first 4 bytes for length in beforeUnwrapDecoder,
|
||||||
|
// SaslClientHandler will handler this
|
||||||
|
ch.pipeline().addFirst("beforeUnwrapDecoder",
|
||||||
|
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0));
|
||||||
|
ch.pipeline().addLast("afterUnwrapDecoder",
|
||||||
|
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
|
||||||
|
ch.pipeline().addLast(new AsyncServerResponseHandler(this));
|
||||||
|
List<AsyncCall> callsToWrite;
|
||||||
|
synchronized (pendingCalls) {
|
||||||
|
connected = true;
|
||||||
|
callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
|
||||||
|
}
|
||||||
|
for (AsyncCall call : callsToWrite) {
|
||||||
|
writeRequest(call);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get SASL handler
|
||||||
|
* @param bootstrap to reconnect to
|
||||||
|
* @return new SASL handler
|
||||||
|
* @throws java.io.IOException if handler failed to create
|
||||||
|
*/
|
||||||
|
private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
|
||||||
|
final Bootstrap bootstrap) throws IOException {
|
||||||
|
return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
|
||||||
|
client.fallbackAllowed,
|
||||||
|
client.conf.get("hbase.rpc.protection",
|
||||||
|
SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
|
||||||
|
getChannelHeaderBytes(authMethod),
|
||||||
|
new SaslClientHandler.SaslExceptionHandler() {
|
||||||
|
@Override
|
||||||
|
public void handle(int retryCount, Random random, Throwable cause) {
|
||||||
|
try {
|
||||||
|
// Handle Sasl failure. Try to potentially get new credentials
|
||||||
|
handleSaslConnectionFailure(retryCount, cause, realTicket);
|
||||||
|
|
||||||
|
retryOrClose(bootstrap, failureCounter++, random.nextInt(reloginMaxBackoff) + 1,
|
||||||
|
cause);
|
||||||
|
} catch (IOException | InterruptedException e) {
|
||||||
|
close(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, new SaslClientHandler.SaslSuccessfulConnectHandler() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Channel channel) {
|
||||||
|
startHBaseConnection(channel);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSaslProtectionSucess(Channel channel) {
|
||||||
|
startConnectionWithEncryption(channel);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retry to connect or close
|
||||||
|
* @param bootstrap to connect with
|
||||||
|
* @param failureCount failure count
|
||||||
|
* @param e exception of fail
|
||||||
|
*/
|
||||||
|
private void retryOrClose(final Bootstrap bootstrap, int failureCount, long timeout,
|
||||||
|
Throwable e) {
|
||||||
|
if (failureCount < client.maxRetries) {
|
||||||
|
client.newTimeout(new TimerTask() {
|
||||||
|
@Override
|
||||||
|
public void run(Timeout timeout) throws Exception {
|
||||||
|
connect(bootstrap);
|
||||||
|
}
|
||||||
|
}, timeout, TimeUnit.MILLISECONDS);
|
||||||
|
} else {
|
||||||
|
client.failedServers.addToFailedServers(address);
|
||||||
|
close(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calls method on channel
|
* Calls method on channel
|
||||||
|
@ -40,41 +319,450 @@ public interface AsyncRpcChannel {
|
||||||
* @param request to send
|
* @param request to send
|
||||||
* @param cellScanner with cells to send
|
* @param cellScanner with cells to send
|
||||||
* @param responsePrototype to construct response with
|
* @param responsePrototype to construct response with
|
||||||
* @param messageConverter for the messages to expected result
|
|
||||||
* @param exceptionConverter for converting exceptions
|
|
||||||
* @param rpcTimeout timeout for request
|
* @param rpcTimeout timeout for request
|
||||||
* @param priority for request
|
* @param priority for request
|
||||||
* @return Promise for the response Message
|
* @return Promise for the response Message
|
||||||
*/
|
*/
|
||||||
<R extends Message, O> Future<O> callMethod(
|
public <R extends Message, O> io.netty.util.concurrent.Promise<O> callMethod(
|
||||||
final Descriptors.MethodDescriptor method,
|
final Descriptors.MethodDescriptor method,
|
||||||
final Message request, final CellScanner cellScanner,
|
final Message request,final CellScanner cellScanner,
|
||||||
R responsePrototype, MessageConverter<R, O> messageConverter,
|
R responsePrototype, MessageConverter<R, O> messageConverter, IOExceptionConverter
|
||||||
IOExceptionConverter exceptionConverter, long rpcTimeout, int priority);
|
exceptionConverter, long rpcTimeout, int priority) {
|
||||||
|
final AsyncCall<R, O> call = new AsyncCall<>(this, client.callIdCnt.getAndIncrement(),
|
||||||
|
method, request, cellScanner, responsePrototype, messageConverter, exceptionConverter,
|
||||||
|
rpcTimeout, priority, client.metrics);
|
||||||
|
|
||||||
|
synchronized (pendingCalls) {
|
||||||
|
if (closed) {
|
||||||
|
call.setFailure(new ConnectException());
|
||||||
|
return call;
|
||||||
|
}
|
||||||
|
pendingCalls.put(call.id, call);
|
||||||
|
// Add timeout for cleanup if none is present
|
||||||
|
if (cleanupTimer == null && call.getRpcTimeout() > 0) {
|
||||||
|
cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
if (!connected) {
|
||||||
|
return call;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
writeRequest(call);
|
||||||
|
return call;
|
||||||
|
}
|
||||||
|
|
||||||
|
public EventLoop getEventExecutor() {
|
||||||
|
return this.channel.eventLoop();
|
||||||
|
}
|
||||||
|
|
||||||
|
AsyncCall removePendingCall(int id) {
|
||||||
|
synchronized (pendingCalls) {
|
||||||
|
return pendingCalls.remove(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the EventLoop on which this channel operated
|
* Write the channel header
|
||||||
* @return EventLoop
|
* @param channel to write to
|
||||||
|
* @return future of write
|
||||||
|
* @throws java.io.IOException on failure to write
|
||||||
*/
|
*/
|
||||||
EventExecutor getEventExecutor();
|
private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
|
||||||
|
RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
|
||||||
|
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
|
||||||
|
ByteBuf b = channel.alloc().directBuffer(totalSize);
|
||||||
|
|
||||||
|
b.writeInt(header.getSerializedSize());
|
||||||
|
b.writeBytes(header.toByteArray());
|
||||||
|
|
||||||
|
return channel.writeAndFlush(b);
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] getChannelHeaderBytes(AuthMethod authMethod) {
|
||||||
|
RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
|
||||||
|
ByteBuffer b = ByteBuffer.allocate(header.getSerializedSize() + 4);
|
||||||
|
b.putInt(header.getSerializedSize());
|
||||||
|
b.put(header.toByteArray());
|
||||||
|
return b.array();
|
||||||
|
}
|
||||||
|
|
||||||
|
private RPCProtos.ConnectionHeader getChannelHeader(AuthMethod authMethod) {
|
||||||
|
RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder()
|
||||||
|
.setServiceName(serviceName);
|
||||||
|
|
||||||
|
RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
|
||||||
|
if (userInfoPB != null) {
|
||||||
|
headerBuilder.setUserInfo(userInfoPB);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (client.codec != null) {
|
||||||
|
headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
|
||||||
|
}
|
||||||
|
if (client.compressor != null) {
|
||||||
|
headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
|
||||||
|
}
|
||||||
|
|
||||||
|
headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
|
||||||
|
return headerBuilder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write request to channel
|
||||||
|
* @param call to write
|
||||||
|
*/
|
||||||
|
private void writeRequest(final AsyncCall call) {
|
||||||
|
try {
|
||||||
|
final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
|
||||||
|
.newBuilder();
|
||||||
|
requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName())
|
||||||
|
.setRequestParam(call.param != null);
|
||||||
|
|
||||||
|
if (Trace.isTracing()) {
|
||||||
|
Span s = Trace.currentSpan();
|
||||||
|
requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder()
|
||||||
|
.setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteBuffer cellBlock = client.buildCellBlock(call.cellScanner());
|
||||||
|
if (cellBlock != null) {
|
||||||
|
final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
|
||||||
|
.newBuilder();
|
||||||
|
cellBlockBuilder.setLength(cellBlock.limit());
|
||||||
|
requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
|
||||||
|
}
|
||||||
|
// Only pass priority if there one. Let zero be same as no priority.
|
||||||
|
if (call.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) {
|
||||||
|
requestHeaderBuilder.setPriority(call.getPriority());
|
||||||
|
}
|
||||||
|
requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ?
|
||||||
|
Integer.MAX_VALUE : (int)call.rpcTimeout);
|
||||||
|
|
||||||
|
RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
|
||||||
|
|
||||||
|
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
|
||||||
|
if (cellBlock != null) {
|
||||||
|
totalSize += cellBlock.remaining();
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
|
||||||
|
try (ByteBufOutputStream out = new ByteBufOutputStream(b)) {
|
||||||
|
call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
|
||||||
|
}
|
||||||
|
|
||||||
|
channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
|
||||||
|
} catch (IOException e) {
|
||||||
|
close(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set up server authorization
|
||||||
|
* @throws java.io.IOException if auth setup failed
|
||||||
|
*/
|
||||||
|
private void setupAuthorization() throws IOException {
|
||||||
|
SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
|
||||||
|
this.useSasl = client.userProvider.isHBaseSecurityEnabled();
|
||||||
|
|
||||||
|
this.token = null;
|
||||||
|
if (useSasl && securityInfo != null) {
|
||||||
|
AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
|
||||||
|
if (tokenKind != null) {
|
||||||
|
TokenSelector<? extends TokenIdentifier> tokenSelector = TOKEN_HANDDLERS.get(tokenKind);
|
||||||
|
if (tokenSelector != null) {
|
||||||
|
token = tokenSelector.selectToken(new Text(client.clusterId),
|
||||||
|
ticket.getUGI().getTokens());
|
||||||
|
} else if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("No token selector found for type " + tokenKind);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String serverKey = securityInfo.getServerPrincipal();
|
||||||
|
if (serverKey == null) {
|
||||||
|
throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
|
||||||
|
}
|
||||||
|
this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
|
||||||
|
address.getAddress().getCanonicalHostName().toLowerCase(Locale.ROOT));
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is "
|
||||||
|
+ serverPrincipal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!useSasl) {
|
||||||
|
authMethod = AuthMethod.SIMPLE;
|
||||||
|
} else if (token != null) {
|
||||||
|
authMethod = AuthMethod.DIGEST;
|
||||||
|
} else {
|
||||||
|
authMethod = AuthMethod.KERBEROS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(
|
||||||
|
"Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl);
|
||||||
|
}
|
||||||
|
reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build the user information
|
||||||
|
* @param ugi User Group Information
|
||||||
|
* @param authMethod Authorization method
|
||||||
|
* @return UserInformation protobuf
|
||||||
|
*/
|
||||||
|
private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
|
||||||
|
if (ugi == null || authMethod == AuthMethod.DIGEST) {
|
||||||
|
// Don't send user for token auth
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
|
||||||
|
if (authMethod == AuthMethod.KERBEROS) {
|
||||||
|
// Send effective user for Kerberos auth
|
||||||
|
userInfoPB.setEffectiveUser(ugi.getUserName());
|
||||||
|
} else if (authMethod == AuthMethod.SIMPLE) {
|
||||||
|
// Send both effective user and real user for simple auth
|
||||||
|
userInfoPB.setEffectiveUser(ugi.getUserName());
|
||||||
|
if (ugi.getRealUser() != null) {
|
||||||
|
userInfoPB.setRealUser(ugi.getRealUser().getUserName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return userInfoPB.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create connection preamble
|
||||||
|
* @param byteBuf to write to
|
||||||
|
* @param authMethod to write
|
||||||
|
*/
|
||||||
|
private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
|
||||||
|
byteBuf.writeBytes(HConstants.RPC_HEADER);
|
||||||
|
byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
|
||||||
|
byteBuf.writeByte(authMethod.code);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void close0(Throwable e) {
|
||||||
|
List<AsyncCall> toCleanup;
|
||||||
|
synchronized (pendingCalls) {
|
||||||
|
if (closed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
closed = true;
|
||||||
|
toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
|
||||||
|
pendingCalls.clear();
|
||||||
|
}
|
||||||
|
IOException closeException = null;
|
||||||
|
if (e != null) {
|
||||||
|
if (e instanceof IOException) {
|
||||||
|
closeException = (IOException) e;
|
||||||
|
} else {
|
||||||
|
closeException = new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// log the info
|
||||||
|
if (LOG.isDebugEnabled() && closeException != null) {
|
||||||
|
LOG.debug(name + ": closing ipc connection to " + address, closeException);
|
||||||
|
}
|
||||||
|
if (cleanupTimer != null) {
|
||||||
|
cleanupTimer.cancel();
|
||||||
|
cleanupTimer = null;
|
||||||
|
}
|
||||||
|
for (AsyncCall call : toCleanup) {
|
||||||
|
call.setFailed(closeException != null ? closeException
|
||||||
|
: new ConnectionClosingException(
|
||||||
|
"Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
|
||||||
|
}
|
||||||
|
channel.disconnect().addListener(ChannelFutureListener.CLOSE);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(name + ": closed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Close connection
|
* Close connection
|
||||||
* @param cause of closure.
|
* @param e exception on close
|
||||||
*/
|
*/
|
||||||
void close(Throwable cause);
|
public void close(final Throwable e) {
|
||||||
|
client.removeConnection(this);
|
||||||
|
|
||||||
|
// Move closing from the requesting thread to the channel thread
|
||||||
|
if (channel.eventLoop().inEventLoop()) {
|
||||||
|
close0(e);
|
||||||
|
} else {
|
||||||
|
channel.eventLoop().execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
close0(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clean up calls.
|
||||||
|
*/
|
||||||
|
private void cleanupCalls() {
|
||||||
|
List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
|
||||||
|
long currentTime = EnvironmentEdgeManager.currentTime();
|
||||||
|
long nextCleanupTaskDelay = -1L;
|
||||||
|
synchronized (pendingCalls) {
|
||||||
|
for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
|
||||||
|
AsyncCall call = iter.next();
|
||||||
|
long timeout = call.getRpcTimeout();
|
||||||
|
if (timeout > 0) {
|
||||||
|
if (currentTime - call.getStartTime() >= timeout) {
|
||||||
|
iter.remove();
|
||||||
|
toCleanup.add(call);
|
||||||
|
} else {
|
||||||
|
if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
|
||||||
|
nextCleanupTaskDelay = timeout;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (nextCleanupTaskDelay > 0) {
|
||||||
|
cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS);
|
||||||
|
} else {
|
||||||
|
cleanupTimer = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (AsyncCall call : toCleanup) {
|
||||||
|
call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
|
||||||
|
+ (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if the connection is alive
|
* Check if the connection is alive
|
||||||
*
|
|
||||||
* @return true if alive
|
* @return true if alive
|
||||||
*/
|
*/
|
||||||
boolean isAlive();
|
public boolean isAlive() {
|
||||||
|
return channel.isOpen();
|
||||||
|
}
|
||||||
|
|
||||||
|
public InetSocketAddress getAddress() {
|
||||||
|
return this.address;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the address on which this channel operates
|
* Check if user should authenticate over Kerberos
|
||||||
* @return InetSocketAddress
|
* @return true if should be authenticated over Kerberos
|
||||||
|
* @throws java.io.IOException on failure of check
|
||||||
*/
|
*/
|
||||||
InetSocketAddress getAddress();
|
private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
|
||||||
|
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
|
||||||
|
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
|
||||||
|
UserGroupInformation realUser = currentUser.getRealUser();
|
||||||
|
return authMethod == AuthMethod.KERBEROS && loginUser != null &&
|
||||||
|
// Make sure user logged in using Kerberos either keytab or TGT
|
||||||
|
loginUser.hasKerberosCredentials() &&
|
||||||
|
// relogin only in case it is the login user (e.g. JT)
|
||||||
|
// or superuser (like oozie).
|
||||||
|
(loginUser.equals(currentUser) || loginUser.equals(realUser));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If multiple clients with the same principal try to connect to the same server at the same time,
|
||||||
|
* the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
|
||||||
|
* work around this, what is done is that the client backs off randomly and tries to initiate the
|
||||||
|
* connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
|
||||||
|
* attempted.
|
||||||
|
* <p>
|
||||||
|
* The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the
|
||||||
|
* user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such
|
||||||
|
* cases, it is prudent to throw a runtime exception when we receive a SaslException from the
|
||||||
|
* underlying authentication implementation, so there is no retry from other high level (for eg,
|
||||||
|
* HCM or HBaseAdmin).
|
||||||
|
* </p>
|
||||||
|
* @param currRetries retry count
|
||||||
|
* @param ex exception describing fail
|
||||||
|
* @param user which is trying to connect
|
||||||
|
* @throws java.io.IOException if IO fail
|
||||||
|
* @throws InterruptedException if thread is interrupted
|
||||||
|
*/
|
||||||
|
private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
|
||||||
|
final UserGroupInformation user) throws IOException, InterruptedException {
|
||||||
|
user.doAs(new PrivilegedExceptionAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws IOException, InterruptedException {
|
||||||
|
if (shouldAuthenticateOverKrb()) {
|
||||||
|
if (currRetries < MAX_SASL_RETRIES) {
|
||||||
|
LOG.debug("Exception encountered while connecting to the server : " + ex);
|
||||||
|
// try re-login
|
||||||
|
if (UserGroupInformation.isLoginKeytabBased()) {
|
||||||
|
UserGroupInformation.getLoginUser().reloginFromKeytab();
|
||||||
|
} else {
|
||||||
|
UserGroupInformation.getLoginUser().reloginFromTicketCache();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should reconnect
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
String msg = "Couldn't setup connection for "
|
||||||
|
+ UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
|
||||||
|
LOG.warn(msg, ex);
|
||||||
|
throw (IOException) new IOException(msg).initCause(ex);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.warn("Exception encountered while connecting to " + "the server : " + ex);
|
||||||
|
}
|
||||||
|
if (ex instanceof RemoteException) {
|
||||||
|
throw (RemoteException) ex;
|
||||||
|
}
|
||||||
|
if (ex instanceof SaslException) {
|
||||||
|
String msg = "SASL authentication failed."
|
||||||
|
+ " The most likely cause is missing or invalid credentials." + " Consider 'kinit'.";
|
||||||
|
LOG.fatal(msg, ex);
|
||||||
|
throw new RuntimeException(msg, ex);
|
||||||
|
}
|
||||||
|
throw new IOException(ex);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getConnectionHashCode() {
|
||||||
|
return ConnectionId.hashCode(ticket, serviceName, address);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return getConnectionHashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (obj instanceof AsyncRpcChannel) {
|
||||||
|
AsyncRpcChannel channel = (AsyncRpcChannel) obj;
|
||||||
|
return channel.hashCode() == obj.hashCode();
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Listens to call writes and fails if write failed
|
||||||
|
*/
|
||||||
|
private static final class CallWriteListener implements ChannelFutureListener {
|
||||||
|
private final AsyncRpcChannel rpcChannel;
|
||||||
|
private final int id;
|
||||||
|
|
||||||
|
public CallWriteListener(AsyncRpcChannel asyncRpcChannelImpl, int id) {
|
||||||
|
this.rpcChannel = asyncRpcChannelImpl;
|
||||||
|
this.id = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
|
if (!future.isSuccess()) {
|
||||||
|
AsyncCall call = rpcChannel.removePendingCall(id);
|
||||||
|
if (call != null) {
|
||||||
|
if (future.cause() instanceof IOException) {
|
||||||
|
call.setFailed((IOException) future.cause());
|
||||||
|
} else {
|
||||||
|
call.setFailed(new IOException(future.cause()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,770 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.ipc;
|
|
||||||
|
|
||||||
import com.google.protobuf.Descriptors;
|
|
||||||
import com.google.protobuf.Message;
|
|
||||||
import io.netty.bootstrap.Bootstrap;
|
|
||||||
import io.netty.buffer.ByteBuf;
|
|
||||||
import io.netty.buffer.ByteBufOutputStream;
|
|
||||||
import io.netty.channel.Channel;
|
|
||||||
import io.netty.channel.ChannelFuture;
|
|
||||||
import io.netty.channel.ChannelFutureListener;
|
|
||||||
import io.netty.channel.EventLoop;
|
|
||||||
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
|
|
||||||
import io.netty.util.Timeout;
|
|
||||||
import io.netty.util.TimerTask;
|
|
||||||
import io.netty.util.concurrent.GenericFutureListener;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.ConnectException;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.security.PrivilegedExceptionAction;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.Locale;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Random;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import javax.security.sasl.SaslException;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.client.Future;
|
|
||||||
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
|
|
||||||
import org.apache.hadoop.hbase.security.AuthMethod;
|
|
||||||
import org.apache.hadoop.hbase.security.SaslClientHandler;
|
|
||||||
import org.apache.hadoop.hbase.security.SaslUtil;
|
|
||||||
import org.apache.hadoop.hbase.security.SecurityInfo;
|
|
||||||
import org.apache.hadoop.hbase.security.User;
|
|
||||||
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
|
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.apache.hadoop.security.token.Token;
|
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
||||||
import org.apache.hadoop.security.token.TokenSelector;
|
|
||||||
import org.apache.htrace.Span;
|
|
||||||
import org.apache.htrace.Trace;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Netty RPC channel
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class AsyncRpcChannelImpl implements AsyncRpcChannel {
|
|
||||||
private static final Log LOG = LogFactory.getLog(AsyncRpcChannelImpl.class.getName());
|
|
||||||
|
|
||||||
private static final int MAX_SASL_RETRIES = 5;
|
|
||||||
|
|
||||||
protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDDLERS
|
|
||||||
= new HashMap<>();
|
|
||||||
|
|
||||||
static {
|
|
||||||
TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
|
|
||||||
new AuthenticationTokenSelector());
|
|
||||||
}
|
|
||||||
|
|
||||||
final AsyncRpcClient client;
|
|
||||||
|
|
||||||
// Contains the channel to work with.
|
|
||||||
// Only exists when connected
|
|
||||||
private Channel channel;
|
|
||||||
|
|
||||||
String name;
|
|
||||||
final User ticket;
|
|
||||||
final String serviceName;
|
|
||||||
final InetSocketAddress address;
|
|
||||||
|
|
||||||
private int failureCounter = 0;
|
|
||||||
|
|
||||||
boolean useSasl;
|
|
||||||
AuthMethod authMethod;
|
|
||||||
private int reloginMaxBackoff;
|
|
||||||
private Token<? extends TokenIdentifier> token;
|
|
||||||
private String serverPrincipal;
|
|
||||||
|
|
||||||
// NOTE: closed and connected flags below are only changed when a lock on pendingCalls
|
|
||||||
private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
|
|
||||||
private boolean connected = false;
|
|
||||||
private boolean closed = false;
|
|
||||||
|
|
||||||
private Timeout cleanupTimer;
|
|
||||||
|
|
||||||
private final TimerTask timeoutTask = new TimerTask() {
|
|
||||||
@Override
|
|
||||||
public void run(Timeout timeout) throws Exception {
|
|
||||||
cleanupCalls();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructor for netty RPC channel
|
|
||||||
* @param bootstrap to construct channel on
|
|
||||||
* @param client to connect with
|
|
||||||
* @param ticket of user which uses connection
|
|
||||||
* @param serviceName name of service to connect to
|
|
||||||
* @param address to connect to
|
|
||||||
*/
|
|
||||||
public AsyncRpcChannelImpl(Bootstrap bootstrap, final AsyncRpcClient client, User ticket,
|
|
||||||
String serviceName, InetSocketAddress address) {
|
|
||||||
this.client = client;
|
|
||||||
|
|
||||||
this.ticket = ticket;
|
|
||||||
this.serviceName = serviceName;
|
|
||||||
this.address = address;
|
|
||||||
|
|
||||||
this.channel = connect(bootstrap).channel();
|
|
||||||
|
|
||||||
name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString()
|
|
||||||
+ ((ticket == null) ? " from unknown user" : (" from " + ticket.getName())));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Connect to channel
|
|
||||||
* @param bootstrap to connect to
|
|
||||||
* @return future of connection
|
|
||||||
*/
|
|
||||||
private ChannelFuture connect(final Bootstrap bootstrap) {
|
|
||||||
return bootstrap.remoteAddress(address).connect()
|
|
||||||
.addListener(new GenericFutureListener<ChannelFuture>() {
|
|
||||||
@Override
|
|
||||||
public void operationComplete(final ChannelFuture f) throws Exception {
|
|
||||||
if (!f.isSuccess()) {
|
|
||||||
retryOrClose(bootstrap, failureCounter++, client.failureSleep, f.cause());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
channel = f.channel();
|
|
||||||
|
|
||||||
setupAuthorization();
|
|
||||||
|
|
||||||
ByteBuf b = channel.alloc().directBuffer(6);
|
|
||||||
createPreamble(b, authMethod);
|
|
||||||
channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
|
|
||||||
if (useSasl) {
|
|
||||||
UserGroupInformation ticket = AsyncRpcChannelImpl.this.ticket.getUGI();
|
|
||||||
if (authMethod == AuthMethod.KERBEROS) {
|
|
||||||
if (ticket != null && ticket.getRealUser() != null) {
|
|
||||||
ticket = ticket.getRealUser();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
SaslClientHandler saslHandler;
|
|
||||||
if (ticket == null) {
|
|
||||||
throw new FatalConnectionException("ticket/user is null");
|
|
||||||
}
|
|
||||||
final UserGroupInformation realTicket = ticket;
|
|
||||||
saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() {
|
|
||||||
@Override
|
|
||||||
public SaslClientHandler run() throws IOException {
|
|
||||||
return getSaslHandler(realTicket, bootstrap);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
if (saslHandler != null) {
|
|
||||||
// Sasl connect is successful. Let's set up Sasl channel handler
|
|
||||||
channel.pipeline().addFirst(saslHandler);
|
|
||||||
} else {
|
|
||||||
// fall back to simple auth because server told us so.
|
|
||||||
authMethod = AuthMethod.SIMPLE;
|
|
||||||
useSasl = false;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
startHBaseConnection(f.channel());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Start HBase connection
|
|
||||||
* @param ch channel to start connection on
|
|
||||||
*/
|
|
||||||
private void startHBaseConnection(Channel ch) {
|
|
||||||
ch.pipeline().addLast("frameDecoder",
|
|
||||||
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
|
|
||||||
ch.pipeline().addLast(new AsyncServerResponseHandler(this));
|
|
||||||
try {
|
|
||||||
writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
|
|
||||||
@Override
|
|
||||||
public void operationComplete(ChannelFuture future) throws Exception {
|
|
||||||
if (!future.isSuccess()) {
|
|
||||||
close(future.cause());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
List<AsyncCall> callsToWrite;
|
|
||||||
synchronized (pendingCalls) {
|
|
||||||
connected = true;
|
|
||||||
callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
|
|
||||||
}
|
|
||||||
for (AsyncCall call : callsToWrite) {
|
|
||||||
writeRequest(call);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} catch (IOException e) {
|
|
||||||
close(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void startConnectionWithEncryption(Channel ch) {
|
|
||||||
// for rpc encryption, the order of ChannelInboundHandler should be:
|
|
||||||
// LengthFieldBasedFrameDecoder->SaslClientHandler->LengthFieldBasedFrameDecoder
|
|
||||||
// Don't skip the first 4 bytes for length in beforeUnwrapDecoder,
|
|
||||||
// SaslClientHandler will handler this
|
|
||||||
ch.pipeline().addFirst("beforeUnwrapDecoder",
|
|
||||||
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0));
|
|
||||||
ch.pipeline().addLast("afterUnwrapDecoder",
|
|
||||||
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
|
|
||||||
ch.pipeline().addLast(new AsyncServerResponseHandler(this));
|
|
||||||
List<AsyncCall> callsToWrite;
|
|
||||||
synchronized (pendingCalls) {
|
|
||||||
connected = true;
|
|
||||||
callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
|
|
||||||
}
|
|
||||||
for (AsyncCall call : callsToWrite) {
|
|
||||||
writeRequest(call);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get SASL handler
|
|
||||||
* @param bootstrap to reconnect to
|
|
||||||
* @return new SASL handler
|
|
||||||
* @throws java.io.IOException if handler failed to create
|
|
||||||
*/
|
|
||||||
private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
|
|
||||||
final Bootstrap bootstrap) throws IOException {
|
|
||||||
return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
|
|
||||||
client.fallbackAllowed,
|
|
||||||
client.conf.get("hbase.rpc.protection",
|
|
||||||
SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
|
|
||||||
getChannelHeaderBytes(authMethod),
|
|
||||||
new SaslClientHandler.SaslExceptionHandler() {
|
|
||||||
@Override
|
|
||||||
public void handle(int retryCount, Random random, Throwable cause) {
|
|
||||||
try {
|
|
||||||
// Handle Sasl failure. Try to potentially get new credentials
|
|
||||||
handleSaslConnectionFailure(retryCount, cause, realTicket);
|
|
||||||
|
|
||||||
retryOrClose(bootstrap, failureCounter++, random.nextInt(reloginMaxBackoff) + 1,
|
|
||||||
cause);
|
|
||||||
} catch (IOException | InterruptedException e) {
|
|
||||||
close(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, new SaslClientHandler.SaslSuccessfulConnectHandler() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(Channel channel) {
|
|
||||||
startHBaseConnection(channel);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onSaslProtectionSucess(Channel channel) {
|
|
||||||
startConnectionWithEncryption(channel);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Retry to connect or close
|
|
||||||
* @param bootstrap to connect with
|
|
||||||
* @param failureCount failure count
|
|
||||||
* @param e exception of fail
|
|
||||||
*/
|
|
||||||
private void retryOrClose(final Bootstrap bootstrap, int failureCount, long timeout,
|
|
||||||
Throwable e) {
|
|
||||||
if (failureCount < client.maxRetries) {
|
|
||||||
client.newTimeout(new TimerTask() {
|
|
||||||
@Override
|
|
||||||
public void run(Timeout timeout) throws Exception {
|
|
||||||
connect(bootstrap);
|
|
||||||
}
|
|
||||||
}, timeout, TimeUnit.MILLISECONDS);
|
|
||||||
} else {
|
|
||||||
client.failedServers.addToFailedServers(address);
|
|
||||||
close(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Calls method on channel
|
|
||||||
* @param method to call
|
|
||||||
* @param request to send
|
|
||||||
* @param cellScanner with cells to send
|
|
||||||
* @param responsePrototype to construct response with
|
|
||||||
* @param rpcTimeout timeout for request
|
|
||||||
* @param priority for request
|
|
||||||
* @return Promise for the response Message
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public <R extends Message, O> Future<O> callMethod(
|
|
||||||
final Descriptors.MethodDescriptor method,
|
|
||||||
final Message request,final CellScanner cellScanner,
|
|
||||||
R responsePrototype, MessageConverter<R, O> messageConverter, IOExceptionConverter
|
|
||||||
exceptionConverter, long rpcTimeout, int priority) {
|
|
||||||
final AsyncCall<R, O> call = new AsyncCall<>(this, client.callIdCnt.getAndIncrement(),
|
|
||||||
method, request, cellScanner, responsePrototype, messageConverter, exceptionConverter,
|
|
||||||
rpcTimeout, priority, client.metrics);
|
|
||||||
|
|
||||||
synchronized (pendingCalls) {
|
|
||||||
if (closed) {
|
|
||||||
call.setFailure(new ConnectException());
|
|
||||||
return call;
|
|
||||||
}
|
|
||||||
pendingCalls.put(call.id, call);
|
|
||||||
// Add timeout for cleanup if none is present
|
|
||||||
if (cleanupTimer == null && call.getRpcTimeout() > 0) {
|
|
||||||
cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS);
|
|
||||||
}
|
|
||||||
if (!connected) {
|
|
||||||
return call;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
writeRequest(call);
|
|
||||||
return call;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public EventLoop getEventExecutor() {
|
|
||||||
return this.channel.eventLoop();
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncCall removePendingCall(int id) {
|
|
||||||
synchronized (pendingCalls) {
|
|
||||||
return pendingCalls.remove(id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Write the channel header
|
|
||||||
* @param channel to write to
|
|
||||||
* @return future of write
|
|
||||||
* @throws java.io.IOException on failure to write
|
|
||||||
*/
|
|
||||||
private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
|
|
||||||
RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
|
|
||||||
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
|
|
||||||
ByteBuf b = channel.alloc().directBuffer(totalSize);
|
|
||||||
|
|
||||||
b.writeInt(header.getSerializedSize());
|
|
||||||
b.writeBytes(header.toByteArray());
|
|
||||||
|
|
||||||
return channel.writeAndFlush(b);
|
|
||||||
}
|
|
||||||
|
|
||||||
private byte[] getChannelHeaderBytes(AuthMethod authMethod) {
|
|
||||||
RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
|
|
||||||
ByteBuffer b = ByteBuffer.allocate(header.getSerializedSize() + 4);
|
|
||||||
b.putInt(header.getSerializedSize());
|
|
||||||
b.put(header.toByteArray());
|
|
||||||
return b.array();
|
|
||||||
}
|
|
||||||
|
|
||||||
private RPCProtos.ConnectionHeader getChannelHeader(AuthMethod authMethod) {
|
|
||||||
RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder()
|
|
||||||
.setServiceName(serviceName);
|
|
||||||
|
|
||||||
RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
|
|
||||||
if (userInfoPB != null) {
|
|
||||||
headerBuilder.setUserInfo(userInfoPB);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (client.codec != null) {
|
|
||||||
headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
|
|
||||||
}
|
|
||||||
if (client.compressor != null) {
|
|
||||||
headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
|
|
||||||
}
|
|
||||||
|
|
||||||
headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
|
|
||||||
return headerBuilder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Write request to channel
|
|
||||||
* @param call to write
|
|
||||||
*/
|
|
||||||
private void writeRequest(final AsyncCall call) {
|
|
||||||
try {
|
|
||||||
final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
|
|
||||||
.newBuilder();
|
|
||||||
requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName())
|
|
||||||
.setRequestParam(call.param != null);
|
|
||||||
|
|
||||||
if (Trace.isTracing()) {
|
|
||||||
Span s = Trace.currentSpan();
|
|
||||||
requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder()
|
|
||||||
.setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
|
|
||||||
}
|
|
||||||
|
|
||||||
ByteBuffer cellBlock = client.buildCellBlock(call.cellScanner());
|
|
||||||
if (cellBlock != null) {
|
|
||||||
final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
|
|
||||||
.newBuilder();
|
|
||||||
cellBlockBuilder.setLength(cellBlock.limit());
|
|
||||||
requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
|
|
||||||
}
|
|
||||||
// Only pass priority if there one. Let zero be same as no priority.
|
|
||||||
if (call.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) {
|
|
||||||
requestHeaderBuilder.setPriority(call.getPriority());
|
|
||||||
}
|
|
||||||
requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ?
|
|
||||||
Integer.MAX_VALUE : (int)call.rpcTimeout);
|
|
||||||
|
|
||||||
RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
|
|
||||||
|
|
||||||
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
|
|
||||||
if (cellBlock != null) {
|
|
||||||
totalSize += cellBlock.remaining();
|
|
||||||
}
|
|
||||||
|
|
||||||
ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
|
|
||||||
try (ByteBufOutputStream out = new ByteBufOutputStream(b)) {
|
|
||||||
call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
|
|
||||||
}
|
|
||||||
|
|
||||||
channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
|
|
||||||
} catch (IOException e) {
|
|
||||||
close(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set up server authorization
|
|
||||||
* @throws java.io.IOException if auth setup failed
|
|
||||||
*/
|
|
||||||
private void setupAuthorization() throws IOException {
|
|
||||||
SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
|
|
||||||
this.useSasl = client.userProvider.isHBaseSecurityEnabled();
|
|
||||||
|
|
||||||
this.token = null;
|
|
||||||
if (useSasl && securityInfo != null) {
|
|
||||||
AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
|
|
||||||
if (tokenKind != null) {
|
|
||||||
TokenSelector<? extends TokenIdentifier> tokenSelector = TOKEN_HANDDLERS.get(tokenKind);
|
|
||||||
if (tokenSelector != null) {
|
|
||||||
token = tokenSelector.selectToken(new Text(client.clusterId),
|
|
||||||
ticket.getUGI().getTokens());
|
|
||||||
} else if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("No token selector found for type " + tokenKind);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
String serverKey = securityInfo.getServerPrincipal();
|
|
||||||
if (serverKey == null) {
|
|
||||||
throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
|
|
||||||
}
|
|
||||||
this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
|
|
||||||
address.getAddress().getCanonicalHostName().toLowerCase(Locale.ROOT));
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is "
|
|
||||||
+ serverPrincipal);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!useSasl) {
|
|
||||||
authMethod = AuthMethod.SIMPLE;
|
|
||||||
} else if (token != null) {
|
|
||||||
authMethod = AuthMethod.DIGEST;
|
|
||||||
} else {
|
|
||||||
authMethod = AuthMethod.KERBEROS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(
|
|
||||||
"Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl);
|
|
||||||
}
|
|
||||||
reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Build the user information
|
|
||||||
* @param ugi User Group Information
|
|
||||||
* @param authMethod Authorization method
|
|
||||||
* @return UserInformation protobuf
|
|
||||||
*/
|
|
||||||
private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
|
|
||||||
if (ugi == null || authMethod == AuthMethod.DIGEST) {
|
|
||||||
// Don't send user for token auth
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
|
|
||||||
if (authMethod == AuthMethod.KERBEROS) {
|
|
||||||
// Send effective user for Kerberos auth
|
|
||||||
userInfoPB.setEffectiveUser(ugi.getUserName());
|
|
||||||
} else if (authMethod == AuthMethod.SIMPLE) {
|
|
||||||
// Send both effective user and real user for simple auth
|
|
||||||
userInfoPB.setEffectiveUser(ugi.getUserName());
|
|
||||||
if (ugi.getRealUser() != null) {
|
|
||||||
userInfoPB.setRealUser(ugi.getRealUser().getUserName());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return userInfoPB.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create connection preamble
|
|
||||||
* @param byteBuf to write to
|
|
||||||
* @param authMethod to write
|
|
||||||
*/
|
|
||||||
private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
|
|
||||||
byteBuf.writeBytes(HConstants.RPC_HEADER);
|
|
||||||
byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
|
|
||||||
byteBuf.writeByte(authMethod.code);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void close0(Throwable e) {
|
|
||||||
List<AsyncCall> toCleanup;
|
|
||||||
synchronized (pendingCalls) {
|
|
||||||
if (closed) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
closed = true;
|
|
||||||
toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
|
|
||||||
pendingCalls.clear();
|
|
||||||
}
|
|
||||||
IOException closeException = null;
|
|
||||||
if (e != null) {
|
|
||||||
if (e instanceof IOException) {
|
|
||||||
closeException = (IOException) e;
|
|
||||||
} else {
|
|
||||||
closeException = new IOException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// log the info
|
|
||||||
if (LOG.isDebugEnabled() && closeException != null) {
|
|
||||||
LOG.debug(name + ": closing ipc connection to " + address, closeException);
|
|
||||||
}
|
|
||||||
if (cleanupTimer != null) {
|
|
||||||
cleanupTimer.cancel();
|
|
||||||
cleanupTimer = null;
|
|
||||||
}
|
|
||||||
for (AsyncCall call : toCleanup) {
|
|
||||||
call.setFailed(closeException != null ? closeException
|
|
||||||
: new ConnectionClosingException(
|
|
||||||
"Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
|
|
||||||
}
|
|
||||||
channel.disconnect().addListener(ChannelFutureListener.CLOSE);
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(name + ": closed");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Close connection
|
|
||||||
* @param e exception on close
|
|
||||||
*/
|
|
||||||
public void close(final Throwable e) {
|
|
||||||
client.removeConnection(this);
|
|
||||||
|
|
||||||
// Move closing from the requesting thread to the channel thread
|
|
||||||
if (channel.eventLoop().inEventLoop()) {
|
|
||||||
close0(e);
|
|
||||||
} else {
|
|
||||||
channel.eventLoop().execute(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
close0(e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Clean up calls.
|
|
||||||
*/
|
|
||||||
private void cleanupCalls() {
|
|
||||||
List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
|
|
||||||
long currentTime = EnvironmentEdgeManager.currentTime();
|
|
||||||
long nextCleanupTaskDelay = -1L;
|
|
||||||
synchronized (pendingCalls) {
|
|
||||||
for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
|
|
||||||
AsyncCall call = iter.next();
|
|
||||||
long timeout = call.getRpcTimeout();
|
|
||||||
if (timeout > 0) {
|
|
||||||
if (currentTime - call.getStartTime() >= timeout) {
|
|
||||||
iter.remove();
|
|
||||||
toCleanup.add(call);
|
|
||||||
} else {
|
|
||||||
if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
|
|
||||||
nextCleanupTaskDelay = timeout;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (nextCleanupTaskDelay > 0) {
|
|
||||||
cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS);
|
|
||||||
} else {
|
|
||||||
cleanupTimer = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (AsyncCall call : toCleanup) {
|
|
||||||
call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
|
|
||||||
+ (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if the connection is alive
|
|
||||||
* @return true if alive
|
|
||||||
*/
|
|
||||||
public boolean isAlive() {
|
|
||||||
return channel.isOpen();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public InetSocketAddress getAddress() {
|
|
||||||
return this.address;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if user should authenticate over Kerberos
|
|
||||||
* @return true if should be authenticated over Kerberos
|
|
||||||
* @throws java.io.IOException on failure of check
|
|
||||||
*/
|
|
||||||
private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
|
|
||||||
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
|
|
||||||
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
|
|
||||||
UserGroupInformation realUser = currentUser.getRealUser();
|
|
||||||
return authMethod == AuthMethod.KERBEROS && loginUser != null &&
|
|
||||||
// Make sure user logged in using Kerberos either keytab or TGT
|
|
||||||
loginUser.hasKerberosCredentials() &&
|
|
||||||
// relogin only in case it is the login user (e.g. JT)
|
|
||||||
// or superuser (like oozie).
|
|
||||||
(loginUser.equals(currentUser) || loginUser.equals(realUser));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If multiple clients with the same principal try to connect to the same server at the same time,
|
|
||||||
* the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
|
|
||||||
* work around this, what is done is that the client backs off randomly and tries to initiate the
|
|
||||||
* connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
|
|
||||||
* attempted.
|
|
||||||
* <p>
|
|
||||||
* The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the
|
|
||||||
* user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such
|
|
||||||
* cases, it is prudent to throw a runtime exception when we receive a SaslException from the
|
|
||||||
* underlying authentication implementation, so there is no retry from other high level (for eg,
|
|
||||||
* HCM or HBaseAdmin).
|
|
||||||
* </p>
|
|
||||||
* @param currRetries retry count
|
|
||||||
* @param ex exception describing fail
|
|
||||||
* @param user which is trying to connect
|
|
||||||
* @throws java.io.IOException if IO fail
|
|
||||||
* @throws InterruptedException if thread is interrupted
|
|
||||||
*/
|
|
||||||
private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
|
|
||||||
final UserGroupInformation user) throws IOException, InterruptedException {
|
|
||||||
user.doAs(new PrivilegedExceptionAction<Void>() {
|
|
||||||
@Override
|
|
||||||
public Void run() throws IOException, InterruptedException {
|
|
||||||
if (shouldAuthenticateOverKrb()) {
|
|
||||||
if (currRetries < MAX_SASL_RETRIES) {
|
|
||||||
LOG.debug("Exception encountered while connecting to the server : " + ex);
|
|
||||||
// try re-login
|
|
||||||
if (UserGroupInformation.isLoginKeytabBased()) {
|
|
||||||
UserGroupInformation.getLoginUser().reloginFromKeytab();
|
|
||||||
} else {
|
|
||||||
UserGroupInformation.getLoginUser().reloginFromTicketCache();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Should reconnect
|
|
||||||
return null;
|
|
||||||
} else {
|
|
||||||
String msg = "Couldn't setup connection for "
|
|
||||||
+ UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
|
|
||||||
LOG.warn(msg, ex);
|
|
||||||
throw (IOException) new IOException(msg).initCause(ex);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
LOG.warn("Exception encountered while connecting to " + "the server : " + ex);
|
|
||||||
}
|
|
||||||
if (ex instanceof RemoteException) {
|
|
||||||
throw (RemoteException) ex;
|
|
||||||
}
|
|
||||||
if (ex instanceof SaslException) {
|
|
||||||
String msg = "SASL authentication failed."
|
|
||||||
+ " The most likely cause is missing or invalid credentials." + " Consider 'kinit'.";
|
|
||||||
LOG.fatal(msg, ex);
|
|
||||||
throw new RuntimeException(msg, ex);
|
|
||||||
}
|
|
||||||
throw new IOException(ex);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getConnectionHashCode() {
|
|
||||||
return ConnectionId.hashCode(ticket, serviceName, address);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
return getConnectionHashCode();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object obj) {
|
|
||||||
if (obj instanceof AsyncRpcChannelImpl) {
|
|
||||||
AsyncRpcChannelImpl channel = (AsyncRpcChannelImpl) obj;
|
|
||||||
return channel.hashCode() == obj.hashCode();
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Listens to call writes and fails if write failed
|
|
||||||
*/
|
|
||||||
private static final class CallWriteListener implements ChannelFutureListener {
|
|
||||||
private final AsyncRpcChannelImpl rpcChannel;
|
|
||||||
private final int id;
|
|
||||||
|
|
||||||
public CallWriteListener(AsyncRpcChannelImpl asyncRpcChannelImpl, int id) {
|
|
||||||
this.rpcChannel = asyncRpcChannelImpl;
|
|
||||||
this.id = id;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void operationComplete(ChannelFuture future) throws Exception {
|
|
||||||
if (!future.isSuccess()) {
|
|
||||||
AsyncCall call = rpcChannel.removePendingCall(id);
|
|
||||||
if (call != null) {
|
|
||||||
if (future.cause() instanceof IOException) {
|
|
||||||
call.setFailed((IOException) future.cause());
|
|
||||||
} else {
|
|
||||||
call.setFailed(new IOException(future.cause()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -23,11 +23,11 @@ import com.google.protobuf.Message;
|
||||||
import com.google.protobuf.RpcCallback;
|
import com.google.protobuf.RpcCallback;
|
||||||
import com.google.protobuf.RpcChannel;
|
import com.google.protobuf.RpcChannel;
|
||||||
import com.google.protobuf.RpcController;
|
import com.google.protobuf.RpcController;
|
||||||
|
|
||||||
import io.netty.bootstrap.Bootstrap;
|
import io.netty.bootstrap.Bootstrap;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelInitializer;
|
import io.netty.channel.ChannelInitializer;
|
||||||
import io.netty.channel.ChannelOption;
|
import io.netty.channel.ChannelOption;
|
||||||
import io.netty.channel.EventLoop;
|
|
||||||
import io.netty.channel.EventLoopGroup;
|
import io.netty.channel.EventLoopGroup;
|
||||||
import io.netty.channel.epoll.EpollEventLoopGroup;
|
import io.netty.channel.epoll.EpollEventLoopGroup;
|
||||||
import io.netty.channel.epoll.EpollSocketChannel;
|
import io.netty.channel.epoll.EpollSocketChannel;
|
||||||
|
@ -37,6 +37,9 @@ import io.netty.channel.socket.nio.NioSocketChannel;
|
||||||
import io.netty.util.HashedWheelTimer;
|
import io.netty.util.HashedWheelTimer;
|
||||||
import io.netty.util.Timeout;
|
import io.netty.util.Timeout;
|
||||||
import io.netty.util.TimerTask;
|
import io.netty.util.TimerTask;
|
||||||
|
import io.netty.util.concurrent.Future;
|
||||||
|
import io.netty.util.concurrent.FutureListener;
|
||||||
|
import io.netty.util.concurrent.Promise;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
@ -49,16 +52,13 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.Future;
|
|
||||||
import org.apache.hadoop.hbase.client.MetricsConnection;
|
import org.apache.hadoop.hbase.client.MetricsConnection;
|
||||||
import org.apache.hadoop.hbase.client.ResponseFutureListener;
|
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.util.JVM;
|
import org.apache.hadoop.hbase.util.JVM;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
@ -240,7 +240,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
|
||||||
}
|
}
|
||||||
final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
|
final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
|
||||||
|
|
||||||
final Future<Message> promise = connection.callMethod(md, param, pcrc.cellScanner(), returnType,
|
final Promise<Message> promise = connection.callMethod(md, param, pcrc.cellScanner(), returnType,
|
||||||
getMessageConverterWithRpcController(pcrc), null, pcrc.getCallTimeout(),
|
getMessageConverterWithRpcController(pcrc), null, pcrc.getCallTimeout(),
|
||||||
pcrc.getPriority());
|
pcrc.getPriority());
|
||||||
|
|
||||||
|
@ -290,8 +290,8 @@ public class AsyncRpcClient extends AbstractRpcClient {
|
||||||
try {
|
try {
|
||||||
connection = createRpcChannel(md.getService().getName(), addr, ticket);
|
connection = createRpcChannel(md.getService().getName(), addr, ticket);
|
||||||
|
|
||||||
ResponseFutureListener<Message> listener =
|
FutureListener<Message> listener =
|
||||||
new ResponseFutureListener<Message>() {
|
new FutureListener<Message>() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(Future<Message> future) throws Exception {
|
public void operationComplete(Future<Message> future) throws Exception {
|
||||||
if (!future.isSuccess()) {
|
if (!future.isSuccess()) {
|
||||||
|
@ -351,11 +351,6 @@ public class AsyncRpcClient extends AbstractRpcClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public EventLoop getEventExecutor() {
|
|
||||||
return this.bootstrap.config().group().next();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a cell scanner
|
* Create a cell scanner
|
||||||
*
|
*
|
||||||
|
@ -378,13 +373,6 @@ public class AsyncRpcClient extends AbstractRpcClient {
|
||||||
return ipcUtil.buildCellBlock(this.codec, this.compressor, cells);
|
return ipcUtil.buildCellBlock(this.codec, this.compressor, cells);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public AsyncRpcChannel createRpcChannel(String serviceName, ServerName sn, User user)
|
|
||||||
throws StoppedRpcClientException, FailedServerException {
|
|
||||||
return this.createRpcChannel(serviceName,
|
|
||||||
new InetSocketAddress(sn.getHostname(), sn.getPort()), user);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates an RPC client
|
* Creates an RPC client
|
||||||
*
|
*
|
||||||
|
@ -420,7 +408,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
|
||||||
connections.remove(hashCode);
|
connections.remove(hashCode);
|
||||||
}
|
}
|
||||||
if (rpcChannel == null) {
|
if (rpcChannel == null) {
|
||||||
rpcChannel = new AsyncRpcChannelImpl(this.bootstrap, this, ticket, serviceName, location);
|
rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location);
|
||||||
connections.put(hashCode, rpcChannel);
|
connections.put(hashCode, rpcChannel);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,13 +37,13 @@ import org.apache.hadoop.ipc.RemoteException;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class AsyncServerResponseHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
public class AsyncServerResponseHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||||
private final AsyncRpcChannelImpl channel;
|
private final AsyncRpcChannel channel;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
* @param channel on which this response handler operates
|
* @param channel on which this response handler operates
|
||||||
*/
|
*/
|
||||||
public AsyncServerResponseHandler(AsyncRpcChannelImpl channel) {
|
public AsyncServerResponseHandler(AsyncRpcChannel channel) {
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,38 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.ipc;
|
|
||||||
|
|
||||||
import io.netty.util.concurrent.DefaultPromise;
|
|
||||||
import io.netty.util.concurrent.EventExecutor;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.client.Future;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Abstract response promise
|
|
||||||
* @param <T> Type of result contained in Promise
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class Promise<T> extends DefaultPromise<T> implements Future<T> {
|
|
||||||
/**
|
|
||||||
* Constructor
|
|
||||||
* @param eventLoop to handle events on
|
|
||||||
*/
|
|
||||||
public Promise(EventExecutor eventLoop) {
|
|
||||||
super(eventLoop);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
import com.google.protobuf.BlockingRpcChannel;
|
import com.google.protobuf.BlockingRpcChannel;
|
||||||
import com.google.protobuf.RpcChannel;
|
import com.google.protobuf.RpcChannel;
|
||||||
import io.netty.util.concurrent.EventExecutor;
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -69,18 +69,6 @@ import org.apache.hadoop.hbase.security.User;
|
||||||
BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout)
|
BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
/**
|
|
||||||
* Create or fetch AsyncRpcChannel
|
|
||||||
* @param serviceName to connect to
|
|
||||||
* @param sn ServerName of the channel to create
|
|
||||||
* @param user for the service
|
|
||||||
* @return An async RPC channel fitting given parameters
|
|
||||||
* @throws FailedServerException if server failed
|
|
||||||
* @throws StoppedRpcClientException if the RPC client has stopped
|
|
||||||
*/
|
|
||||||
AsyncRpcChannel createRpcChannel(String serviceName, ServerName sn, User user)
|
|
||||||
throws StoppedRpcClientException, FailedServerException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a "channel" that can be used by a protobuf service. Useful setting up
|
* Creates a "channel" that can be used by a protobuf service. Useful setting up
|
||||||
* protobuf stubs.
|
* protobuf stubs.
|
||||||
|
@ -116,10 +104,4 @@ import org.apache.hadoop.hbase.security.User;
|
||||||
* supports cell blocks.
|
* supports cell blocks.
|
||||||
*/
|
*/
|
||||||
boolean hasCellBlockSupport();
|
boolean hasCellBlockSupport();
|
||||||
|
|
||||||
/**
|
|
||||||
* Get an event loop to operate on
|
|
||||||
* @return EventLoop
|
|
||||||
*/
|
|
||||||
EventExecutor getEventExecutor();
|
|
||||||
}
|
}
|
|
@ -23,8 +23,6 @@ import com.google.protobuf.Message;
|
||||||
import com.google.protobuf.Message.Builder;
|
import com.google.protobuf.Message.Builder;
|
||||||
import com.google.protobuf.RpcCallback;
|
import com.google.protobuf.RpcCallback;
|
||||||
import com.google.protobuf.RpcChannel;
|
import com.google.protobuf.RpcChannel;
|
||||||
import com.google.protobuf.RpcController;
|
|
||||||
import io.netty.util.concurrent.EventExecutor;
|
|
||||||
|
|
||||||
import java.io.BufferedInputStream;
|
import java.io.BufferedInputStream;
|
||||||
import java.io.BufferedOutputStream;
|
import java.io.BufferedOutputStream;
|
||||||
|
@ -55,6 +53,7 @@ import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
import javax.security.sasl.SaslException;
|
import javax.security.sasl.SaslException;
|
||||||
|
|
||||||
|
@ -66,7 +65,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.Future;
|
|
||||||
import org.apache.hadoop.hbase.client.MetricsConnection;
|
import org.apache.hadoop.hbase.client.MetricsConnection;
|
||||||
import org.apache.hadoop.hbase.codec.Codec;
|
import org.apache.hadoop.hbase.codec.Codec;
|
||||||
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
|
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
|
||||||
|
@ -1219,11 +1217,6 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public EventExecutor getEventExecutor() {
|
|
||||||
return AsyncRpcClient.getGlobalEventLoopGroup(this.conf).getFirst().next();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Make a call, passing <code>param</code>, to the IPC server running at
|
* Make a call, passing <code>param</code>, to the IPC server running at
|
||||||
* <code>address</code> which is servicing the <code>protocol</code> protocol,
|
* <code>address</code> which is servicing the <code>protocol</code> protocol,
|
||||||
|
@ -1335,15 +1328,9 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
return call;
|
return call;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public org.apache.hadoop.hbase.ipc.AsyncRpcChannel createRpcChannel(String serviceName,
|
|
||||||
ServerName sn, User user) throws StoppedRpcClientException, FailedServerException {
|
|
||||||
return new AsyncRpcChannel(sn, user);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RpcChannel createProtobufRpcChannel(ServerName sn, User user, int rpcTimeout) {
|
public RpcChannel createProtobufRpcChannel(ServerName sn, User user, int rpcTimeout) {
|
||||||
return new RpcChannelImplementation(sn, user, rpcTimeout);
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1392,143 +1379,4 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
|
|
||||||
return connection;
|
return connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Simulated async call
|
|
||||||
*/
|
|
||||||
private class RpcChannelImplementation implements RpcChannel {
|
|
||||||
private final InetSocketAddress isa;
|
|
||||||
private final User ticket;
|
|
||||||
private final int channelOperationTimeout;
|
|
||||||
private final EventExecutor executor;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param channelOperationTimeout - the default timeout when no timeout is given
|
|
||||||
*/
|
|
||||||
protected RpcChannelImplementation(
|
|
||||||
final ServerName sn, final User ticket, int channelOperationTimeout) {
|
|
||||||
this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
|
|
||||||
this.ticket = ticket;
|
|
||||||
this.channelOperationTimeout = channelOperationTimeout;
|
|
||||||
|
|
||||||
this.executor = RpcClientImpl.this.getEventExecutor();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void callMethod(final MethodDescriptor method, RpcController controller,
|
|
||||||
final Message request, final Message responsePrototype, final RpcCallback<Message> done) {
|
|
||||||
final PayloadCarryingRpcController pcrc = configurePayloadCarryingRpcController(
|
|
||||||
controller,
|
|
||||||
channelOperationTimeout);
|
|
||||||
|
|
||||||
executor.execute(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
|
|
||||||
cs.setStartTime(EnvironmentEdgeManager.currentTime());
|
|
||||||
Call call = call(method, request, responsePrototype, pcrc, ticket, isa, cs);
|
|
||||||
cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
|
|
||||||
if (metrics != null) {
|
|
||||||
metrics.updateRpc(method, request, cs);
|
|
||||||
}
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Call: " + method.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
|
|
||||||
}
|
|
||||||
|
|
||||||
done.run(call.response);
|
|
||||||
} catch (IOException e) {
|
|
||||||
pcrc.setFailed(e);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
pcrc.startCancel();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wraps the call in an async channel.
|
|
||||||
*/
|
|
||||||
private class AsyncRpcChannel implements org.apache.hadoop.hbase.ipc.AsyncRpcChannel {
|
|
||||||
private final EventExecutor executor;
|
|
||||||
private final InetSocketAddress isa;
|
|
||||||
|
|
||||||
private final User ticket;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructor
|
|
||||||
* @param sn servername to connect to
|
|
||||||
* @param user to connect with
|
|
||||||
*/
|
|
||||||
public AsyncRpcChannel(ServerName sn, User user) {
|
|
||||||
this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
|
|
||||||
this.executor = RpcClientImpl.this.getEventExecutor();
|
|
||||||
this.ticket = user;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public <R extends Message, O> Future<O> callMethod(final MethodDescriptor method,
|
|
||||||
final Message request, CellScanner cellScanner, final R responsePrototype,
|
|
||||||
final MessageConverter<R, O> messageConverter,
|
|
||||||
final IOExceptionConverter exceptionConverter, long rpcTimeout, int priority) {
|
|
||||||
final PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(cellScanner);
|
|
||||||
pcrc.setPriority(priority);
|
|
||||||
pcrc.setCallTimeout((int) rpcTimeout);
|
|
||||||
|
|
||||||
final Promise<O> promise = new Promise<>(executor);
|
|
||||||
|
|
||||||
executor.execute(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
|
|
||||||
cs.setStartTime(EnvironmentEdgeManager.currentTime());
|
|
||||||
Call call = call(method, request, responsePrototype, pcrc, ticket, isa, cs);
|
|
||||||
cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
|
|
||||||
if (metrics != null) {
|
|
||||||
metrics.updateRpc(method, request, cs);
|
|
||||||
}
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Call: " + method.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
|
|
||||||
}
|
|
||||||
|
|
||||||
promise.setSuccess(
|
|
||||||
messageConverter.convert((R) call.response, call.cells)
|
|
||||||
);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
promise.cancel(true);
|
|
||||||
} catch (IOException e) {
|
|
||||||
if(exceptionConverter != null) {
|
|
||||||
e = exceptionConverter.convert(e);
|
|
||||||
}
|
|
||||||
promise.setFailure(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
return promise;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public EventExecutor getEventExecutor() {
|
|
||||||
return this.executor;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close(Throwable cause) {
|
|
||||||
this.executor.shutdownGracefully();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isAlive() {
|
|
||||||
return !this.executor.isShuttingDown() && !this.executor.isShutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public InetSocketAddress getAddress() {
|
|
||||||
return isa;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
package org.apache.hadoop.hbase.ipc;
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.anyObject;
|
import static org.mockito.Matchers.anyObject;
|
||||||
|
@ -32,9 +31,9 @@ import com.google.protobuf.BlockingRpcChannel;
|
||||||
import com.google.protobuf.BlockingService;
|
import com.google.protobuf.BlockingService;
|
||||||
import com.google.protobuf.Descriptors.MethodDescriptor;
|
import com.google.protobuf.Descriptors.MethodDescriptor;
|
||||||
import com.google.protobuf.Message;
|
import com.google.protobuf.Message;
|
||||||
import com.google.protobuf.RpcChannel;
|
|
||||||
import com.google.protobuf.RpcController;
|
import com.google.protobuf.RpcController;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
|
@ -42,9 +41,7 @@ import java.net.InetSocketAddress;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -56,8 +53,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.Waiter;
|
|
||||||
import org.apache.hadoop.hbase.client.Future;
|
|
||||||
import org.apache.hadoop.hbase.client.MetricsConnection;
|
import org.apache.hadoop.hbase.client.MetricsConnection;
|
||||||
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
|
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
|
||||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
|
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
|
||||||
|
@ -414,141 +409,4 @@ public abstract class AbstractTestIPC {
|
||||||
.wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException"))
|
.wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException"))
|
||||||
.getCause() instanceof CallTimeoutException);
|
.getCause() instanceof CallTimeoutException);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testAsyncProtobufConnectionSetup() throws Exception {
|
|
||||||
TestRpcServer rpcServer = new TestRpcServer();
|
|
||||||
try (RpcClient client = createRpcClient(CONF)) {
|
|
||||||
rpcServer.start();
|
|
||||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
|
||||||
if (address == null) {
|
|
||||||
throw new IOException("Listener channel is closed");
|
|
||||||
}
|
|
||||||
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
|
|
||||||
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
|
|
||||||
|
|
||||||
RpcChannel channel = client.createProtobufRpcChannel(
|
|
||||||
ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()),
|
|
||||||
User.getCurrent(), 0);
|
|
||||||
|
|
||||||
final AtomicBoolean done = new AtomicBoolean(false);
|
|
||||||
|
|
||||||
channel
|
|
||||||
.callMethod(md, new PayloadCarryingRpcController(), param, md.getOutputType().toProto(),
|
|
||||||
new com.google.protobuf.RpcCallback<Message>() {
|
|
||||||
@Override
|
|
||||||
public void run(Message parameter) {
|
|
||||||
done.set(true);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
|
|
||||||
@Override
|
|
||||||
public boolean evaluate() throws Exception {
|
|
||||||
return done.get();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} finally {
|
|
||||||
rpcServer.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRTEDuringAsyncProtobufConnectionSetup() throws Exception {
|
|
||||||
TestRpcServer rpcServer = new TestRpcServer();
|
|
||||||
try (RpcClient client = createRpcClientRTEDuringConnectionSetup(CONF)) {
|
|
||||||
rpcServer.start();
|
|
||||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
|
||||||
if (address == null) {
|
|
||||||
throw new IOException("Listener channel is closed");
|
|
||||||
}
|
|
||||||
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
|
|
||||||
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
|
|
||||||
|
|
||||||
RpcChannel channel = client.createProtobufRpcChannel(
|
|
||||||
ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()),
|
|
||||||
User.getCurrent(), 0);
|
|
||||||
|
|
||||||
final AtomicBoolean done = new AtomicBoolean(false);
|
|
||||||
|
|
||||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
|
|
||||||
controller.notifyOnFail(new com.google.protobuf.RpcCallback<IOException>() {
|
|
||||||
@Override
|
|
||||||
public void run(IOException e) {
|
|
||||||
done.set(true);
|
|
||||||
LOG.info("Caught expected exception: " + e.toString());
|
|
||||||
assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
channel.callMethod(md, controller, param, md.getOutputType().toProto(),
|
|
||||||
new com.google.protobuf.RpcCallback<Message>() {
|
|
||||||
@Override
|
|
||||||
public void run(Message parameter) {
|
|
||||||
done.set(true);
|
|
||||||
fail("Expected an exception to have been thrown!");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
|
|
||||||
@Override
|
|
||||||
public boolean evaluate() throws Exception {
|
|
||||||
return done.get();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} finally {
|
|
||||||
rpcServer.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testAsyncConnectionSetup() throws Exception {
|
|
||||||
TestRpcServer rpcServer = new TestRpcServer();
|
|
||||||
try (RpcClient client = createRpcClient(CONF)) {
|
|
||||||
rpcServer.start();
|
|
||||||
Message msg = setupAsyncConnection(rpcServer, client);
|
|
||||||
|
|
||||||
assertNotNull(msg);
|
|
||||||
} finally {
|
|
||||||
rpcServer.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRTEDuringAsyncConnectionSetup() throws Exception {
|
|
||||||
TestRpcServer rpcServer = new TestRpcServer();
|
|
||||||
try (RpcClient client = createRpcClientRTEDuringConnectionSetup(CONF)) {
|
|
||||||
rpcServer.start();
|
|
||||||
setupAsyncConnection(rpcServer, client);
|
|
||||||
|
|
||||||
fail("Expected an exception to have been thrown!");
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
|
|
||||||
} finally {
|
|
||||||
rpcServer.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Message setupAsyncConnection(TestRpcServer rpcServer, RpcClient client)
|
|
||||||
throws IOException, InterruptedException, ExecutionException,
|
|
||||||
java.util.concurrent.TimeoutException {
|
|
||||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
|
||||||
if (address == null) {
|
|
||||||
throw new IOException("Listener channel is closed");
|
|
||||||
}
|
|
||||||
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
|
|
||||||
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
|
|
||||||
|
|
||||||
ServerName serverName =
|
|
||||||
ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis());
|
|
||||||
|
|
||||||
AsyncRpcChannel channel =
|
|
||||||
client.createRpcChannel(md.getService().getName(), serverName, User.getCurrent());
|
|
||||||
|
|
||||||
final Future<Message> f = channel
|
|
||||||
.callMethod(md, param, null, md.getOutputType().toProto(), MessageConverter.NO_CONVERTER,
|
|
||||||
null, 1000, HConstants.NORMAL_QOS);
|
|
||||||
|
|
||||||
return f.get(1, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue