From a0a986dda77ea03dac9cfc7e0631bae611034ef4 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Wed, 24 Jul 2013 07:48:36 +0000 Subject: [PATCH] HADOOP-9762. RetryCache utility for implementing RPC retries. Contributed by Suresh Srinivas. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1506426 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + .../org/apache/hadoop/ipc/RetryCache.java | 293 ++++++++++++++++++ .../java/org/apache/hadoop/ipc/Server.java | 23 +- .../org/apache/hadoop/ipc/TestRetryCache.java | 217 +++++++++++++ 4 files changed, 527 insertions(+), 9 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 119abfe7b09..1cdecd3c7ba 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -372,6 +372,9 @@ Release 2.1.0-beta - 2013-07-02 HADOOP-9763. Extends LightWeightGSet to support eviction of expired elements. (Tsz Wo (Nicholas) SZE via jing9) + HADOOP-9762. RetryCache utility for implementing RPC retries. + (Suresh Srinivas via jing9) + IMPROVEMENTS HADOOP-9164. Print paths of loaded native libraries in diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java new file mode 100644 index 00000000000..3858a307ad7 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + + +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.util.LightWeightCache; +import org.apache.hadoop.util.LightWeightGSet; +import org.apache.hadoop.util.LightWeightGSet.LinkedElement; + +import com.google.common.base.Preconditions; + +/** + * Maintains a cache of non-idempotent requests that have been successfully + * processed by the RPC server implementation, to handle the retries. A request + * is uniquely identified by the unique client ID + call ID of the RPC request. + * On receiving retried request, an entry will be found in the + * {@link RetryCache} and the previous response is sent back to the request. + *

+ * To look an implementation using this cache, see HDFS FSNamesystem class. + */ +@InterfaceAudience.Private +public class RetryCache { + public static final Log LOG = LogFactory.getLog(RetryCache.class); + /** + * CacheEntry is tracked using unique client ID and callId of the RPC request + */ + public static class CacheEntry implements LightWeightCache.Entry { + /** + * Processing state of the requests + */ + private static byte INPROGRESS = 0; + private static byte SUCCESS = 1; + private static byte FAILED = 2; + + private volatile byte state = INPROGRESS; + + // Store uuid as two long for better memory utilization + private final long clientIdMsb; // Most signficant bytes + private final long clientIdLsb; // Least significant bytes + + private final int callId; + private final long expirationTime; + private LightWeightGSet.LinkedElement next; + + CacheEntry(byte[] clientId, int callId, long expirationTime) { + Preconditions.checkArgument(clientId.length == 16, "Invalid clientId"); + // Conver UUID bytes to two longs + long tmp = 0; + for (int i=0; i<8; i++) { + tmp = (tmp << 8) | (clientId[i] & 0xff); + } + clientIdMsb = tmp; + tmp = 0; + for (int i=8; i<16; i++) { + tmp = (tmp << 8) | (clientId[i] & 0xff); + } + clientIdLsb = tmp; + this.callId = callId; + this.expirationTime = expirationTime; + } + + private static int hashCode(long value) { + return (int)(value ^ (value >>> 32)); + } + + @Override + public int hashCode() { + return (hashCode(clientIdMsb) * 31 + hashCode(clientIdLsb)) * 31 + callId; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof CacheEntry)) { + return false; + } + CacheEntry other = (CacheEntry) obj; + return callId == other.callId && clientIdMsb == other.clientIdMsb + && clientIdLsb == other.clientIdLsb; + } + + @Override + public void setNext(LinkedElement next) { + this.next = next; + } + + @Override + public LinkedElement getNext() { + return next; + } + + synchronized void completed(boolean success) { + state = success ? SUCCESS : FAILED; + this.notifyAll(); + } + + public boolean isSuccess() { + return state == SUCCESS; + } + + @Override + public void setExpirationTime(long timeNano) { + // expiration time does not change + } + + @Override + public long getExpirationTime() { + return expirationTime; + } + } + + /** + * CacheEntry with payload that tracks the previous response or parts of + * previous response to be used for generating response for retried requests. + */ + public static class CacheEntryWithPayload extends CacheEntry { + private Object payload; + + CacheEntryWithPayload(byte[] clientId, int callId, Object payload, + long expirationTime) { + super(clientId, callId, expirationTime); + this.payload = payload; + } + + /** Override equals to avoid findbugs warnings */ + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } + + /** Override hashcode to avoid findbugs warnings */ + @Override + public int hashCode() { + return super.hashCode(); + } + + public Object getPayload() { + return payload; + } + } + + private final LightWeightGSet set; + private final long expirationTime; + + /** + * Constructor + * @param cacheName name to identify the cache by + * @param percentage percentage of total java heap space used by this cache + * @param expirationTime time for an entry to expire in nanoseconds + */ + public RetryCache(String cacheName, double percentage, long expirationTime) { + int capacity = LightWeightGSet.computeCapacity(percentage, cacheName); + capacity = capacity > 16 ? capacity : 16; + this.set = new LightWeightCache(capacity, capacity, + expirationTime, 0); + this.expirationTime = expirationTime; + } + + private static boolean skipRetryCache() { + // Do not track non RPC invocation or RPC requests with + // invalid callId or clientId in retry cache + return !Server.isRpcInvocation() || Server.getCallId() < 0 + || Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID); + } + + /** + * This method handles the following conditions: + *