Compare commits

...

1 Commits

Author SHA1 Message Date
zhangduo f324188551 HDFS-13643. Implement basic async rpc client 2019-07-02 11:16:23 +08:00
16 changed files with 1104 additions and 8 deletions

View File

@ -114,6 +114,10 @@
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>

View File

@ -38,6 +38,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>com.squareup.okhttp</groupId>
<artifactId>okhttp</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
@ -63,11 +68,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-netty</artifactId>
@ -163,6 +163,25 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
</source>
</configuration>
</execution>
<execution>
<id>compile-test-protoc</id>
<goals>
<goal>test-protoc</goal>
</goals>
<configuration>
<protocVersion>${protobuf.version}</protocVersion>
<protocCommand>${protoc.path}</protocCommand>
<imports>
<param>${basedir}/src/test/proto</param>
</imports>
<source>
<directory>${basedir}/src/test/proto</directory>
<includes>
<include>test_rpc.proto</include>
</includes>
</source>
</configuration>
</execution>
</executions>
</plugin>
<plugin>

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.ipc;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
private enum BufferCallAction {
FLUSH, FAIL
}
public static final class BufferCallEvent {
public final BufferCallAction action;
public final IOException error;
private BufferCallEvent(BufferCallBeforeInitHandler.BufferCallAction action,
IOException error) {
this.action = action;
this.error = error;
}
public static BufferCallBeforeInitHandler.BufferCallEvent success() {
return SUCCESS_EVENT;
}
public static BufferCallBeforeInitHandler.BufferCallEvent fail(
IOException error) {
return new BufferCallEvent(BufferCallAction.FAIL, error);
}
}
private static final BufferCallEvent SUCCESS_EVENT =
new BufferCallEvent(BufferCallAction.FLUSH, null);
private final Map<Integer, Call> id2Call = new HashMap<>();
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
if (msg instanceof Call) {
Call call = (Call) msg;
id2Call.put(call.getId(), call);
// The call is already in track so here we set the write operation as
// success.
// We will fail the call directly if we can not write it out.
promise.trySuccess();
} else {
ctx.write(msg, promise);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof BufferCallEvent) {
BufferCallEvent bcEvt = (BufferCallBeforeInitHandler.BufferCallEvent) evt;
switch (bcEvt.action) {
case FLUSH:
for (Call call : id2Call.values()) {
ctx.write(call);
}
break;
case FAIL:
for (Call call : id2Call.values()) {
call.setException(bcEvt.error);
}
break;
}
ctx.flush();
ctx.pipeline().remove(this);
} else {
ctx.fireUserEventTriggered(evt);
}
}
}

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.ipc;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
class Call {
private final int id;
private final String protocolName;
private final long protocolVersion;
private final String methodName;
private final Message param;
private final Message responseDefaultType;
private volatile Message response;
private volatile IOException error;
private boolean done;
private final RpcCallback<Call> callback;
Call(int id, String protocolName, long protocolVersion, String methodName,
Message param, Message responseDefaultType, RpcCallback<Call> callback) {
this.id = id;
this.protocolName = protocolName;
this.protocolVersion = protocolVersion;
this.methodName = methodName;
this.param = param;
this.responseDefaultType = responseDefaultType;
this.callback = callback;
}
private void callComplete() {
callback.run(this);
}
/**
* Set the exception when there is an error. Notify the caller the call is
* done.
*
* @param error exception thrown by the call; either local or remote
*/
void setException(IOException error) {
synchronized (this) {
if (done) {
return;
}
this.done = true;
this.error = error;
}
callComplete();
}
/**
* Set the return value when there is no error. Notify the caller the call is
* done.
*
* @param response return value of the call.
* @param cells Can be null
*/
void setResponse(Message response) {
synchronized (this) {
if (done) {
return;
}
this.done = true;
this.response = response;
}
callComplete();
}
int getId() {
return id;
}
String getProtocolName() {
return protocolName;
}
long getProtocolVersion() {
return protocolVersion;
}
String getMethodName() {
return methodName;
}
Message getParam() {
return param;
}
Message getResponseDefaultType() {
return responseDefaultType;
}
Message getResponse() {
return response;
}
IOException getError() {
return error;
}
boolean isDone() {
return done;
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.ipc;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.UserGroupInformation;
@InterfaceAudience.Private
class ConnectionId {
private static final int PRIME = 16777619;
private final UserGroupInformation ticket;
private final String protocolName;
private final InetSocketAddress address;
public ConnectionId(UserGroupInformation ticket, String protocolName,
InetSocketAddress address) {
this.ticket = ticket;
this.protocolName = protocolName;
this.address = address;
}
UserGroupInformation getTicket() {
return ticket;
}
String getProtocolName() {
return protocolName;
}
InetSocketAddress getAddress() {
return address;
}
@Override
public int hashCode() {
int h = ticket == null ? 0 : ticket.hashCode();
h = PRIME * h + protocolName.hashCode();
h = PRIME * h + address.hashCode();
return h;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof ConnectionId) {
ConnectionId id = (ConnectionId) obj;
return address.equals(id.address) &&
((ticket != null && ticket.equals(id.ticket)) ||
(ticket == id.ticket)) &&
protocolName.equals(id.protocolName);
}
return false;
}
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.ipc;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public class HdfsRpcController implements RpcController {
private IOException error;
@Override
public void reset() {
throw new UnsupportedOperationException();
}
@Override
public boolean failed() {
return error != null;
}
@Override
public String errorText() {
return error != null ? error.getMessage() : null;
}
@Override
public void startCancel() {
throw new UnsupportedOperationException();
}
@Override
public void setFailed(String reason) {
this.error = new IOException(reason);
}
public void setException(IOException error) {
this.error = error;
}
public IOException getException() {
return error;
}
@Override
public boolean isCanceled() {
return false;
}
@Override
public void notifyOnCancel(RpcCallback<Object> callback) {
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.ipc;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
class IPCUtil {
static IOException toIOE(Throwable t) {
if (t instanceof IOException) {
return (IOException) t;
} else {
return new IOException(t);
}
}
}

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.ipc;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.ClientId;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server.AuthProtocol;
import org.apache.hadoop.security.UserGroupInformation;
/**
* The protobuf based rpc client.
*/
@InterfaceAudience.Private
public class RpcClient implements Closeable {
private final byte[] clientId;
private final EventLoopGroup group = new NioEventLoopGroup();
private final Class<? extends Channel> channelClass = NioSocketChannel.class;
private final AtomicInteger callIdCnt = new AtomicInteger(0);
private final ConcurrentMap<ConnectionId, RpcConnection> connections =
new ConcurrentHashMap<>();
public RpcClient() {
this.clientId = ClientId.getClientId();
}
private int nextCallId() {
int id, next;
do {
id = callIdCnt.get();
next = id < Integer.MAX_VALUE ? id + 1 : 0;
} while (!callIdCnt.compareAndSet(id, next));
return id;
}
private void onCallFinished(Call call, HdfsRpcController hrc,
InetSocketAddress addr, RpcCallback<Message> callback) {
IOException error = call.getError();
if (error != null) {
if (error instanceof RemoteException) {
error.fillInStackTrace();
}
hrc.setException(error);
callback.run(null);
} else {
callback.run(call.getResponse());
}
}
private void callMethod(String protocolName, long protocolVersion,
Descriptors.MethodDescriptor md, HdfsRpcController hrc, Message param,
Message returnType, UserGroupInformation ugi, InetSocketAddress addr,
RpcCallback<Message> callback) {
Call call =
new Call(nextCallId(), protocolName, protocolVersion, md.getName(),
param, returnType, c -> onCallFinished(c, hrc, addr, callback));
ConnectionId remoteId = new ConnectionId(ugi, protocolName, addr);
connections
.computeIfAbsent(remoteId,
k -> new RpcConnection(this, k, AuthProtocol.NONE))
.sendRequest(call);
}
public RpcChannel createRpcChannel(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ugi) {
String protocolName = RPC.getProtocolName(protocol);
long protocolVersion = RPC.getProtocolVersion(protocol);
return (method, controller, request, responsePrototype, done) -> callMethod(
protocolName, protocolVersion, method, (HdfsRpcController) controller,
request, responsePrototype, ugi, addr, done);
}
byte[] getClientId() {
return clientId;
}
EventLoopGroup getGroup() {
return group;
}
Class<? extends Channel> getChannelClass() {
return channelClass;
}
@Override
public void close() throws IOException {
connections.values().forEach(c -> c.shutdown());
connections.clear();
}
}

View File

@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.ipc;
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
import com.google.protobuf.CodedOutputStream;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.Server.AuthProtocol;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.util.ProtoUtil;
/**
* The connection to remote server.
*/
@InterfaceAudience.Private
class RpcConnection {
final RpcClient rpcClient;
final ConnectionId remoteId;
private final AuthProtocol authProtocol;
private Channel channel;
public RpcConnection(RpcClient rpcClient, ConnectionId remoteId,
AuthProtocol authProtocol) {
this.rpcClient = rpcClient;
this.remoteId = remoteId;
this.authProtocol = authProtocol;
}
private void writeConnectionHeader(Channel ch) {
ByteBuf header = ch.alloc().buffer(7);
header.writeBytes(RpcConstants.HEADER.duplicate());
header.writeByte(RpcConstants.CURRENT_VERSION);
header.writeByte(0); // service class
header.writeByte(authProtocol.callId);
ch.writeAndFlush(header);
}
private void writeConnectionContext(Channel ch) throws IOException {
RpcRequestHeaderProto connectionContextHeader =
ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
RpcConstants.INVALID_RETRY_COUNT, rpcClient.getClientId());
int headerSize = connectionContextHeader.getSerializedSize();
IpcConnectionContextProto message = ProtoUtil.makeIpcConnectionContext(
remoteId.getProtocolName(), remoteId.getTicket(), AuthMethod.SIMPLE);
int messageSize = message.getSerializedSize();
int totalSize =
CodedOutputStream.computeRawVarint32Size(headerSize) + headerSize +
CodedOutputStream.computeRawVarint32Size(messageSize) + messageSize;
ByteBuf buf = ch.alloc().buffer(totalSize + 4);
buf.writeInt(totalSize);
ByteBufOutputStream out = new ByteBufOutputStream(buf);
connectionContextHeader.writeDelimitedTo(out);
message.writeDelimitedTo(out);
ch.writeAndFlush(buf);
}
private void established(Channel ch) throws IOException {
ChannelPipeline p = ch.pipeline();
String addBeforeHandler =
p.context(BufferCallBeforeInitHandler.class).name();
p.addBefore(addBeforeHandler, "frameDecoder",
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
p.addBefore(addBeforeHandler, "rpcHandler", new RpcDuplexHandler(this));
p.fireUserEventTriggered(BufferCallEvent.success());
}
private Channel connect() {
if (channel != null) {
return channel;
}
channel = new Bootstrap().group(rpcClient.getGroup())
.channel(rpcClient.getChannelClass())
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new BufferCallBeforeInitHandler())
.remoteAddress(remoteId.getAddress()).connect()
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel ch = future.channel();
if (!future.isSuccess()) {
failInit(ch, IPCUtil.toIOE(future.cause()));
return;
}
writeConnectionHeader(ch);
writeConnectionContext(ch);
established(ch);
}
}).channel();
return channel;
}
private synchronized void failInit(Channel ch, IOException e) {
// fail all pending calls
ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
shutdown0();
}
private void shutdown0() {
if (channel != null) {
channel.close();
channel = null;
}
}
public synchronized void shutdown() {
shutdown0();
}
public synchronized void sendRequest(Call call) {
Channel channel = connect();
channel.eventLoop().execute(() -> channel.writeAndFlush(call));
}
}

