diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index d6f325cda14..7cfa91cb2c8 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -6,6 +6,8 @@ Release 2.3.0 - UNRELEASED
NEW FEATURES
+ HADOOP-9509. Implement ONCRPC and XDR. (brandonli)
+
IMPROVEMENTS
OPTIMIZATIONS
diff --git a/hadoop-common-project/hadoop-nfs/README.txt b/hadoop-common-project/hadoop-nfs/README.txt
new file mode 100644
index 00000000000..bec500aaee6
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/README.txt
@@ -0,0 +1,10 @@
+Hadoop NFS
+
+Hadoop NFS is a Java library for building NFS gateway. It has
+the following components:
+
+- ONCRPC: This a implementation of ONCRPC(RFC-5531) and XDR(RFC-4506).
+- Mount: This an interface implementation of MOUNT protocol (RFC-1813).
+- Portmap: This is a implementation of Binding protocol(RFC-1833).
+- NFSv3: This is an interface implementation of NFSv3 protocol(RFC-1813).
+
diff --git a/hadoop-common-project/hadoop-nfs/pom.xml b/hadoop-common-project/hadoop-nfs/pom.xml
new file mode 100644
index 00000000000..5b873052085
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/pom.xml
@@ -0,0 +1,98 @@
+
+
+
+ 4.0.0
+
+ org.apache.hadoop
+ hadoop-project
+ 3.0.0-SNAPSHOT
+ ../../hadoop-project
+
+ org.apache.hadoop
+ hadoop-nfs
+ 3.0.0-SNAPSHOT
+ jar
+
+ Apache Hadoop NFS
+ Apache Hadoop NFS library
+
+
+ yyyyMMdd
+ LOCALHOST
+
+
+
+
+
+ org.apache.hadoop
+ hadoop-annotations
+ provided
+
+
+ org.apache.hadoop
+ hadoop-common
+ provided
+
+
+ junit
+ junit
+ 4.8.2
+
+
+ org.mockito
+ mockito-all
+ 1.8.5
+
+
+ commons-logging
+ commons-logging
+ compile
+
+
+ javax.servlet
+ servlet-api
+ provided
+
+
+ org.slf4j
+ slf4j-api
+ compile
+
+
+ log4j
+ log4j
+ runtime
+
+
+ org.slf4j
+ slf4j-log4j12
+ runtime
+
+
+ io.netty
+ netty
+ 3.6.2.Final
+ compile
+
+
+ com.google.guava
+ guava
+ 11.0.2
+
+
+
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java
new file mode 100644
index 00000000000..7ba37c98109
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+
+/**
+ * A simple client that registers an RPC program with portmap.
+ */
+public class RegistrationClient extends SimpleTcpClient {
+ public static final Log LOG = LogFactory.getLog(RegistrationClient.class);
+
+ public RegistrationClient(String host, int port, XDR request) {
+ super(host, port, request);
+ }
+
+ /**
+ * Handler to handle response from the server.
+ */
+ static class RegistrationClientHandler extends SimpleTcpClientHandler {
+ public RegistrationClientHandler(XDR request) {
+ super(request);
+ }
+
+ private boolean validMessageLength(int len) {
+ // 28 bytes is the minimal success response size (portmapV2)
+ if (len < 28) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Portmap mapping registration failed,"
+ + " the response size is less than 28 bytes:" + len);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ ChannelBuffer buf = (ChannelBuffer) e.getMessage(); // Read reply
+ if (!validMessageLength(buf.readableBytes())) {
+ e.getChannel().close();
+ return;
+ }
+
+ // handling fragment header for TCP, 4 bytes.
+ byte[] fragmentHeader = Arrays.copyOfRange(buf.array(), 0, 4);
+ int fragmentSize = XDR.fragmentSize(fragmentHeader);
+ boolean isLast = XDR.isLastFragment(fragmentHeader);
+ assert (fragmentSize == 28 && isLast == true);
+
+ XDR xdr = new XDR();
+ xdr.writeFixedOpaque(Arrays.copyOfRange(buf.array(), 4,
+ buf.readableBytes()));
+
+ RpcReply reply = RpcReply.read(xdr);
+ if (reply.getState() == RpcReply.ReplyState.MSG_ACCEPTED) {
+ RpcAcceptedReply acceptedReply = (RpcAcceptedReply) reply;
+ handle(acceptedReply, xdr);
+ } else {
+ RpcDeniedReply deniedReply = (RpcDeniedReply) reply;
+ handle(deniedReply);
+ }
+ e.getChannel().close(); // shutdown now that request is complete
+ }
+
+ private void handle(RpcDeniedReply deniedReply) {
+ LOG.warn("Portmap mapping registration request was denied , " +
+ deniedReply);
+ }
+
+ private void handle(RpcAcceptedReply acceptedReply, XDR xdr) {
+ AcceptState acceptState = acceptedReply.getAcceptState();
+ assert (acceptState == AcceptState.SUCCESS);
+ boolean answer = xdr.readBoolean();
+ if (answer != true) {
+ LOG.warn("Portmap mapping registration failed, accept state:"
+ + acceptState);
+ }
+ LOG.info("Portmap mapping registration succeeded");
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java
new file mode 100644
index 00000000000..9ac223de155
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+
+/**
+ * Represents RPC message MSG_ACCEPTED reply body. See RFC 1831 for details.
+ * This response is sent to a request to indicate success of the request.
+ */
+public class RpcAcceptedReply extends RpcReply {
+ public enum AcceptState {
+ SUCCESS(0), /* RPC executed successfully */
+ PROG_UNAVAIL(1), /* remote hasn't exported program */
+ PROG_MISMATCH(2), /* remote can't support version # */
+ PROC_UNAVAIL(3), /* program can't support procedure */
+ GARBAGE_ARGS(4), /* procedure can't decode params */
+ SYSTEM_ERR(5); /* e.g. memory allocation failure */
+
+ private final int value;
+
+ AcceptState(int value) {
+ this.value = value;
+ }
+
+ public static AcceptState fromValue(int value) {
+ return values()[value];
+ }
+
+ public int getValue() {
+ return value;
+ }
+ };
+
+ private final RpcAuthInfo verifier;
+ private final AcceptState acceptState;
+
+ RpcAcceptedReply(int xid, int messageType, ReplyState state,
+ RpcAuthInfo verifier, AcceptState acceptState) {
+ super(xid, messageType, state);
+ this.verifier = verifier;
+ this.acceptState = acceptState;
+ }
+
+ public static RpcAcceptedReply read(int xid, int messageType,
+ ReplyState replyState, XDR xdr) {
+ RpcAuthInfo verifier = RpcAuthInfo.read(xdr);
+ AcceptState acceptState = AcceptState.fromValue(xdr.readInt());
+ return new RpcAcceptedReply(xid, messageType, replyState, verifier,
+ acceptState);
+ }
+
+ public RpcAuthInfo getVerifier() {
+ return verifier;
+ }
+
+ public AcceptState getAcceptState() {
+ return acceptState;
+ }
+
+ public static XDR voidReply(XDR xdr, int xid) {
+ return voidReply(xdr, xid, AcceptState.SUCCESS);
+ }
+
+ public static XDR voidReply(XDR xdr, int xid, AcceptState acceptState) {
+ xdr.writeInt(xid);
+ xdr.writeInt(RpcMessage.RPC_REPLY);
+ xdr.writeInt(ReplyState.MSG_ACCEPTED.getValue());
+ xdr.writeInt(AuthFlavor.AUTH_NONE.getValue());
+ xdr.writeVariableOpaque(new byte[0]);
+ xdr.writeInt(acceptState.getValue());
+ return xdr;
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java
new file mode 100644
index 00000000000..a507d0d20de
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.util.Arrays;
+
+/**
+ * Authentication Info as defined in RFC 1831
+ */
+public class RpcAuthInfo {
+ /** Different types of authentication as defined in RFC 1831 */
+ public enum AuthFlavor {
+ AUTH_NONE(0),
+ AUTH_SYS(1),
+ AUTH_SHORT(2),
+ AUTH_DH(3),
+ RPCSEC_GSS(6);
+
+ private int value;
+
+ AuthFlavor(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ static AuthFlavor fromValue(int value) {
+ for (AuthFlavor v : values()) {
+ if (v.value == value) {
+ return v;
+ }
+ }
+ throw new IllegalArgumentException("Invalid AuthFlavor value " + value);
+ }
+ }
+
+ private final AuthFlavor flavor;
+ private final byte[] body;
+
+ protected RpcAuthInfo(AuthFlavor flavor, byte[] body) {
+ this.flavor = flavor;
+ this.body = body;
+ }
+
+ public static RpcAuthInfo read(XDR xdr) {
+ int type = xdr.readInt();
+ AuthFlavor flavor = AuthFlavor.fromValue(type);
+ byte[] body = xdr.readVariableOpaque();
+ return new RpcAuthInfo(flavor, body);
+ }
+
+ public AuthFlavor getFlavor() {
+ return flavor;
+ }
+
+ public byte[] getBody() {
+ return Arrays.copyOf(body, body.length);
+ }
+
+ @Override
+ public String toString() {
+ return "(AuthFlavor:" + flavor + ")";
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java
new file mode 100644
index 00000000000..dbedb3649e4
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+/**
+ * AUTH_SYS as defined in RFC 1831
+ */
+public class RpcAuthSys {
+ private final int uid;
+ private final int gid;
+
+ public RpcAuthSys(int uid, int gid) {
+ this.uid = uid;
+ this.gid = gid;
+ }
+
+ public static RpcAuthSys from(byte[] credentials) {
+ XDR sys = new XDR(credentials);
+ sys.skip(4); // Stamp
+ sys.skipVariableOpaque(); // Machine name
+ return new RpcAuthSys(sys.readInt(), sys.readInt());
+ }
+
+ public int getUid() {
+ return uid;
+ }
+
+ public int getGid() {
+ return gid;
+ }
+
+ @Override
+ public String toString() {
+ return "(AuthSys: uid=" + uid + " gid=" + gid + ")";
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java
new file mode 100644
index 00000000000..74e6373283f
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Represents an RPC message of type RPC call as defined in RFC 1831
+ */
+public class RpcCall extends RpcMessage {
+ public static final int RPC_VERSION = 2;
+ private static final Log LOG = LogFactory.getLog(RpcCall.class);
+ private final int rpcVersion;
+ private final int program;
+ private final int version;
+ private final int procedure;
+ private final RpcAuthInfo credential;
+ private final RpcAuthInfo verifier;
+
+ protected RpcCall(int xid, int messageType, int rpcVersion, int program,
+ int version, int procedure, RpcAuthInfo credential, RpcAuthInfo verifier) {
+ super(xid, messageType);
+ this.rpcVersion = rpcVersion;
+ this.program = program;
+ this.version = version;
+ this.procedure = procedure;
+ this.credential = credential;
+ this.verifier = verifier;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this);
+ }
+ validate();
+ }
+
+ private void validateRpcVersion() {
+ if (rpcVersion != RPC_VERSION) {
+ throw new IllegalArgumentException("RPC version is expected to be "
+ + RPC_VERSION + " but got " + rpcVersion);
+ }
+ }
+
+ public void validate() {
+ validateMessageType(RPC_CALL);
+ validateRpcVersion();
+ // Validate other members
+ // Throw exception if validation fails
+ }
+
+
+ public int getRpcVersion() {
+ return rpcVersion;
+ }
+
+ public int getProgram() {
+ return program;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public int getProcedure() {
+ return procedure;
+ }
+
+ public RpcAuthInfo getCredential() {
+ return credential;
+ }
+
+ public RpcAuthInfo getVerifier() {
+ return verifier;
+ }
+
+ public static RpcCall read(XDR xdr) {
+ return new RpcCall(xdr.readInt(), xdr.readInt(), xdr.readInt(), xdr.readInt(),
+ xdr.readInt(), xdr.readInt(), RpcAuthInfo.read(xdr),
+ RpcAuthInfo.read(xdr));
+ }
+
+ public static void write(XDR out, int xid, int program, int progVersion,
+ int procedure) {
+ out.writeInt(xid);
+ out.writeInt(RpcMessage.RPC_CALL);
+ out.writeInt(2);
+ out.writeInt(program);
+ out.writeInt(progVersion);
+ out.writeInt(procedure);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Xid:%d, messageType:%d, rpcVersion:%d, program:%d,"
+ + " version:%d, procedure:%d, credential:%s, verifier:%s", xid,
+ messageType, rpcVersion, program, version, procedure,
+ credential.toString(), verifier.toString());
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java
new file mode 100644
index 00000000000..0862d4fb4ea
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class is used for handling the duplicate non-idempotenty Rpc
+ * calls. A non-idempotent request is processed as follows:
+ *
+ * - If the request is being processed for the first time, its state is
+ * in-progress in cache.
+ * - If the request is retransimitted and is in-progress state, it is ignored.
+ *
+ * - If the request is retransimitted and is completed, the previous response
+ * from the cache is sent back to the client.
+ *
+ *
+ * A request is identified by the client ID (address of the client) and
+ * transaction ID (xid) from the Rpc call.
+ *
+ */
+public class RpcCallCache {
+
+ public static class CacheEntry {
+ private XDR response; // null if no response has been sent
+
+ public CacheEntry() {
+ response = null;
+ }
+
+ public boolean isInProgress() {
+ return response == null;
+ }
+
+ public boolean isCompleted() {
+ return response != null;
+ }
+
+ public XDR getResponse() {
+ return response;
+ }
+
+ public void setResponse(XDR response) {
+ this.response = response;
+ }
+ }
+
+ /**
+ * Call that is used to track a client in the {@link RpcCallCache}
+ */
+ public static class ClientRequest {
+ protected final InetAddress clientId;
+ protected final int xid;
+
+ public InetAddress getClientId() {
+ return clientId;
+ }
+
+ public ClientRequest(InetAddress clientId, int xid) {
+ this.clientId = clientId;
+ this.xid = xid;
+ }
+
+ @Override
+ public int hashCode() {
+ return xid + clientId.hashCode() * 31;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || !(obj instanceof ClientRequest)) {
+ return false;
+ }
+ ClientRequest other = (ClientRequest) obj;
+ return clientId.equals(other.clientId) && (xid == other.xid);
+ }
+ }
+
+ private final String program;
+
+ private final Map map;
+
+ public RpcCallCache(final String program, final int maxEntries) {
+ if (maxEntries <= 0) {
+ throw new IllegalArgumentException("Cache size is " + maxEntries
+ + ". Should be > 0");
+ }
+ this.program = program;
+ map = new LinkedHashMap() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean removeEldestEntry(
+ java.util.Map.Entry eldest) {
+ return RpcCallCache.this.size() > maxEntries;
+ }
+ };
+ }
+
+ /** Return the program name */
+ public String getProgram() {
+ return program;
+ }
+
+ /** Mark a request as completed and add corresponding response to the cache */
+ public void callCompleted(InetAddress clientId, int xid, XDR response) {
+ ClientRequest req = new ClientRequest(clientId, xid);
+ CacheEntry e;
+ synchronized(map) {
+ e = map.get(req);
+ }
+ e.setResponse(response);
+ }
+
+ /**
+ * Check the cache for an entry. If it does not exist, add the request
+ * as in progress.
+ */
+ public CacheEntry checkOrAddToCache(InetAddress clientId, int xid) {
+ ClientRequest req = new ClientRequest(clientId, xid);
+ CacheEntry e;
+ synchronized(map) {
+ e = map.get(req);
+ if (e == null) {
+ // Add an inprogress cache entry
+ map.put(req, new CacheEntry());
+ }
+ }
+ return e;
+ }
+
+ /** Return number of cached entries */
+ public int size() {
+ return map.size();
+ }
+
+ /**
+ * Iterator to the cache entries
+ * @return iterator
+ */
+ @VisibleForTesting
+ public Iterator> iterator() {
+ return map.entrySet().iterator();
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java
new file mode 100644
index 00000000000..26abd69d254
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+
+/**
+ * Represents RPC message MSG_DENIED reply body. See RFC 1831 for details.
+ * This response is sent to a request to indicate failure of the request.
+ */
+public class RpcDeniedReply extends RpcReply {
+ public enum RejectState {
+ RPC_MISMATCH(0), AUTH_ERROR(1);
+
+ private final int value;
+
+ RejectState(int value) {
+ this.value = value;
+ }
+
+ int getValue() {
+ return value;
+ }
+
+ static RejectState fromValue(int value) {
+ return values()[value];
+ }
+ }
+
+ private final RejectState rejectState;
+
+ RpcDeniedReply(int xid, int messageType, ReplyState replyState,
+ RejectState rejectState) {
+ super(xid, messageType, replyState);
+ this.rejectState = rejectState;
+ }
+
+ public static RpcDeniedReply read(int xid, int messageType,
+ ReplyState replyState, XDR xdr) {
+ RejectState rejectState = RejectState.fromValue(xdr.readInt());
+ return new RpcDeniedReply(xid, messageType, replyState, rejectState);
+ }
+
+ public RejectState getRejectState() {
+ return rejectState;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuffer().append("xid:").append(xid)
+ .append(",messageType:").append(messageType).append("rejectState:")
+ .append(rejectState).toString();
+ }
+
+ public static XDR voidReply(XDR xdr, int xid, ReplyState msgAccepted,
+ RejectState rejectState) {
+ xdr.writeInt(xid);
+ xdr.writeInt(RpcMessage.RPC_REPLY);
+ xdr.writeInt(msgAccepted.getValue());
+ xdr.writeInt(AuthFlavor.AUTH_NONE.getValue());
+ xdr.writeVariableOpaque(new byte[0]);
+ xdr.writeInt(rejectState.getValue());
+ return xdr;
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java
new file mode 100644
index 00000000000..45b6bb83f17
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+/**
+ * {@link FrameDecoder} for RPC messages.
+ */
+public class RpcFrameDecoder extends FrameDecoder {
+ public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class);
+ private ChannelBuffer frame;
+
+ /**
+ * Decode an RPC message received on the socket.
+ * @return mpnull if incomplete message is received.
+ */
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, Channel channel,
+ ChannelBuffer buf) {
+
+ // Make sure if the length field was received.
+ if (buf.readableBytes() < 4) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Length field is not received yet");
+ }
+ return null;
+ }
+
+ // Note the index and go back to it when an incomplete message is received
+ buf.markReaderIndex();
+
+ // Read the record marking.
+ ChannelBuffer fragmentHeader = buf.readBytes(4);
+ int length = XDR.fragmentSize(fragmentHeader.array());
+ boolean isLast = XDR.isLastFragment(fragmentHeader.array());
+
+ // Make sure if there's enough bytes in the buffer.
+ if (buf.readableBytes() < length) {
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(length + " bytes are not received yet");
+ }
+ buf.resetReaderIndex(); // Go back to the right reader index
+ return null;
+ }
+
+ if (frame == null) {
+ frame = buf.readBytes(length);
+ } else {
+ ChannelBuffer tmp = ChannelBuffers.copiedBuffer(frame.array(), buf
+ .readBytes(length).array());
+ frame = tmp;
+ }
+
+ // Successfully decoded a frame. Return the decoded frame if the frame is
+ // the last one. Otherwise, wait for the next frame.
+ if (isLast) {
+ ChannelBuffer completeFrame = frame;
+ frame = null;
+ return completeFrame;
+ } else {
+ LOG.info("Wait for the next frame. This rarely happens.");
+ return null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java
new file mode 100644
index 00000000000..3bd45c71599
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+/**
+ * Represent an RPC message as defined in RFC 1831.
+ */
+public abstract class RpcMessage {
+ public static final int RPC_CALL = 0;
+ public static final int RPC_REPLY = 1;
+
+ protected final int xid;
+ protected final int messageType;
+
+ RpcMessage(int xid, int messageType) {
+ if (messageType != RPC_CALL && messageType != RPC_REPLY) {
+ throw new IllegalArgumentException("Invalid message type " + messageType);
+ }
+ this.xid = xid;
+ this.messageType = messageType;
+ }
+
+ public int getXid() {
+ return xid;
+ }
+
+ public int getMessageType() {
+ return messageType;
+ }
+
+ protected void validateMessageType(int expected) {
+ if (expected != messageType) {
+ throw new IllegalArgumentException("Message type is expected to be "
+ + expected + " but got " + messageType);
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
new file mode 100644
index 00000000000..d82e624265a
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
+import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry;
+import org.apache.hadoop.portmap.PortmapMapping;
+import org.apache.hadoop.portmap.PortmapRequest;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * Class for writing RPC server programs based on RFC 1050. Extend this class
+ * and implement {@link #handleInternal} to handle the requests received.
+ */
+public abstract class RpcProgram {
+ private static final Log LOG = LogFactory.getLog(RpcProgram.class);
+ public static final int RPCB_PORT = 111;
+ private final String program;
+ private final String host;
+ private final int port;
+ private final int progNumber;
+ private final int lowProgVersion;
+ private final int highProgVersion;
+ private final RpcCallCache rpcCallCache;
+
+ /**
+ * Constructor
+ *
+ * @param program program name
+ * @param host host where the Rpc server program is started
+ * @param port port where the Rpc server program is listening to
+ * @param progNumber program number as defined in RFC 1050
+ * @param lowProgVersion lowest version of the specification supported
+ * @param highProgVersion highest version of the specification supported
+ * @param cacheSize size of cache to handle duplciate requests. Size <= 0
+ * indicates no cache.
+ */
+ protected RpcProgram(String program, String host, int port, int progNumber,
+ int lowProgVersion, int highProgVersion, int cacheSize) {
+ this.program = program;
+ this.host = host;
+ this.port = port;
+ this.progNumber = progNumber;
+ this.lowProgVersion = lowProgVersion;
+ this.highProgVersion = highProgVersion;
+ this.rpcCallCache = cacheSize > 0 ? new RpcCallCache(program, cacheSize)
+ : null;
+ }
+
+ /**
+ * Register this program with the local portmapper.
+ */
+ public void register(int transport) {
+ // Register all the program versions with portmapper for a given transport
+ for (int vers = lowProgVersion; vers <= highProgVersion; vers++) {
+ register(vers, transport);
+ }
+ }
+
+ /**
+ * Register this program with the local portmapper.
+ */
+ private void register(int progVersion, int transport) {
+ PortmapMapping mapEntry = new PortmapMapping(progNumber, progVersion,
+ transport, port);
+ register(mapEntry);
+ }
+
+ /**
+ * Register the program with Portmap or Rpcbind
+ */
+ protected void register(PortmapMapping mapEntry) {
+ XDR mappingRequest = PortmapRequest.create(mapEntry);
+ SimpleUdpClient registrationClient = new SimpleUdpClient(host, RPCB_PORT,
+ mappingRequest);
+ try {
+ registrationClient.run();
+ } catch (IOException e) {
+ LOG.error("Registration failure with " + host + ":" + port
+ + ", portmap entry: " + mapEntry);
+ throw new RuntimeException("Registration failure");
+ }
+ }
+
+ /**
+ * Handle an RPC request.
+ * @param rpcCall RPC call that is received
+ * @param in xdr with cursor at reading the remaining bytes of a method call
+ * @param out xdr output corresponding to Rpc reply
+ * @param client making the Rpc request
+ * @param channel connection over which Rpc request is received
+ * @return response xdr response
+ */
+ protected abstract XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
+ InetAddress client, Channel channel);
+
+ public XDR handle(XDR xdr, InetAddress client, Channel channel) {
+ XDR out = new XDR();
+ RpcCall rpcCall = RpcCall.read(xdr);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(program + " procedure #" + rpcCall.getProcedure());
+ }
+
+ if (!checkProgram(rpcCall.getProgram())) {
+ return programMismatch(out, rpcCall);
+ }
+
+ if (!checkProgramVersion(rpcCall.getVersion())) {
+ return programVersionMismatch(out, rpcCall);
+ }
+
+ // Check for duplicate requests in the cache for non-idempotent requests
+ boolean idempotent = rpcCallCache != null && !isIdempotent(rpcCall);
+ if (idempotent) {
+ CacheEntry entry = rpcCallCache.checkOrAddToCache(client, rpcCall.getXid());
+ if (entry != null) { // in ache
+ if (entry.isCompleted()) {
+ LOG.info("Sending the cached reply to retransmitted request "
+ + rpcCall.getXid());
+ return entry.getResponse();
+ } else { // else request is in progress
+ LOG.info("Retransmitted request, transaction still in progress "
+ + rpcCall.getXid());
+ // TODO: ignore the request?
+ }
+ }
+ }
+
+ XDR response = handleInternal(rpcCall, xdr, out, client, channel);
+ if (response.size() == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No sync response, expect an async response for request XID="
+ + rpcCall.getXid());
+ }
+ }
+
+ // Add the request to the cache
+ if (idempotent) {
+ rpcCallCache.callCompleted(client, rpcCall.getXid(), response);
+ }
+ return response;
+ }
+
+ private XDR programMismatch(XDR out, RpcCall call) {
+ LOG.warn("Invalid RPC call program " + call.getProgram());
+ RpcAcceptedReply.voidReply(out, call.getXid(), AcceptState.PROG_UNAVAIL);
+ return out;
+ }
+
+ private XDR programVersionMismatch(XDR out, RpcCall call) {
+ LOG.warn("Invalid RPC call version " + call.getVersion());
+ RpcAcceptedReply.voidReply(out, call.getXid(), AcceptState.PROG_MISMATCH);
+ out.writeInt(lowProgVersion);
+ out.writeInt(highProgVersion);
+ return out;
+ }
+
+ private boolean checkProgram(int progNumber) {
+ return this.progNumber == progNumber;
+ }
+
+ /** Return true if a the program version in rpcCall is supported */
+ private boolean checkProgramVersion(int programVersion) {
+ return programVersion >= lowProgVersion
+ && programVersion <= highProgVersion;
+ }
+
+ @Override
+ public String toString() {
+ return "Rpc program: " + program + " at " + host + ":" + port;
+ }
+
+ protected abstract boolean isIdempotent(RpcCall call);
+
+ public int getPort() {
+ return port;
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java
new file mode 100644
index 00000000000..4681d6d9698
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+/**
+ * Represents an RPC message of type RPC reply as defined in RFC 1831
+ */
+public abstract class RpcReply extends RpcMessage {
+ /** RPC reply_stat as defined in RFC 1831 */
+ public enum ReplyState {
+ MSG_ACCEPTED(0),
+ MSG_DENIED(1);
+
+ private final int value;
+ ReplyState(int value) {
+ this.value = value;
+ }
+
+ int getValue() {
+ return value;
+ }
+
+ public static ReplyState fromValue(int value) {
+ return values()[value];
+ }
+ }
+
+ private final ReplyState state;
+
+ RpcReply(int xid, int messageType, ReplyState state) {
+ super(xid, messageType);
+ this.state = state;
+ validateMessageType(RPC_REPLY);
+ }
+
+ public static RpcReply read(XDR xdr) {
+ int xid = xdr.readInt();
+ int messageType = xdr.readInt();
+ ReplyState stat = ReplyState.fromValue(xdr.readInt());
+ switch (stat) {
+ case MSG_ACCEPTED:
+ return RpcAcceptedReply.read(xid, messageType, stat, xdr);
+ case MSG_DENIED:
+ return RpcDeniedReply.read(xid, messageType, stat, xdr);
+ }
+ return null;
+ }
+
+ public ReplyState getState() {
+ return state;
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
new file mode 100644
index 00000000000..7186dd1359e
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+/**
+ * The XID in RPC call. It is used for starting with new seed after each reboot.
+ */
+public class RpcUtil {
+ private static int xid = (int) (System.currentTimeMillis() / 1000) << 12;
+
+ public static int getNewXid(String caller) {
+ return xid = ++xid + caller.hashCode();
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
new file mode 100644
index 00000000000..287aa9fa8ad
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.oncrpc.RpcFrameDecoder;
+import org.apache.hadoop.oncrpc.XDR;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+/**
+ * A simple TCP based RPC client which just sends a request to a server.
+ */
+public class SimpleTcpClient {
+ protected final String host;
+ protected final int port;
+ protected final XDR request;
+ protected ChannelPipelineFactory pipelineFactory;
+ protected final boolean oneShot;
+
+ public SimpleTcpClient(String host, int port, XDR request) {
+ this(host,port, request, true);
+ }
+
+ public SimpleTcpClient(String host, int port, XDR request, Boolean oneShot) {
+ this.host = host;
+ this.port = port;
+ this.request = request;
+ this.oneShot = oneShot;
+ }
+
+ protected ChannelPipelineFactory setPipelineFactory() {
+ this.pipelineFactory = new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() {
+ return Channels.pipeline(new RpcFrameDecoder(),
+ new SimpleTcpClientHandler(request));
+ }
+ };
+ return this.pipelineFactory;
+ }
+
+ public void run() {
+ // Configure the client.
+ ChannelFactory factory = new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), 1, 1);
+ ClientBootstrap bootstrap = new ClientBootstrap(factory);
+
+ // Set up the pipeline factory.
+ bootstrap.setPipelineFactory(setPipelineFactory());
+
+ bootstrap.setOption("tcpNoDelay", true);
+ bootstrap.setOption("keepAlive", true);
+
+ // Start the connection attempt.
+ ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
+
+ if (oneShot) {
+ // Wait until the connection is closed or the connection attempt fails.
+ future.getChannel().getCloseFuture().awaitUninterruptibly();
+
+ // Shut down thread pools to exit.
+ bootstrap.releaseExternalResources();
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java
new file mode 100644
index 00000000000..b72153a312c
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * A simple TCP based RPC client handler used by {@link SimpleTcpServer}.
+ */
+public class SimpleTcpClientHandler extends SimpleChannelHandler {
+ public static final Log LOG = LogFactory.getLog(SimpleTcpClient.class);
+ protected final XDR request;
+
+ public SimpleTcpClientHandler(XDR request) {
+ this.request = request;
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ // Send the request
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("sending PRC request");
+ }
+ ChannelBuffer outBuf = XDR.writeMessageTcp(request, true);
+ e.getChannel().write(outBuf);
+ }
+
+ /**
+ * Shutdown connection by default. Subclass can override this method to do
+ * more interaction with the server.
+ */
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ e.getChannel().close();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ LOG.warn("Unexpected exception from downstream: ", e.getCause());
+ e.getChannel().close();
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
new file mode 100644
index 00000000000..e168ef406b9
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+/**
+ * Simple UDP server implemented using netty.
+ */
+public class SimpleTcpServer {
+ public static final Log LOG = LogFactory.getLog(SimpleTcpServer.class);
+ protected final int port;
+ protected final ChannelPipelineFactory pipelineFactory;
+ protected final RpcProgram rpcProgram;
+
+ /** The maximum number of I/O worker threads */
+ protected final int workerCount;
+
+ /**
+ * @param port TCP port where to start the server at
+ * @param program RPC program corresponding to the server
+ * @param workercount Number of worker threads
+ */
+ public SimpleTcpServer(int port, RpcProgram program, int workercount) {
+ this.port = port;
+ this.rpcProgram = program;
+ this.workerCount = workercount;
+ this.pipelineFactory = getPipelineFactory();
+ }
+
+ public ChannelPipelineFactory getPipelineFactory() {
+ return new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() {
+ return Channels.pipeline(new RpcFrameDecoder(),
+ new SimpleTcpServerHandler(rpcProgram));
+ }
+ };
+ }
+
+ public void run() {
+ // Configure the Server.
+ ChannelFactory factory;
+ if (workerCount == 0) {
+ // Use default workers: 2 * the number of available processors
+ factory = new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+ } else {
+ factory = new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
+ workerCount);
+ }
+
+ ServerBootstrap bootstrap = new ServerBootstrap(factory);
+ bootstrap.setPipelineFactory(pipelineFactory);
+ bootstrap.setOption("child.tcpNoDelay", true);
+ bootstrap.setOption("child.keepAlive", true);
+
+ // Listen to TCP port
+ bootstrap.bind(new InetSocketAddress(port));
+
+ LOG.info("Started listening to TCP requests at port " + port + " for "
+ + rpcProgram + " with workerCount " + workerCount);
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java
new file mode 100644
index 00000000000..71cce18f586
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * Handler used by {@link SimpleTcpServer}.
+ */
+public class SimpleTcpServerHandler extends SimpleChannelHandler {
+ public static final Log LOG = LogFactory.getLog(SimpleTcpServerHandler.class);
+
+ protected final RpcProgram rpcProgram;
+
+ public SimpleTcpServerHandler(RpcProgram rpcProgram) {
+ this.rpcProgram = rpcProgram;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+ XDR request = new XDR(buf.array());
+
+ InetAddress remoteInetAddr = ((InetSocketAddress) ctx.getChannel()
+ .getRemoteAddress()).getAddress();
+ Channel outChannel = e.getChannel();
+ XDR response = rpcProgram.handle(request, remoteInetAddr, outChannel);
+ if (response.size() > 0) {
+ outChannel.write(XDR.writeMessageTcp(response, true));
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ LOG.warn("Encountered ", e.getCause());
+ e.getChannel().close();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java
new file mode 100644
index 00000000000..344dcd78018
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.util.Arrays;
+
+/**
+ * A simple UDP based RPC client which just sends one request to a server.
+ */
+public class SimpleUdpClient {
+ protected final String host;
+ protected final int port;
+ protected final XDR request;
+ protected final boolean oneShot;
+
+ public SimpleUdpClient(String host, int port, XDR request) {
+ this(host, port, request, true);
+ }
+
+ public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot) {
+ this.host = host;
+ this.port = port;
+ this.request = request;
+ this.oneShot = oneShot;
+ }
+
+ public void run() throws IOException {
+ DatagramSocket clientSocket = new DatagramSocket();
+ InetAddress IPAddress = InetAddress.getByName(host);
+ byte[] sendData = request.getBytes();
+ byte[] receiveData = new byte[65535];
+
+ DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
+ IPAddress, port);
+ clientSocket.send(sendPacket);
+ DatagramPacket receivePacket = new DatagramPacket(receiveData,
+ receiveData.length);
+ clientSocket.receive(receivePacket);
+
+ // Check reply status
+ XDR xdr = new XDR();
+ xdr.writeFixedOpaque(Arrays.copyOfRange(receiveData, 0,
+ receivePacket.getLength()));
+ RpcReply reply = RpcReply.read(xdr);
+ if (reply.getState() != RpcReply.ReplyState.MSG_ACCEPTED) {
+ throw new IOException("Request failed: " + reply.getState());
+ }
+
+ clientSocket.close();
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
new file mode 100644
index 00000000000..70bffba66d6
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.DatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+
+/**
+ * Simple UDP server implemented based on netty.
+ */
+public class SimpleUdpServer {
+ public static final Log LOG = LogFactory.getLog(SimpleUdpServer.class);
+ private final int SEND_BUFFER_SIZE = 65536;
+ private final int RECEIVE_BUFFER_SIZE = 65536;
+
+ protected final int port;
+ protected final ChannelPipelineFactory pipelineFactory;
+ protected final RpcProgram rpcProgram;
+ protected final int workerCount;
+
+ public SimpleUdpServer(int port, RpcProgram program, int workerCount) {
+ this.port = port;
+ this.rpcProgram = program;
+ this.workerCount = workerCount;
+ this.pipelineFactory = new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() {
+ return Channels.pipeline(new SimpleUdpServerHandler(rpcProgram));
+ }
+ };
+ }
+
+ public void run() {
+ // Configure the client.
+ DatagramChannelFactory f = new NioDatagramChannelFactory(
+ Executors.newCachedThreadPool(), workerCount);
+
+ ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
+ ChannelPipeline p = b.getPipeline();
+ p.addLast("handler", new SimpleUdpServerHandler(rpcProgram));
+
+ b.setOption("broadcast", "false");
+ b.setOption("sendBufferSize", SEND_BUFFER_SIZE);
+ b.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE);
+
+ // Listen to the UDP port
+ b.bind(new InetSocketAddress(port));
+
+ LOG.info("Started listening to UDP requests at port " + port + " for "
+ + rpcProgram + " with workerCount " + workerCount);
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java
new file mode 100644
index 00000000000..223b7794a2f
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * Handler used by {@link SimpleUdpServer}.
+ */
+public class SimpleUdpServerHandler extends SimpleChannelHandler {
+ public static final Log LOG = LogFactory.getLog(SimpleUdpServerHandler.class);
+ private final RpcProgram rpcProgram;
+
+ public SimpleUdpServerHandler(RpcProgram rpcProgram) {
+ this.rpcProgram = rpcProgram;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+
+ XDR request = new XDR();
+
+ request.writeFixedOpaque(buf.array());
+ InetAddress remoteInetAddr = ((InetSocketAddress) e.getRemoteAddress())
+ .getAddress();
+ XDR response = rpcProgram.handle(request, remoteInetAddr, null);
+ e.getChannel().write(XDR.writeMessageUdp(response), e.getRemoteAddress());
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ LOG.warn("Encountered ", e.getCause());
+ e.getChannel().close();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
new file mode 100644
index 00000000000..40633e286d8
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.io.PrintStream;
+import java.util.Arrays;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Utility class for building XDR messages based on RFC 4506.
+ *
+ * This class maintains a buffer into which java types are written as
+ * XDR types for building XDR messages. Similarly this class can
+ * be used to get java types from an XDR request or response.
+ *
+ * Currently only a subset of XDR types defined in RFC 4506 are supported.
+ */
+public class XDR {
+ private final static String HEXES = "0123456789abcdef";
+
+ /** Internal buffer for reading or writing to */
+ private byte[] bytearr;
+
+ /** Place to read from or write to */
+ private int cursor;
+
+ public XDR() {
+ this(new byte[0]);
+ }
+
+ public XDR(byte[] data) {
+ bytearr = Arrays.copyOf(data, data.length);
+ cursor = 0;
+ }
+
+ /**
+ * @param bytes bytes to be appended to internal buffer
+ */
+ private void append(byte[] bytesToAdd) {
+ bytearr = append(bytearr, bytesToAdd);
+ }
+
+ public int size() {
+ return bytearr.length;
+ }
+
+ /** Skip some bytes by moving the cursor */
+ public void skip(int size) {
+ cursor += size;
+ }
+
+ /**
+ * Write Java primitive integer as XDR signed integer.
+ *
+ * Definition of XDR signed integer from RFC 4506:
+ *
+ * An XDR signed integer is a 32-bit datum that encodes an integer in
+ * the range [-2147483648,2147483647]. The integer is represented in
+ * two's complement notation. The most and least significant bytes are
+ * 0 and 3, respectively. Integers are declared as follows:
+ *
+ * int identifier;
+ *
+ * (MSB) (LSB)
+ * +-------+-------+-------+-------+
+ * |byte 0 |byte 1 |byte 2 |byte 3 | INTEGER
+ * +-------+-------+-------+-------+
+ * <------------32 bits------------>
+ *
+ */
+ public void writeInt(int data) {
+ append(toBytes(data));
+ }
+
+ /**
+ * Read an XDR signed integer and return as Java primitive integer.
+ */
+ public int readInt() {
+ byte byte0 = bytearr[cursor++];
+ byte byte1 = bytearr[cursor++];
+ byte byte2 = bytearr[cursor++];
+ byte byte3 = bytearr[cursor++];
+ return (XDR.toShort(byte0) << 24) + (XDR.toShort(byte1) << 16)
+ + (XDR.toShort(byte2) << 8) + XDR.toShort(byte3);
+ }
+
+ /**
+ * Write Java primitive boolean as an XDR boolean.
+ *
+ * Definition of XDR boolean from RFC 4506:
+ *
+ * Booleans are important enough and occur frequently enough to warrant
+ * their own explicit type in the standard. Booleans are declared as
+ * follows:
+ *
+ * bool identifier;
+ *
+ * This is equivalent to:
+ *
+ * enum { FALSE = 0, TRUE = 1 } identifier;
+ *
+ */
+ public void writeBoolean(boolean data) {
+ this.writeInt(data ? 1 : 0);
+ }
+
+ /**
+ * Read an XDR boolean and return as Java primitive boolean.
+ */
+ public boolean readBoolean() {
+ return readInt() == 0 ? false : true;
+ }
+
+ /**
+ * Write Java primitive long to an XDR signed long.
+ *
+ * Definition of XDR signed long from RFC 4506:
+ *
+ * The standard also defines 64-bit (8-byte) numbers called hyper
+ * integers and unsigned hyper integers. Their representations are the
+ * obvious extensions of integer and unsigned integer defined above.
+ * They are represented in two's complement notation.The most and
+ * least significant bytes are 0 and 7, respectively. Their
+ * declarations:
+ *
+ * hyper identifier; unsigned hyper identifier;
+ *
+ * (MSB) (LSB)
+ * +-------+-------+-------+-------+-------+-------+-------+-------+
+ * |byte 0 |byte 1 |byte 2 |byte 3 |byte 4 |byte 5 |byte 6 |byte 7 |
+ * +-------+-------+-------+-------+-------+-------+-------+-------+
+ * <----------------------------64 bits---------------------------->
+ * HYPER INTEGER
+ * UNSIGNED HYPER INTEGER
+ *
+ */
+ public void writeLongAsHyper(long data) {
+ byte byte0 = (byte) ((data & 0xff00000000000000l) >> 56);
+ byte byte1 = (byte) ((data & 0x00ff000000000000l) >> 48);
+ byte byte2 = (byte) ((data & 0x0000ff0000000000l) >> 40);
+ byte byte3 = (byte) ((data & 0x000000ff00000000l) >> 32);
+ byte byte4 = (byte) ((data & 0x00000000ff000000l) >> 24);
+ byte byte5 = (byte) ((data & 0x0000000000ff0000l) >> 16);
+ byte byte6 = (byte) ((data & 0x000000000000ff00l) >> 8);
+ byte byte7 = (byte) ((data & 0x00000000000000ffl));
+ this.append(new byte[] { byte0, byte1, byte2, byte3, byte4, byte5, byte6, byte7 });
+ }
+
+ /**
+ * Read XDR signed hyper and return as java primitive long.
+ */
+ public long readHyper() {
+ byte byte0 = bytearr[cursor++];
+ byte byte1 = bytearr[cursor++];
+ byte byte2 = bytearr[cursor++];
+ byte byte3 = bytearr[cursor++];
+ byte byte4 = bytearr[cursor++];
+ byte byte5 = bytearr[cursor++];
+ byte byte6 = bytearr[cursor++];
+ byte byte7 = bytearr[cursor++];
+ return ((long) XDR.toShort(byte0) << 56)
+ + ((long) XDR.toShort(byte1) << 48) + ((long) XDR.toShort(byte2) << 40)
+ + ((long) XDR.toShort(byte3) << 32) + ((long) XDR.toShort(byte4) << 24)
+ + ((long) XDR.toShort(byte5) << 16) + ((long) XDR.toShort(byte6) << 8)
+ + XDR.toShort(byte7);
+ }
+
+ /**
+ * Write a Java primitive byte array to XDR fixed-length opaque data.
+ *
+ * Defintion of fixed-length opaque data from RFC 4506:
+ *
+ * At times, fixed-length uninterpreted data needs to be passed among
+ * machines. This data is called "opaque" and is declared as follows:
+ *
+ * opaque identifier[n];
+ *
+ * where the constant n is the (static) number of bytes necessary to
+ * contain the opaque data. If n is not a multiple of four, then the n
+ * bytes are followed by enough (0 to 3) residual zero bytes, r, to make
+ * the total byte count of the opaque object a multiple of four.
+ *
+ * 0 1 ...
+ * +--------+--------+...+--------+--------+...+--------+
+ * | byte 0 | byte 1 |...|byte n-1| 0 |...| 0 |
+ * +--------+--------+...+--------+--------+...+--------+
+ * |<-----------n bytes---------->|<------r bytes------>|
+ * |<-----------n+r (where (n+r) mod 4 = 0)------------>|
+ * FIXED-LENGTH OPAQUE
+ *
+ */
+ public void writeFixedOpaque(byte[] data) {
+ writeFixedOpaque(data, data.length);
+ }
+
+ public void writeFixedOpaque(byte[] data, int length) {
+ append(Arrays.copyOf(data, length + XDR.pad(length, 4)));
+ }
+
+ public byte[] readFixedOpaque(int size) {
+ byte[] ret = new byte[size];
+ for(int i = 0; i < size; i++) {
+ ret[i] = bytearr[cursor];
+ cursor++;
+ }
+
+ for(int i = 0; i < XDR.pad(size, 4); i++) {
+ cursor++;
+ }
+ return ret;
+ }
+
+ /**
+ * Write a Java primitive byte array as XDR variable-length opque data.
+ *
+ * Definition of XDR variable-length opaque data RFC 4506:
+ *
+ *
+ * The standard also provides for variable-length (counted) opaque data,
+ * defined as a sequence of n (numbered 0 through n-1) arbitrary bytes
+ * to be the number n encoded as an unsigned integer (as described
+ * below), and followed by the n bytes of the sequence.
+ *
+ * Byte m of the sequence always precedes byte m+1 of the sequence, and
+ * byte 0 of the sequence always follows the sequence's length (count).
+ * If n is not a multiple of four, then the n bytes are followed by
+ * enough (0 to 3) residual zero bytes, r, to make the total byte count
+ * a multiple of four. Variable-length opaque data is declared in the
+ * following way:
+ *
+ * opaque identifier;
+ * or
+ * opaque identifier<>;
+ *
+ * The constant m denotes an upper bound of the number of bytes that the
+ * sequence may contain. If m is not specified, as in the second
+ * declaration, it is assumed to be (2**32) - 1, the maximum length.
+ *
+ * The constant m would normally be found in a protocol specification.
+ * For example, a filing protocol may state that the maximum data
+ * transfer size is 8192 bytes, as follows:
+ *
+ * opaque filedata<8192>;
+ *
+ * 0 1 2 3 4 5 ...
+ * +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
+ * | length n |byte0|byte1|...| n-1 | 0 |...| 0 |
+ * +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
+ * |<-------4 bytes------->|<------n bytes------>|<---r bytes--->|
+ * |<----n+r (where (n+r) mod 4 = 0)---->|
+ * VARIABLE-LENGTH OPAQUE
+ *
+ * It is an error to encode a length greater than the maximum described
+ * in the specification.
+ *
+ */
+ public void writeVariableOpaque(byte[] data) {
+ this.writeInt(data.length);
+ this.writeFixedOpaque(data);
+ }
+
+ public byte[] readVariableOpaque() {
+ int size = this.readInt();
+ return size != 0 ? this.readFixedOpaque(size) : null;
+ }
+
+ public void skipVariableOpaque() {
+ int length= this.readInt();
+ this.skip(length+XDR.pad(length, 4));
+ }
+
+ /**
+ * Write Java String as XDR string.
+ *
+ * Definition of XDR string from RFC 4506:
+ *
+ *
+ * The standard defines a string of n (numbered 0 through n-1) ASCII
+ * bytes to be the number n encoded as an unsigned integer (as described
+ * above), and followed by the n bytes of the string. Byte m of the
+ * string always precedes byte m+1 of the string, and byte 0 of the
+ * string always follows the string's length. If n is not a multiple of
+ * four, then the n bytes are followed by enough (0 to 3) residual zero
+ * bytes, r, to make the total byte count a multiple of four. Counted
+ * byte strings are declared as follows:
+ *
+ * string object;
+ * or
+ * string object<>;
+ *
+ * The constant m denotes an upper bound of the number of bytes that a
+ * string may contain. If m is not specified, as in the second
+ * declaration, it is assumed to be (2**32) - 1, the maximum length.
+ * The constant m would normally be found in a protocol specification.
+ * For example, a filing protocol may state that a file name can be no
+ * longer than 255 bytes, as follows:
+ *
+ * string filename<255>;
+ *
+ * 0 1 2 3 4 5 ...
+ * +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
+ * | length n |byte0|byte1|...| n-1 | 0 |...| 0 |
+ * +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
+ * |<-------4 bytes------->|<------n bytes------>|<---r bytes--->|
+ * |<----n+r (where (n+r) mod 4 = 0)---->|
+ * STRING
+ * It is an error to encode a length greater than the maximum described
+ * in the specification.
+ *
+ */
+ public void writeString(String data) {
+ this.writeVariableOpaque(data.getBytes());
+ }
+
+ public String readString() {
+ return new String(this.readVariableOpaque());
+ }
+
+ public void dump(PrintStream out) {
+ for(int i = 0; i < bytearr.length; i += 4) {
+ out.println(hex(bytearr[i]) + " " + hex(bytearr[i + 1]) + " "
+ + hex(bytearr[i + 2]) + " " + hex(bytearr[i + 3]));
+ }
+ }
+
+ @VisibleForTesting
+ public byte[] getBytes() {
+ return Arrays.copyOf(bytearr, bytearr.length);
+ }
+
+ public static byte[] append(byte[] bytes, byte[] bytesToAdd) {
+ byte[] newByteArray = new byte[bytes.length + bytesToAdd.length];
+ System.arraycopy(bytes, 0, newByteArray, 0, bytes.length);
+ System.arraycopy(bytesToAdd, 0, newByteArray, bytes.length, bytesToAdd.length);
+ return newByteArray;
+ }
+
+ private static int pad(int x, int y) {
+ return x % y == 0 ? 0 : y - (x % y);
+ }
+
+ static byte[] toBytes(int n) {
+ byte[] ret = { (byte) ((n & 0xff000000) >> 24),
+ (byte) ((n & 0x00ff0000) >> 16), (byte) ((n & 0x0000ff00) >> 8),
+ (byte) (n & 0x000000ff) };
+ return ret;
+ }
+
+ private static short toShort(byte b) {
+ return b < 0 ? (short) (b + 256): (short) b;
+ }
+
+ private static String hex(byte b) {
+ return "" + HEXES.charAt((b & 0xF0) >> 4) + HEXES.charAt((b & 0x0F));
+ }
+
+ private static byte[] recordMark(int size, boolean last) {
+ return toBytes(!last ? size : size | 0x80000000);
+ }
+
+ public static byte[] getVariableOpque(byte[] data) {
+ byte[] bytes = toBytes(data.length);
+ return append(bytes, Arrays.copyOf(data, data.length + XDR.pad(data.length, 4)));
+ }
+
+ public static int fragmentSize(byte[] mark) {
+ int n = (XDR.toShort(mark[0]) << 24) + (XDR.toShort(mark[1]) << 16)
+ + (XDR.toShort(mark[2]) << 8) + XDR.toShort(mark[3]);
+ return n & 0x7fffffff;
+ }
+
+ public static boolean isLastFragment(byte[] mark) {
+ int n = (XDR.toShort(mark[0]) << 24) + (XDR.toShort(mark[1]) << 16)
+ + (XDR.toShort(mark[2]) << 8) + XDR.toShort(mark[3]);
+ return (n & 0x80000000) != 0;
+ }
+
+ /** check if the rest of data has more than bytes */
+ public static boolean verifyLength(XDR xdr, int len) {
+ return (xdr.bytearr.length - xdr.cursor) >= len;
+ }
+
+ /** Write an XDR message to a TCP ChannelBuffer */
+ public static ChannelBuffer writeMessageTcp(XDR request, boolean last) {
+ byte[] fragmentHeader = XDR.recordMark(request.bytearr.length, last);
+ ChannelBuffer outBuf = ChannelBuffers.buffer(fragmentHeader.length
+ + request.bytearr.length);
+ outBuf.writeBytes(fragmentHeader);
+ outBuf.writeBytes(request.bytearr);
+ return outBuf;
+ }
+
+ /** Write an XDR message to a UDP ChannelBuffer */
+ public static ChannelBuffer writeMessageUdp(XDR response) {
+ ChannelBuffer outBuf = ChannelBuffers.buffer(response.bytearr.length);
+ outBuf.writeBytes(response.bytearr);
+ return outBuf;
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java
new file mode 100644
index 00000000000..6a3e86c13da
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.portmap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.SimpleTcpServer;
+import org.apache.hadoop.oncrpc.SimpleUdpServer;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Portmap service for binding RPC protocols. See RFC 1833 for details.
+ */
+public class Portmap {
+ public static final Log LOG = LogFactory.getLog(Portmap.class);
+
+ private static void startUDPServer(RpcProgramPortmap rpcProgram) {
+ rpcProgram.register(PortmapMapping.TRANSPORT_UDP);
+ SimpleUdpServer udpServer = new SimpleUdpServer(RpcProgram.RPCB_PORT,
+ rpcProgram, 1);
+ udpServer.run();
+ }
+
+ private static void startTCPServer(final RpcProgramPortmap rpcProgram) {
+ rpcProgram.register(PortmapMapping.TRANSPORT_TCP);
+ SimpleTcpServer tcpServer = new SimpleTcpServer(RpcProgram.RPCB_PORT,
+ rpcProgram, 1);
+ tcpServer.run();
+ }
+
+ public static void main(String[] args) {
+ StringUtils.startupShutdownMessage(Portmap.class, args, LOG);
+ RpcProgramPortmap program = new RpcProgramPortmap();
+ try {
+ startUDPServer(program);
+ startTCPServer(program);
+ } catch (Throwable e) {
+ LOG.fatal("Start server failure");
+ System.exit(-1);
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapInterface.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapInterface.java
new file mode 100644
index 00000000000..d0f9af11e0a
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapInterface.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.portmap;
+
+import org.apache.hadoop.oncrpc.XDR;
+
+/**
+ * Methods that need to be implemented to provide Portmap RPC program.
+ * See RFC 1833 for details.
+ */
+public interface PortmapInterface {
+ public enum Procedure {
+ PMAPPROC_NULL(0),
+ PMAPPROC_SET(1),
+ PMAPPROC_UNSET(2),
+ PMAPPROC_GETPORT(3),
+ PMAPPROC_DUMP(4),
+ PMAPPROC_CALLIT(5),
+ PMAPPROC_GETTIME(6),
+ PMAPPROC_UADDR2TADDR(7),
+ PMAPPROC_TADDR2UADDR(8),
+ PMAPPROC_GETVERSADDR(9),
+ PMAPPROC_INDIRECT(10),
+ PMAPPROC_GETADDRLIST(11),
+ PMAPPROC_GETSTAT(12);
+
+ private final int value;
+
+ Procedure(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ public static Procedure fromValue(int value) {
+ return values()[value];
+ }
+ }
+
+ /**
+ * This procedure does no work. By convention, procedure zero of any protocol
+ * takes no parameters and returns no results.
+ */
+ public XDR nullOp(int xidd, XDR in, XDR out);
+
+ /**
+ * When a program first becomes available on a machine, it registers itself
+ * with the port mapper program on the same machine. The program passes its
+ * program number "prog", version number "vers", transport protocol number
+ * "prot", and the port "port" on which it awaits service request. The
+ * procedure returns a boolean reply whose value is "TRUE" if the procedure
+ * successfully established the mapping and "FALSE" otherwise. The procedure
+ * refuses to establish a mapping if one already exists for the tuple
+ * "(prog, vers, prot)".
+ */
+ public XDR set(int xid, XDR in, XDR out);
+
+ /**
+ * When a program becomes unavailable, it should unregister itself with the
+ * port mapper program on the same machine. The parameters and results have
+ * meanings identical to those of "PMAPPROC_SET". The protocol and port number
+ * fields of the argument are ignored.
+ */
+ public XDR unset(int xid, XDR in, XDR out);
+
+ /**
+ * Given a program number "prog", version number "vers", and transport
+ * protocol number "prot", this procedure returns the port number on which the
+ * program is awaiting call requests. A port value of zeros means the program
+ * has not been registered. The "port" field of the argument is ignored.
+ */
+ public XDR getport(int xid, XDR in, XDR out);
+
+ /**
+ * This procedure enumerates all entries in the port mapper's database. The
+ * procedure takes no parameters and returns a list of program, version,
+ * protocol, and port values.
+ */
+ public XDR dump(int xid, XDR in, XDR out);
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapMapping.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapMapping.java
new file mode 100644
index 00000000000..f73ab02c60f
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapMapping.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.portmap;
+
+import org.apache.hadoop.oncrpc.XDR;
+
+/**
+ * Represents a mapping entry for in the Portmap service for binding RPC
+ * protocols. See RFC 1833 for details.
+ *
+ * This maps a program to a port number.
+ */
+public class PortmapMapping {
+ public static final int TRANSPORT_TCP = 6;
+ public static final int TRANSPORT_UDP = 17;
+
+ private final int program;
+ private final int version;
+ private final int transport;
+ private final int port;
+
+ public PortmapMapping(int program, int version, int transport, int port) {
+ this.program = program;
+ this.version = version;
+ this.transport = transport;
+ this.port = port;
+ }
+
+ public XDR serialize(XDR xdr) {
+ xdr.writeInt(program);
+ xdr.writeInt(version);
+ xdr.writeInt(transport);
+ xdr.writeInt(port);
+ return xdr;
+ }
+
+ public static PortmapMapping deserialize(XDR xdr) {
+ return new PortmapMapping(xdr.readInt(), xdr.readInt(), xdr.readInt(),
+ xdr.readInt());
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public static String key(PortmapMapping mapping) {
+ return mapping.program + " " + mapping.version + " " + mapping.transport;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("(PortmapMapping-%d:%d:%d:%d)", program, version,
+ transport, port);
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java
new file mode 100644
index 00000000000..11da7d44dc2
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.portmap;
+
+import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcUtil;
+import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.portmap.PortmapInterface.Procedure;
+
+/**
+ * Helper utility for building portmap request
+ */
+public class PortmapRequest {
+ public static PortmapMapping mapping(XDR xdr) {
+ return PortmapMapping.deserialize(xdr);
+ }
+
+ public static XDR create(PortmapMapping mapping) {
+ XDR request = new XDR();
+ RpcCall.write(request,
+ RpcUtil.getNewXid(String.valueOf(RpcProgramPortmap.PROGRAM)),
+ RpcProgramPortmap.PROGRAM, RpcProgramPortmap.VERSION,
+ Procedure.PMAPPROC_SET.getValue());
+ request.writeInt(AuthFlavor.AUTH_NONE.getValue());
+ request.writeInt(0);
+ request.writeInt(0);
+ request.writeInt(0);
+ return mapping.serialize(request);
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java
new file mode 100644
index 00000000000..f650a74994e
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.portmap;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.hadoop.oncrpc.RpcAcceptedReply;
+import org.apache.hadoop.oncrpc.XDR;
+
+/**
+ * Helper utility for sending portmap response.
+ */
+public class PortmapResponse {
+ public static XDR voidReply(XDR xdr, int xid) {
+ RpcAcceptedReply.voidReply(xdr, xid);
+ return xdr;
+ }
+
+ public static XDR intReply(XDR xdr, int xid, int value) {
+ RpcAcceptedReply.voidReply(xdr, xid);
+ xdr.writeInt(value);
+ return xdr;
+ }
+
+ public static XDR booleanReply(XDR xdr, int xid, boolean value) {
+ RpcAcceptedReply.voidReply(xdr, xid);
+ xdr.writeBoolean(value);
+ return xdr;
+ }
+
+ public static XDR pmapList(XDR xdr, int xid, Collection list) {
+ RpcAcceptedReply.voidReply(xdr, xid);
+ for (PortmapMapping mapping : list) {
+ System.out.println(mapping);
+ xdr.writeBoolean(true); // Value follows
+ mapping.serialize(xdr);
+ }
+ xdr.writeBoolean(false); // No value follows
+ return xdr;
+ }
+
+ public static XDR pmapList(XDR xdr, int xid, PortmapMapping[] list) {
+ return pmapList(xdr, xid, Arrays.asList(list));
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
new file mode 100644
index 00000000000..cbf381296a7
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.portmap;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.oncrpc.RpcAcceptedReply;
+import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.XDR;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * An rpcbind request handler.
+ */
+public class RpcProgramPortmap extends RpcProgram implements PortmapInterface {
+ public static final int PROGRAM = 100000;
+ public static final int VERSION = 2;
+
+ private static final Log LOG = LogFactory.getLog(RpcProgramPortmap.class);
+
+ /** Map synchronized usis monitor lock of this instance */
+ private final HashMap map;
+
+ public RpcProgramPortmap() {
+ super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION, 0);
+ map = new HashMap(256);
+ }
+
+ /** Dump all the register RPC services */
+ private synchronized void dumpRpcServices() {
+ Set> entrySet = map.entrySet();
+ for (Entry entry : entrySet) {
+ LOG.info("Service: " + entry.getKey() + " portmapping: "
+ + entry.getValue());
+ }
+ }
+
+ @Override
+ public XDR nullOp(int xid, XDR in, XDR out) {
+ return PortmapResponse.voidReply(out, xid);
+ }
+
+ @Override
+ public XDR set(int xid, XDR in, XDR out) {
+ PortmapMapping mapping = PortmapRequest.mapping(in);
+ String key = PortmapMapping.key(mapping);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Portmap set key=" + key);
+ }
+
+ PortmapMapping value = null;
+ synchronized(this) {
+ map.put(key, mapping);
+ dumpRpcServices();
+ value = map.get(key);
+ }
+ return PortmapResponse.intReply(out, xid, value.getPort());
+ }
+
+ @Override
+ public synchronized XDR unset(int xid, XDR in, XDR out) {
+ PortmapMapping mapping = PortmapRequest.mapping(in);
+ synchronized(this) {
+ map.remove(PortmapMapping.key(mapping));
+ }
+ return PortmapResponse.booleanReply(out, xid, true);
+ }
+
+ @Override
+ public synchronized XDR getport(int xid, XDR in, XDR out) {
+ PortmapMapping mapping = PortmapRequest.mapping(in);
+ String key = PortmapMapping.key(mapping);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Portmap GETPORT key=" + key + " " + mapping);
+ }
+ PortmapMapping value = null;
+ synchronized(this) {
+ value = map.get(key);
+ }
+ int res = 0;
+ if (value != null) {
+ res = value.getPort();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found mapping for key: " + key + " port:" + res);
+ }
+ } else {
+ LOG.warn("Warning, no mapping for key: " + key);
+ }
+ return PortmapResponse.intReply(out, xid, res);
+ }
+
+ @Override
+ public synchronized XDR dump(int xid, XDR in, XDR out) {
+ PortmapMapping[] pmapList = null;
+ synchronized(this) {
+ pmapList = new PortmapMapping[map.values().size()];
+ map.values().toArray(pmapList);
+ }
+ return PortmapResponse.pmapList(out, xid, pmapList);
+ }
+
+ @Override
+ public void register(PortmapMapping mapping) {
+ String key = PortmapMapping.key(mapping);
+ synchronized(this) {
+ map.put(key, mapping);
+ }
+ }
+
+ @Override
+ public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
+ InetAddress client, Channel channel) {
+ Procedure procedure = Procedure.fromValue(rpcCall.getProcedure());
+ int xid = rpcCall.getXid();
+ switch (procedure) {
+ case PMAPPROC_NULL:
+ out = nullOp(xid, in, out);
+ break;
+ case PMAPPROC_SET:
+ out = set(xid, in, out);
+ break;
+ case PMAPPROC_UNSET:
+ out = unset(xid, in, out);
+ break;
+ case PMAPPROC_DUMP:
+ out = dump(xid, in, out);
+ break;
+ case PMAPPROC_GETPORT:
+ out = getport(xid, in, out);
+ break;
+ case PMAPPROC_GETVERSADDR:
+ out = getport(xid, in, out);
+ break;
+ default:
+ LOG.info("PortmapHandler unknown rpc procedure=" + procedure);
+ RpcAcceptedReply.voidReply(out, xid,
+ RpcAcceptedReply.AcceptState.PROC_UNAVAIL);
+ }
+ return out;
+ }
+
+ @Override
+ protected boolean isIdempotent(RpcCall call) {
+ return false;
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
new file mode 100644
index 00000000000..fab80ddcb65
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.oncrpc;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+
+import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestFrameDecoder {
+
+ private static int port = 12345; // some random server port
+ private static XDR result = null;
+
+ static void testRequest(XDR request) {
+ SimpleTcpClient tcpClient = new SimpleTcpClient("localhost", port, request,
+ true);
+ tcpClient.run();
+ }
+
+ static class TestRpcProgram extends RpcProgram {
+
+ protected TestRpcProgram(String program, String host, int port,
+ int progNumber, int lowProgVersion, int highProgVersion, int cacheSize) {
+ super(program, host, port, progNumber, lowProgVersion, highProgVersion,
+ cacheSize);
+ }
+
+ @Override
+ public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
+ InetAddress client, Channel channel) {
+ // Get the final complete request and return a void response.
+ result = in;
+ return RpcAcceptedReply.voidReply(out, 1234);
+ }
+
+ @Override
+ protected boolean isIdempotent(RpcCall call) {
+ return false;
+ }
+ }
+
+ @Test
+ public void testSingleFrame() {
+ RpcFrameDecoder decoder = new RpcFrameDecoder();
+
+ // Test "Length field is not received yet"
+ ByteBuffer buffer = ByteBuffer.allocate(1);
+ ChannelBuffer buf = new ByteBufferBackedChannelBuffer(buffer);
+ ChannelBuffer channelBuffer = (ChannelBuffer) decoder.decode(
+ Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
+ buf);
+ assertTrue(channelBuffer == null);
+
+ // Test all bytes are not received yet
+ byte[] fragment = new byte[4 + 9];
+ fragment[0] = (byte) (1 << 7); // final fragment
+ fragment[1] = 0;
+ fragment[2] = 0;
+ fragment[3] = (byte) 10; // fragment size = 10 bytes
+ assertTrue(XDR.isLastFragment(fragment));
+ assertTrue(XDR.fragmentSize(fragment)==10);
+
+ buffer = ByteBuffer.allocate(4 + 9);
+ buffer.put(fragment);
+ buffer.flip();
+ buf = new ByteBufferBackedChannelBuffer(buffer);
+ channelBuffer = (ChannelBuffer) decoder.decode(
+ Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
+ buf);
+ assertTrue(channelBuffer == null);
+ }
+
+ @Test
+ public void testMultipleFrames() {
+ RpcFrameDecoder decoder = new RpcFrameDecoder();
+
+ // Test multiple frames
+ byte[] fragment1 = new byte[4 + 10];
+ fragment1[0] = 0; // not final fragment
+ fragment1[1] = 0;
+ fragment1[2] = 0;
+ fragment1[3] = (byte) 10; // fragment size = 10 bytes
+ assertFalse(XDR.isLastFragment(fragment1));
+ assertTrue(XDR.fragmentSize(fragment1)==10);
+
+ // decoder should wait for the final fragment
+ ByteBuffer buffer = ByteBuffer.allocate(4 + 10);
+ buffer.put(fragment1);
+ buffer.flip();
+ ChannelBuffer buf = new ByteBufferBackedChannelBuffer(buffer);
+ ChannelBuffer channelBuffer = (ChannelBuffer) decoder.decode(
+ Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
+ buf);
+ assertTrue(channelBuffer == null);
+
+ byte[] fragment2 = new byte[4 + 10];
+ fragment2[0] = (byte) (1 << 7); // final fragment
+ fragment2[1] = 0;
+ fragment2[2] = 0;
+ fragment2[3] = (byte) 10; // fragment size = 10 bytes
+ assertTrue(XDR.isLastFragment(fragment2));
+ assertTrue(XDR.fragmentSize(fragment2)==10);
+
+ buffer = ByteBuffer.allocate(4 + 10);
+ buffer.put(fragment2);
+ buffer.flip();
+ buf = new ByteBufferBackedChannelBuffer(buffer);
+ channelBuffer = (ChannelBuffer) decoder.decode(
+ Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
+ buf);
+ assertTrue(channelBuffer != null);
+ // Complete frame should have to total size 10+10=20
+ assertTrue(channelBuffer.array().length == 20);
+ }
+
+ @Test
+ public void testFrames() {
+
+ RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram",
+ "localhost", port, 100000, 1, 2, 100);
+ SimpleTcpServer tcpServer = new SimpleTcpServer(port, program, 1);
+ tcpServer.run();
+
+ XDR xdrOut = createGetportMount();
+ int bufsize = 2 * 1024 * 1024;
+ byte[] buffer = new byte[bufsize];
+ xdrOut.writeFixedOpaque(buffer);
+ int requestSize = xdrOut.size();
+
+ // Send the request to the server
+ testRequest(xdrOut);
+
+ // Verify the server got the request with right size
+ assertTrue(requestSize == result.size());
+ }
+
+ static void createPortmapXDRheader(XDR xdr_out, int procedure) {
+ // Make this a method
+ RpcCall.write(xdr_out, 0, 100000, 2, procedure);
+ }
+
+ static XDR createGetportMount() {
+ XDR xdr_out = new XDR();
+ createPortmapXDRheader(xdr_out, 3);
+ xdr_out.writeInt(0); // AUTH_NULL
+ xdr_out.writeInt(0); // cred len
+ xdr_out.writeInt(0); // verifier AUTH_NULL
+ xdr_out.writeInt(0); // verf len
+ return xdr_out;
+ }
+ /*
+ * static void testGetport() { XDR xdr_out = new XDR();
+ *
+ * createPortmapXDRheader(xdr_out, 3);
+ *
+ * xdr_out.writeInt(100003); xdr_out.writeInt(3); xdr_out.writeInt(6);
+ * xdr_out.writeInt(0);
+ *
+ * XDR request2 = new XDR();
+ *
+ * createPortmapXDRheader(xdr_out, 3); request2.writeInt(100003);
+ * request2.writeInt(3); request2.writeInt(6); request2.writeInt(0);
+ *
+ * testRequest(xdr_out); }
+ *
+ * static void testDump() { XDR xdr_out = new XDR();
+ * createPortmapXDRheader(xdr_out, 4); testRequest(xdr_out); }
+ */
+}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java
new file mode 100644
index 00000000000..fbbeb0744cc
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
+import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+import org.apache.hadoop.oncrpc.RpcReply.ReplyState;
+import org.junit.Test;
+
+/**
+ * Test for {@link RpcAcceptedReply}
+ */
+public class TestRpcAcceptedReply {
+ @Test
+ public void testAcceptState() {
+ assertEquals(AcceptState.SUCCESS, AcceptState.fromValue(0));
+ assertEquals(AcceptState.PROG_UNAVAIL, AcceptState.fromValue(1));
+ assertEquals(AcceptState.PROG_MISMATCH, AcceptState.fromValue(2));
+ assertEquals(AcceptState.PROC_UNAVAIL, AcceptState.fromValue(3));
+ assertEquals(AcceptState.GARBAGE_ARGS, AcceptState.fromValue(4));
+ assertEquals(AcceptState.SYSTEM_ERR, AcceptState.fromValue(5));
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testAcceptStateFromInvalidValue() {
+ AcceptState.fromValue(6);
+ }
+
+ @Test
+ public void testConstructor() {
+ RpcAuthInfo verifier = new RpcAuthInfo(AuthFlavor.AUTH_NONE, new byte[0]);
+ RpcAcceptedReply reply = new RpcAcceptedReply(0, RpcMessage.RPC_REPLY,
+ ReplyState.MSG_ACCEPTED, verifier, AcceptState.SUCCESS);
+ assertEquals(0, reply.getXid());
+ assertEquals(RpcMessage.RPC_REPLY, reply.getMessageType());
+ assertEquals(ReplyState.MSG_ACCEPTED, reply.getState());
+ assertEquals(verifier, reply.getVerifier());
+ assertEquals(AcceptState.SUCCESS, reply.getAcceptState());
+ }
+}
+
diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthInfo.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthInfo.java
new file mode 100644
index 00000000000..0b8240bc605
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthInfo.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+import org.junit.Test;
+
+/**
+ * Tests for {@link RpcAuthInfo}
+ */
+public class TestRpcAuthInfo {
+ @Test
+ public void testAuthFlavor() {
+ assertEquals(AuthFlavor.AUTH_NONE, AuthFlavor.fromValue(0));
+ assertEquals(AuthFlavor.AUTH_SYS, AuthFlavor.fromValue(1));
+ assertEquals(AuthFlavor.AUTH_SHORT, AuthFlavor.fromValue(2));
+ assertEquals(AuthFlavor.AUTH_DH, AuthFlavor.fromValue(3));
+ assertEquals(AuthFlavor.RPCSEC_GSS, AuthFlavor.fromValue(6));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testInvalidAuthFlavor() {
+ assertEquals(AuthFlavor.AUTH_NONE, AuthFlavor.fromValue(4));
+ }
+
+ @Test
+ public void testConsturctor() {
+ byte[] body = new byte[0];
+ RpcAuthInfo auth = new RpcAuthInfo(AuthFlavor.AUTH_NONE, body);
+ assertEquals(AuthFlavor.AUTH_NONE, auth.getFlavor());
+ assertTrue(Arrays.equals(body, auth.getBody()));
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthSys.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthSys.java
new file mode 100644
index 00000000000..474a1f73789
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthSys.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+/**
+ * Test for {@link RpcAuthSys}
+ */
+public class TestRpcAuthSys {
+ @Test
+ public void testConstructor() {
+ RpcAuthSys auth = new RpcAuthSys(0, 1);
+ assertEquals(0, auth.getUid());
+ assertEquals(1, auth.getGid());
+ }
+
+ @Test
+ public void testRead() {
+ byte[] bytes = {0, 1, 2, 3}; // 4 bytes Stamp
+ bytes = XDR.append(bytes, XDR.getVariableOpque(new byte[0]));
+ bytes = XDR.append(bytes, XDR.toBytes(0)); // gid
+ bytes = XDR.append(bytes, XDR.toBytes(1)); // uid
+ RpcAuthSys auth = RpcAuthSys.from(bytes);
+ assertEquals(0, auth.getUid());
+ assertEquals(1, auth.getGid());
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java
new file mode 100644
index 00000000000..e3ae2edc40f
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+/**
+ * Tests for {@link RpcCall}
+ */
+public class TestRpcCall {
+
+ @Test
+ public void testConstructor() {
+ RpcAuthInfo credential = new RpcAuthInfo(AuthFlavor.AUTH_NONE, new byte[0]);
+ RpcAuthInfo verifier = new RpcAuthInfo(AuthFlavor.AUTH_NONE, new byte[0]);
+ int rpcVersion = RpcCall.RPC_VERSION;
+ int program = 2;
+ int version = 3;
+ int procedure = 4;
+ RpcCall call = new RpcCall(0, RpcMessage.RPC_CALL, rpcVersion, program, version, procedure, credential, verifier);
+ assertEquals(0, call.getXid());
+ assertEquals(RpcMessage.RPC_CALL, call.getMessageType());
+ assertEquals(rpcVersion, call.getRpcVersion());
+ assertEquals(program, call.getProgram());
+ assertEquals(version, call.getVersion());
+ assertEquals(procedure, call.getProcedure());
+ assertEquals(credential, call.getCredential());
+ assertEquals(verifier, call.getVerifier());
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testInvalidRpcVersion() {
+ int invalidRpcVersion = 3;
+ new RpcCall(0, RpcMessage.RPC_CALL, invalidRpcVersion, 2, 3, 4, null, null);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testInvalidRpcMessageType() {
+ int invalidMessageType = 3; // Message typ is not RpcMessage.RPC_CALL
+ new RpcCall(0, invalidMessageType, RpcCall.RPC_VERSION, 2, 3, 4, null, null);
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java
new file mode 100644
index 00000000000..f605fc20540
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry;
+import org.apache.hadoop.oncrpc.RpcCallCache.ClientRequest;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link RpcCallCache}
+ */
+public class TestRpcCallCache {
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testRpcCallCacheConstructorIllegalArgument0(){
+ new RpcCallCache("test", 0);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testRpcCallCacheConstructorIllegalArgumentNegative(){
+ new RpcCallCache("test", -1);
+ }
+
+ @Test
+ public void testRpcCallCacheConstructor(){
+ RpcCallCache cache = new RpcCallCache("test", 100);
+ assertEquals("test", cache.getProgram());
+ }
+
+ @Test
+ public void testAddRemoveEntries() throws UnknownHostException {
+ RpcCallCache cache = new RpcCallCache("test", 100);
+ InetAddress clientIp = InetAddress.getByName("1.1.1.1");
+ int xid = 100;
+
+ // Ensure null is returned when there is no entry in the cache
+ // An entry is added to indicate the request is in progress
+ CacheEntry e = cache.checkOrAddToCache(clientIp, xid);
+ assertNull(e);
+ e = cache.checkOrAddToCache(clientIp, xid);
+ validateInprogressCacheEntry(e);
+
+ // Set call as completed
+ XDR response = new XDR();
+ cache.callCompleted(clientIp, xid, response);
+ e = cache.checkOrAddToCache(clientIp, xid);
+ validateCompletedCacheEntry(e, response);
+ }
+
+ private void validateInprogressCacheEntry(CacheEntry c) {
+ assertTrue(c.isInProgress());
+ assertFalse(c.isCompleted());
+ assertNull(c.getResponse());
+ }
+
+ private void validateCompletedCacheEntry(CacheEntry c, XDR response) {
+ assertFalse(c.isInProgress());
+ assertTrue(c.isCompleted());
+ assertEquals(response, c.getResponse());
+ }
+
+ @Test
+ public void testCacheEntry() {
+ CacheEntry c = new CacheEntry();
+ validateInprogressCacheEntry(c);
+ assertTrue(c.isInProgress());
+ assertFalse(c.isCompleted());
+ assertNull(c.getResponse());
+
+ XDR response = new XDR();
+ c.setResponse(response);
+ validateCompletedCacheEntry(c, response);
+ }
+
+ @Test
+ public void testCacheFunctionality() throws UnknownHostException {
+ RpcCallCache cache = new RpcCallCache("Test", 10);
+
+ // Add 20 entries to the cache and only last 10 should be retained
+ int size = 0;
+ for (int clientId = 0; clientId < 20; clientId++) {
+ InetAddress clientIp = InetAddress.getByName("1.1.1."+clientId);
+ System.out.println("Adding " + clientIp);
+ cache.checkOrAddToCache(clientIp, 0);
+ size = Math.min(++size, 10);
+ System.out.println("Cache size " + cache.size());
+ assertEquals(size, cache.size()); // Ensure the cache size is correct
+
+ // Ensure the cache entries are correct
+ int startEntry = Math.max(clientId - 10 + 1, 0);
+ Iterator> iterator = cache.iterator();
+ for (int i = 0; i < size; i++) {
+ ClientRequest key = iterator.next().getKey();
+ System.out.println("Entry " + key.getClientId());
+ assertEquals(InetAddress.getByName("1.1.1." + (startEntry + i)),
+ key.getClientId());
+ }
+
+ // Ensure cache entries are returned as in progress.
+ for (int i = 0; i < size; i++) {
+ CacheEntry e = cache.checkOrAddToCache(
+ InetAddress.getByName("1.1.1." + (startEntry + i)), 0);
+ assertNotNull(e);
+ assertTrue(e.isInProgress());
+ assertFalse(e.isCompleted());
+ }
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java
new file mode 100644
index 00000000000..669ec9ad5f4
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.hadoop.oncrpc.RpcDeniedReply.RejectState;
+import org.apache.hadoop.oncrpc.RpcReply.ReplyState;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link RpcDeniedReply}
+ */
+public class TestRpcDeniedReply {
+ @Test
+ public void testRejectStateFromValue() {
+ Assert.assertEquals(RejectState.RPC_MISMATCH, RejectState.fromValue(0));
+ Assert.assertEquals(RejectState.AUTH_ERROR, RejectState.fromValue(1));
+ }
+
+ @Test(expected=IndexOutOfBoundsException.class)
+ public void testRejectStateFromInvalidValue1() {
+ RejectState.fromValue(2);
+ }
+
+ @Test
+ public void testConstructor() {
+ RpcDeniedReply reply = new RpcDeniedReply(0, RpcMessage.RPC_REPLY,
+ ReplyState.MSG_ACCEPTED, RejectState.AUTH_ERROR) {
+ // Anonymous class
+ };
+ Assert.assertEquals(0, reply.getXid());
+ Assert.assertEquals(RpcMessage.RPC_REPLY, reply.getMessageType());
+ Assert.assertEquals(ReplyState.MSG_ACCEPTED, reply.getState());
+ Assert.assertEquals(RejectState.AUTH_ERROR, reply.getRejectState());
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java
new file mode 100644
index 00000000000..893df7786dd
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link RpcMessage}
+ */
+public class TestRpcMessage {
+ private RpcMessage getRpcMessage(int xid, int msgType) {
+ return new RpcMessage(xid, msgType) {
+ // Anonymous class
+ };
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testInvalidMessageType() {
+ int invalidMsgType = 2; // valid values are 0 and 1
+ getRpcMessage(0, invalidMsgType);
+ }
+
+ @Test
+ public void testRpcMessage() {
+ RpcMessage msg = getRpcMessage(0, RpcMessage.RPC_CALL);
+ Assert.assertEquals(0, msg.getXid());
+ Assert.assertEquals(RpcMessage.RPC_CALL, msg.getMessageType());
+ }
+
+ @Test
+ public void testValidateMessage() {
+ RpcMessage msg = getRpcMessage(0, RpcMessage.RPC_CALL);
+ msg.validateMessageType(RpcMessage.RPC_CALL);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testValidateMessageException() {
+ RpcMessage msg = getRpcMessage(0, RpcMessage.RPC_CALL);
+ msg.validateMessageType(RpcMessage.RPC_REPLY);
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java
new file mode 100644
index 00000000000..21d38a47603
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+
+import org.apache.hadoop.oncrpc.RpcReply.ReplyState;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link RpcReply}
+ */
+public class TestRpcReply {
+ @Test
+ public void testReplyStateFromValue() {
+ Assert.assertEquals(ReplyState.MSG_ACCEPTED, ReplyState.fromValue(0));
+ Assert.assertEquals(ReplyState.MSG_DENIED, ReplyState.fromValue(1));
+ }
+
+ @Test(expected=IndexOutOfBoundsException.class)
+ public void testReplyStateFromInvalidValue1() {
+ ReplyState.fromValue(2);
+ }
+
+ @Test
+ public void testRpcReply() {
+ RpcReply reply = new RpcReply(0, 1, ReplyState.MSG_ACCEPTED) {
+ // Anonymous class
+ };
+ Assert.assertEquals(0, reply.getXid());
+ Assert.assertEquals(1, reply.getMessageType());
+ Assert.assertEquals(ReplyState.MSG_ACCEPTED, reply.getState());
+ }
+}
diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java
new file mode 100644
index 00000000000..1745a06bed7
--- /dev/null
+++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+
+import org.junit.Test;
+
+/**
+ * Tests for {@link XDR}
+ */
+public class TestXDR {
+ /**
+ * Test {@link XDR#append(byte[], byte[])}
+ */
+ @Test
+ public void testAppendBytes() {
+ byte[] arr1 = new byte[] {0, 1};
+ byte[] arr2 = new byte[] {2, 3};
+ assertTrue(Arrays.equals(new byte[]{0, 1, 2, 3}, XDR.append(arr1, arr2)));
+ }
+}
diff --git a/hadoop-common-project/pom.xml b/hadoop-common-project/pom.xml
index 78a44c3da2d..7533199e4c6 100644
--- a/hadoop-common-project/pom.xml
+++ b/hadoop-common-project/pom.xml
@@ -35,6 +35,7 @@
hadoop-auth-examples
hadoop-common
hadoop-annotations
+ hadoop-nfs