Compare commits
1 Commits
trunk
...
HDFS-13572
Author | SHA1 | Date |
---|---|---|
zhangduo | f324188551 |
|
@ -114,6 +114,10 @@
|
||||||
<groupId>io.netty</groupId>
|
<groupId>io.netty</groupId>
|
||||||
<artifactId>netty</artifactId>
|
<artifactId>netty</artifactId>
|
||||||
</exclusion>
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>io.netty</groupId>
|
||||||
|
<artifactId>netty-all</artifactId>
|
||||||
|
</exclusion>
|
||||||
<exclusion>
|
<exclusion>
|
||||||
<groupId>javax.servlet</groupId>
|
<groupId>javax.servlet</groupId>
|
||||||
<artifactId>javax.servlet-api</artifactId>
|
<artifactId>javax.servlet-api</artifactId>
|
||||||
|
|
|
@ -38,6 +38,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
<groupId>com.squareup.okhttp</groupId>
|
<groupId>com.squareup.okhttp</groupId>
|
||||||
<artifactId>okhttp</artifactId>
|
<artifactId>okhttp</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.netty</groupId>
|
||||||
|
<artifactId>netty-all</artifactId>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-common</artifactId>
|
<artifactId>hadoop-common</artifactId>
|
||||||
|
@ -63,11 +68,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
<artifactId>mockito-core</artifactId>
|
<artifactId>mockito-core</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>io.netty</groupId>
|
|
||||||
<artifactId>netty-all</artifactId>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.mock-server</groupId>
|
<groupId>org.mock-server</groupId>
|
||||||
<artifactId>mockserver-netty</artifactId>
|
<artifactId>mockserver-netty</artifactId>
|
||||||
|
@ -163,6 +163,25 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
</source>
|
</source>
|
||||||
</configuration>
|
</configuration>
|
||||||
</execution>
|
</execution>
|
||||||
|
<execution>
|
||||||
|
<id>compile-test-protoc</id>
|
||||||
|
<goals>
|
||||||
|
<goal>test-protoc</goal>
|
||||||
|
</goals>
|
||||||
|
<configuration>
|
||||||
|
<protocVersion>${protobuf.version}</protocVersion>
|
||||||
|
<protocCommand>${protoc.path}</protocCommand>
|
||||||
|
<imports>
|
||||||
|
<param>${basedir}/src/test/proto</param>
|
||||||
|
</imports>
|
||||||
|
<source>
|
||||||
|
<directory>${basedir}/src/test/proto</directory>
|
||||||
|
<includes>
|
||||||
|
<include>test_rpc.proto</include>
|
||||||
|
</includes>
|
||||||
|
</source>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
</executions>
|
</executions>
|
||||||
</plugin>
|
</plugin>
|
||||||
<plugin>
|
<plugin>
|
||||||
|
|
|
@ -0,0 +1,100 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.ipc;
|
||||||
|
|
||||||
|
import io.netty.channel.ChannelDuplexHandler;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.ChannelPromise;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
|
||||||
|
|
||||||
|
private enum BufferCallAction {
|
||||||
|
FLUSH, FAIL
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class BufferCallEvent {
|
||||||
|
|
||||||
|
public final BufferCallAction action;
|
||||||
|
|
||||||
|
public final IOException error;
|
||||||
|
|
||||||
|
private BufferCallEvent(BufferCallBeforeInitHandler.BufferCallAction action,
|
||||||
|
IOException error) {
|
||||||
|
this.action = action;
|
||||||
|
this.error = error;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static BufferCallBeforeInitHandler.BufferCallEvent success() {
|
||||||
|
return SUCCESS_EVENT;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static BufferCallBeforeInitHandler.BufferCallEvent fail(
|
||||||
|
IOException error) {
|
||||||
|
return new BufferCallEvent(BufferCallAction.FAIL, error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final BufferCallEvent SUCCESS_EVENT =
|
||||||
|
new BufferCallEvent(BufferCallAction.FLUSH, null);
|
||||||
|
|
||||||
|
private final Map<Integer, Call> id2Call = new HashMap<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(ChannelHandlerContext ctx, Object msg,
|
||||||
|
ChannelPromise promise) {
|
||||||
|
if (msg instanceof Call) {
|
||||||
|
Call call = (Call) msg;
|
||||||
|
id2Call.put(call.getId(), call);
|
||||||
|
// The call is already in track so here we set the write operation as
|
||||||
|
// success.
|
||||||
|
// We will fail the call directly if we can not write it out.
|
||||||
|
promise.trySuccess();
|
||||||
|
} else {
|
||||||
|
ctx.write(msg, promise);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
|
||||||
|
throws Exception {
|
||||||
|
if (evt instanceof BufferCallEvent) {
|
||||||
|
BufferCallEvent bcEvt = (BufferCallBeforeInitHandler.BufferCallEvent) evt;
|
||||||
|
switch (bcEvt.action) {
|
||||||
|
case FLUSH:
|
||||||
|
for (Call call : id2Call.values()) {
|
||||||
|
ctx.write(call);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case FAIL:
|
||||||
|
for (Call call : id2Call.values()) {
|
||||||
|
call.setException(bcEvt.error);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
ctx.flush();
|
||||||
|
ctx.pipeline().remove(this);
|
||||||
|
} else {
|
||||||
|
ctx.fireUserEventTriggered(evt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,132 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.ipc;
|
||||||
|
|
||||||
|
import com.google.protobuf.Message;
|
||||||
|
import com.google.protobuf.RpcCallback;
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class Call {
|
||||||
|
private final int id;
|
||||||
|
|
||||||
|
private final String protocolName;
|
||||||
|
|
||||||
|
private final long protocolVersion;
|
||||||
|
|
||||||
|
private final String methodName;
|
||||||
|
|
||||||
|
private final Message param;
|
||||||
|
|
||||||
|
private final Message responseDefaultType;
|
||||||
|
|
||||||
|
private volatile Message response;
|
||||||
|
|
||||||
|
private volatile IOException error;
|
||||||
|
|
||||||
|
private boolean done;
|
||||||
|
|
||||||
|
private final RpcCallback<Call> callback;
|
||||||
|
|
||||||
|
Call(int id, String protocolName, long protocolVersion, String methodName,
|
||||||
|
Message param, Message responseDefaultType, RpcCallback<Call> callback) {
|
||||||
|
this.id = id;
|
||||||
|
this.protocolName = protocolName;
|
||||||
|
this.protocolVersion = protocolVersion;
|
||||||
|
this.methodName = methodName;
|
||||||
|
this.param = param;
|
||||||
|
this.responseDefaultType = responseDefaultType;
|
||||||
|
this.callback = callback;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void callComplete() {
|
||||||
|
callback.run(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the exception when there is an error. Notify the caller the call is
|
||||||
|
* done.
|
||||||
|
*
|
||||||
|
* @param error exception thrown by the call; either local or remote
|
||||||
|
*/
|
||||||
|
void setException(IOException error) {
|
||||||
|
synchronized (this) {
|
||||||
|
if (done) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.done = true;
|
||||||
|
this.error = error;
|
||||||
|
}
|
||||||
|
callComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the return value when there is no error. Notify the caller the call is
|
||||||
|
* done.
|
||||||
|
*
|
||||||
|
* @param response return value of the call.
|
||||||
|
* @param cells Can be null
|
||||||
|
*/
|
||||||
|
void setResponse(Message response) {
|
||||||
|
synchronized (this) {
|
||||||
|
if (done) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.done = true;
|
||||||
|
this.response = response;
|
||||||
|
}
|
||||||
|
callComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
int getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
String getProtocolName() {
|
||||||
|
return protocolName;
|
||||||
|
}
|
||||||
|
|
||||||
|
long getProtocolVersion() {
|
||||||
|
return protocolVersion;
|
||||||
|
}
|
||||||
|
|
||||||
|
String getMethodName() {
|
||||||
|
return methodName;
|
||||||
|
}
|
||||||
|
|
||||||
|
Message getParam() {
|
||||||
|
return param;
|
||||||
|
}
|
||||||
|
|
||||||
|
Message getResponseDefaultType() {
|
||||||
|
return responseDefaultType;
|
||||||
|
}
|
||||||
|
|
||||||
|
Message getResponse() {
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
IOException getError() {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isDone() {
|
||||||
|
return done;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,71 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.ipc;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class ConnectionId {
|
||||||
|
|
||||||
|
private static final int PRIME = 16777619;
|
||||||
|
|
||||||
|
private final UserGroupInformation ticket;
|
||||||
|
private final String protocolName;
|
||||||
|
private final InetSocketAddress address;
|
||||||
|
|
||||||
|
public ConnectionId(UserGroupInformation ticket, String protocolName,
|
||||||
|
InetSocketAddress address) {
|
||||||
|
this.ticket = ticket;
|
||||||
|
this.protocolName = protocolName;
|
||||||
|
this.address = address;
|
||||||
|
}
|
||||||
|
|
||||||
|
UserGroupInformation getTicket() {
|
||||||
|
return ticket;
|
||||||
|
}
|
||||||
|
|
||||||
|
String getProtocolName() {
|
||||||
|
return protocolName;
|
||||||
|
}
|
||||||
|
|
||||||
|
InetSocketAddress getAddress() {
|
||||||
|
return address;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
int h = ticket == null ? 0 : ticket.hashCode();
|
||||||
|
h = PRIME * h + protocolName.hashCode();
|
||||||
|
h = PRIME * h + address.hashCode();
|
||||||
|
return h;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (obj instanceof ConnectionId) {
|
||||||
|
ConnectionId id = (ConnectionId) obj;
|
||||||
|
return address.equals(id.address) &&
|
||||||
|
((ticket != null && ticket.equals(id.ticket)) ||
|
||||||
|
(ticket == id.ticket)) &&
|
||||||
|
protocolName.equals(id.protocolName);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,74 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.ipc;
|
||||||
|
|
||||||
|
import com.google.protobuf.RpcCallback;
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class HdfsRpcController implements RpcController {
|
||||||
|
|
||||||
|
private IOException error;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reset() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean failed() {
|
||||||
|
return error != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String errorText() {
|
||||||
|
return error != null ? error.getMessage() : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void startCancel() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setFailed(String reason) {
|
||||||
|
this.error = new IOException(reason);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setException(IOException error) {
|
||||||
|
this.error = error;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IOException getException() {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isCanceled() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void notifyOnCancel(RpcCallback<Object> callback) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.ipc;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class IPCUtil {
|
||||||
|
|
||||||
|
static IOException toIOE(Throwable t) {
|
||||||
|
if (t instanceof IOException) {
|
||||||
|
return (IOException) t;
|
||||||
|
} else {
|
||||||
|
return new IOException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,128 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.ipc;
|
||||||
|
|
||||||
|
import com.google.protobuf.Descriptors;
|
||||||
|
import com.google.protobuf.Message;
|
||||||
|
import com.google.protobuf.RpcCallback;
|
||||||
|
import com.google.protobuf.RpcChannel;
|
||||||
|
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.EventLoopGroup;
|
||||||
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
|
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.ipc.ClientId;
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.ipc.Server.AuthProtocol;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The protobuf based rpc client.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class RpcClient implements Closeable {
|
||||||
|
|
||||||
|
private final byte[] clientId;
|
||||||
|
|
||||||
|
private final EventLoopGroup group = new NioEventLoopGroup();
|
||||||
|
|
||||||
|
private final Class<? extends Channel> channelClass = NioSocketChannel.class;
|
||||||
|
|
||||||
|
private final AtomicInteger callIdCnt = new AtomicInteger(0);
|
||||||
|
|
||||||
|
private final ConcurrentMap<ConnectionId, RpcConnection> connections =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
public RpcClient() {
|
||||||
|
this.clientId = ClientId.getClientId();
|
||||||
|
}
|
||||||
|
|
||||||
|
private int nextCallId() {
|
||||||
|
int id, next;
|
||||||
|
do {
|
||||||
|
id = callIdCnt.get();
|
||||||
|
next = id < Integer.MAX_VALUE ? id + 1 : 0;
|
||||||
|
} while (!callIdCnt.compareAndSet(id, next));
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onCallFinished(Call call, HdfsRpcController hrc,
|
||||||
|
InetSocketAddress addr, RpcCallback<Message> callback) {
|
||||||
|
IOException error = call.getError();
|
||||||
|
if (error != null) {
|
||||||
|
if (error instanceof RemoteException) {
|
||||||
|
error.fillInStackTrace();
|
||||||
|
}
|
||||||
|
hrc.setException(error);
|
||||||
|
callback.run(null);
|
||||||
|
} else {
|
||||||
|
callback.run(call.getResponse());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void callMethod(String protocolName, long protocolVersion,
|
||||||
|
Descriptors.MethodDescriptor md, HdfsRpcController hrc, Message param,
|
||||||
|
Message returnType, UserGroupInformation ugi, InetSocketAddress addr,
|
||||||
|
RpcCallback<Message> callback) {
|
||||||
|
Call call =
|
||||||
|
new Call(nextCallId(), protocolName, protocolVersion, md.getName(),
|
||||||
|
param, returnType, c -> onCallFinished(c, hrc, addr, callback));
|
||||||
|
ConnectionId remoteId = new ConnectionId(ugi, protocolName, addr);
|
||||||
|
connections
|
||||||
|
.computeIfAbsent(remoteId,
|
||||||
|
k -> new RpcConnection(this, k, AuthProtocol.NONE))
|
||||||
|
.sendRequest(call);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RpcChannel createRpcChannel(Class<?> protocol, InetSocketAddress addr,
|
||||||
|
UserGroupInformation ugi) {
|
||||||
|
String protocolName = RPC.getProtocolName(protocol);
|
||||||
|
long protocolVersion = RPC.getProtocolVersion(protocol);
|
||||||
|
return (method, controller, request, responsePrototype, done) -> callMethod(
|
||||||
|
protocolName, protocolVersion, method, (HdfsRpcController) controller,
|
||||||
|
request, responsePrototype, ugi, addr, done);
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] getClientId() {
|
||||||
|
return clientId;
|
||||||
|
}
|
||||||
|
|
||||||
|
EventLoopGroup getGroup() {
|
||||||
|
return group;
|
||||||
|
}
|
||||||
|
|
||||||
|
Class<? extends Channel> getChannelClass() {
|
||||||
|
return channelClass;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
connections.values().forEach(c -> c.shutdown());
|
||||||
|
connections.clear();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,153 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.ipc;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
|
||||||
|
|
||||||
|
import com.google.protobuf.CodedOutputStream;
|
||||||
|
import io.netty.bootstrap.Bootstrap;
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.ByteBufOutputStream;
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.ChannelFutureListener;
|
||||||
|
import io.netty.channel.ChannelOption;
|
||||||
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hdfs.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
|
||||||
|
import org.apache.hadoop.ipc.RPC.RpcKind;
|
||||||
|
import org.apache.hadoop.ipc.RpcConstants;
|
||||||
|
import org.apache.hadoop.ipc.Server.AuthProtocol;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
|
||||||
|
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
|
||||||
|
import org.apache.hadoop.util.ProtoUtil;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The connection to remote server.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class RpcConnection {
|
||||||
|
|
||||||
|
final RpcClient rpcClient;
|
||||||
|
|
||||||
|
final ConnectionId remoteId;
|
||||||
|
|
||||||
|
private final AuthProtocol authProtocol;
|
||||||
|
|
||||||
|
private Channel channel;
|
||||||
|
|
||||||
|
public RpcConnection(RpcClient rpcClient, ConnectionId remoteId,
|
||||||
|
AuthProtocol authProtocol) {
|
||||||
|
this.rpcClient = rpcClient;
|
||||||
|
this.remoteId = remoteId;
|
||||||
|
this.authProtocol = authProtocol;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeConnectionHeader(Channel ch) {
|
||||||
|
ByteBuf header = ch.alloc().buffer(7);
|
||||||
|
header.writeBytes(RpcConstants.HEADER.duplicate());
|
||||||
|
header.writeByte(RpcConstants.CURRENT_VERSION);
|
||||||
|
header.writeByte(0); // service class
|
||||||
|
header.writeByte(authProtocol.callId);
|
||||||
|
ch.writeAndFlush(header);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeConnectionContext(Channel ch) throws IOException {
|
||||||
|
RpcRequestHeaderProto connectionContextHeader =
|
||||||
|
ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
|
||||||
|
OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
|
||||||
|
RpcConstants.INVALID_RETRY_COUNT, rpcClient.getClientId());
|
||||||
|
int headerSize = connectionContextHeader.getSerializedSize();
|
||||||
|
IpcConnectionContextProto message = ProtoUtil.makeIpcConnectionContext(
|
||||||
|
remoteId.getProtocolName(), remoteId.getTicket(), AuthMethod.SIMPLE);
|
||||||
|
int messageSize = message.getSerializedSize();
|
||||||
|
|
||||||
|
int totalSize =
|
||||||
|
CodedOutputStream.computeRawVarint32Size(headerSize) + headerSize +
|
||||||
|
CodedOutputStream.computeRawVarint32Size(messageSize) + messageSize;
|
||||||
|
ByteBuf buf = ch.alloc().buffer(totalSize + 4);
|
||||||
|
buf.writeInt(totalSize);
|
||||||
|
ByteBufOutputStream out = new ByteBufOutputStream(buf);
|
||||||
|
connectionContextHeader.writeDelimitedTo(out);
|
||||||
|
message.writeDelimitedTo(out);
|
||||||
|
ch.writeAndFlush(buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void established(Channel ch) throws IOException {
|
||||||
|
ChannelPipeline p = ch.pipeline();
|
||||||
|
String addBeforeHandler =
|
||||||
|
p.context(BufferCallBeforeInitHandler.class).name();
|
||||||
|
p.addBefore(addBeforeHandler, "frameDecoder",
|
||||||
|
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
|
||||||
|
p.addBefore(addBeforeHandler, "rpcHandler", new RpcDuplexHandler(this));
|
||||||
|
p.fireUserEventTriggered(BufferCallEvent.success());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Channel connect() {
|
||||||
|
if (channel != null) {
|
||||||
|
return channel;
|
||||||
|
}
|
||||||
|
channel = new Bootstrap().group(rpcClient.getGroup())
|
||||||
|
.channel(rpcClient.getChannelClass())
|
||||||
|
.option(ChannelOption.TCP_NODELAY, true)
|
||||||
|
.option(ChannelOption.SO_KEEPALIVE, true)
|
||||||
|
.handler(new BufferCallBeforeInitHandler())
|
||||||
|
.remoteAddress(remoteId.getAddress()).connect()
|
||||||
|
.addListener(new ChannelFutureListener() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
|
Channel ch = future.channel();
|
||||||
|
if (!future.isSuccess()) {
|
||||||
|
failInit(ch, IPCUtil.toIOE(future.cause()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
writeConnectionHeader(ch);
|
||||||
|
writeConnectionContext(ch);
|
||||||
|
established(ch);
|
||||||
|
}
|
||||||
|
}).channel();
|
||||||
|
return channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void failInit(Channel ch, IOException e) {
|
||||||
|
// fail all pending calls
|
||||||
|
ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
|
||||||
|
shutdown0();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void shutdown0() {
|
||||||
|
if (channel != null) {
|
||||||
|
channel.close();
|
||||||
|
channel = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void shutdown() {
|
||||||
|
shutdown0();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void sendRequest(Call call) {
|
||||||
|
Channel channel = connect();
|
||||||
|
channel.eventLoop().execute(() -> channel.writeAndFlush(call));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,175 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.ipc;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.ipc.RPC.RpcKind;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
|
||||||
|
import org.apache.hadoop.util.ProtoUtil;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.protobuf.CodedOutputStream;
|
||||||
|
import com.google.protobuf.Message;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.ByteBufInputStream;
|
||||||
|
import io.netty.buffer.ByteBufOutputStream;
|
||||||
|
import io.netty.channel.ChannelDuplexHandler;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.ChannelPromise;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class RpcDuplexHandler extends ChannelDuplexHandler {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(RpcDuplexHandler.class);
|
||||||
|
|
||||||
|
private final RpcConnection conn;
|
||||||
|
|
||||||
|
private final Map<Integer, Call> id2Call = new HashMap<>();
|
||||||
|
|
||||||
|
public RpcDuplexHandler(RpcConnection conn) {
|
||||||
|
this.conn = conn;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeRequest(ChannelHandlerContext ctx, Call call,
|
||||||
|
ChannelPromise promise) throws IOException {
|
||||||
|
id2Call.put(call.getId(), call);
|
||||||
|
|
||||||
|
RpcRequestHeaderProto rpcHeader = ProtoUtil.makeRpcRequestHeader(
|
||||||
|
RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET,
|
||||||
|
call.getId(), 0, conn.rpcClient.getClientId());
|
||||||
|
int rpcHeaderSize = rpcHeader.getSerializedSize();
|
||||||
|
RequestHeaderProto requestHeader =
|
||||||
|
RequestHeaderProto.newBuilder().setMethodName(call.getMethodName())
|
||||||
|
.setDeclaringClassProtocolName(call.getProtocolName())
|
||||||
|
.setClientProtocolVersion(call.getProtocolVersion()).build();
|
||||||
|
int requestHeaderSize = requestHeader.getSerializedSize();
|
||||||
|
int totalSize = CodedOutputStream.computeRawVarint32Size(rpcHeaderSize) +
|
||||||
|
rpcHeaderSize +
|
||||||
|
CodedOutputStream.computeRawVarint32Size(requestHeaderSize) +
|
||||||
|
requestHeaderSize;
|
||||||
|
Message param = call.getParam();
|
||||||
|
if (param != null) {
|
||||||
|
int paramSize = param.getSerializedSize();
|
||||||
|
totalSize +=
|
||||||
|
CodedOutputStream.computeRawVarint32Size(paramSize) + paramSize;
|
||||||
|
}
|
||||||
|
ByteBufOutputStream out =
|
||||||
|
new ByteBufOutputStream(ctx.alloc().buffer(totalSize + 4));
|
||||||
|
out.writeInt(totalSize);
|
||||||
|
rpcHeader.writeDelimitedTo(out);
|
||||||
|
requestHeader.writeDelimitedTo(out);
|
||||||
|
if (param != null) {
|
||||||
|
param.writeDelimitedTo(out);
|
||||||
|
}
|
||||||
|
ctx.write(out.buffer(), promise);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(ChannelHandlerContext ctx, Object msg,
|
||||||
|
ChannelPromise promise) throws Exception {
|
||||||
|
if (msg instanceof Call) {
|
||||||
|
writeRequest(ctx, (Call) msg, promise);
|
||||||
|
} else {
|
||||||
|
ctx.write(msg, promise);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void readResponse(ChannelHandlerContext ctx, ByteBuf buf)
|
||||||
|
throws Exception {
|
||||||
|
ByteBufInputStream in = new ByteBufInputStream(buf);
|
||||||
|
RpcResponseHeaderProto header =
|
||||||
|
RpcResponseHeaderProto.parseDelimitedFrom(in);
|
||||||
|
int id = header.getCallId();
|
||||||
|
RpcStatusProto status = header.getStatus();
|
||||||
|
if (status != RpcStatusProto.SUCCESS) {
|
||||||
|
String exceptionClassName =
|
||||||
|
header.hasExceptionClassName() ? header.getExceptionClassName()
|
||||||
|
: "ServerDidNotSetExceptionClassName";
|
||||||
|
String errorMsg = header.hasErrorMsg() ? header.getErrorMsg()
|
||||||
|
: "ServerDidNotSetErrorMsg";
|
||||||
|
RpcErrorCodeProto errCode =
|
||||||
|
(header.hasErrorDetail() ? header.getErrorDetail() : null);
|
||||||
|
if (errCode == null) {
|
||||||
|
LOG.warn("Detailed error code not set by server on rpc error");
|
||||||
|
}
|
||||||
|
RemoteException re =
|
||||||
|
new RemoteException(exceptionClassName, errorMsg, errCode);
|
||||||
|
if (status == RpcStatusProto.ERROR) {
|
||||||
|
Call call = id2Call.remove(id);
|
||||||
|
call.setException(re);
|
||||||
|
} else if (status == RpcStatusProto.FATAL) {
|
||||||
|
exceptionCaught(ctx, re);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Call call = id2Call.remove(id);
|
||||||
|
call.setResponse(call.getResponseDefaultType().getParserForType()
|
||||||
|
.parseDelimitedFrom(in));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelRead(ChannelHandlerContext ctx, Object msg)
|
||||||
|
throws Exception {
|
||||||
|
if (msg instanceof ByteBuf) {
|
||||||
|
ByteBuf buf = (ByteBuf) msg;
|
||||||
|
try {
|
||||||
|
readResponse(ctx, buf);
|
||||||
|
} finally {
|
||||||
|
buf.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cleanupCalls(ChannelHandlerContext ctx, IOException error) {
|
||||||
|
for (Call call : id2Call.values()) {
|
||||||
|
call.setException(error);
|
||||||
|
}
|
||||||
|
id2Call.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
if (!id2Call.isEmpty()) {
|
||||||
|
cleanupCalls(ctx, new IOException("Connection closed"));
|
||||||
|
}
|
||||||
|
conn.shutdown();
|
||||||
|
ctx.fireChannelInactive();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
|
||||||
|
throws Exception {
|
||||||
|
if (!id2Call.isEmpty()) {
|
||||||
|
cleanupCalls(ctx, new IOException("Connection closed"));
|
||||||
|
}
|
||||||
|
conn.shutdown();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,88 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.ipc;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import com.google.protobuf.RpcChannel;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos;
|
||||||
|
import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos.EchoRequestProto;
|
||||||
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestAsyncIPC {
|
||||||
|
|
||||||
|
private static Configuration CONF;
|
||||||
|
|
||||||
|
private static TestServer SERVER;
|
||||||
|
|
||||||
|
private static int PORT;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws IOException {
|
||||||
|
CONF = new Configuration();
|
||||||
|
RPC.setProtocolEngine(CONF, TestRpcProtocolPB.class,
|
||||||
|
ProtobufRpcEngine.class);
|
||||||
|
SERVER = new TestServer(CONF);
|
||||||
|
SERVER.start();
|
||||||
|
PORT = SERVER.port();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() {
|
||||||
|
SERVER.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws IOException, InterruptedException {
|
||||||
|
try (RpcClient client = new RpcClient()) {
|
||||||
|
RpcChannel channel = client.createRpcChannel(TestRpcProtocolPB.class,
|
||||||
|
new InetSocketAddress("localhost", PORT),
|
||||||
|
UserGroupInformation.getCurrentUser());
|
||||||
|
TestRpcProtos.TestRpcService.Interface stub =
|
||||||
|
TestRpcProtos.TestRpcService.newStub(channel);
|
||||||
|
Map<Integer, String> results = new HashMap<>();
|
||||||
|
int count = 100;
|
||||||
|
CountDownLatch latch = new CountDownLatch(count);
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
final int index = i;
|
||||||
|
stub.echo(new HdfsRpcController(),
|
||||||
|
EchoRequestProto.newBuilder().setMessage("Echo-" + index).build(),
|
||||||
|
resp -> {
|
||||||
|
results.put(index, resp.getMessage());
|
||||||
|
latch.countDown();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
latch.await();
|
||||||
|
assertEquals(count, results.size());
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
assertEquals("Echo-" + i, results.get(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,27 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.ipc;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos;
|
||||||
|
import org.apache.hadoop.ipc.ProtocolInfo;
|
||||||
|
|
||||||
|
@ProtocolInfo(protocolName = "org.apache.hadoop.hdfs.ipc.TestRpcProtocol",
|
||||||
|
protocolVersion = 1)
|
||||||
|
public interface TestRpcProtocolPB
|
||||||
|
extends TestRpcProtos.TestRpcService.BlockingInterface {
|
||||||
|
}
|
|
@ -0,0 +1,58 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.ipc;
|
||||||
|
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos;
|
||||||
|
import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos.EchoRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos.EchoResponseProto;
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
|
||||||
|
public class TestServer implements TestRpcProtocolPB {
|
||||||
|
|
||||||
|
private final RPC.Server server;
|
||||||
|
|
||||||
|
public TestServer(Configuration conf) throws IOException {
|
||||||
|
server = new RPC.Builder(conf).setProtocol(TestRpcProtocolPB.class)
|
||||||
|
.setInstance(
|
||||||
|
TestRpcProtos.TestRpcService.newReflectiveBlockingService(this))
|
||||||
|
.setNumHandlers(10).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start() {
|
||||||
|
server.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() {
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int port() {
|
||||||
|
return server.getPort();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EchoResponseProto echo(RpcController controller,
|
||||||
|
EchoRequestProto request) throws ServiceException {
|
||||||
|
return EchoResponseProto.newBuilder().setMessage(request.getMessage())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,35 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
option java_package = "org.apache.hadoop.hdfs.ipc.protobuf";
|
||||||
|
option java_outer_classname = "TestRpcProtos";
|
||||||
|
option java_generic_services = true;
|
||||||
|
option java_generate_equals_and_hash = true;
|
||||||
|
package hadoop.hdfs;
|
||||||
|
|
||||||
|
message EchoRequestProto {
|
||||||
|
required string message = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message EchoResponseProto {
|
||||||
|
required string message = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
service TestRpcService {
|
||||||
|
rpc echo(EchoRequestProto) returns (EchoResponseProto);
|
||||||
|
}
|
|
@ -842,7 +842,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.netty</groupId>
|
<groupId>io.netty</groupId>
|
||||||
<artifactId>netty-all</artifactId>
|
<artifactId>netty-all</artifactId>
|
||||||
<version>4.0.52.Final</version>
|
<version>4.1.37.Final</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|
|
@ -29,7 +29,6 @@
|
||||||
<protobuf.version>3.6.1</protobuf.version>
|
<protobuf.version>3.6.1</protobuf.version>
|
||||||
<guava.version>20.0</guava.version>
|
<guava.version>20.0</guava.version>
|
||||||
<grpc.version>1.15.1</grpc.version>
|
<grpc.version>1.15.1</grpc.version>
|
||||||
<netty-all.version>4.1.27.Final</netty-all.version>
|
|
||||||
<os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
|
<os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
|
@ -47,7 +46,6 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.netty</groupId>
|
<groupId>io.netty</groupId>
|
||||||
<artifactId>netty-all</artifactId>
|
<artifactId>netty-all</artifactId>
|
||||||
<version>${netty-all.version}</version>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.grpc</groupId>
|
<groupId>io.grpc</groupId>
|
||||||
|
|
Loading…
Reference in New Issue