View File

@ -0,0 +1,175 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.ipc;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.util.ProtoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
@InterfaceAudience.Private
class RpcDuplexHandler extends ChannelDuplexHandler {
private static final Logger LOG =
LoggerFactory.getLogger(RpcDuplexHandler.class);
private final RpcConnection conn;
private final Map<Integer, Call> id2Call = new HashMap<>();
public RpcDuplexHandler(RpcConnection conn) {
this.conn = conn;
}
private void writeRequest(ChannelHandlerContext ctx, Call call,
ChannelPromise promise) throws IOException {
id2Call.put(call.getId(), call);
RpcRequestHeaderProto rpcHeader = ProtoUtil.makeRpcRequestHeader(
RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET,
call.getId(), 0, conn.rpcClient.getClientId());
int rpcHeaderSize = rpcHeader.getSerializedSize();
RequestHeaderProto requestHeader =
RequestHeaderProto.newBuilder().setMethodName(call.getMethodName())
.setDeclaringClassProtocolName(call.getProtocolName())
.setClientProtocolVersion(call.getProtocolVersion()).build();
int requestHeaderSize = requestHeader.getSerializedSize();
int totalSize = CodedOutputStream.computeRawVarint32Size(rpcHeaderSize) +
rpcHeaderSize +
CodedOutputStream.computeRawVarint32Size(requestHeaderSize) +
requestHeaderSize;
Message param = call.getParam();
if (param != null) {
int paramSize = param.getSerializedSize();
totalSize +=
CodedOutputStream.computeRawVarint32Size(paramSize) + paramSize;
}
ByteBufOutputStream out =
new ByteBufOutputStream(ctx.alloc().buffer(totalSize + 4));
out.writeInt(totalSize);
rpcHeader.writeDelimitedTo(out);
requestHeader.writeDelimitedTo(out);
if (param != null) {
param.writeDelimitedTo(out);
}
ctx.write(out.buffer(), promise);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
if (msg instanceof Call) {
writeRequest(ctx, (Call) msg, promise);
} else {
ctx.write(msg, promise);
}
}
private void readResponse(ChannelHandlerContext ctx, ByteBuf buf)
throws Exception {
ByteBufInputStream in = new ByteBufInputStream(buf);
RpcResponseHeaderProto header =
RpcResponseHeaderProto.parseDelimitedFrom(in);
int id = header.getCallId();
RpcStatusProto status = header.getStatus();
if (status != RpcStatusProto.SUCCESS) {
String exceptionClassName =
header.hasExceptionClassName() ? header.getExceptionClassName()
: "ServerDidNotSetExceptionClassName";
String errorMsg = header.hasErrorMsg() ? header.getErrorMsg()
: "ServerDidNotSetErrorMsg";
RpcErrorCodeProto errCode =
(header.hasErrorDetail() ? header.getErrorDetail() : null);
if (errCode == null) {
LOG.warn("Detailed error code not set by server on rpc error");
}
RemoteException re =
new RemoteException(exceptionClassName, errorMsg, errCode);
if (status == RpcStatusProto.ERROR) {
Call call = id2Call.remove(id);
call.setException(re);
} else if (status == RpcStatusProto.FATAL) {
exceptionCaught(ctx, re);
}
return;
}
Call call = id2Call.remove(id);
call.setResponse(call.getResponseDefaultType().getParserForType()
.parseDelimitedFrom(in));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
try {
readResponse(ctx, buf);
} finally {
buf.release();
}
}
}
private void cleanupCalls(ChannelHandlerContext ctx, IOException error) {
for (Call call : id2Call.values()) {
call.setException(error);
}
id2Call.clear();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (!id2Call.isEmpty()) {
cleanupCalls(ctx, new IOException("Connection closed"));
}
conn.shutdown();
ctx.fireChannelInactive();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
if (!id2Call.isEmpty()) {
cleanupCalls(ctx, new IOException("Connection closed"));
}
conn.shutdown();
}
}

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.ipc;
import static org.junit.Assert.assertEquals;
import com.google.protobuf.RpcChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos;
import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos.EchoRequestProto;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestAsyncIPC {
private static Configuration CONF;
private static TestServer SERVER;
private static int PORT;
@BeforeClass
public static void setUp() throws IOException {
CONF = new Configuration();
RPC.setProtocolEngine(CONF, TestRpcProtocolPB.class,
ProtobufRpcEngine.class);
SERVER = new TestServer(CONF);
SERVER.start();
PORT = SERVER.port();
}
@AfterClass
public static void tearDown() {
SERVER.stop();
}
@Test
public void test() throws IOException, InterruptedException {
try (RpcClient client = new RpcClient()) {
RpcChannel channel = client.createRpcChannel(TestRpcProtocolPB.class,
new InetSocketAddress("localhost", PORT),
UserGroupInformation.getCurrentUser());
TestRpcProtos.TestRpcService.Interface stub =
TestRpcProtos.TestRpcService.newStub(channel);
Map<Integer, String> results = new HashMap<>();
int count = 100;
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; i++) {
final int index = i;
stub.echo(new HdfsRpcController(),
EchoRequestProto.newBuilder().setMessage("Echo-" + index).build(),
resp -> {
results.put(index, resp.getMessage());
latch.countDown();
});
}
latch.await();
assertEquals(count, results.size());
for (int i = 0; i < count; i++) {
assertEquals("Echo-" + i, results.get(i));
}
}
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.ipc;
import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos;
import org.apache.hadoop.ipc.ProtocolInfo;
@ProtocolInfo(protocolName = "org.apache.hadoop.hdfs.ipc.TestRpcProtocol",
protocolVersion = 1)
public interface TestRpcProtocolPB
extends TestRpcProtos.TestRpcService.BlockingInterface {
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.ipc;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos;
import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos.EchoRequestProto;
import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos.EchoResponseProto;
import org.apache.hadoop.ipc.RPC;
public class TestServer implements TestRpcProtocolPB {
private final RPC.Server server;
public TestServer(Configuration conf) throws IOException {
server = new RPC.Builder(conf).setProtocol(TestRpcProtocolPB.class)
.setInstance(
TestRpcProtos.TestRpcService.newReflectiveBlockingService(this))
.setNumHandlers(10).build();
}
public void start() {
server.start();
}
public void stop() {
server.stop();
}
public int port() {
return server.getPort();
}
@Override
public EchoResponseProto echo(RpcController controller,
EchoRequestProto request) throws ServiceException {
return EchoResponseProto.newBuilder().setMessage(request.getMessage())
.build();
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.hdfs.ipc.protobuf";
option java_outer_classname = "TestRpcProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.hdfs;
message EchoRequestProto {
required string message = 1;
}
message EchoResponseProto {
required string message = 1;
}
service TestRpcService {
rpc echo(EchoRequestProto) returns (EchoResponseProto);
}

View File

@ -842,7 +842,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.52.Final</version>
<version>4.1.37.Final</version>
</dependency>
<dependency>

View File

@ -29,7 +29,6 @@
<protobuf.version>3.6.1</protobuf.version>
<guava.version>20.0</guava.version>
<grpc.version>1.15.1</grpc.version>
<netty-all.version>4.1.27.Final</netty-all.version>
<os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
</properties>
@ -47,7 +46,6 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty-all.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>