diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java deleted file mode 100644 index 99a8baae7d0..00000000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -/** - * Promise for responses - * @param Value type - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving - -@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_SAME_SIMPLE_NAME_AS_INTERFACE", - justification="Agree that this can be confusing but folks will pull in this and think twice " - + "about pulling in netty; incidence of confusion should be rare in this case.") -public interface Future extends io.netty.util.concurrent.Future { -} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java deleted file mode 100644 index f23dc8f05b8..00000000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import io.netty.util.concurrent.GenericFutureListener; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -/** - * Specific interface for the Response future listener - * @param Value type. - */ -@InterfaceAudience.Private -public interface ResponseFutureListener - extends GenericFutureListener> { -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java index 89e6ca489ca..33536df4e5d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java @@ -19,7 +19,11 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; + +import io.netty.util.concurrent.DefaultPromise; + import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CellScanner; @@ -39,12 +43,12 @@ import org.apache.hadoop.ipc.RemoteException; * @param Message returned in communication to be converted */ @InterfaceAudience.Private -public class AsyncCall extends Promise { +public class AsyncCall extends DefaultPromise { private static final Log LOG = LogFactory.getLog(AsyncCall.class.getName()); final int id; - private final AsyncRpcChannelImpl channel; + private final AsyncRpcChannel channel; final Descriptors.MethodDescriptor method; final Message param; @@ -77,7 +81,7 @@ public class AsyncCall extends Promise { * @param priority for this request * @param metrics MetricsConnection to which the metrics are stored for this request */ - public AsyncCall(AsyncRpcChannelImpl channel, int connectId, Descriptors.MethodDescriptor + public AsyncCall(AsyncRpcChannel channel, int connectId, Descriptors.MethodDescriptor md, Message param, CellScanner cellScanner, M responseDefaultType, MessageConverter messageConverter, IOExceptionConverter exceptionConverter, long rpcTimeout, int priority, MetricsConnection metrics) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 8cc730ffbc0..9550f2a9f46 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -20,19 +20,298 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; -import io.netty.util.concurrent.EventExecutor; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.EventLoop; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import io.netty.util.concurrent.GenericFutureListener; +import java.io.IOException; +import java.net.ConnectException; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import javax.security.sasl.SaslException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Future; +import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; +import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; +import org.apache.hadoop.hbase.protobuf.generated.TracingProtos; +import org.apache.hadoop.hbase.security.AuthMethod; +import org.apache.hadoop.hbase.security.SaslClientHandler; +import org.apache.hadoop.hbase.security.SaslUtil; +import org.apache.hadoop.hbase.security.SecurityInfo; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.TokenSelector; +import org.apache.htrace.Span; +import org.apache.htrace.Trace; /** - * Interface for Async Rpc Channels + * Netty RPC channel */ @InterfaceAudience.Private -public interface AsyncRpcChannel { +public class AsyncRpcChannel { + private static final Log LOG = LogFactory.getLog(AsyncRpcChannel.class.getName()); + + private static final int MAX_SASL_RETRIES = 5; + + protected final static Map> TOKEN_HANDDLERS + = new HashMap<>(); + + static { + TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN, + new AuthenticationTokenSelector()); + } + + final AsyncRpcClient client; + + // Contains the channel to work with. + // Only exists when connected + private Channel channel; + + String name; + final User ticket; + final String serviceName; + final InetSocketAddress address; + + private int failureCounter = 0; + + boolean useSasl; + AuthMethod authMethod; + private int reloginMaxBackoff; + private Token token; + private String serverPrincipal; + + // NOTE: closed and connected flags below are only changed when a lock on pendingCalls + private final Map pendingCalls = new HashMap(); + private boolean connected = false; + private boolean closed = false; + + private Timeout cleanupTimer; + + private final TimerTask timeoutTask = new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + cleanupCalls(); + } + }; + + /** + * Constructor for netty RPC channel + * @param bootstrap to construct channel on + * @param client to connect with + * @param ticket of user which uses connection + * @param serviceName name of service to connect to + * @param address to connect to + */ + public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, + String serviceName, InetSocketAddress address) { + this.client = client; + + this.ticket = ticket; + this.serviceName = serviceName; + this.address = address; + + this.channel = connect(bootstrap).channel(); + + name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString() + + ((ticket == null) ? " from unknown user" : (" from " + ticket.getName()))); + } + + /** + * Connect to channel + * @param bootstrap to connect to + * @return future of connection + */ + private ChannelFuture connect(final Bootstrap bootstrap) { + return bootstrap.remoteAddress(address).connect() + .addListener(new GenericFutureListener() { + @Override + public void operationComplete(final ChannelFuture f) throws Exception { + if (!f.isSuccess()) { + retryOrClose(bootstrap, failureCounter++, client.failureSleep, f.cause()); + return; + } + channel = f.channel(); + + setupAuthorization(); + + ByteBuf b = channel.alloc().directBuffer(6); + createPreamble(b, authMethod); + channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + if (useSasl) { + UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI(); + if (authMethod == AuthMethod.KERBEROS) { + if (ticket != null && ticket.getRealUser() != null) { + ticket = ticket.getRealUser(); + } + } + SaslClientHandler saslHandler; + if (ticket == null) { + throw new FatalConnectionException("ticket/user is null"); + } + final UserGroupInformation realTicket = ticket; + saslHandler = ticket.doAs(new PrivilegedExceptionAction() { + @Override + public SaslClientHandler run() throws IOException { + return getSaslHandler(realTicket, bootstrap); + } + }); + if (saslHandler != null) { + // Sasl connect is successful. Let's set up Sasl channel handler + channel.pipeline().addFirst(saslHandler); + } else { + // fall back to simple auth because server told us so. + authMethod = AuthMethod.SIMPLE; + useSasl = false; + } + } else { + startHBaseConnection(f.channel()); + } + } + }); + } + + /** + * Start HBase connection + * @param ch channel to start connection on + */ + private void startHBaseConnection(Channel ch) { + ch.pipeline().addLast("frameDecoder", + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); + ch.pipeline().addLast(new AsyncServerResponseHandler(this)); + try { + writeChannelHeader(ch).addListener(new GenericFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + close(future.cause()); + return; + } + List callsToWrite; + synchronized (pendingCalls) { + connected = true; + callsToWrite = new ArrayList(pendingCalls.values()); + } + for (AsyncCall call : callsToWrite) { + writeRequest(call); + } + } + }); + } catch (IOException e) { + close(e); + } + } + + private void startConnectionWithEncryption(Channel ch) { + // for rpc encryption, the order of ChannelInboundHandler should be: + // LengthFieldBasedFrameDecoder->SaslClientHandler->LengthFieldBasedFrameDecoder + // Don't skip the first 4 bytes for length in beforeUnwrapDecoder, + // SaslClientHandler will handler this + ch.pipeline().addFirst("beforeUnwrapDecoder", + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0)); + ch.pipeline().addLast("afterUnwrapDecoder", + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); + ch.pipeline().addLast(new AsyncServerResponseHandler(this)); + List callsToWrite; + synchronized (pendingCalls) { + connected = true; + callsToWrite = new ArrayList(pendingCalls.values()); + } + for (AsyncCall call : callsToWrite) { + writeRequest(call); + } + } + + /** + * Get SASL handler + * @param bootstrap to reconnect to + * @return new SASL handler + * @throws java.io.IOException if handler failed to create + */ + private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket, + final Bootstrap bootstrap) throws IOException { + return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal, + client.fallbackAllowed, + client.conf.get("hbase.rpc.protection", + SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)), + getChannelHeaderBytes(authMethod), + new SaslClientHandler.SaslExceptionHandler() { + @Override + public void handle(int retryCount, Random random, Throwable cause) { + try { + // Handle Sasl failure. Try to potentially get new credentials + handleSaslConnectionFailure(retryCount, cause, realTicket); + + retryOrClose(bootstrap, failureCounter++, random.nextInt(reloginMaxBackoff) + 1, + cause); + } catch (IOException | InterruptedException e) { + close(e); + } + } + }, new SaslClientHandler.SaslSuccessfulConnectHandler() { + @Override + public void onSuccess(Channel channel) { + startHBaseConnection(channel); + } + + @Override + public void onSaslProtectionSucess(Channel channel) { + startConnectionWithEncryption(channel); + } + }); + } + + /** + * Retry to connect or close + * @param bootstrap to connect with + * @param failureCount failure count + * @param e exception of fail + */ + private void retryOrClose(final Bootstrap bootstrap, int failureCount, long timeout, + Throwable e) { + if (failureCount < client.maxRetries) { + client.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + connect(bootstrap); + } + }, timeout, TimeUnit.MILLISECONDS); + } else { + client.failedServers.addToFailedServers(address); + close(e); + } + } /** * Calls method on channel @@ -40,41 +319,450 @@ public interface AsyncRpcChannel { * @param request to send * @param cellScanner with cells to send * @param responsePrototype to construct response with - * @param messageConverter for the messages to expected result - * @param exceptionConverter for converting exceptions * @param rpcTimeout timeout for request * @param priority for request * @return Promise for the response Message */ - Future callMethod( + public io.netty.util.concurrent.Promise callMethod( final Descriptors.MethodDescriptor method, - final Message request, final CellScanner cellScanner, - R responsePrototype, MessageConverter messageConverter, - IOExceptionConverter exceptionConverter, long rpcTimeout, int priority); + final Message request,final CellScanner cellScanner, + R responsePrototype, MessageConverter messageConverter, IOExceptionConverter + exceptionConverter, long rpcTimeout, int priority) { + final AsyncCall call = new AsyncCall<>(this, client.callIdCnt.getAndIncrement(), + method, request, cellScanner, responsePrototype, messageConverter, exceptionConverter, + rpcTimeout, priority, client.metrics); + synchronized (pendingCalls) { + if (closed) { + call.setFailure(new ConnectException()); + return call; + } + pendingCalls.put(call.id, call); + // Add timeout for cleanup if none is present + if (cleanupTimer == null && call.getRpcTimeout() > 0) { + cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS); + } + if (!connected) { + return call; + } + } + writeRequest(call); + return call; + } + + public EventLoop getEventExecutor() { + return this.channel.eventLoop(); + } + + AsyncCall removePendingCall(int id) { + synchronized (pendingCalls) { + return pendingCalls.remove(id); + } + } /** - * Get the EventLoop on which this channel operated - * @return EventLoop + * Write the channel header + * @param channel to write to + * @return future of write + * @throws java.io.IOException on failure to write */ - EventExecutor getEventExecutor(); + private ChannelFuture writeChannelHeader(Channel channel) throws IOException { + RPCProtos.ConnectionHeader header = getChannelHeader(authMethod); + int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header); + ByteBuf b = channel.alloc().directBuffer(totalSize); + + b.writeInt(header.getSerializedSize()); + b.writeBytes(header.toByteArray()); + + return channel.writeAndFlush(b); + } + + private byte[] getChannelHeaderBytes(AuthMethod authMethod) { + RPCProtos.ConnectionHeader header = getChannelHeader(authMethod); + ByteBuffer b = ByteBuffer.allocate(header.getSerializedSize() + 4); + b.putInt(header.getSerializedSize()); + b.put(header.toByteArray()); + return b.array(); + } + + private RPCProtos.ConnectionHeader getChannelHeader(AuthMethod authMethod) { + RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder() + .setServiceName(serviceName); + + RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod); + if (userInfoPB != null) { + headerBuilder.setUserInfo(userInfoPB); + } + + if (client.codec != null) { + headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName()); + } + if (client.compressor != null) { + headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName()); + } + + headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo()); + return headerBuilder.build(); + } + + /** + * Write request to channel + * @param call to write + */ + private void writeRequest(final AsyncCall call) { + try { + final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader + .newBuilder(); + requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName()) + .setRequestParam(call.param != null); + + if (Trace.isTracing()) { + Span s = Trace.currentSpan(); + requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder() + .setParentId(s.getSpanId()).setTraceId(s.getTraceId())); + } + + ByteBuffer cellBlock = client.buildCellBlock(call.cellScanner()); + if (cellBlock != null) { + final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta + .newBuilder(); + cellBlockBuilder.setLength(cellBlock.limit()); + requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build()); + } + // Only pass priority if there one. Let zero be same as no priority. + if (call.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) { + requestHeaderBuilder.setPriority(call.getPriority()); + } + requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ? + Integer.MAX_VALUE : (int)call.rpcTimeout); + + RPCProtos.RequestHeader rh = requestHeaderBuilder.build(); + + int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param); + if (cellBlock != null) { + totalSize += cellBlock.remaining(); + } + + ByteBuf b = channel.alloc().directBuffer(4 + totalSize); + try (ByteBufOutputStream out = new ByteBufOutputStream(b)) { + call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock)); + } + + channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id)); + } catch (IOException e) { + close(e); + } + } + + /** + * Set up server authorization + * @throws java.io.IOException if auth setup failed + */ + private void setupAuthorization() throws IOException { + SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName); + this.useSasl = client.userProvider.isHBaseSecurityEnabled(); + + this.token = null; + if (useSasl && securityInfo != null) { + AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind(); + if (tokenKind != null) { + TokenSelector tokenSelector = TOKEN_HANDDLERS.get(tokenKind); + if (tokenSelector != null) { + token = tokenSelector.selectToken(new Text(client.clusterId), + ticket.getUGI().getTokens()); + } else if (LOG.isDebugEnabled()) { + LOG.debug("No token selector found for type " + tokenKind); + } + } + String serverKey = securityInfo.getServerPrincipal(); + if (serverKey == null) { + throw new IOException("Can't obtain server Kerberos config key from SecurityInfo"); + } + this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey), + address.getAddress().getCanonicalHostName().toLowerCase(Locale.ROOT)); + if (LOG.isDebugEnabled()) { + LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is " + + serverPrincipal); + } + } + + if (!useSasl) { + authMethod = AuthMethod.SIMPLE; + } else if (token != null) { + authMethod = AuthMethod.DIGEST; + } else { + authMethod = AuthMethod.KERBEROS; + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl); + } + reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000); + } + + /** + * Build the user information + * @param ugi User Group Information + * @param authMethod Authorization method + * @return UserInformation protobuf + */ + private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) { + if (ugi == null || authMethod == AuthMethod.DIGEST) { + // Don't send user for token auth + return null; + } + RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder(); + if (authMethod == AuthMethod.KERBEROS) { + // Send effective user for Kerberos auth + userInfoPB.setEffectiveUser(ugi.getUserName()); + } else if (authMethod == AuthMethod.SIMPLE) { + // Send both effective user and real user for simple auth + userInfoPB.setEffectiveUser(ugi.getUserName()); + if (ugi.getRealUser() != null) { + userInfoPB.setRealUser(ugi.getRealUser().getUserName()); + } + } + return userInfoPB.build(); + } + + /** + * Create connection preamble + * @param byteBuf to write to + * @param authMethod to write + */ + private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) { + byteBuf.writeBytes(HConstants.RPC_HEADER); + byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION); + byteBuf.writeByte(authMethod.code); + } + + private void close0(Throwable e) { + List toCleanup; + synchronized (pendingCalls) { + if (closed) { + return; + } + closed = true; + toCleanup = new ArrayList(pendingCalls.values()); + pendingCalls.clear(); + } + IOException closeException = null; + if (e != null) { + if (e instanceof IOException) { + closeException = (IOException) e; + } else { + closeException = new IOException(e); + } + } + // log the info + if (LOG.isDebugEnabled() && closeException != null) { + LOG.debug(name + ": closing ipc connection to " + address, closeException); + } + if (cleanupTimer != null) { + cleanupTimer.cancel(); + cleanupTimer = null; + } + for (AsyncCall call : toCleanup) { + call.setFailed(closeException != null ? closeException + : new ConnectionClosingException( + "Call id=" + call.id + " on server " + address + " aborted: connection is closing")); + } + channel.disconnect().addListener(ChannelFutureListener.CLOSE); + if (LOG.isDebugEnabled()) { + LOG.debug(name + ": closed"); + } + } /** * Close connection - * @param cause of closure. + * @param e exception on close */ - void close(Throwable cause); + public void close(final Throwable e) { + client.removeConnection(this); + + // Move closing from the requesting thread to the channel thread + if (channel.eventLoop().inEventLoop()) { + close0(e); + } else { + channel.eventLoop().execute(new Runnable() { + @Override + public void run() { + close0(e); + } + }); + } + } + + /** + * Clean up calls. + */ + private void cleanupCalls() { + List toCleanup = new ArrayList(); + long currentTime = EnvironmentEdgeManager.currentTime(); + long nextCleanupTaskDelay = -1L; + synchronized (pendingCalls) { + for (Iterator iter = pendingCalls.values().iterator(); iter.hasNext();) { + AsyncCall call = iter.next(); + long timeout = call.getRpcTimeout(); + if (timeout > 0) { + if (currentTime - call.getStartTime() >= timeout) { + iter.remove(); + toCleanup.add(call); + } else { + if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) { + nextCleanupTaskDelay = timeout; + } + } + } + } + if (nextCleanupTaskDelay > 0) { + cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS); + } else { + cleanupTimer = null; + } + } + for (AsyncCall call : toCleanup) { + call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime=" + + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout())); + } + } /** * Check if the connection is alive - * * @return true if alive */ - boolean isAlive(); + public boolean isAlive() { + return channel.isOpen(); + } + + public InetSocketAddress getAddress() { + return this.address; + } /** - * Get the address on which this channel operates - * @return InetSocketAddress + * Check if user should authenticate over Kerberos + * @return true if should be authenticated over Kerberos + * @throws java.io.IOException on failure of check */ - InetSocketAddress getAddress(); + private synchronized boolean shouldAuthenticateOverKrb() throws IOException { + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + UserGroupInformation realUser = currentUser.getRealUser(); + return authMethod == AuthMethod.KERBEROS && loginUser != null && + // Make sure user logged in using Kerberos either keytab or TGT + loginUser.hasKerberosCredentials() && + // relogin only in case it is the login user (e.g. JT) + // or superuser (like oozie). + (loginUser.equals(currentUser) || loginUser.equals(realUser)); + } + + /** + * If multiple clients with the same principal try to connect to the same server at the same time, + * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to + * work around this, what is done is that the client backs off randomly and tries to initiate the + * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is + * attempted. + *

+ * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the + * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such + * cases, it is prudent to throw a runtime exception when we receive a SaslException from the + * underlying authentication implementation, so there is no retry from other high level (for eg, + * HCM or HBaseAdmin). + *

+ * @param currRetries retry count + * @param ex exception describing fail + * @param user which is trying to connect + * @throws java.io.IOException if IO fail + * @throws InterruptedException if thread is interrupted + */ + private void handleSaslConnectionFailure(final int currRetries, final Throwable ex, + final UserGroupInformation user) throws IOException, InterruptedException { + user.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws IOException, InterruptedException { + if (shouldAuthenticateOverKrb()) { + if (currRetries < MAX_SASL_RETRIES) { + LOG.debug("Exception encountered while connecting to the server : " + ex); + // try re-login + if (UserGroupInformation.isLoginKeytabBased()) { + UserGroupInformation.getLoginUser().reloginFromKeytab(); + } else { + UserGroupInformation.getLoginUser().reloginFromTicketCache(); + } + + // Should reconnect + return null; + } else { + String msg = "Couldn't setup connection for " + + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal; + LOG.warn(msg, ex); + throw (IOException) new IOException(msg).initCause(ex); + } + } else { + LOG.warn("Exception encountered while connecting to " + "the server : " + ex); + } + if (ex instanceof RemoteException) { + throw (RemoteException) ex; + } + if (ex instanceof SaslException) { + String msg = "SASL authentication failed." + + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'."; + LOG.fatal(msg, ex); + throw new RuntimeException(msg, ex); + } + throw new IOException(ex); + } + }); + } + + public int getConnectionHashCode() { + return ConnectionId.hashCode(ticket, serviceName, address); + } + + @Override + public int hashCode() { + return getConnectionHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof AsyncRpcChannel) { + AsyncRpcChannel channel = (AsyncRpcChannel) obj; + return channel.hashCode() == obj.hashCode(); + } + return false; + } + + @Override + public String toString() { + return this.address.toString() + "/" + this.serviceName + "/" + this.ticket; + } + + /** + * Listens to call writes and fails if write failed + */ + private static final class CallWriteListener implements ChannelFutureListener { + private final AsyncRpcChannel rpcChannel; + private final int id; + + public CallWriteListener(AsyncRpcChannel asyncRpcChannelImpl, int id) { + this.rpcChannel = asyncRpcChannelImpl; + this.id = id; + } + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + AsyncCall call = rpcChannel.removePendingCall(id); + if (call != null) { + if (future.cause() instanceof IOException) { + call.setFailed((IOException) future.cause()); + } else { + call.setFailed(new IOException(future.cause())); + } + } + } + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java deleted file mode 100644 index 6b7dc5b537f..00000000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java +++ /dev/null @@ -1,770 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufOutputStream; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.EventLoop; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; -import io.netty.util.concurrent.GenericFutureListener; -import java.io.IOException; -import java.net.ConnectException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Locale; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.TimeUnit; - -import javax.security.sasl.SaslException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Future; -import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; -import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; -import org.apache.hadoop.hbase.protobuf.generated.TracingProtos; -import org.apache.hadoop.hbase.security.AuthMethod; -import org.apache.hadoop.hbase.security.SaslClientHandler; -import org.apache.hadoop.hbase.security.SaslUtil; -import org.apache.hadoop.hbase.security.SecurityInfo; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.token.TokenSelector; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; - -/** - * Netty RPC channel - */ -@InterfaceAudience.Private -public class AsyncRpcChannelImpl implements AsyncRpcChannel { - private static final Log LOG = LogFactory.getLog(AsyncRpcChannelImpl.class.getName()); - - private static final int MAX_SASL_RETRIES = 5; - - protected final static Map> TOKEN_HANDDLERS - = new HashMap<>(); - - static { - TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN, - new AuthenticationTokenSelector()); - } - - final AsyncRpcClient client; - - // Contains the channel to work with. - // Only exists when connected - private Channel channel; - - String name; - final User ticket; - final String serviceName; - final InetSocketAddress address; - - private int failureCounter = 0; - - boolean useSasl; - AuthMethod authMethod; - private int reloginMaxBackoff; - private Token token; - private String serverPrincipal; - - // NOTE: closed and connected flags below are only changed when a lock on pendingCalls - private final Map pendingCalls = new HashMap(); - private boolean connected = false; - private boolean closed = false; - - private Timeout cleanupTimer; - - private final TimerTask timeoutTask = new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - cleanupCalls(); - } - }; - - /** - * Constructor for netty RPC channel - * @param bootstrap to construct channel on - * @param client to connect with - * @param ticket of user which uses connection - * @param serviceName name of service to connect to - * @param address to connect to - */ - public AsyncRpcChannelImpl(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, - String serviceName, InetSocketAddress address) { - this.client = client; - - this.ticket = ticket; - this.serviceName = serviceName; - this.address = address; - - this.channel = connect(bootstrap).channel(); - - name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString() - + ((ticket == null) ? " from unknown user" : (" from " + ticket.getName()))); - } - - /** - * Connect to channel - * @param bootstrap to connect to - * @return future of connection - */ - private ChannelFuture connect(final Bootstrap bootstrap) { - return bootstrap.remoteAddress(address).connect() - .addListener(new GenericFutureListener() { - @Override - public void operationComplete(final ChannelFuture f) throws Exception { - if (!f.isSuccess()) { - retryOrClose(bootstrap, failureCounter++, client.failureSleep, f.cause()); - return; - } - channel = f.channel(); - - setupAuthorization(); - - ByteBuf b = channel.alloc().directBuffer(6); - createPreamble(b, authMethod); - channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - if (useSasl) { - UserGroupInformation ticket = AsyncRpcChannelImpl.this.ticket.getUGI(); - if (authMethod == AuthMethod.KERBEROS) { - if (ticket != null && ticket.getRealUser() != null) { - ticket = ticket.getRealUser(); - } - } - SaslClientHandler saslHandler; - if (ticket == null) { - throw new FatalConnectionException("ticket/user is null"); - } - final UserGroupInformation realTicket = ticket; - saslHandler = ticket.doAs(new PrivilegedExceptionAction() { - @Override - public SaslClientHandler run() throws IOException { - return getSaslHandler(realTicket, bootstrap); - } - }); - if (saslHandler != null) { - // Sasl connect is successful. Let's set up Sasl channel handler - channel.pipeline().addFirst(saslHandler); - } else { - // fall back to simple auth because server told us so. - authMethod = AuthMethod.SIMPLE; - useSasl = false; - } - } else { - startHBaseConnection(f.channel()); - } - } - }); - } - - /** - * Start HBase connection - * @param ch channel to start connection on - */ - private void startHBaseConnection(Channel ch) { - ch.pipeline().addLast("frameDecoder", - new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); - ch.pipeline().addLast(new AsyncServerResponseHandler(this)); - try { - writeChannelHeader(ch).addListener(new GenericFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - close(future.cause()); - return; - } - List callsToWrite; - synchronized (pendingCalls) { - connected = true; - callsToWrite = new ArrayList(pendingCalls.values()); - } - for (AsyncCall call : callsToWrite) { - writeRequest(call); - } - } - }); - } catch (IOException e) { - close(e); - } - } - - private void startConnectionWithEncryption(Channel ch) { - // for rpc encryption, the order of ChannelInboundHandler should be: - // LengthFieldBasedFrameDecoder->SaslClientHandler->LengthFieldBasedFrameDecoder - // Don't skip the first 4 bytes for length in beforeUnwrapDecoder, - // SaslClientHandler will handler this - ch.pipeline().addFirst("beforeUnwrapDecoder", - new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0)); - ch.pipeline().addLast("afterUnwrapDecoder", - new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); - ch.pipeline().addLast(new AsyncServerResponseHandler(this)); - List callsToWrite; - synchronized (pendingCalls) { - connected = true; - callsToWrite = new ArrayList(pendingCalls.values()); - } - for (AsyncCall call : callsToWrite) { - writeRequest(call); - } - } - - /** - * Get SASL handler - * @param bootstrap to reconnect to - * @return new SASL handler - * @throws java.io.IOException if handler failed to create - */ - private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket, - final Bootstrap bootstrap) throws IOException { - return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal, - client.fallbackAllowed, - client.conf.get("hbase.rpc.protection", - SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)), - getChannelHeaderBytes(authMethod), - new SaslClientHandler.SaslExceptionHandler() { - @Override - public void handle(int retryCount, Random random, Throwable cause) { - try { - // Handle Sasl failure. Try to potentially get new credentials - handleSaslConnectionFailure(retryCount, cause, realTicket); - - retryOrClose(bootstrap, failureCounter++, random.nextInt(reloginMaxBackoff) + 1, - cause); - } catch (IOException | InterruptedException e) { - close(e); - } - } - }, new SaslClientHandler.SaslSuccessfulConnectHandler() { - @Override - public void onSuccess(Channel channel) { - startHBaseConnection(channel); - } - - @Override - public void onSaslProtectionSucess(Channel channel) { - startConnectionWithEncryption(channel); - } - }); - } - - /** - * Retry to connect or close - * @param bootstrap to connect with - * @param failureCount failure count - * @param e exception of fail - */ - private void retryOrClose(final Bootstrap bootstrap, int failureCount, long timeout, - Throwable e) { - if (failureCount < client.maxRetries) { - client.newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - connect(bootstrap); - } - }, timeout, TimeUnit.MILLISECONDS); - } else { - client.failedServers.addToFailedServers(address); - close(e); - } - } - - /** - * Calls method on channel - * @param method to call - * @param request to send - * @param cellScanner with cells to send - * @param responsePrototype to construct response with - * @param rpcTimeout timeout for request - * @param priority for request - * @return Promise for the response Message - */ - @Override - public Future callMethod( - final Descriptors.MethodDescriptor method, - final Message request,final CellScanner cellScanner, - R responsePrototype, MessageConverter messageConverter, IOExceptionConverter - exceptionConverter, long rpcTimeout, int priority) { - final AsyncCall call = new AsyncCall<>(this, client.callIdCnt.getAndIncrement(), - method, request, cellScanner, responsePrototype, messageConverter, exceptionConverter, - rpcTimeout, priority, client.metrics); - - synchronized (pendingCalls) { - if (closed) { - call.setFailure(new ConnectException()); - return call; - } - pendingCalls.put(call.id, call); - // Add timeout for cleanup if none is present - if (cleanupTimer == null && call.getRpcTimeout() > 0) { - cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS); - } - if (!connected) { - return call; - } - } - writeRequest(call); - return call; - } - - @Override - public EventLoop getEventExecutor() { - return this.channel.eventLoop(); - } - - AsyncCall removePendingCall(int id) { - synchronized (pendingCalls) { - return pendingCalls.remove(id); - } - } - - /** - * Write the channel header - * @param channel to write to - * @return future of write - * @throws java.io.IOException on failure to write - */ - private ChannelFuture writeChannelHeader(Channel channel) throws IOException { - RPCProtos.ConnectionHeader header = getChannelHeader(authMethod); - int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header); - ByteBuf b = channel.alloc().directBuffer(totalSize); - - b.writeInt(header.getSerializedSize()); - b.writeBytes(header.toByteArray()); - - return channel.writeAndFlush(b); - } - - private byte[] getChannelHeaderBytes(AuthMethod authMethod) { - RPCProtos.ConnectionHeader header = getChannelHeader(authMethod); - ByteBuffer b = ByteBuffer.allocate(header.getSerializedSize() + 4); - b.putInt(header.getSerializedSize()); - b.put(header.toByteArray()); - return b.array(); - } - - private RPCProtos.ConnectionHeader getChannelHeader(AuthMethod authMethod) { - RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder() - .setServiceName(serviceName); - - RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod); - if (userInfoPB != null) { - headerBuilder.setUserInfo(userInfoPB); - } - - if (client.codec != null) { - headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName()); - } - if (client.compressor != null) { - headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName()); - } - - headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo()); - return headerBuilder.build(); - } - - /** - * Write request to channel - * @param call to write - */ - private void writeRequest(final AsyncCall call) { - try { - final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader - .newBuilder(); - requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName()) - .setRequestParam(call.param != null); - - if (Trace.isTracing()) { - Span s = Trace.currentSpan(); - requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder() - .setParentId(s.getSpanId()).setTraceId(s.getTraceId())); - } - - ByteBuffer cellBlock = client.buildCellBlock(call.cellScanner()); - if (cellBlock != null) { - final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta - .newBuilder(); - cellBlockBuilder.setLength(cellBlock.limit()); - requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build()); - } - // Only pass priority if there one. Let zero be same as no priority. - if (call.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) { - requestHeaderBuilder.setPriority(call.getPriority()); - } - requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ? - Integer.MAX_VALUE : (int)call.rpcTimeout); - - RPCProtos.RequestHeader rh = requestHeaderBuilder.build(); - - int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param); - if (cellBlock != null) { - totalSize += cellBlock.remaining(); - } - - ByteBuf b = channel.alloc().directBuffer(4 + totalSize); - try (ByteBufOutputStream out = new ByteBufOutputStream(b)) { - call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock)); - } - - channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id)); - } catch (IOException e) { - close(e); - } - } - - /** - * Set up server authorization - * @throws java.io.IOException if auth setup failed - */ - private void setupAuthorization() throws IOException { - SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName); - this.useSasl = client.userProvider.isHBaseSecurityEnabled(); - - this.token = null; - if (useSasl && securityInfo != null) { - AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind(); - if (tokenKind != null) { - TokenSelector tokenSelector = TOKEN_HANDDLERS.get(tokenKind); - if (tokenSelector != null) { - token = tokenSelector.selectToken(new Text(client.clusterId), - ticket.getUGI().getTokens()); - } else if (LOG.isDebugEnabled()) { - LOG.debug("No token selector found for type " + tokenKind); - } - } - String serverKey = securityInfo.getServerPrincipal(); - if (serverKey == null) { - throw new IOException("Can't obtain server Kerberos config key from SecurityInfo"); - } - this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey), - address.getAddress().getCanonicalHostName().toLowerCase(Locale.ROOT)); - if (LOG.isDebugEnabled()) { - LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is " - + serverPrincipal); - } - } - - if (!useSasl) { - authMethod = AuthMethod.SIMPLE; - } else if (token != null) { - authMethod = AuthMethod.DIGEST; - } else { - authMethod = AuthMethod.KERBEROS; - } - - if (LOG.isDebugEnabled()) { - LOG.debug( - "Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl); - } - reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000); - } - - /** - * Build the user information - * @param ugi User Group Information - * @param authMethod Authorization method - * @return UserInformation protobuf - */ - private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) { - if (ugi == null || authMethod == AuthMethod.DIGEST) { - // Don't send user for token auth - return null; - } - RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder(); - if (authMethod == AuthMethod.KERBEROS) { - // Send effective user for Kerberos auth - userInfoPB.setEffectiveUser(ugi.getUserName()); - } else if (authMethod == AuthMethod.SIMPLE) { - // Send both effective user and real user for simple auth - userInfoPB.setEffectiveUser(ugi.getUserName()); - if (ugi.getRealUser() != null) { - userInfoPB.setRealUser(ugi.getRealUser().getUserName()); - } - } - return userInfoPB.build(); - } - - /** - * Create connection preamble - * @param byteBuf to write to - * @param authMethod to write - */ - private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) { - byteBuf.writeBytes(HConstants.RPC_HEADER); - byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION); - byteBuf.writeByte(authMethod.code); - } - - private void close0(Throwable e) { - List toCleanup; - synchronized (pendingCalls) { - if (closed) { - return; - } - closed = true; - toCleanup = new ArrayList(pendingCalls.values()); - pendingCalls.clear(); - } - IOException closeException = null; - if (e != null) { - if (e instanceof IOException) { - closeException = (IOException) e; - } else { - closeException = new IOException(e); - } - } - // log the info - if (LOG.isDebugEnabled() && closeException != null) { - LOG.debug(name + ": closing ipc connection to " + address, closeException); - } - if (cleanupTimer != null) { - cleanupTimer.cancel(); - cleanupTimer = null; - } - for (AsyncCall call : toCleanup) { - call.setFailed(closeException != null ? closeException - : new ConnectionClosingException( - "Call id=" + call.id + " on server " + address + " aborted: connection is closing")); - } - channel.disconnect().addListener(ChannelFutureListener.CLOSE); - if (LOG.isDebugEnabled()) { - LOG.debug(name + ": closed"); - } - } - - /** - * Close connection - * @param e exception on close - */ - public void close(final Throwable e) { - client.removeConnection(this); - - // Move closing from the requesting thread to the channel thread - if (channel.eventLoop().inEventLoop()) { - close0(e); - } else { - channel.eventLoop().execute(new Runnable() { - @Override - public void run() { - close0(e); - } - }); - } - } - - /** - * Clean up calls. - */ - private void cleanupCalls() { - List toCleanup = new ArrayList(); - long currentTime = EnvironmentEdgeManager.currentTime(); - long nextCleanupTaskDelay = -1L; - synchronized (pendingCalls) { - for (Iterator iter = pendingCalls.values().iterator(); iter.hasNext();) { - AsyncCall call = iter.next(); - long timeout = call.getRpcTimeout(); - if (timeout > 0) { - if (currentTime - call.getStartTime() >= timeout) { - iter.remove(); - toCleanup.add(call); - } else { - if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) { - nextCleanupTaskDelay = timeout; - } - } - } - } - if (nextCleanupTaskDelay > 0) { - cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS); - } else { - cleanupTimer = null; - } - } - for (AsyncCall call : toCleanup) { - call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime=" - + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout())); - } - } - - /** - * Check if the connection is alive - * @return true if alive - */ - public boolean isAlive() { - return channel.isOpen(); - } - - @Override - public InetSocketAddress getAddress() { - return this.address; - } - - /** - * Check if user should authenticate over Kerberos - * @return true if should be authenticated over Kerberos - * @throws java.io.IOException on failure of check - */ - private synchronized boolean shouldAuthenticateOverKrb() throws IOException { - UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); - UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); - UserGroupInformation realUser = currentUser.getRealUser(); - return authMethod == AuthMethod.KERBEROS && loginUser != null && - // Make sure user logged in using Kerberos either keytab or TGT - loginUser.hasKerberosCredentials() && - // relogin only in case it is the login user (e.g. JT) - // or superuser (like oozie). - (loginUser.equals(currentUser) || loginUser.equals(realUser)); - } - - /** - * If multiple clients with the same principal try to connect to the same server at the same time, - * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to - * work around this, what is done is that the client backs off randomly and tries to initiate the - * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is - * attempted. - *

- * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the - * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such - * cases, it is prudent to throw a runtime exception when we receive a SaslException from the - * underlying authentication implementation, so there is no retry from other high level (for eg, - * HCM or HBaseAdmin). - *

- * @param currRetries retry count - * @param ex exception describing fail - * @param user which is trying to connect - * @throws java.io.IOException if IO fail - * @throws InterruptedException if thread is interrupted - */ - private void handleSaslConnectionFailure(final int currRetries, final Throwable ex, - final UserGroupInformation user) throws IOException, InterruptedException { - user.doAs(new PrivilegedExceptionAction() { - @Override - public Void run() throws IOException, InterruptedException { - if (shouldAuthenticateOverKrb()) { - if (currRetries < MAX_SASL_RETRIES) { - LOG.debug("Exception encountered while connecting to the server : " + ex); - // try re-login - if (UserGroupInformation.isLoginKeytabBased()) { - UserGroupInformation.getLoginUser().reloginFromKeytab(); - } else { - UserGroupInformation.getLoginUser().reloginFromTicketCache(); - } - - // Should reconnect - return null; - } else { - String msg = "Couldn't setup connection for " - + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal; - LOG.warn(msg, ex); - throw (IOException) new IOException(msg).initCause(ex); - } - } else { - LOG.warn("Exception encountered while connecting to " + "the server : " + ex); - } - if (ex instanceof RemoteException) { - throw (RemoteException) ex; - } - if (ex instanceof SaslException) { - String msg = "SASL authentication failed." - + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'."; - LOG.fatal(msg, ex); - throw new RuntimeException(msg, ex); - } - throw new IOException(ex); - } - }); - } - - public int getConnectionHashCode() { - return ConnectionId.hashCode(ticket, serviceName, address); - } - - @Override - public int hashCode() { - return getConnectionHashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof AsyncRpcChannelImpl) { - AsyncRpcChannelImpl channel = (AsyncRpcChannelImpl) obj; - return channel.hashCode() == obj.hashCode(); - } - return false; - } - - @Override - public String toString() { - return this.address.toString() + "/" + this.serviceName + "/" + this.ticket; - } - - /** - * Listens to call writes and fails if write failed - */ - private static final class CallWriteListener implements ChannelFutureListener { - private final AsyncRpcChannelImpl rpcChannel; - private final int id; - - public CallWriteListener(AsyncRpcChannelImpl asyncRpcChannelImpl, int id) { - this.rpcChannel = asyncRpcChannelImpl; - this.id = id; - } - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - AsyncCall call = rpcChannel.removePendingCall(id); - if (call != null) { - if (future.cause() instanceof IOException) { - call.setFailed((IOException) future.cause()); - } else { - call.setFailed(new IOException(future.cause())); - } - } - } - } - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index 723a2345886..3d343b4d77d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -23,11 +23,11 @@ import com.google.protobuf.Message; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcChannel; import com.google.protobuf.RpcController; + import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollSocketChannel; @@ -37,6 +37,9 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; import java.io.IOException; import java.net.InetSocketAddress; @@ -49,16 +52,13 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Future; import org.apache.hadoop.hbase.client.MetricsConnection; -import org.apache.hadoop.hbase.client.ResponseFutureListener; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.JVM; import org.apache.hadoop.hbase.util.Pair; @@ -240,7 +240,7 @@ public class AsyncRpcClient extends AbstractRpcClient { } final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket); - final Future promise = connection.callMethod(md, param, pcrc.cellScanner(), returnType, + final Promise promise = connection.callMethod(md, param, pcrc.cellScanner(), returnType, getMessageConverterWithRpcController(pcrc), null, pcrc.getCallTimeout(), pcrc.getPriority()); @@ -290,8 +290,8 @@ public class AsyncRpcClient extends AbstractRpcClient { try { connection = createRpcChannel(md.getService().getName(), addr, ticket); - ResponseFutureListener listener = - new ResponseFutureListener() { + FutureListener listener = + new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { @@ -351,11 +351,6 @@ public class AsyncRpcClient extends AbstractRpcClient { } } - @Override - public EventLoop getEventExecutor() { - return this.bootstrap.config().group().next(); - } - /** * Create a cell scanner * @@ -378,13 +373,6 @@ public class AsyncRpcClient extends AbstractRpcClient { return ipcUtil.buildCellBlock(this.codec, this.compressor, cells); } - @Override - public AsyncRpcChannel createRpcChannel(String serviceName, ServerName sn, User user) - throws StoppedRpcClientException, FailedServerException { - return this.createRpcChannel(serviceName, - new InetSocketAddress(sn.getHostname(), sn.getPort()), user); - } - /** * Creates an RPC client * @@ -420,7 +408,7 @@ public class AsyncRpcClient extends AbstractRpcClient { connections.remove(hashCode); } if (rpcChannel == null) { - rpcChannel = new AsyncRpcChannelImpl(this.bootstrap, this, ticket, serviceName, location); + rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location); connections.put(hashCode, rpcChannel); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java index 6fcca345138..7a2802fef8a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java @@ -37,13 +37,13 @@ import org.apache.hadoop.ipc.RemoteException; */ @InterfaceAudience.Private public class AsyncServerResponseHandler extends SimpleChannelInboundHandler { - private final AsyncRpcChannelImpl channel; + private final AsyncRpcChannel channel; /** * Constructor * @param channel on which this response handler operates */ - public AsyncServerResponseHandler(AsyncRpcChannelImpl channel) { + public AsyncServerResponseHandler(AsyncRpcChannel channel) { this.channel = channel; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java deleted file mode 100644 index 0d05db8512b..00000000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import io.netty.util.concurrent.DefaultPromise; -import io.netty.util.concurrent.EventExecutor; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Future; - -/** - * Abstract response promise - * @param Type of result contained in Promise - */ -@InterfaceAudience.Private -public class Promise extends DefaultPromise implements Future { - /** - * Constructor - * @param eventLoop to handle events on - */ - public Promise(EventExecutor eventLoop) { - super(eventLoop); - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 9d05c210b2e..a8ec628805d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.RpcChannel; -import io.netty.util.concurrent.EventExecutor; + import java.io.Closeable; import java.io.IOException; @@ -69,18 +69,6 @@ import org.apache.hadoop.hbase.security.User; BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) throws IOException; - /** - * Create or fetch AsyncRpcChannel - * @param serviceName to connect to - * @param sn ServerName of the channel to create - * @param user for the service - * @return An async RPC channel fitting given parameters - * @throws FailedServerException if server failed - * @throws StoppedRpcClientException if the RPC client has stopped - */ - AsyncRpcChannel createRpcChannel(String serviceName, ServerName sn, User user) - throws StoppedRpcClientException, FailedServerException; - /** * Creates a "channel" that can be used by a protobuf service. Useful setting up * protobuf stubs. @@ -116,10 +104,4 @@ import org.apache.hadoop.hbase.security.User; * supports cell blocks. */ boolean hasCellBlockSupport(); - - /** - * Get an event loop to operate on - * @return EventLoop - */ - EventExecutor getEventExecutor(); } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index dc05af14e99..37b9afdd9b8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -23,8 +23,6 @@ import com.google.protobuf.Message; import com.google.protobuf.Message.Builder; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcChannel; -import com.google.protobuf.RpcController; -import io.netty.util.concurrent.EventExecutor; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@ -55,6 +53,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; + import javax.net.SocketFactory; import javax.security.sasl.SaslException; @@ -66,7 +65,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Future; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; @@ -1219,11 +1217,6 @@ public class RpcClientImpl extends AbstractRpcClient { } } - @Override - public EventExecutor getEventExecutor() { - return AsyncRpcClient.getGlobalEventLoopGroup(this.conf).getFirst().next(); - } - /** * Make a call, passing param, to the IPC server running at * address which is servicing the protocol protocol, @@ -1335,15 +1328,9 @@ public class RpcClientImpl extends AbstractRpcClient { return call; } - @Override - public org.apache.hadoop.hbase.ipc.AsyncRpcChannel createRpcChannel(String serviceName, - ServerName sn, User user) throws StoppedRpcClientException, FailedServerException { - return new AsyncRpcChannel(sn, user); - } - @Override public RpcChannel createProtobufRpcChannel(ServerName sn, User user, int rpcTimeout) { - return new RpcChannelImplementation(sn, user, rpcTimeout); + throw new UnsupportedOperationException(); } /** @@ -1392,143 +1379,4 @@ public class RpcClientImpl extends AbstractRpcClient { return connection; } - - /** - * Simulated async call - */ - private class RpcChannelImplementation implements RpcChannel { - private final InetSocketAddress isa; - private final User ticket; - private final int channelOperationTimeout; - private final EventExecutor executor; - - /** - * @param channelOperationTimeout - the default timeout when no timeout is given - */ - protected RpcChannelImplementation( - final ServerName sn, final User ticket, int channelOperationTimeout) { - this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); - this.ticket = ticket; - this.channelOperationTimeout = channelOperationTimeout; - - this.executor = RpcClientImpl.this.getEventExecutor(); - } - - @Override - public void callMethod(final MethodDescriptor method, RpcController controller, - final Message request, final Message responsePrototype, final RpcCallback done) { - final PayloadCarryingRpcController pcrc = configurePayloadCarryingRpcController( - controller, - channelOperationTimeout); - - executor.execute(new Runnable() { - @Override - public void run() { - try { - final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); - cs.setStartTime(EnvironmentEdgeManager.currentTime()); - Call call = call(method, request, responsePrototype, pcrc, ticket, isa, cs); - cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime()); - if (metrics != null) { - metrics.updateRpc(method, request, cs); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Call: " + method.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); - } - - done.run(call.response); - } catch (IOException e) { - pcrc.setFailed(e); - } catch (InterruptedException e) { - pcrc.startCancel(); - } - } - }); - } - } - - /** - * Wraps the call in an async channel. - */ - private class AsyncRpcChannel implements org.apache.hadoop.hbase.ipc.AsyncRpcChannel { - private final EventExecutor executor; - private final InetSocketAddress isa; - - private final User ticket; - - /** - * Constructor - * @param sn servername to connect to - * @param user to connect with - */ - public AsyncRpcChannel(ServerName sn, User user) { - this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); - this.executor = RpcClientImpl.this.getEventExecutor(); - this.ticket = user; - } - - @Override - @SuppressWarnings("unchecked") - public Future callMethod(final MethodDescriptor method, - final Message request, CellScanner cellScanner, final R responsePrototype, - final MessageConverter messageConverter, - final IOExceptionConverter exceptionConverter, long rpcTimeout, int priority) { - final PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(cellScanner); - pcrc.setPriority(priority); - pcrc.setCallTimeout((int) rpcTimeout); - - final Promise promise = new Promise<>(executor); - - executor.execute(new Runnable() { - @Override - public void run() { - try { - final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); - cs.setStartTime(EnvironmentEdgeManager.currentTime()); - Call call = call(method, request, responsePrototype, pcrc, ticket, isa, cs); - cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime()); - if (metrics != null) { - metrics.updateRpc(method, request, cs); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Call: " + method.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); - } - - promise.setSuccess( - messageConverter.convert((R) call.response, call.cells) - ); - } catch (InterruptedException e) { - promise.cancel(true); - } catch (IOException e) { - if(exceptionConverter != null) { - e = exceptionConverter.convert(e); - } - promise.setFailure(e); - } - } - }); - - return promise; - } - - @Override - public EventExecutor getEventExecutor() { - return this.executor; - } - - @Override - public void close(Throwable cause) { - this.executor.shutdownGracefully(); - } - - @Override - public boolean isAlive() { - return !this.executor.isShuttingDown() && !this.executor.isShutdown(); - } - - @Override - public InetSocketAddress getAddress() { - return isa; - } - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 45cec789910..4cfa25c9d28 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.ipc; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.anyObject; @@ -32,9 +31,9 @@ import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.BlockingService; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; -import com.google.protobuf.RpcChannel; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; + import java.io.IOException; import java.net.ConnectException; import java.net.InetAddress; @@ -42,9 +41,7 @@ import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -56,8 +53,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.client.Future; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; @@ -414,141 +409,4 @@ public abstract class AbstractTestIPC { .wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException")) .getCause() instanceof CallTimeoutException); } - - @Test - public void testAsyncProtobufConnectionSetup() throws Exception { - TestRpcServer rpcServer = new TestRpcServer(); - try (RpcClient client = createRpcClient(CONF)) { - rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - - RpcChannel channel = client.createProtobufRpcChannel( - ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()), - User.getCurrent(), 0); - - final AtomicBoolean done = new AtomicBoolean(false); - - channel - .callMethod(md, new PayloadCarryingRpcController(), param, md.getOutputType().toProto(), - new com.google.protobuf.RpcCallback() { - @Override - public void run(Message parameter) { - done.set(true); - } - }); - - TEST_UTIL.waitFor(1000, new Waiter.Predicate() { - @Override - public boolean evaluate() throws Exception { - return done.get(); - } - }); - } finally { - rpcServer.stop(); - } - } - - @Test - public void testRTEDuringAsyncProtobufConnectionSetup() throws Exception { - TestRpcServer rpcServer = new TestRpcServer(); - try (RpcClient client = createRpcClientRTEDuringConnectionSetup(CONF)) { - rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - - RpcChannel channel = client.createProtobufRpcChannel( - ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()), - User.getCurrent(), 0); - - final AtomicBoolean done = new AtomicBoolean(false); - - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); - controller.notifyOnFail(new com.google.protobuf.RpcCallback() { - @Override - public void run(IOException e) { - done.set(true); - LOG.info("Caught expected exception: " + e.toString()); - assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); - } - }); - - channel.callMethod(md, controller, param, md.getOutputType().toProto(), - new com.google.protobuf.RpcCallback() { - @Override - public void run(Message parameter) { - done.set(true); - fail("Expected an exception to have been thrown!"); - } - }); - - TEST_UTIL.waitFor(1000, new Waiter.Predicate() { - @Override - public boolean evaluate() throws Exception { - return done.get(); - } - }); - } finally { - rpcServer.stop(); - } - } - - @Test - public void testAsyncConnectionSetup() throws Exception { - TestRpcServer rpcServer = new TestRpcServer(); - try (RpcClient client = createRpcClient(CONF)) { - rpcServer.start(); - Message msg = setupAsyncConnection(rpcServer, client); - - assertNotNull(msg); - } finally { - rpcServer.stop(); - } - } - - @Test - public void testRTEDuringAsyncConnectionSetup() throws Exception { - TestRpcServer rpcServer = new TestRpcServer(); - try (RpcClient client = createRpcClientRTEDuringConnectionSetup(CONF)) { - rpcServer.start(); - setupAsyncConnection(rpcServer, client); - - fail("Expected an exception to have been thrown!"); - } catch (ExecutionException e) { - assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); - } finally { - rpcServer.stop(); - } - } - - private Message setupAsyncConnection(TestRpcServer rpcServer, RpcClient client) - throws IOException, InterruptedException, ExecutionException, - java.util.concurrent.TimeoutException { - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - - ServerName serverName = - ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()); - - AsyncRpcChannel channel = - client.createRpcChannel(md.getService().getName(), serverName, User.getCurrent()); - - final Future f = channel - .callMethod(md, param, null, md.getOutputType().toProto(), MessageConverter.NO_CONVERTER, - null, 1000, HConstants.NORMAL_QOS); - - return f.get(1, TimeUnit.SECONDS); - } }