diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index d7ae3a741cf..a57bc01af18 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.Client.ConnectionId; @@ -38,7 +39,12 @@ import org.apache.hadoop.util.ThreadUtil; import com.google.common.annotations.VisibleForTesting; -class RetryInvocationHandler implements RpcInvocationHandler { +/** + * This class implements RpcInvocationHandler and supports retry on the client + * side. + */ +@InterfaceAudience.Private +public class RetryInvocationHandler implements RpcInvocationHandler { public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class); private final FailoverProxyProvider proxyProvider; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 0c6b765e390..fe987bfebd4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -1161,7 +1161,7 @@ public class Client { CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT); this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); - this.clientId = StringUtils.getUuidBytes(); + this.clientId = ClientId.getClientId(); this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance(); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java new file mode 100644 index 00000000000..83df4f932d4 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import java.nio.ByteBuffer; +import java.util.UUID; + +import org.apache.hadoop.classification.InterfaceAudience; + +import com.google.common.base.Preconditions; + +/** + * A class defining a set of static helper methods to provide conversion between + * bytes and string for UUID-based client Id. + */ +@InterfaceAudience.Private +public class ClientId { + + /** The byte array of a UUID should be 16 */ + public static final int BYTE_LENGTH = 16; + + /** + * Return clientId as byte[] + */ + public static byte[] getClientId() { + UUID uuid = UUID.randomUUID(); + ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]); + buf.putLong(uuid.getMostSignificantBits()); + buf.putLong(uuid.getLeastSignificantBits()); + return buf.array(); + } + + /** Convert a clientId byte[] to string */ + public static String toString(byte[] clientId) { + // clientId can be null or an empty array + if (clientId == null || clientId.length == 0) { + return ""; + } + // otherwise should be 16 bytes + Preconditions.checkArgument(clientId.length == BYTE_LENGTH); + long msb = 0; + long lsb = 0; + for (int i = 0; i < 8; i++) { + msb = (msb << 8) | (clientId[i] & 0xff); + } + for (int i = 8; i < 16; i++) { + lsb = (lsb << 8) | (clientId[i] & 0xff); + } + return (new UUID(msb, lsb)).toString(); + } + + /** Convert from clientId string byte[] representation of clientId */ + public static byte[] toBytes(String id) { + if (id == null || "".equals(id)) { + return new byte[0]; + } + UUID uuid = UUID.fromString(id); + ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]); + buf.putLong(uuid.getMostSignificantBits()); + buf.putLong(uuid.getLeastSignificantBits()); + return buf.array(); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java index dab18db80be..173e7bce1bb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ipc; import java.util.Arrays; +import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -27,6 +28,7 @@ import org.apache.hadoop.util.LightWeightCache; import org.apache.hadoop.util.LightWeightGSet; import org.apache.hadoop.util.LightWeightGSet.LinkedElement; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** @@ -64,8 +66,9 @@ public class RetryCache { CacheEntry(byte[] clientId, int callId, long expirationTime) { // ClientId must be a UUID - that is 16 octets. - Preconditions.checkArgument(clientId.length == 16, - "Invalid clientId - must be UUID of size 16 octets"); + Preconditions.checkArgument(clientId.length == ClientId.BYTE_LENGTH, + "Invalid clientId - length is " + clientId.length + + " expected length " + ClientId.BYTE_LENGTH); // Convert UUID bytes to two longs long tmp = 0; for (int i=0; i<8; i++) { @@ -131,6 +134,12 @@ public class RetryCache { public long getExpirationTime() { return expirationTime; } + + @Override + public String toString() { + return (new UUID(this.clientIdMsb, this.clientIdLsb)).toString() + ":" + + this.callId + ":" + this.state; + } } /** @@ -186,6 +195,11 @@ public class RetryCache { return !Server.isRpcInvocation() || Server.getCallId() < 0 || Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID); } + + @VisibleForTesting + public LightWeightGSet getCacheSet() { + return set; + } /** * This method handles the following conditions: @@ -240,6 +254,26 @@ public class RetryCache { } return mapEntry; } + + /** + * Add a new cache entry into the retry cache. The cache entry consists of + * clientId and callId extracted from editlog. + */ + public void addCacheEntry(byte[] clientId, int callId) { + CacheEntry newEntry = new CacheEntry(clientId, callId, System.nanoTime() + + expirationTime); + newEntry.completed(true); + set.put(newEntry); + } + + public void addCacheEntryWithPayload(byte[] clientId, int callId, + Object payload) { + CacheEntry newEntry = new CacheEntryWithPayload(clientId, callId, payload, + System.nanoTime() + expirationTime); + // since the entry is loaded from editlog, we can assume it succeeded. + newEntry.completed(true); + set.put(newEntry); + } private static CacheEntry newEntry(long expirationTime) { return new CacheEntry(Server.getClientId(), Server.getCallId(), diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java index 4672d5d957a..284a042e83b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java @@ -22,7 +22,6 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.net.URI; import java.net.URISyntaxException; -import java.nio.ByteBuffer; import java.text.DateFormat; import java.util.ArrayList; import java.util.Arrays; @@ -33,7 +32,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.StringTokenizer; -import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -42,7 +40,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.Path; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.util.Shell; import com.google.common.net.InetAddresses; @@ -897,17 +894,6 @@ public class StringUtils { return sb.toString(); } - /** - * Return a new UUID as byte[] - */ - public static byte[] getUuidBytes() { - UUID uuid = UUID.randomUUID(); - ByteBuffer buf = ByteBuffer.wrap(new byte[16]); - buf.putLong(uuid.getMostSignificantBits()); - buf.putLong(uuid.getLeastSignificantBits()); - return buf.array(); - } - /** * Get stack trace for a given thread. */ diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java index c4c10886dfd..64607deb908 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java @@ -29,8 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.util.StringUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -39,7 +37,7 @@ import org.junit.Test; * Tests for {@link RetryCache} */ public class TestRetryCache { - private static final byte[] CLIENT_ID = StringUtils.getUuidBytes(); + private static final byte[] CLIENT_ID = ClientId.getClientId(); private static int callId = 100; private static final Random r = new Random(); private static final TestServer testServer = new TestServer(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java index 5d55f89d7b6..ab891b8f200 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java @@ -17,8 +17,8 @@ */ package org.apache.hadoop.util; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -26,6 +26,7 @@ import java.io.DataInputStream; import java.io.IOException; import java.util.Arrays; +import org.apache.hadoop.ipc.ClientId; import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.RpcConstants; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; @@ -78,7 +79,7 @@ public class TestProtoUtil { @Test public void testRpcClientId() { - byte[] uuid = StringUtils.getUuidBytes(); + byte[] uuid = ClientId.getClientId(); RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0, RpcConstants.INVALID_RETRY_COUNT, uuid); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7904f80282b..9bf6881e38a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -343,6 +343,9 @@ Release 2.1.0-beta - 2013-07-02 protocol methods. (suresh) HDFS-4979. Implement retry cache on Namenode. (suresh) + + HDFS-5025. Record ClientId and CallId in EditLog to enable rebuilding + retry cache in case of HA failover. (Jing Zhao via suresh) IMPROVEMENTS diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index d6bf673cd97..0aded40b073 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -453,7 +453,8 @@ public class DFSClient implements java.io.Closeable { * Create a new DFSClient connected to the given nameNodeUri or rpcNamenode. * Exactly one of nameNodeUri or rpcNamenode must be null. */ - DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, + @VisibleForTesting + public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats) throws IOException { // Copy only the required DFSClient configuration diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java index 7d47a4292c1..eb745b8bb7d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java @@ -68,6 +68,7 @@ import org.apache.hadoop.tools.GetUserMappingsProtocol; import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB; import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** @@ -307,7 +308,8 @@ public class NameNodeProxies { } /** Gets the configured Failover proxy provider's class */ - private static Class> getFailoverProxyProviderClass( + @VisibleForTesting + public static Class> getFailoverProxyProviderClass( Configuration conf, URI nameNodeUri, Class xface) throws IOException { if (nameNodeUri == null) { return null; @@ -344,7 +346,8 @@ public class NameNodeProxies { } /** Creates the Failover proxy provider instance*/ - private static FailoverProxyProvider createFailoverProxyProvider( + @VisibleForTesting + public static FailoverProxyProvider createFailoverProxyProvider( Configuration conf, Class> failoverProxyProviderClass, Class xface, URI nameNodeUri) throws IOException { Preconditions.checkArgument( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java index 14890c59ddb..d4c62c4c710 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java @@ -104,7 +104,9 @@ public class LayoutVersion { OPTIMIZE_SNAPSHOT_INODES(-45, -43, "Reduce snapshot inode memory footprint", false), SEQUENTIAL_BLOCK_ID(-46, "Allocate block IDs sequentially and store " + - "block IDs in the edits log and image files"); + "block IDs in the edits log and image files"), + EDITLOG_SUPPORT_RETRYCACHE(-47, "Record ClientId and CallId in editlog to " + + "enable rebuilding retry cache in case of HA failover"); final int lv; final int ancestorLV; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 432a194d46a..9523a50a47d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -381,12 +381,13 @@ public class FSDirectory implements Closeable { /** * Persist the block list for the inode. */ - void persistBlocks(String path, INodeFileUnderConstruction file) { + void persistBlocks(String path, INodeFileUnderConstruction file, + boolean logRetryCache) { waitForReady(); writeLock(); try { - fsImage.getEditLog().logUpdateBlocks(path, file); + fsImage.getEditLog().logUpdateBlocks(path, file, logRetryCache); if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* FSDirectory.persistBlocks: " +path+" with "+ file.getBlocks().length @@ -459,7 +460,7 @@ public class FSDirectory implements Closeable { * @deprecated Use {@link #renameTo(String, String, Rename...)} instead. */ @Deprecated - boolean renameTo(String src, String dst) + boolean renameTo(String src, String dst, boolean logRetryCache) throws QuotaExceededException, UnresolvedLinkException, FileAlreadyExistsException, SnapshotAccessControlException, IOException { if (NameNode.stateChangeLog.isDebugEnabled()) { @@ -475,14 +476,15 @@ public class FSDirectory implements Closeable { } finally { writeUnlock(); } - fsImage.getEditLog().logRename(src, dst, now); + fsImage.getEditLog().logRename(src, dst, now, logRetryCache); return true; } /** * @see #unprotectedRenameTo(String, String, long, Options.Rename...) */ - void renameTo(String src, String dst, Options.Rename... options) + void renameTo(String src, String dst, boolean logRetryCache, + Options.Rename... options) throws FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, QuotaExceededException, UnresolvedLinkException, IOException { @@ -500,7 +502,7 @@ public class FSDirectory implements Closeable { } finally { writeUnlock(); } - fsImage.getEditLog().logRename(src, dst, now, options); + fsImage.getEditLog().logRename(src, dst, now, logRetryCache, options); } /** @@ -1176,7 +1178,7 @@ public class FSDirectory implements Closeable { /** * Concat all the blocks from srcs to trg and delete the srcs files */ - void concat(String target, String [] srcs) + void concat(String target, String [] srcs, boolean supportRetryCache) throws UnresolvedLinkException, QuotaExceededException, SnapshotAccessControlException, SnapshotException { writeLock(); @@ -1186,7 +1188,8 @@ public class FSDirectory implements Closeable { long timestamp = now(); unprotectedConcat(target, srcs, timestamp); // do the commit - fsImage.getEditLog().logConcat(target, srcs, timestamp); + fsImage.getEditLog().logConcat(target, srcs, timestamp, + supportRetryCache); } finally { writeUnlock(); } @@ -1261,10 +1264,12 @@ public class FSDirectory implements Closeable { * @param src Path of a directory to delete * @param collectedBlocks Blocks under the deleted directory * @param removedINodes INodes that should be removed from {@link #inodeMap} + * @param logRetryCache Whether to record RPC IDs in editlog to support retry + * cache rebuilding. * @return true on successful deletion; else false */ boolean delete(String src, BlocksMapUpdateInfo collectedBlocks, - List removedINodes) throws IOException { + List removedINodes, boolean logRetryCache) throws IOException { if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src); } @@ -1299,7 +1304,7 @@ public class FSDirectory implements Closeable { if (filesRemoved < 0) { return false; } - fsImage.getEditLog().logDelete(src, now); + fsImage.getEditLog().logDelete(src, now, logRetryCache); incrDeletedFileCount(filesRemoved); // Blocks/INodes will be handled later by the caller of this method getFSNamesystem().removePathAndBlocks(src, null, null); @@ -2522,7 +2527,7 @@ public class FSDirectory implements Closeable { /** * Create FileStatus by file INode */ - private HdfsFileStatus createFileStatus(byte[] path, INode node, + HdfsFileStatus createFileStatus(byte[] path, INode node, Snapshot snapshot) { long size = 0; // length is zero for directories short replication = 0; @@ -2595,7 +2600,7 @@ public class FSDirectory implements Closeable { * Add the given symbolic link to the fs. Record it in the edits log. */ INodeSymlink addSymlink(String path, String target, - PermissionStatus dirPerms, boolean createParent) + PermissionStatus dirPerms, boolean createParent, boolean logRetryCache) throws UnresolvedLinkException, FileAlreadyExistsException, QuotaExceededException, SnapshotAccessControlException { waitForReady(); @@ -2621,7 +2626,8 @@ public class FSDirectory implements Closeable { NameNode.stateChangeLog.info("DIR* addSymlink: failed to add " + path); return null; } - fsImage.getEditLog().logSymlink(path, target, modTime, modTime, newNode); + fsImage.getEditLog().logSymlink(path, target, modTime, modTime, newNode, + logRetryCache); if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* addSymlink: " + path + " is added"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index 781fe5207ac..df90a8fc10f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.token.delegation.DelegationKey; import com.google.common.annotations.VisibleForTesting; @@ -661,11 +662,20 @@ public class FSEditLog implements LogsPurgeable { LOG.info(buf); } + /** Record the RPC IDs if necessary */ + private void logRpcIds(FSEditLogOp op, boolean toLogRpcIds) { + if (toLogRpcIds) { + op.setRpcClientId(Server.getClientId()); + op.setRpcCallId(Server.getCallId()); + } + } + /** * Add open lease record to edit log. * Records the block locations of the last block. */ - public void logOpenFile(String path, INodeFileUnderConstruction newNode) { + public void logOpenFile(String path, INodeFileUnderConstruction newNode, + boolean toLogRpcIds) { AddOp op = AddOp.getInstance(cache.get()) .setInodeId(newNode.getId()) .setPath(path) @@ -677,8 +687,8 @@ public class FSEditLog implements LogsPurgeable { .setPermissionStatus(newNode.getPermissionStatus()) .setClientName(newNode.getClientName()) .setClientMachine(newNode.getClientMachine()); - - logEdit(op); + logRpcIds(op, toLogRpcIds); + logEdit(op); } /** @@ -697,10 +707,12 @@ public class FSEditLog implements LogsPurgeable { logEdit(op); } - public void logUpdateBlocks(String path, INodeFileUnderConstruction file) { + public void logUpdateBlocks(String path, INodeFileUnderConstruction file, + boolean toLogRpcIds) { UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get()) .setPath(path) .setBlocks(file.getBlocks()); + logRpcIds(op, toLogRpcIds); logEdit(op); } @@ -720,23 +732,26 @@ public class FSEditLog implements LogsPurgeable { * Add rename record to edit log * TODO: use String parameters until just before writing to disk */ - void logRename(String src, String dst, long timestamp) { + void logRename(String src, String dst, long timestamp, boolean toLogRpcIds) { RenameOldOp op = RenameOldOp.getInstance(cache.get()) .setSource(src) .setDestination(dst) .setTimestamp(timestamp); + logRpcIds(op, toLogRpcIds); logEdit(op); } /** * Add rename record to edit log */ - void logRename(String src, String dst, long timestamp, Options.Rename... options) { + void logRename(String src, String dst, long timestamp, boolean toLogRpcIds, + Options.Rename... options) { RenameOp op = RenameOp.getInstance(cache.get()) .setSource(src) .setDestination(dst) .setTimestamp(timestamp) .setOptions(options); + logRpcIds(op, toLogRpcIds); logEdit(op); } @@ -783,21 +798,23 @@ public class FSEditLog implements LogsPurgeable { /** * concat(trg,src..) log */ - void logConcat(String trg, String [] srcs, long timestamp) { + void logConcat(String trg, String[] srcs, long timestamp, boolean toLogRpcIds) { ConcatDeleteOp op = ConcatDeleteOp.getInstance(cache.get()) .setTarget(trg) .setSources(srcs) .setTimestamp(timestamp); + logRpcIds(op, toLogRpcIds); logEdit(op); } /** * Add delete file record to edit log */ - void logDelete(String src, long timestamp) { + void logDelete(String src, long timestamp, boolean toLogRpcIds) { DeleteOp op = DeleteOp.getInstance(cache.get()) .setPath(src) .setTimestamp(timestamp); + logRpcIds(op, toLogRpcIds); logEdit(op); } @@ -842,8 +859,8 @@ public class FSEditLog implements LogsPurgeable { /** * Add a create symlink record. */ - void logSymlink(String path, String value, long mtime, - long atime, INodeSymlink node) { + void logSymlink(String path, String value, long mtime, long atime, + INodeSymlink node, boolean toLogRpcIds) { SymlinkOp op = SymlinkOp.getInstance(cache.get()) .setId(node.getId()) .setPath(path) @@ -851,6 +868,7 @@ public class FSEditLog implements LogsPurgeable { .setModificationTime(mtime) .setAccessTime(atime) .setPermissionStatus(node.getPermissionStatus()); + logRpcIds(op, toLogRpcIds); logEdit(op); } @@ -895,22 +913,26 @@ public class FSEditLog implements LogsPurgeable { logEdit(op); } - void logCreateSnapshot(String snapRoot, String snapName) { + void logCreateSnapshot(String snapRoot, String snapName, boolean toLogRpcIds) { CreateSnapshotOp op = CreateSnapshotOp.getInstance(cache.get()) .setSnapshotRoot(snapRoot).setSnapshotName(snapName); + logRpcIds(op, toLogRpcIds); logEdit(op); } - void logDeleteSnapshot(String snapRoot, String snapName) { + void logDeleteSnapshot(String snapRoot, String snapName, boolean toLogRpcIds) { DeleteSnapshotOp op = DeleteSnapshotOp.getInstance(cache.get()) .setSnapshotRoot(snapRoot).setSnapshotName(snapName); + logRpcIds(op, toLogRpcIds); logEdit(op); } - void logRenameSnapshot(String path, String snapOldName, String snapNewName) { + void logRenameSnapshot(String path, String snapOldName, String snapNewName, + boolean toLogRpcIds) { RenameSnapshotOp op = RenameSnapshotOp.getInstance(cache.get()) .setSnapshotRoot(path).setSnapshotOldName(snapOldName) .setSnapshotNewName(snapNewName); + logRpcIds(op, toLogRpcIds); logEdit(op); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 6432e0aec94..f19b15cd9f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -33,8 +33,10 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.common.Storage; @@ -275,7 +277,8 @@ public class FSEditLogLoader { if (LOG.isTraceEnabled()) { LOG.trace("replaying edit log: " + op); } - + final boolean toAddRetryCache = fsNamesys.hasRetryCache() && op.hasRpcIds(); + switch (op.opCode) { case OP_ADD: { AddCloseOp addCloseOp = (AddCloseOp)op; @@ -298,8 +301,8 @@ public class FSEditLogLoader { if (oldFile == null) { // this is OP_ADD on a new file (case 1) // versions > 0 support per file replication // get name and replication - final short replication = fsNamesys.getBlockManager( - ).adjustReplication(addCloseOp.replication); + final short replication = fsNamesys.getBlockManager() + .adjustReplication(addCloseOp.replication); assert addCloseOp.blocks.length == 0; // add to the file tree @@ -311,6 +314,13 @@ public class FSEditLogLoader { addCloseOp.clientName, addCloseOp.clientMachine); fsNamesys.leaseManager.addLease(addCloseOp.clientName, addCloseOp.path); + // add the op into retry cache if necessary + if (toAddRetryCache) { + HdfsFileStatus stat = fsNamesys.dir.createFileStatus( + HdfsFileStatus.EMPTY_NAME, newFile, null); + fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId, + addCloseOp.rpcCallId, stat); + } } else { // This is OP_ADD on an existing file if (!oldFile.isUnderConstruction()) { // This is case 3: a call to append() on an already-closed file. @@ -318,11 +328,17 @@ public class FSEditLogLoader { FSNamesystem.LOG.debug("Reopening an already-closed file " + "for append"); } - fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile, - addCloseOp.clientName, addCloseOp.clientMachine, null, - false, iip.getLatestSnapshot()); + LocatedBlock lb = fsNamesys.prepareFileForWrite(addCloseOp.path, + oldFile, addCloseOp.clientName, addCloseOp.clientMachine, null, + false, iip.getLatestSnapshot(), false); newFile = INodeFile.valueOf(fsDir.getINode(addCloseOp.path), addCloseOp.path, true); + + // add the op into retry cache is necessary + if (toAddRetryCache) { + fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId, + addCloseOp.rpcCallId, lb); + } } } // Fall-through for case 2. @@ -382,6 +398,10 @@ public class FSEditLogLoader { updateOp.path); // Update in-memory data structures updateBlocks(fsDir, updateOp, oldFile); + + if (toAddRetryCache) { + fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId); + } break; } @@ -397,17 +417,30 @@ public class FSEditLogLoader { ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op; fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs, concatDeleteOp.timestamp); + + if (toAddRetryCache) { + fsNamesys.addCacheEntry(concatDeleteOp.rpcClientId, + concatDeleteOp.rpcCallId); + } break; } case OP_RENAME_OLD: { RenameOldOp renameOp = (RenameOldOp)op; fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst, renameOp.timestamp); + + if (toAddRetryCache) { + fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId); + } break; } case OP_DELETE: { DeleteOp deleteOp = (DeleteOp)op; fsDir.unprotectedDelete(deleteOp.path, deleteOp.timestamp); + + if (toAddRetryCache) { + fsNamesys.addCacheEntry(deleteOp.rpcClientId, deleteOp.rpcCallId); + } break; } case OP_MKDIR: { @@ -472,12 +505,20 @@ public class FSEditLogLoader { fsDir.unprotectedAddSymlink(inodeId, symlinkOp.path, symlinkOp.value, symlinkOp.mtime, symlinkOp.atime, symlinkOp.permissionStatus); + + if (toAddRetryCache) { + fsNamesys.addCacheEntry(symlinkOp.rpcClientId, symlinkOp.rpcCallId); + } break; } case OP_RENAME: { RenameOp renameOp = (RenameOp)op; fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst, renameOp.timestamp, renameOp.options); + + if (toAddRetryCache) { + fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId); + } break; } case OP_GET_DELEGATION_TOKEN: { @@ -530,8 +571,12 @@ public class FSEditLogLoader { } case OP_CREATE_SNAPSHOT: { CreateSnapshotOp createSnapshotOp = (CreateSnapshotOp) op; - fsNamesys.getSnapshotManager().createSnapshot( + String path = fsNamesys.getSnapshotManager().createSnapshot( createSnapshotOp.snapshotRoot, createSnapshotOp.snapshotName); + if (toAddRetryCache) { + fsNamesys.addCacheEntryWithPayload(createSnapshotOp.rpcClientId, + createSnapshotOp.rpcCallId, path); + } break; } case OP_DELETE_SNAPSHOT: { @@ -545,6 +590,11 @@ public class FSEditLogLoader { collectedBlocks.clear(); fsNamesys.dir.removeFromInodeMap(removedINodes); removedINodes.clear(); + + if (toAddRetryCache) { + fsNamesys.addCacheEntry(deleteSnapshotOp.rpcClientId, + deleteSnapshotOp.rpcCallId); + } break; } case OP_RENAME_SNAPSHOT: { @@ -552,6 +602,11 @@ public class FSEditLogLoader { fsNamesys.getSnapshotManager().renameSnapshot( renameSnapshotOp.snapshotRoot, renameSnapshotOp.snapshotOldName, renameSnapshotOp.snapshotNewName); + + if (toAddRetryCache) { + fsNamesys.addCacheEntry(renameSnapshotOp.rpcClientId, + renameSnapshotOp.rpcCallId); + } break; } case OP_ALLOW_SNAPSHOT: { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java index 02a7bf7dc79..10432bfd8e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java @@ -17,55 +17,88 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import java.util.zip.CheckedInputStream; -import java.util.zip.Checksum; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOCATE_BLOCK_ID; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOW_SNAPSHOT; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CLEAR_NS_QUOTA; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CLOSE; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CONCAT_DELETE; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CREATE_SNAPSHOT; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE_SNAPSHOT; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DISALLOW_SNAPSHOT; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_END_LOG_SEGMENT; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MKDIR; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENEW_DELEGATION_TOKEN; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V1; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V2; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_NS_QUOTA; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_OWNER; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_PERMISSIONS; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_QUOTA; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_REPLICATION; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_START_LOG_SEGMENT; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SYMLINK; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TIMES; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_BLOCKS; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_MASTER_KEY; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; import java.util.Arrays; import java.util.EnumMap; import java.util.List; +import java.util.zip.CheckedInputStream; +import java.util.zip.Checksum; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; -import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DeprecatedUTF8; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature; -import org.apache.hadoop.util.PureJavaCrc32; - -import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*; -import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.util.XMLUtils; +import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException; +import org.apache.hadoop.hdfs.util.XMLUtils.Stanza; +import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableFactories; import org.apache.hadoop.io.WritableFactory; -import org.apache.hadoop.hdfs.util.XMLUtils; -import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException; -import org.apache.hadoop.hdfs.util.XMLUtils.Stanza; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DeprecatedUTF8; +import org.apache.hadoop.ipc.ClientId; +import org.apache.hadoop.ipc.RpcConstants; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.util.PureJavaCrc32; import org.xml.sax.ContentHandler; import org.xml.sax.SAXException; import org.xml.sax.helpers.AttributesImpl; import com.google.common.base.Preconditions; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.EOFException; - /** * Helper classes for reading the ops from an InputStream. * All ops derive from FSEditLogOp and are only @@ -76,6 +109,8 @@ import java.io.EOFException; public abstract class FSEditLogOp { public final FSEditLogOpCodes opCode; long txid; + byte[] rpcClientId = RpcConstants.DUMMY_CLIENT_ID; + int rpcCallId = RpcConstants.INVALID_CALL_ID; @SuppressWarnings("deprecation") final public static class OpInstanceCache { @@ -150,6 +185,31 @@ public abstract class FSEditLogOp { public void setTransactionId(long txid) { this.txid = txid; } + + public boolean hasRpcIds() { + return rpcClientId != RpcConstants.DUMMY_CLIENT_ID + && rpcCallId != RpcConstants.INVALID_CALL_ID; + } + + /** this has to be called after calling {@link #hasRpcIds()} */ + public byte[] getClientId() { + Preconditions.checkState(rpcClientId != RpcConstants.DUMMY_CLIENT_ID); + return rpcClientId; + } + + public void setRpcClientId(byte[] clientId) { + this.rpcClientId = clientId; + } + + /** this has to be called after calling {@link #hasRpcIds()} */ + public int getCallId() { + Preconditions.checkState(rpcCallId != RpcConstants.INVALID_CALL_ID); + return rpcCallId; + } + + public void setRpcCallId(int callId) { + this.rpcCallId = callId; + } abstract void readFields(DataInputStream in, int logVersion) throws IOException; @@ -163,6 +223,46 @@ public abstract class FSEditLogOp { boolean shouldCompleteLastBlock(); } + private static void writeRpcIds(final byte[] clientId, final int callId, + DataOutputStream out) throws IOException { + FSImageSerialization.writeBytes(clientId, out); + FSImageSerialization.writeInt(callId, out); + } + + void readRpcIds(DataInputStream in, int logVersion) + throws IOException { + if (LayoutVersion.supports(Feature.EDITLOG_SUPPORT_RETRYCACHE, + logVersion)) { + this.rpcClientId = FSImageSerialization.readBytes(in); + this.rpcCallId = FSImageSerialization.readInt(in); + } + } + + void readRpcIdsFromXml(Stanza st) { + this.rpcClientId = st.hasChildren("RPC_CLIENTID") ? + ClientId.toBytes(st.getValue("RPC_CLIENTID")) + : RpcConstants.DUMMY_CLIENT_ID; + this.rpcCallId = st.hasChildren("RPC_CALLID") ? + Integer.valueOf(st.getValue("RPC_CALLID")) + : RpcConstants.INVALID_CALL_ID; + } + + private static void appendRpcIdsToString(final StringBuilder builder, + final byte[] clientId, final int callId) { + builder.append(", RpcClientId="); + builder.append(ClientId.toString(clientId)); + builder.append(", RpcCallId="); + builder.append(callId); + } + + private static void appendRpcIdsToXml(ContentHandler contentHandler, + final byte[] clientId, final int callId) throws SAXException { + XMLUtils.addSaxString(contentHandler, "RPC_CLIENTID", + ClientId.toString(clientId)); + XMLUtils.addSaxString(contentHandler, "RPC_CALLID", + Integer.valueOf(callId).toString()); + } + @SuppressWarnings("unchecked") static abstract class AddCloseOp extends FSEditLogOp implements BlockListUpdatingOp { int length; @@ -176,7 +276,7 @@ public abstract class FSEditLogOp { PermissionStatus permissions; String clientName; String clientMachine; - + private AddCloseOp(FSEditLogOpCodes opCode) { super(opCode); assert(opCode == OP_ADD || opCode == OP_CLOSE); @@ -247,8 +347,7 @@ public abstract class FSEditLogOp { } @Override - public - void writeFields(DataOutputStream out) throws IOException { + public void writeFields(DataOutputStream out) throws IOException { FSImageSerialization.writeLong(inodeId, out); FSImageSerialization.writeString(path, out); FSImageSerialization.writeShort(replication, out); @@ -261,6 +360,8 @@ public abstract class FSEditLogOp { if (this.opCode == OP_ADD) { FSImageSerialization.writeString(clientName,out); FSImageSerialization.writeString(clientMachine,out); + // write clientId and callId + writeRpcIds(rpcClientId, rpcCallId, out); } } @@ -317,6 +418,8 @@ public abstract class FSEditLogOp { if (this.opCode == OP_ADD) { this.clientName = FSImageSerialization.readString(in); this.clientMachine = FSImageSerialization.readString(in); + // read clientId and callId + readRpcIds(in, logVersion); } else { this.clientName = ""; this.clientMachine = ""; @@ -368,6 +471,9 @@ public abstract class FSEditLogOp { builder.append(clientName); builder.append(", clientMachine="); builder.append(clientMachine); + if (this.opCode == OP_ADD) { + appendRpcIdsToString(builder, rpcClientId, rpcCallId); + } builder.append(", opCode="); builder.append(opCode); builder.append(", txid="); @@ -397,9 +503,13 @@ public abstract class FSEditLogOp { FSEditLogOp.blockToXml(contentHandler, b); } FSEditLogOp.permissionStatusToXml(contentHandler, permissions); + if (this.opCode == OP_ADD) { + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); + } } - @Override void fromXml(Stanza st) throws InvalidXmlException { + @Override + void fromXml(Stanza st) throws InvalidXmlException { this.length = Integer.valueOf(st.getValue("LENGTH")); this.inodeId = Long.valueOf(st.getValue("INODEID")); this.path = st.getValue("PATH"); @@ -420,9 +530,14 @@ public abstract class FSEditLogOp { } this.permissions = permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0)); + readRpcIdsFromXml(st); } } + /** + * {@literal @AtMostOnce} for {@link ClientProtocol#startFile} and + * {@link ClientProtocol#appendFile} + */ static class AddOp extends AddCloseOp { private AddOp() { super(OP_ADD); @@ -446,6 +561,11 @@ public abstract class FSEditLogOp { } } + /** + * Although {@link ClientProtocol#appendFile} may also log a close op, we do + * not need to record the rpc ids here since a successful appendFile op will + * finally log an AddOp. + */ static class CloseOp extends AddCloseOp { private CloseOp() { super(OP_CLOSE); @@ -469,6 +589,10 @@ public abstract class FSEditLogOp { } } + /** + * {@literal @AtMostOnce} for {@link ClientProtocol#updatePipeline}, but + * {@literal @Idempotent} for some other ops. + */ static class UpdateBlocksOp extends FSEditLogOp implements BlockListUpdatingOp { String path; Block[] blocks; @@ -481,7 +605,6 @@ public abstract class FSEditLogOp { return (UpdateBlocksOp)cache.get(OP_UPDATE_BLOCKS); } - UpdateBlocksOp setPath(String path) { this.path = path; return this; @@ -507,6 +630,8 @@ public abstract class FSEditLogOp { void writeFields(DataOutputStream out) throws IOException { FSImageSerialization.writeString(path, out); FSImageSerialization.writeCompactBlockArray(blocks, out); + // clientId and callId + writeRpcIds(rpcClientId, rpcCallId, out); } @Override @@ -514,6 +639,7 @@ public abstract class FSEditLogOp { path = FSImageSerialization.readString(in); this.blocks = FSImageSerialization.readCompactBlockArray( in, logVersion); + readRpcIds(in, logVersion); } @Override @@ -527,8 +653,9 @@ public abstract class FSEditLogOp { sb.append("UpdateBlocksOp [path=") .append(path) .append(", blocks=") - .append(Arrays.toString(blocks)) - .append("]"); + .append(Arrays.toString(blocks)); + appendRpcIdsToString(sb, rpcClientId, rpcCallId); + sb.append("]"); return sb.toString(); } @@ -538,6 +665,7 @@ public abstract class FSEditLogOp { for (Block b : blocks) { FSEditLogOp.blockToXml(contentHandler, b); } + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override void fromXml(Stanza st) throws InvalidXmlException { @@ -547,9 +675,11 @@ public abstract class FSEditLogOp { for (int i = 0; i < blocks.size(); i++) { this.blocks[i] = FSEditLogOp.blockFromXml(blocks.get(i)); } + readRpcIdsFromXml(st); } } + /** {@literal @Idempotent} for {@link ClientProtocol#setReplication} */ static class SetReplicationOp extends FSEditLogOp { String path; short replication; @@ -618,6 +748,7 @@ public abstract class FSEditLogOp { } } + /** {@literal @AtMostOnce} for {@link ClientProtocol#concat} */ static class ConcatDeleteOp extends FSEditLogOp { int length; String trg; @@ -654,8 +785,7 @@ public abstract class FSEditLogOp { } @Override - public - void writeFields(DataOutputStream out) throws IOException { + public void writeFields(DataOutputStream out) throws IOException { FSImageSerialization.writeString(trg, out); DeprecatedUTF8 info[] = new DeprecatedUTF8[srcs.length]; @@ -666,6 +796,9 @@ public abstract class FSEditLogOp { new ArrayWritable(DeprecatedUTF8.class, info).write(out); FSImageSerialization.writeLong(timestamp, out); + + // rpc ids + writeRpcIds(rpcClientId, rpcCallId, out); } @Override @@ -704,6 +837,8 @@ public abstract class FSEditLogOp { } else { this.timestamp = readLong(in); } + // read RPC ids if necessary + readRpcIds(in, logVersion); } @Override @@ -717,6 +852,7 @@ public abstract class FSEditLogOp { builder.append(Arrays.toString(srcs)); builder.append(", timestamp="); builder.append(timestamp); + appendRpcIdsToString(builder, rpcClientId, rpcCallId); builder.append(", opCode="); builder.append(opCode); builder.append(", txid="); @@ -738,6 +874,7 @@ public abstract class FSEditLogOp { "SOURCE" + (i + 1), srcs[i]); } contentHandler.endElement("", "", "SOURCES"); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override void fromXml(Stanza st) throws InvalidXmlException { @@ -755,9 +892,11 @@ public abstract class FSEditLogOp { for (i = 0; i < srcs.length; i++) { srcs[i] = sources.get(0).getValue("SOURCE" + (i + 1)); } + readRpcIdsFromXml(st); } } + /** {@literal @AtMostOnce} for {@link ClientProtocol#rename} */ static class RenameOldOp extends FSEditLogOp { int length; String src; @@ -793,6 +932,7 @@ public abstract class FSEditLogOp { FSImageSerialization.writeString(src, out); FSImageSerialization.writeString(dst, out); FSImageSerialization.writeLong(timestamp, out); + writeRpcIds(rpcClientId, rpcCallId, out); } @Override @@ -812,6 +952,9 @@ public abstract class FSEditLogOp { } else { this.timestamp = readLong(in); } + + // read RPC ids if necessary + readRpcIds(in, logVersion); } @Override @@ -825,6 +968,7 @@ public abstract class FSEditLogOp { builder.append(dst); builder.append(", timestamp="); builder.append(timestamp); + appendRpcIdsToString(builder, rpcClientId, rpcCallId); builder.append(", opCode="); builder.append(opCode); builder.append(", txid="); @@ -841,16 +985,21 @@ public abstract class FSEditLogOp { XMLUtils.addSaxString(contentHandler, "DST", dst); XMLUtils.addSaxString(contentHandler, "TIMESTAMP", Long.valueOf(timestamp).toString()); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } - @Override void fromXml(Stanza st) throws InvalidXmlException { + @Override + void fromXml(Stanza st) throws InvalidXmlException { this.length = Integer.valueOf(st.getValue("LENGTH")); this.src = st.getValue("SRC"); this.dst = st.getValue("DST"); this.timestamp = Long.valueOf(st.getValue("TIMESTAMP")); + + readRpcIdsFromXml(st); } } + /** {@literal @AtMostOnce} for {@link ClientProtocol#delete} */ static class DeleteOp extends FSEditLogOp { int length; String path; @@ -879,6 +1028,7 @@ public abstract class FSEditLogOp { void writeFields(DataOutputStream out) throws IOException { FSImageSerialization.writeString(path, out); FSImageSerialization.writeLong(timestamp, out); + writeRpcIds(rpcClientId, rpcCallId, out); } @Override @@ -896,6 +1046,8 @@ public abstract class FSEditLogOp { } else { this.timestamp = readLong(in); } + // read RPC ids if necessary + readRpcIds(in, logVersion); } @Override @@ -907,6 +1059,7 @@ public abstract class FSEditLogOp { builder.append(path); builder.append(", timestamp="); builder.append(timestamp); + appendRpcIdsToString(builder, rpcClientId, rpcCallId); builder.append(", opCode="); builder.append(opCode); builder.append(", txid="); @@ -922,15 +1075,19 @@ public abstract class FSEditLogOp { XMLUtils.addSaxString(contentHandler, "PATH", path); XMLUtils.addSaxString(contentHandler, "TIMESTAMP", Long.valueOf(timestamp).toString()); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override void fromXml(Stanza st) throws InvalidXmlException { this.length = Integer.valueOf(st.getValue("LENGTH")); this.path = st.getValue("PATH"); this.timestamp = Long.valueOf(st.getValue("TIMESTAMP")); + + readRpcIdsFromXml(st); } } - + + /** {@literal @Idempotent} for {@link ClientProtocol#mkdirs} */ static class MkdirOp extends FSEditLogOp { int length; long inodeId; @@ -1056,6 +1213,13 @@ public abstract class FSEditLogOp { } } + /** + * The corresponding operations are either {@literal @Idempotent} ( + * {@link ClientProtocol#updateBlockForPipeline}, + * {@link ClientProtocol#recoverLease}, {@link ClientProtocol#addBlock}) or + * already bound with other editlog op which records rpc ids ( + * {@link ClientProtocol#startFile}). Thus no need to record rpc ids here. + */ static class SetGenstampV1Op extends FSEditLogOp { long genStampV1; @@ -1108,6 +1272,7 @@ public abstract class FSEditLogOp { } } + /** Similar with {@link SetGenstampV1Op} */ static class SetGenstampV2Op extends FSEditLogOp { long genStampV2; @@ -1160,6 +1325,7 @@ public abstract class FSEditLogOp { } } + /** {@literal @Idempotent} for {@link ClientProtocol#addBlock} */ static class AllocateBlockIdOp extends FSEditLogOp { long blockId; @@ -1212,6 +1378,7 @@ public abstract class FSEditLogOp { } } + /** {@literal @Idempotent} for {@link ClientProtocol#setPermission} */ static class SetPermissionsOp extends FSEditLogOp { String src; FsPermission permissions; @@ -1277,6 +1444,7 @@ public abstract class FSEditLogOp { } } + /** {@literal @Idempotent} for {@link ClientProtocol#setOwner} */ static class SetOwnerOp extends FSEditLogOp { String src; String username; @@ -1357,7 +1525,7 @@ public abstract class FSEditLogOp { st.getValue("GROUPNAME") : null; } } - + static class SetNSQuotaOp extends FSEditLogOp { String src; long nsQuota; @@ -1457,6 +1625,7 @@ public abstract class FSEditLogOp { } } + /** {@literal @Idempotent} for {@link ClientProtocol#setQuota} */ static class SetQuotaOp extends FSEditLogOp { String src; long nsQuota; @@ -1534,6 +1703,7 @@ public abstract class FSEditLogOp { } } + /** {@literal @Idempotent} for {@link ClientProtocol#setTimes} */ static class TimesOp extends FSEditLogOp { int length; String path; @@ -1629,6 +1799,7 @@ public abstract class FSEditLogOp { } } + /** {@literal @AtMostOnce} for {@link ClientProtocol#createSymlink} */ static class SymlinkOp extends FSEditLogOp { int length; long inodeId; @@ -1677,14 +1848,14 @@ public abstract class FSEditLogOp { } @Override - public - void writeFields(DataOutputStream out) throws IOException { + public void writeFields(DataOutputStream out) throws IOException { FSImageSerialization.writeLong(inodeId, out); FSImageSerialization.writeString(path, out); FSImageSerialization.writeString(value, out); FSImageSerialization.writeLong(mtime, out); FSImageSerialization.writeLong(atime, out); permissionStatus.write(out); + writeRpcIds(rpcClientId, rpcCallId, out); } @Override @@ -1714,6 +1885,9 @@ public abstract class FSEditLogOp { this.atime = readLong(in); } this.permissionStatus = PermissionStatus.read(in); + + // read RPC ids if necessary + readRpcIds(in, logVersion); } @Override @@ -1733,6 +1907,7 @@ public abstract class FSEditLogOp { builder.append(atime); builder.append(", permissionStatus="); builder.append(permissionStatus); + appendRpcIdsToString(builder, rpcClientId, rpcCallId); builder.append(", opCode="); builder.append(opCode); builder.append(", txid="); @@ -1754,9 +1929,11 @@ public abstract class FSEditLogOp { XMLUtils.addSaxString(contentHandler, "ATIME", Long.valueOf(atime).toString()); FSEditLogOp.permissionStatusToXml(contentHandler, permissionStatus); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } - @Override void fromXml(Stanza st) throws InvalidXmlException { + @Override + void fromXml(Stanza st) throws InvalidXmlException { this.length = Integer.valueOf(st.getValue("LENGTH")); this.inodeId = Long.valueOf(st.getValue("INODEID")); this.path = st.getValue("PATH"); @@ -1765,9 +1942,12 @@ public abstract class FSEditLogOp { this.atime = Long.valueOf(st.getValue("ATIME")); this.permissionStatus = permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0)); + + readRpcIdsFromXml(st); } } + /** {@literal @AtMostOnce} for {@link ClientProtocol#rename2} */ static class RenameOp extends FSEditLogOp { int length; String src; @@ -1810,6 +1990,7 @@ public abstract class FSEditLogOp { FSImageSerialization.writeString(dst, out); FSImageSerialization.writeLong(timestamp, out); toBytesWritable(options).write(out); + writeRpcIds(rpcClientId, rpcCallId, out); } @Override @@ -1830,6 +2011,9 @@ public abstract class FSEditLogOp { this.timestamp = readLong(in); } this.options = readRenameOptions(in); + + // read RPC ids if necessary + readRpcIds(in, logVersion); } private static Rename[] readRenameOptions(DataInputStream in) throws IOException { @@ -1866,6 +2050,7 @@ public abstract class FSEditLogOp { builder.append(timestamp); builder.append(", options="); builder.append(Arrays.toString(options)); + appendRpcIdsToString(builder, rpcClientId, rpcCallId); builder.append(", opCode="); builder.append(opCode); builder.append(", txid="); @@ -1889,6 +2074,7 @@ public abstract class FSEditLogOp { prefix = "|"; } XMLUtils.addSaxString(contentHandler, "OPTIONS", bld.toString()); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override void fromXml(Stanza st) throws InvalidXmlException { @@ -1910,9 +2096,15 @@ public abstract class FSEditLogOp { } } } + readRpcIdsFromXml(st); } } - + + /** + * {@literal @Idempotent} for {@link ClientProtocol#recoverLease}. In the + * meanwhile, startFile and appendFile both have their own corresponding + * editlog op. + */ static class ReassignLeaseOp extends FSEditLogOp { String leaseHolder; String path; @@ -1988,6 +2180,7 @@ public abstract class FSEditLogOp { } } + /** {@literal @Idempotent} for {@link ClientProtocol#getDelegationToken} */ static class GetDelegationTokenOp extends FSEditLogOp { DelegationTokenIdentifier token; long expiryTime; @@ -2059,6 +2252,7 @@ public abstract class FSEditLogOp { } } + /** {@literal @Idempotent} for {@link ClientProtocol#renewDelegationToken} */ static class RenewDelegationTokenOp extends FSEditLogOp { DelegationTokenIdentifier token; long expiryTime; @@ -2130,6 +2324,7 @@ public abstract class FSEditLogOp { } } + /** {@literal @Idempotent} for {@link ClientProtocol#cancelDelegationToken} */ static class CancelDelegationTokenOp extends FSEditLogOp { DelegationTokenIdentifier token; @@ -2323,7 +2518,8 @@ public abstract class FSEditLogOp { } /** - * Operation corresponding to creating a snapshot + * Operation corresponding to creating a snapshot. + * {@literal @AtMostOnce} for {@link ClientProtocol#createSnapshot}. */ static class CreateSnapshotOp extends FSEditLogOp { String snapshotRoot; @@ -2351,24 +2547,31 @@ public abstract class FSEditLogOp { void readFields(DataInputStream in, int logVersion) throws IOException { snapshotRoot = FSImageSerialization.readString(in); snapshotName = FSImageSerialization.readString(in); + + // read RPC ids if necessary + readRpcIds(in, logVersion); } @Override public void writeFields(DataOutputStream out) throws IOException { FSImageSerialization.writeString(snapshotRoot, out); FSImageSerialization.writeString(snapshotName, out); + writeRpcIds(rpcClientId, rpcCallId, out); } @Override protected void toXml(ContentHandler contentHandler) throws SAXException { XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot); XMLUtils.addSaxString(contentHandler, "SNAPSHOTNAME", snapshotName); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override void fromXml(Stanza st) throws InvalidXmlException { snapshotRoot = st.getValue("SNAPSHOTROOT"); snapshotName = st.getValue("SNAPSHOTNAME"); + + readRpcIdsFromXml(st); } @Override @@ -2378,13 +2581,15 @@ public abstract class FSEditLogOp { builder.append(snapshotRoot); builder.append(", snapshotName="); builder.append(snapshotName); + appendRpcIdsToString(builder, rpcClientId, rpcCallId); builder.append("]"); return builder.toString(); } } /** - * Operation corresponding to delete a snapshot + * Operation corresponding to delete a snapshot. + * {@literal @AtMostOnce} for {@link ClientProtocol#deleteSnapshot}. */ static class DeleteSnapshotOp extends FSEditLogOp { String snapshotRoot; @@ -2412,24 +2617,31 @@ public abstract class FSEditLogOp { void readFields(DataInputStream in, int logVersion) throws IOException { snapshotRoot = FSImageSerialization.readString(in); snapshotName = FSImageSerialization.readString(in); + + // read RPC ids if necessary + readRpcIds(in, logVersion); } @Override public void writeFields(DataOutputStream out) throws IOException { FSImageSerialization.writeString(snapshotRoot, out); FSImageSerialization.writeString(snapshotName, out); + writeRpcIds(rpcClientId, rpcCallId, out); } @Override protected void toXml(ContentHandler contentHandler) throws SAXException { XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot); XMLUtils.addSaxString(contentHandler, "SNAPSHOTNAME", snapshotName); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override void fromXml(Stanza st) throws InvalidXmlException { snapshotRoot = st.getValue("SNAPSHOTROOT"); snapshotName = st.getValue("SNAPSHOTNAME"); + + readRpcIdsFromXml(st); } @Override @@ -2439,13 +2651,15 @@ public abstract class FSEditLogOp { builder.append(snapshotRoot); builder.append(", snapshotName="); builder.append(snapshotName); + appendRpcIdsToString(builder, rpcClientId, rpcCallId); builder.append("]"); return builder.toString(); } } /** - * Operation corresponding to rename a snapshot + * Operation corresponding to rename a snapshot. + * {@literal @AtMostOnce} for {@link ClientProtocol#renameSnapshot}. */ static class RenameSnapshotOp extends FSEditLogOp { String snapshotRoot; @@ -2480,6 +2694,9 @@ public abstract class FSEditLogOp { snapshotRoot = FSImageSerialization.readString(in); snapshotOldName = FSImageSerialization.readString(in); snapshotNewName = FSImageSerialization.readString(in); + + // read RPC ids if necessary + readRpcIds(in, logVersion); } @Override @@ -2487,6 +2704,8 @@ public abstract class FSEditLogOp { FSImageSerialization.writeString(snapshotRoot, out); FSImageSerialization.writeString(snapshotOldName, out); FSImageSerialization.writeString(snapshotNewName, out); + + writeRpcIds(rpcClientId, rpcCallId, out); } @Override @@ -2494,6 +2713,7 @@ public abstract class FSEditLogOp { XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot); XMLUtils.addSaxString(contentHandler, "SNAPSHOTOLDNAME", snapshotOldName); XMLUtils.addSaxString(contentHandler, "SNAPSHOTNEWNAME", snapshotNewName); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override @@ -2501,6 +2721,8 @@ public abstract class FSEditLogOp { snapshotRoot = st.getValue("SNAPSHOTROOT"); snapshotOldName = st.getValue("SNAPSHOTOLDNAME"); snapshotNewName = st.getValue("SNAPSHOTNEWNAME"); + + readRpcIdsFromXml(st); } @Override @@ -2512,6 +2734,7 @@ public abstract class FSEditLogOp { builder.append(snapshotOldName); builder.append(", snapshotNewName="); builder.append(snapshotNewName); + appendRpcIdsToString(builder, rpcClientId, rpcCallId); builder.append("]"); return builder.toString(); } @@ -2520,7 +2743,7 @@ public abstract class FSEditLogOp { /** * Operation corresponding to allow creating snapshot on a directory */ - static class AllowSnapshotOp extends FSEditLogOp { + static class AllowSnapshotOp extends FSEditLogOp { // @Idempotent String snapshotRoot; public AllowSnapshotOp() { @@ -2574,7 +2797,7 @@ public abstract class FSEditLogOp { /** * Operation corresponding to disallow creating snapshot on a directory */ - static class DisallowSnapshotOp extends FSEditLogOp { + static class DisallowSnapshotOp extends FSEditLogOp { // @Idempotent String snapshotRoot; public DisallowSnapshotOp() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java index 366bb18255b..51ab61b2171 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottab import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.ShortWritable; import org.apache.hadoop.io.Text; @@ -80,6 +81,7 @@ public class FSImageSerialization { static private final class TLData { final DeprecatedUTF8 U_STR = new DeprecatedUTF8(); final ShortWritable U_SHORT = new ShortWritable(); + final IntWritable U_INT = new IntWritable(); final LongWritable U_LONG = new LongWritable(); final FsPermission FILE_PERM = new FsPermission((short) 0); } @@ -350,9 +352,9 @@ public class FSImageSerialization { /** read the long value */ static long readLong(DataInput in) throws IOException { - LongWritable ustr = TL_DATA.get().U_LONG; - ustr.readFields(in); - return ustr.get(); + LongWritable uLong = TL_DATA.get().U_LONG; + uLong.readFields(in); + return uLong.get(); } /** write the long value */ @@ -361,6 +363,20 @@ public class FSImageSerialization { uLong.set(value); uLong.write(out); } + + /** read the int value */ + static int readInt(DataInput in) throws IOException { + IntWritable uInt = TL_DATA.get().U_INT; + uInt.readFields(in); + return uInt.get(); + } + + /** write the int value */ + static void writeInt(int value, DataOutputStream out) throws IOException { + IntWritable uInt = TL_DATA.get().U_INT; + uInt.set(value); + uInt.write(out); + } /** read short value */ static short readShort(DataInput in) throws IOException { @@ -414,8 +430,13 @@ public class FSImageSerialization { private static void writeLocalName(INodeAttributes inode, DataOutput out) throws IOException { final byte[] name = inode.getLocalNameBytes(); - out.writeShort(name.length); - out.write(name); + writeBytes(name, out); + } + + public static void writeBytes(byte[] data, DataOutput out) + throws IOException { + out.writeShort(data.length); + out.write(data); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index b75f552f133..ba62dad9aeb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -564,7 +564,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, FSImage fsImage = new FSImage(conf, FSNamesystem.getNamespaceDirs(conf), FSNamesystem.getNamespaceEditsDirs(conf)); - FSNamesystem namesystem = new FSNamesystem(conf, fsImage); + FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false); StartupOption startOpt = NameNode.getStartupOption(conf); if (startOpt == StartupOption.RECOVER) { namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); @@ -582,7 +582,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } return namesystem; } - + + FSNamesystem(Configuration conf, FSImage fsImage) throws IOException { + this(conf, fsImage, false); + } + /** * Create an FSNamesystem associated with the specified image. * @@ -591,9 +595,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * * @param conf configuration * @param fsImage The FSImage to associate with + * @param ignoreRetryCache Whether or not should ignore the retry cache setup + * step. For Secondary NN this should be set to true. * @throws IOException on bad configuration */ - FSNamesystem(Configuration conf, FSImage fsImage) throws IOException { + FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache) + throws IOException { try { resourceRecheckInterval = conf.getLong( DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY, @@ -684,7 +691,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, this.auditLoggers = initAuditLoggers(conf); this.isDefaultAuditLogger = auditLoggers.size() == 1 && auditLoggers.get(0) instanceof DefaultAuditLogger; - this.retryCache = initRetryCache(conf); + this.retryCache = ignoreRetryCache ? null : initRetryCache(conf); } catch(IOException e) { LOG.error(getClass().getSimpleName() + " initialization failed.", e); close(); @@ -696,6 +703,28 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } } + @VisibleForTesting + public RetryCache getRetryCache() { + return retryCache; + } + + /** Whether or not retry cache is enabled */ + boolean hasRetryCache() { + return retryCache != null; + } + + void addCacheEntryWithPayload(byte[] clientId, int callId, Object payload) { + if (retryCache != null) { + retryCache.addCacheEntryWithPayload(clientId, callId, payload); + } + } + + void addCacheEntry(byte[] clientId, int callId) { + if (retryCache != null) { + retryCache.addCacheEntry(clientId, callId); + } + } + @VisibleForTesting static RetryCache initRetryCache(Configuration conf) { boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, @@ -1536,7 +1565,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, boolean success = false; try { - concatInt(target, srcs); + concatInt(target, srcs, cacheEntry != null); success = true; } catch (AccessControlException e) { logAuditEvent(false, "concat", Arrays.toString(srcs), target, null); @@ -1546,8 +1575,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } } - private void concatInt(String target, String [] srcs) - throws IOException, UnresolvedLinkException { + private void concatInt(String target, String [] srcs, + boolean logRetryCache) throws IOException, UnresolvedLinkException { // verify args if(target.isEmpty()) { throw new IllegalArgumentException("Target file name is empty"); @@ -1576,7 +1605,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, if (isInSafeMode()) { throw new SafeModeException("Cannot concat " + target, safeMode); } - concatInternal(pc, target, srcs); + concatInternal(pc, target, srcs, logRetryCache); resultingStat = getAuditFileInfo(target, false); } finally { writeUnlock(); @@ -1586,8 +1615,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } /** See {@link #concat(String, String[])} */ - private void concatInternal(FSPermissionChecker pc, String target, String [] srcs) - throws IOException, UnresolvedLinkException { + private void concatInternal(FSPermissionChecker pc, String target, + String[] srcs, boolean logRetryCache) throws IOException, + UnresolvedLinkException { assert hasWriteLock(); // write permission for the target @@ -1691,7 +1721,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, Arrays.toString(srcs) + " to " + target); } - dir.concat(target,srcs); + dir.concat(target,srcs, logRetryCache); } /** @@ -1763,7 +1793,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } boolean success = false; try { - createSymlinkInt(target, link, dirPerms, createParent); + createSymlinkInt(target, link, dirPerms, createParent, cacheEntry != null); success = true; } catch (AccessControlException e) { logAuditEvent(false, "createSymlink", link, target, null); @@ -1774,7 +1804,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } private void createSymlinkInt(String target, String link, - PermissionStatus dirPerms, boolean createParent) + PermissionStatus dirPerms, boolean createParent, boolean logRetryCache) throws IOException, UnresolvedLinkException { if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target=" @@ -1805,7 +1835,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, checkFsObjectLimit(); // add symbolic link to namespace - dir.addSymlink(link, target, dirPerms, createParent); + dir.addSymlink(link, target, dirPerms, createParent, logRetryCache); resultingStat = getAuditFileInfo(link, false); } finally { writeUnlock(); @@ -1935,7 +1965,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, try { status = startFileInt(src, permissions, holder, clientMachine, flag, - createParent, replication, blockSize); + createParent, replication, blockSize, cacheEntry != null); } catch (AccessControlException e) { logAuditEvent(false, "create", src); throw e; @@ -1947,8 +1977,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, private HdfsFileStatus startFileInt(String src, PermissionStatus permissions, String holder, String clientMachine, EnumSet flag, - boolean createParent, short replication, long blockSize) - throws AccessControlException, SafeModeException, + boolean createParent, short replication, long blockSize, + boolean logRetryCache) throws AccessControlException, SafeModeException, FileAlreadyExistsException, UnresolvedLinkException, FileNotFoundException, ParentNotDirectoryException, IOException { if (NameNode.stateChangeLog.isDebugEnabled()) { @@ -1983,8 +2013,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throw new SafeModeException("Cannot create file" + src, safeMode); } src = FSDirectory.resolvePath(src, pathComponents, dir); - startFileInternal(pc, src, permissions, holder, clientMachine, - create, overwrite, createParent, replication, blockSize); + startFileInternal(pc, src, permissions, holder, clientMachine, create, + overwrite, createParent, replication, blockSize, logRetryCache); stat = dir.getFileInfo(src, false); } catch (StandbyException se) { skipSync = true; @@ -2014,8 +2044,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats, private void startFileInternal(FSPermissionChecker pc, String src, PermissionStatus permissions, String holder, String clientMachine, boolean create, boolean overwrite, boolean createParent, - short replication, long blockSize) throws FileAlreadyExistsException, - AccessControlException, UnresolvedLinkException, FileNotFoundException, + short replication, long blockSize, boolean logRetryEntry) + throws FileAlreadyExistsException, AccessControlException, + UnresolvedLinkException, FileNotFoundException, ParentNotDirectoryException, IOException { assert hasWriteLock(); // Verify that the destination does not exist as a directory already. @@ -2047,7 +2078,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } else { if (overwrite) { try { - deleteInt(src, true); // File exists - delete if overwrite + deleteInt(src, true, false); // File exists - delete if overwrite } catch (AccessControlException e) { logAuditEvent(false, "delete", src); throw e; @@ -2073,7 +2104,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, leaseManager.addLease(newNode.getClientName(), src); // record file record in log, record new generation stamp - getEditLog().logOpenFile(src, newNode); + getEditLog().logOpenFile(src, newNode, logRetryEntry); if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: " +"add "+src+" to namespace for "+holder); @@ -2102,8 +2133,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @return the last block locations if the block is partial or null otherwise */ private LocatedBlock appendFileInternal(FSPermissionChecker pc, String src, - String holder, String clientMachine) throws AccessControlException, - UnresolvedLinkException, FileNotFoundException, IOException { + String holder, String clientMachine, boolean logRetryCache) + throws AccessControlException, UnresolvedLinkException, + FileNotFoundException, IOException { assert hasWriteLock(); // Verify that the destination does not exist as a directory already. final INodesInPath iip = dir.getINodesInPath4Write(src); @@ -2128,7 +2160,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, final DatanodeDescriptor clientNode = blockManager.getDatanodeManager().getDatanodeByHost(clientMachine); return prepareFileForWrite(src, myFile, holder, clientMachine, clientNode, - true, iip.getLatestSnapshot()); + true, iip.getLatestSnapshot(), logRetryCache); } catch (IOException ie) { NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage()); throw ie; @@ -2145,13 +2177,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @param clientMachine identifier of the client machine * @param clientNode if the client is collocated with a DN, that DN's descriptor * @param writeToEditLog whether to persist this change to the edit log + * @param logRetryCache whether to record RPC ids in editlog for retry cache + * rebuilding * @return the last block locations if the block is partial or null otherwise * @throws UnresolvedLinkException * @throws IOException */ LocatedBlock prepareFileForWrite(String src, INodeFile file, String leaseHolder, String clientMachine, DatanodeDescriptor clientNode, - boolean writeToEditLog, Snapshot latestSnapshot) throws IOException { + boolean writeToEditLog, Snapshot latestSnapshot, boolean logRetryCache) + throws IOException { file = file.recordModification(latestSnapshot, dir.getINodeMap()); final INodeFileUnderConstruction cons = file.toUnderConstruction( leaseHolder, clientMachine, clientNode); @@ -2161,7 +2196,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons); if (writeToEditLog) { - getEditLog().logOpenFile(src, cons); + getEditLog().logOpenFile(src, cons, logRetryCache); } return ret; } @@ -2309,7 +2344,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, boolean success = false; try { - lb = appendFileInt(src, holder, clientMachine); + lb = appendFileInt(src, holder, clientMachine, cacheEntry != null); success = true; return lb; } catch (AccessControlException e) { @@ -2321,7 +2356,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } private LocatedBlock appendFileInt(String src, String holder, - String clientMachine) throws AccessControlException, SafeModeException, + String clientMachine, boolean logRetryCache) + throws AccessControlException, SafeModeException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, IOException { if (NameNode.stateChangeLog.isDebugEnabled()) { @@ -2347,7 +2383,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throw new SafeModeException("Cannot append to file" + src, safeMode); } src = FSDirectory.resolvePath(src, pathComponents, dir); - lb = appendFileInternal(pc, src, holder, clientMachine); + lb = appendFileInternal(pc, src, holder, clientMachine, logRetryCache); } catch (StandbyException se) { skipSync = true; throw se; @@ -2471,7 +2507,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, newBlock = createNewBlock(); saveAllocatedBlock(src, inodesInPath, newBlock, targets); - dir.persistBlocks(src, pendingFile); + dir.persistBlocks(src, pendingFile, false); offset = pendingFile.computeFileSize(); } finally { writeUnlock(); @@ -2673,7 +2709,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " + b + " is removed from pendingCreates"); } - dir.persistBlocks(src, file); + dir.persistBlocks(src, file, false); } finally { writeUnlock(); } @@ -2890,7 +2926,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } boolean ret = false; try { - ret = renameToInt(src, dst); + ret = renameToInt(src, dst, cacheEntry != null); } catch (AccessControlException e) { logAuditEvent(false, "rename", src, dst, null); throw e; @@ -2900,7 +2936,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, return ret; } - private boolean renameToInt(String src, String dst) + private boolean renameToInt(String src, String dst, boolean logRetryCache) throws IOException, UnresolvedLinkException { if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + @@ -2924,7 +2960,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, src = FSDirectory.resolvePath(src, srcComponents, dir); dst = FSDirectory.resolvePath(dst, dstComponents, dir); checkOperation(OperationCategory.WRITE); - status = renameToInternal(pc, src, dst); + status = renameToInternal(pc, src, dst, logRetryCache); if (status) { resultingStat = getAuditFileInfo(dst, false); } @@ -2940,8 +2976,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats, /** @deprecated See {@link #renameTo(String, String)} */ @Deprecated - private boolean renameToInternal(FSPermissionChecker pc, String src, String dst) - throws IOException, UnresolvedLinkException { + private boolean renameToInternal(FSPermissionChecker pc, String src, + String dst, boolean logRetryCache) throws IOException, + UnresolvedLinkException { assert hasWriteLock(); if (isPermissionEnabled) { //We should not be doing this. This is move() not renameTo(). @@ -2959,7 +2996,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, false); } - if (dir.renameTo(src, dst)) { + if (dir.renameTo(src, dst, logRetryCache)) { return true; } return false; @@ -2994,7 +3031,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } src = FSDirectory.resolvePath(src, srcComponents, dir); dst = FSDirectory.resolvePath(dst, dstComponents, dir); - renameToInternal(pc, src, dst, options); + renameToInternal(pc, src, dst, cacheEntry != null, options); resultingStat = getAuditFileInfo(dst, false); success = true; } finally { @@ -3012,7 +3049,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } private void renameToInternal(FSPermissionChecker pc, String src, String dst, - Options.Rename... options) throws IOException { + boolean logRetryCache, Options.Rename... options) throws IOException { assert hasWriteLock(); if (isPermissionEnabled) { // Rename does not operates on link targets @@ -3023,7 +3060,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, checkPermission(pc, dst, false, FsAction.WRITE, null, null, null, false); } - dir.renameTo(src, dst, options); + dir.renameTo(src, dst, logRetryCache, options); } /** @@ -3041,7 +3078,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } boolean ret = false; try { - ret = deleteInt(src, recursive); + ret = deleteInt(src, recursive, cacheEntry != null); } catch (AccessControlException e) { logAuditEvent(false, "delete", src); throw e; @@ -3051,13 +3088,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats, return ret; } - private boolean deleteInt(String src, boolean recursive) + private boolean deleteInt(String src, boolean recursive, boolean logRetryCache) throws AccessControlException, SafeModeException, UnresolvedLinkException, IOException { if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src); } - boolean status = deleteInternal(src, recursive, true); + boolean status = deleteInternal(src, recursive, true, logRetryCache); if (status) { logAuditEvent(true, "delete", src); } @@ -3085,7 +3122,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @see ClientProtocol#delete(String, boolean) for description of exceptions */ private boolean deleteInternal(String src, boolean recursive, - boolean enforcePermission) + boolean enforcePermission, boolean logRetryCache) throws AccessControlException, SafeModeException, UnresolvedLinkException, IOException { BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo(); @@ -3109,7 +3146,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, FsAction.ALL, false); } // Unlink the target directory from directory tree - if (!dir.delete(src, collectedBlocks, removedINodes)) { + if (!dir.delete(src, collectedBlocks, removedINodes, logRetryCache)) { return false; } ret = true; @@ -3438,7 +3475,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, if (lastBlockLength > 0) { pendingFile.updateLengthOfLastBlock(lastBlockLength); } - dir.persistBlocks(src, pendingFile); + dir.persistBlocks(src, pendingFile, false); } finally { writeUnlock(); } @@ -3735,7 +3772,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, src = closeFileCommitBlocks(pendingFile, storedBlock); } else { // If this commit does not want to close the file, persist blocks - src = persistBlocks(pendingFile); + src = persistBlocks(pendingFile, false); } } finally { writeUnlock(); @@ -3784,10 +3821,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @throws IOException */ @VisibleForTesting - String persistBlocks(INodeFileUnderConstruction pendingFile) - throws IOException { + String persistBlocks(INodeFileUnderConstruction pendingFile, + boolean logRetryCache) throws IOException { String src = leaseManager.findPath(pendingFile); - dir.persistBlocks(src, pendingFile); + dir.persistBlocks(src, pendingFile, logRetryCache); return src; } @@ -5595,7 +5632,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and " + oldBlock + " has different block identifier"; - updatePipelineInternal(clientName, oldBlock, newBlock, newNodes); + updatePipelineInternal(clientName, oldBlock, newBlock, newNodes, + cacheEntry != null); success = true; } finally { writeUnlock(); @@ -5607,7 +5645,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, /** @see #updatePipeline(String, ExtendedBlock, ExtendedBlock, DatanodeID[]) */ private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock, - ExtendedBlock newBlock, DatanodeID[] newNodes) + ExtendedBlock newBlock, DatanodeID[] newNodes, boolean logRetryCache) throws IOException { assert hasWriteLock(); // check the vadility of the block and lease holder name @@ -5642,7 +5680,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, blockinfo.setExpectedLocations(descriptors); String src = leaseManager.findPath(pendingFile); - dir.persistBlocks(src, pendingFile); + dir.persistBlocks(src, pendingFile, logRetryCache); } // rename was successful. If any part of the renamed subtree had @@ -6510,7 +6548,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } finally { dir.writeUnlock(); } - getEditLog().logCreateSnapshot(snapshotRoot, snapshotName); + getEditLog().logCreateSnapshot(snapshotRoot, snapshotName, + cacheEntry != null); } finally { writeUnlock(); RetryCache.setState(cacheEntry, snapshotPath != null, snapshotPath); @@ -6552,7 +6591,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, dir.verifySnapshotName(snapshotNewName, path); snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName); - getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName); + getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName, + cacheEntry != null); success = true; } finally { writeUnlock(); @@ -6676,7 +6716,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, removedINodes.clear(); this.removeBlocks(collectedBlocks); collectedBlocks.clear(); - getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName); + getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName, + cacheEntry != null); success = true; } finally { writeUnlock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index dccbc55db19..a933585523a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -733,7 +733,8 @@ public class NameNode implements NameNodeStatusMXBean { } /** get FSImage */ - FSImage getFSImage() { + @VisibleForTesting + public FSImage getFSImage() { return namesystem.dir.fsImage; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java index b370c3901af..47f7222b1b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java @@ -249,7 +249,7 @@ public class SecondaryNameNode implements Runnable { checkpointImage.recoverCreate(commandLineOpts.shouldFormat()); checkpointImage.deleteTempEdits(); - namesystem = new FSNamesystem(conf, checkpointImage); + namesystem = new FSNamesystem(conf, checkpointImage, true); // Initialize other scheduling parameters from the configuration checkpointConf = new CheckpointConf(conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java index 6738be3abcb..d23b27b7d35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java @@ -126,7 +126,7 @@ class ImageLoaderCurrent implements ImageLoader { new SimpleDateFormat("yyyy-MM-dd HH:mm"); private static int[] versions = { -16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39, - -40, -41, -42, -43, -44, -45, -46 }; + -40, -41, -42, -43, -44, -45, -46, -47 }; private int imageVersion = 0; private final Map subtreeMap = new HashMap(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index b39bfef10ea..f0c10b0a2fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -57,8 +57,11 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.fs.Options.Rename; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; @@ -922,4 +925,102 @@ public class DFSTestUtil { return new DFSTestUtil(nFiles, maxLevels, maxSize, minSize); } } + + /** + * Run a set of operations and generate all edit logs + */ + public static void runOperations(MiniDFSCluster cluster, + DistributedFileSystem filesystem, Configuration conf, long blockSize, + int nnIndex) throws IOException { + // create FileContext for rename2 + FileContext fc = FileContext.getFileContext(cluster.getURI(0), conf); + + // OP_ADD 0 + final Path pathFileCreate = new Path("/file_create"); + FSDataOutputStream s = filesystem.create(pathFileCreate); + // OP_CLOSE 9 + s.close(); + // OP_RENAME_OLD 1 + final Path pathFileMoved = new Path("/file_moved"); + filesystem.rename(pathFileCreate, pathFileMoved); + // OP_DELETE 2 + filesystem.delete(pathFileMoved, false); + // OP_MKDIR 3 + Path pathDirectoryMkdir = new Path("/directory_mkdir"); + filesystem.mkdirs(pathDirectoryMkdir); + // OP_ALLOW_SNAPSHOT 29 + filesystem.allowSnapshot(pathDirectoryMkdir); + // OP_DISALLOW_SNAPSHOT 30 + filesystem.disallowSnapshot(pathDirectoryMkdir); + // OP_CREATE_SNAPSHOT 26 + String ssName = "snapshot1"; + filesystem.allowSnapshot(pathDirectoryMkdir); + filesystem.createSnapshot(pathDirectoryMkdir, ssName); + // OP_RENAME_SNAPSHOT 28 + String ssNewName = "snapshot2"; + filesystem.renameSnapshot(pathDirectoryMkdir, ssName, ssNewName); + // OP_DELETE_SNAPSHOT 27 + filesystem.deleteSnapshot(pathDirectoryMkdir, ssNewName); + // OP_SET_REPLICATION 4 + s = filesystem.create(pathFileCreate); + s.close(); + filesystem.setReplication(pathFileCreate, (short)1); + // OP_SET_PERMISSIONS 7 + Short permission = 0777; + filesystem.setPermission(pathFileCreate, new FsPermission(permission)); + // OP_SET_OWNER 8 + filesystem.setOwner(pathFileCreate, new String("newOwner"), null); + // OP_CLOSE 9 see above + // OP_SET_GENSTAMP 10 see above + // OP_SET_NS_QUOTA 11 obsolete + // OP_CLEAR_NS_QUOTA 12 obsolete + // OP_TIMES 13 + long mtime = 1285195527000L; // Wed, 22 Sep 2010 22:45:27 GMT + long atime = mtime; + filesystem.setTimes(pathFileCreate, mtime, atime); + // OP_SET_QUOTA 14 + filesystem.setQuota(pathDirectoryMkdir, 1000L, + HdfsConstants.QUOTA_DONT_SET); + // OP_RENAME 15 + fc.rename(pathFileCreate, pathFileMoved, Rename.NONE); + // OP_CONCAT_DELETE 16 + Path pathConcatTarget = new Path("/file_concat_target"); + Path[] pathConcatFiles = new Path[2]; + pathConcatFiles[0] = new Path("/file_concat_0"); + pathConcatFiles[1] = new Path("/file_concat_1"); + + long length = blockSize * 3; // multiple of blocksize for concat + short replication = 1; + long seed = 1; + DFSTestUtil.createFile(filesystem, pathConcatTarget, length, replication, + seed); + DFSTestUtil.createFile(filesystem, pathConcatFiles[0], length, replication, + seed); + DFSTestUtil.createFile(filesystem, pathConcatFiles[1], length, replication, + seed); + filesystem.concat(pathConcatTarget, pathConcatFiles); + + // OP_SYMLINK 17 + Path pathSymlink = new Path("/file_symlink"); + fc.createSymlink(pathConcatTarget, pathSymlink, false); + + // OP_REASSIGN_LEASE 22 + String filePath = "/hard-lease-recovery-test"; + byte[] bytes = "foo-bar-baz".getBytes(); + DFSClientAdapter.stopLeaseRenewer(filesystem); + FSDataOutputStream leaseRecoveryPath = filesystem.create(new Path(filePath)); + leaseRecoveryPath.write(bytes); + leaseRecoveryPath.hflush(); + // Set the hard lease timeout to 1 second. + cluster.setLeasePeriod(60 * 1000, 1000, nnIndex); + // wait for lease recovery to complete + LocatedBlocks locatedBlocks; + do { + try { + Thread.sleep(1000); + } catch (InterruptedException e) {} + locatedBlocks = DFSClientAdapter.callGetBlockLocations( + cluster.getNameNodeRpc(nnIndex), filePath, 0L, bytes.length); + } while (locatedBlocks.isUnderConstruction()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 2daa9cc9a02..7090d498444 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2040,6 +2040,10 @@ public class MiniDFSCluster { NameNodeAdapter.setLeasePeriod(getNamesystem(), soft, hard); } + public void setLeasePeriod(long soft, long hard, int nnIndex) { + NameNodeAdapter.setLeasePeriod(getNamesystem(nnIndex), soft, hard); + } + public void setWaitSafeMode(boolean wait) { this.waitSafeMode = wait; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java index 852a34a8da5..fd7c734c336 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.Storage; -import org.apache.hadoop.hdfs.server.namenode.INodeId; /** * @@ -97,8 +96,9 @@ public class CreateEditsLog { dirInode = new INodeDirectory(inodeId.nextValue(), null, p, 0L); editLog.logMkDir(currentDir, dirInode); } - editLog.logOpenFile(filePath, new INodeFileUnderConstruction( - inodeId.nextValue(), p, replication, 0, blockSize, "", "", null)); + editLog.logOpenFile(filePath, + new INodeFileUnderConstruction(inodeId.nextValue(), p, replication, + 0, blockSize, "", "", null), false); editLog.logCloseFile(filePath, inode); if (currentBlockId - bidAtSync >= 2000) { // sync every 2K blocks diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java index 34ce90e995c..53007196200 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java @@ -18,9 +18,9 @@ package org.apache.hadoop.hdfs.server.namenode; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.*; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -28,7 +28,9 @@ import static org.mockito.Mockito.spy; import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; @@ -64,7 +66,7 @@ public class TestCommitBlockSynchronization { any(INodeFileUnderConstruction.class), any(BlockInfo.class)); doReturn("").when(namesystemSpy).persistBlocks( - any(INodeFileUnderConstruction.class)); + any(INodeFileUnderConstruction.class), anyBoolean()); doReturn(mock(FSEditLog.class)).when(namesystemSpy).getEditLog(); return namesystemSpy; @@ -127,7 +129,6 @@ public class TestCommitBlockSynchronization { INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class); Block block = new Block(blockId, length, genStamp); FSNamesystem namesystemSpy = makeNameSystemSpy(block, file); - DatanodeDescriptor[] targets = new DatanodeDescriptor[0]; DatanodeID[] newTargets = new DatanodeID[0]; ExtendedBlock lastBlock = new ExtendedBlock(); @@ -148,7 +149,6 @@ public class TestCommitBlockSynchronization { INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class); Block block = new Block(blockId, length, genStamp); FSNamesystem namesystemSpy = makeNameSystemSpy(block, file); - DatanodeDescriptor[] targets = new DatanodeDescriptor[0]; DatanodeID[] newTargets = new DatanodeID[0]; ExtendedBlock lastBlock = new ExtendedBlock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index a4e9b7aa2ac..f83a531b463 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -155,7 +155,7 @@ public class TestEditLog { INodeFileUnderConstruction inode = new INodeFileUnderConstruction( namesystem.allocateNewInodeId(), p, replication, blockSize, 0, "", "", null); - editLog.logOpenFile("/filename" + (startIndex + i), inode); + editLog.logOpenFile("/filename" + (startIndex + i), inode, false); editLog.logCloseFile("/filename" + (startIndex + i), inode); editLog.logSync(); } @@ -912,14 +912,14 @@ public class TestEditLog { log.setMetricsForTests(mockMetrics); for (int i = 0; i < 400; i++) { - log.logDelete(oneKB, 1L); + log.logDelete(oneKB, 1L, false); } // After ~400KB, we're still within the 512KB buffer size Mockito.verify(mockMetrics, Mockito.times(0)).addSync(Mockito.anyLong()); // After ~400KB more, we should have done an automatic sync for (int i = 0; i < 400; i++) { - log.logDelete(oneKB, 1L); + log.logDelete(oneKB, 1L, false); } Mockito.verify(mockMetrics, Mockito.times(1)).addSync(Mockito.anyLong()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java index 79e893961ad..dd637a9212d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java @@ -292,7 +292,7 @@ public class TestFSEditLogLoader { long thisTxId = spyLog.getLastWrittenTxId() + 1; offsetToTxId.put(trueOffset, thisTxId); System.err.println("txid " + thisTxId + " at offset " + trueOffset); - spyLog.logDelete("path" + i, i); + spyLog.logDelete("path" + i, i, false); spyLog.logSync(); } } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java index 299d49ec1bf..42f3f5764f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java @@ -30,8 +30,6 @@ import java.io.RandomAccessFile; import java.util.HashSet; import java.util.Set; -import junit.framework.Assert; - import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -276,7 +274,7 @@ public class TestNameNodeRecovery { } public int getMaxOpSize() { - return 30; + return 36; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java index b581c9ae206..03ee9fd5d1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java @@ -19,12 +19,17 @@ package org.apache.hadoop.hdfs.server.namenode; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.util.EnumSet; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnresolvedLinkException; @@ -32,19 +37,21 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.ipc.ClientId; import org.apache.hadoop.ipc.RPC.RpcKind; -import org.apache.hadoop.ipc.RetryCache; +import org.apache.hadoop.ipc.RetryCache.CacheEntry; import org.apache.hadoop.ipc.RpcConstants; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.LightWeightCache; import org.junit.After; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; /** @@ -61,19 +68,20 @@ import org.junit.Test; * request, a new callId is generated using {@link #newCall()}. */ public class TestNamenodeRetryCache { - private static final byte[] CLIENT_ID = StringUtils.getUuidBytes(); + private static final byte[] CLIENT_ID = ClientId.getClientId(); private static MiniDFSCluster cluster; private static FSNamesystem namesystem; private static PermissionStatus perm = new PermissionStatus( "TestNamenodeRetryCache", null, FsPermission.getDefault()); - private static FileSystem filesystem; + private static DistributedFileSystem filesystem; private static int callId = 100; + private static Configuration conf = new HdfsConfiguration(); + private static final int BlockSize = 512; /** Start a cluster */ - @BeforeClass - public static void setup() throws Exception { - Configuration conf = new HdfsConfiguration(); - conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "512"); + @Before + public void setup() throws Exception { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BlockSize); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, true); cluster = new MiniDFSCluster.Builder(conf).build(); cluster.waitActive(); @@ -109,8 +117,8 @@ public class TestNamenodeRetryCache { } private void concatSetup(String file1, String file2) throws Exception { - DFSTestUtil.createFile(filesystem, new Path(file1), 512, (short)1, 0L); - DFSTestUtil.createFile(filesystem, new Path(file2), 512, (short)1, 0L); + DFSTestUtil.createFile(filesystem, new Path(file1), BlockSize, (short)1, 0L); + DFSTestUtil.createFile(filesystem, new Path(file2), BlockSize, (short)1, 0L); } /** @@ -192,19 +200,19 @@ public class TestNamenodeRetryCache { // Two retried calls succeed newCall(); HdfsFileStatus status = namesystem.startFile(src, perm, "holder", - "clientmachine", EnumSet.of(CreateFlag.CREATE), true, (short) 1, 512); + "clientmachine", EnumSet.of(CreateFlag.CREATE), true, (short) 1, BlockSize); Assert.assertEquals(status, namesystem.startFile(src, perm, "holder", "clientmachine", EnumSet.of(CreateFlag.CREATE), - true, (short) 1, 512)); + true, (short) 1, BlockSize)); Assert.assertEquals(status, namesystem.startFile(src, perm, "holder", "clientmachine", EnumSet.of(CreateFlag.CREATE), - true, (short) 1, 512)); + true, (short) 1, BlockSize)); // A non-retried call fails newCall(); try { namesystem.startFile(src, perm, "holder", "clientmachine", - EnumSet.of(CreateFlag.CREATE), true, (short) 1, 512); + EnumSet.of(CreateFlag.CREATE), true, (short) 1, BlockSize); Assert.fail("testCreate - expected exception is not thrown"); } catch (IOException e) { // expected @@ -352,4 +360,41 @@ public class TestNamenodeRetryCache { conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, false); Assert.assertNull(FSNamesystem.initRetryCache(conf)); } + + /** + * After run a set of operations, restart NN and check if the retry cache has + * been rebuilt based on the editlog. + */ + @Test + public void testRetryCacheRebuild() throws Exception { + DFSTestUtil.runOperations(cluster, filesystem, conf, BlockSize, 0); + + LightWeightCache cacheSet = + (LightWeightCache) namesystem.getRetryCache().getCacheSet(); + assertEquals(14, cacheSet.size()); + + Map oldEntries = + new HashMap(); + Iterator iter = cacheSet.iterator(); + while (iter.hasNext()) { + CacheEntry entry = iter.next(); + oldEntries.put(entry, entry); + } + + // restart NameNode + cluster.restartNameNode(); + cluster.waitActive(); + namesystem = cluster.getNamesystem(); + + // check retry cache + assertTrue(namesystem.hasRetryCache()); + cacheSet = (LightWeightCache) namesystem + .getRetryCache().getCacheSet(); + assertEquals(14, cacheSet.size()); + iter = cacheSet.iterator(); + while (iter.hasNext()) { + CacheEntry entry = iter.next(); + assertTrue(oldEntries.containsKey(entry)); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java new file mode 100644 index 00000000000..7318a11e635 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.URI; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.io.retry.RetryInvocationHandler; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ipc.RetryCache.CacheEntry; +import org.apache.hadoop.util.LightWeightCache; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestRetryCacheWithHA { + private static final Log LOG = LogFactory.getLog(TestRetryCacheWithHA.class); + + private static MiniDFSCluster cluster; + private static DistributedFileSystem dfs; + private static Configuration conf = new HdfsConfiguration(); + + private static final int BlockSize = 1024; + private static final short DataNodes = 3; + private final static Map results = + new HashMap(); + + /** + * A dummy invocation handler extending RetryInvocationHandler. We can use + * a boolean flag to control whether the method invocation succeeds or not. + */ + private static class DummyRetryInvocationHandler extends + RetryInvocationHandler { + static AtomicBoolean block = new AtomicBoolean(false); + + DummyRetryInvocationHandler( + FailoverProxyProvider proxyProvider, + RetryPolicy retryPolicy) { + super(proxyProvider, retryPolicy); + } + + @Override + protected Object invokeMethod(Method method, Object[] args) + throws Throwable { + Object result = super.invokeMethod(method, args); + if (block.get()) { + throw new UnknownHostException("Fake Exception"); + } else { + return result; + } + } + } + + @Before + public void setup() throws Exception { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BlockSize); + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(DataNodes).build(); + cluster.waitActive(); + cluster.transitionToActive(0); + // setup the configuration + HATestUtil.setFailoverConfigurations(cluster, conf); + dfs = (DistributedFileSystem) HATestUtil.configureFailoverFs(cluster, conf); + } + + @After + public void cleanup() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * 1. Run a set of operations + * 2. Trigger the NN failover + * 3. Check the retry cache on the original standby NN + */ + @Test + public void testRetryCacheOnStandbyNN() throws Exception { + // 1. run operations + DFSTestUtil.runOperations(cluster, dfs, conf, BlockSize, 0); + + // check retry cache in NN1 + FSNamesystem fsn0 = cluster.getNamesystem(0); + LightWeightCache cacheSet = + (LightWeightCache) fsn0.getRetryCache().getCacheSet(); + assertEquals(14, cacheSet.size()); + + Map oldEntries = + new HashMap(); + Iterator iter = cacheSet.iterator(); + while (iter.hasNext()) { + CacheEntry entry = iter.next(); + oldEntries.put(entry, entry); + } + + // 2. Failover the current standby to active. + cluster.getNameNode(0).getRpcServer().rollEditLog(); + cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits(); + + cluster.shutdownNameNode(0); + cluster.transitionToActive(1); + + // 3. check the retry cache on the new active NN + FSNamesystem fsn1 = cluster.getNamesystem(1); + cacheSet = (LightWeightCache) fsn1 + .getRetryCache().getCacheSet(); + assertEquals(14, cacheSet.size()); + iter = cacheSet.iterator(); + while (iter.hasNext()) { + CacheEntry entry = iter.next(); + assertTrue(oldEntries.containsKey(entry)); + } + } + + private DFSClient genClientWithDummyHandler() throws IOException { + URI nnUri = dfs.getUri(); + Class> failoverProxyProviderClass = + NameNodeProxies.getFailoverProxyProviderClass(conf, nnUri, + ClientProtocol.class); + FailoverProxyProvider failoverProxyProvider = + NameNodeProxies.createFailoverProxyProvider(conf, + failoverProxyProviderClass, ClientProtocol.class, nnUri); + InvocationHandler dummyHandler = new DummyRetryInvocationHandler( + failoverProxyProvider, RetryPolicies + .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, + Integer.MAX_VALUE, + DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT, + DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT)); + ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance( + failoverProxyProvider.getInterface().getClassLoader(), + new Class[] { ClientProtocol.class }, dummyHandler); + + DFSClient client = new DFSClient(null, proxy, conf, null); + return client; + } + + /** + * When NN failover happens, if the client did not receive the response and + * send a retry request to the other NN, the same response should be recieved + * based on the retry cache. + * + * TODO: currently we only test the createSnapshot from the client side. We + * may need to cover all the calls with "@AtMostOnce" annotation. + */ + @Test + public void testClientRetryWithFailover() throws Exception { + final String dir = "/test"; + final Path dirPath = new Path(dir); + final String sName = "s1"; + final String dirSnapshot = dir + HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR + + Path.SEPARATOR + sName; + + dfs.mkdirs(dirPath); + dfs.allowSnapshot(dirPath); + + final DFSClient client = genClientWithDummyHandler(); + // set DummyRetryInvocationHandler#block to true + DummyRetryInvocationHandler.block.set(true); + + new Thread() { + @Override + public void run() { + try { + final String snapshotPath = client.createSnapshot(dir, "s1"); + assertEquals(dirSnapshot, snapshotPath); + LOG.info("Created snapshot " + snapshotPath); + synchronized (TestRetryCacheWithHA.this) { + results.put("createSnapshot", snapshotPath); + TestRetryCacheWithHA.this.notifyAll(); + } + } catch (IOException e) { + LOG.info("Got IOException " + e + " while creating snapshot"); + } finally { + IOUtils.cleanup(null, client); + } + } + }.start(); + + // make sure the client's createSnapshot call has actually been handled by + // the active NN + boolean snapshotCreated = dfs.exists(new Path(dirSnapshot)); + while (!snapshotCreated) { + Thread.sleep(1000); + snapshotCreated = dfs.exists(new Path(dirSnapshot)); + } + + // force the failover + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + // disable the block in DummyHandler + LOG.info("Setting block to false"); + DummyRetryInvocationHandler.block.set(false); + + synchronized (this) { + while (!results.containsKey("createSnapshot")) { + this.wait(); + } + LOG.info("Got the result of createSnapshot: " + + results.get("createSnapshot")); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored index 93638fc1836..111cec0bef7 100644 Binary files a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored and b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored differ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml index 0ade7eeb0b2..dbb3498d46d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml @@ -1,6 +1,6 @@ - -46 + -47 OP_START_LOG_SEGMENT @@ -13,8 +13,8 @@ 2 1 - 1372798673941 - 247c47b8bf6b89ec + 1375509063810 + 4d47710649039b98 @@ -24,8 +24,8 @@ 3 2 - 1372798673944 - ef1a35da6b4fc327 + 1375509063812 + 38cbb1d8fd90fcb2 @@ -37,16 +37,18 @@ 16386 /file_create_u\0001;F431 1 - 1372107474972 - 1372107474972 + 1374817864805 + 1374817864805 512 - DFSClient_NONMAPREDUCE_-1834501254_1 + DFSClient_NONMAPREDUCE_-1676409172_1 127.0.0.1 - aagarwal + jing supergroup 420 + 5245793a-984b-4264-8d7c-7890775547a0 + 8 @@ -57,13 +59,13 @@ 0 /file_create_u\0001;F431 1 - 1372107474983 - 1372107474972 + 1374817864816 + 1374817864805 512 - aagarwal + jing supergroup 420 @@ -76,7 +78,9 @@ 0 /file_create_u\0001;F431 /file_moved - 1372107474986 + 1374817864818 + 5245793a-984b-4264-8d7c-7890775547a0 + 10 @@ -85,7 +89,9 @@ 7 0 /file_moved - 1372107474989 + 1374817864822 + 5245793a-984b-4264-8d7c-7890775547a0 + 11 @@ -95,9 +101,9 @@ 0 16387 /directory_mkdir - 1372107474991 + 1374817864825 - aagarwal + jing supergroup 493 @@ -130,6 +136,8 @@ 12 /directory_mkdir snapshot1 + 5245793a-984b-4264-8d7c-7890775547a0 + 16 @@ -139,6 +147,8 @@ /directory_mkdir snapshot1 snapshot2 + 5245793a-984b-4264-8d7c-7890775547a0 + 17 @@ -147,6 +157,8 @@ 14 /directory_mkdir snapshot2 + 5245793a-984b-4264-8d7c-7890775547a0 + 18 @@ -157,16 +169,18 @@ 16388 /file_create_u\0001;F431 1 - 1372107475007 - 1372107475007 + 1374817864846 + 1374817864846 512 - DFSClient_NONMAPREDUCE_-1834501254_1 + DFSClient_NONMAPREDUCE_-1676409172_1 127.0.0.1 - aagarwal + jing supergroup 420 + 5245793a-984b-4264-8d7c-7890775547a0 + 19 @@ -177,13 +191,13 @@ 0 /file_create_u\0001;F431 1 - 1372107475009 - 1372107475007 + 1374817864848 + 1374817864846 512 - aagarwal + jing supergroup 420 @@ -239,8 +253,10 @@ 0 /file_create_u\0001;F431 /file_moved - 1372107475019 + 1374817864860 NONE + 5245793a-984b-4264-8d7c-7890775547a0 + 26 @@ -251,16 +267,18 @@ 16389 /file_concat_target 1 - 1372107475023 - 1372107475023 + 1374817864864 + 1374817864864 512 - DFSClient_NONMAPREDUCE_-1834501254_1 + DFSClient_NONMAPREDUCE_-1676409172_1 127.0.0.1 - aagarwal + jing supergroup 420 + 5245793a-984b-4264-8d7c-7890775547a0 + 28 @@ -287,6 +305,8 @@ 0 1001 + + -2 @@ -318,6 +338,8 @@ 0 1002 + + -2 @@ -354,6 +376,8 @@ 0 1003 + + -2 @@ -364,8 +388,8 @@ 0 /file_concat_target 1 - 1372107475091 - 1372107475023 + 1374817864927 + 1374817864864 512 @@ -385,7 +409,7 @@ 1003 - aagarwal + jing supergroup 420 @@ -399,16 +423,18 @@ 16390 /file_concat_0 1 - 1372107475093 - 1372107475093 + 1374817864929 + 1374817864929 512 - DFSClient_NONMAPREDUCE_-1834501254_1 + DFSClient_NONMAPREDUCE_-1676409172_1 127.0.0.1 - aagarwal + jing supergroup 420 + 5245793a-984b-4264-8d7c-7890775547a0 + 41 @@ -435,6 +461,8 @@ 0 1004 + + -2 @@ -466,6 +494,8 @@ 0 1005 + + -2 @@ -502,6 +532,8 @@ 0 1006 + + -2 @@ -512,8 +544,8 @@ 0 /file_concat_0 1 - 1372107475110 - 1372107475093 + 1374817864947 + 1374817864929 512 @@ -533,7 +565,7 @@ 1006 - aagarwal + jing supergroup 420 @@ -547,16 +579,18 @@ 16391 /file_concat_1 1 - 1372107475112 - 1372107475112 + 1374817864950 + 1374817864950 512 - DFSClient_NONMAPREDUCE_-1834501254_1 + DFSClient_NONMAPREDUCE_-1676409172_1 127.0.0.1 - aagarwal + jing supergroup 420 + 5245793a-984b-4264-8d7c-7890775547a0 + 53 @@ -583,6 +617,8 @@ 0 1007 + + -2 @@ -614,6 +650,8 @@ 0 1008 + + -2 @@ -650,6 +688,8 @@ 0 1009 + + -2 @@ -660,8 +700,8 @@ 0 /file_concat_1 1 - 1372107475131 - 1372107475112 + 1374817864966 + 1374817864950 512 @@ -681,7 +721,7 @@ 1009 - aagarwal + jing supergroup 420 @@ -693,11 +733,13 @@ 56 0 /file_concat_target - 1372107475133 + 1374817864967 /file_concat_0 /file_concat_1 + 5245793a-984b-4264-8d7c-7890775547a0 + 64 @@ -708,13 +750,15 @@ 16392 /file_symlink /file_concat_target - 1372107475137 - 1372107475137 + 1374817864971 + 1374817864971 - aagarwal + jing supergroup 511 + 5245793a-984b-4264-8d7c-7890775547a0 + 65 @@ -724,14 +768,14 @@ HDFS_DELEGATION_TOKEN 1 - aagarwal + jing JobTracker - 1372107475140 - 1372712275140 + 1374817864974 + 1375422664974 2 - 1372193875140 + 1374904264974 @@ -741,14 +785,14 @@ HDFS_DELEGATION_TOKEN 1 - aagarwal + jing JobTracker - 1372107475140 - 1372712275140 + 1374817864974 + 1375422664974 2 - 1372193875208 + 1374904265012 @@ -758,11 +802,11 @@ HDFS_DELEGATION_TOKEN 1 - aagarwal + jing JobTracker - 1372107475140 - 1372712275140 + 1374817864974 + 1375422664974 2 @@ -773,18 +817,20 @@ 61 0 16393 - /written_file + /hard-lease-recovery-test 1 - 1372107475214 - 1372107475214 + 1374817865017 + 1374817865017 512 - DFSClient_NONMAPREDUCE_-1834501254_1 + DFSClient_NONMAPREDUCE_-1676409172_1 127.0.0.1 - aagarwal + jing supergroup 420 + 5245793a-984b-4264-8d7c-7890775547a0 + 69 @@ -805,178 +851,42 @@ OP_UPDATE_BLOCKS 64 - /written_file + /hard-lease-recovery-test 1073741834 0 1010 + + -2 - OP_CLOSE + OP_UPDATE_BLOCKS 65 - 0 - 0 - /written_file - 1 - 1372107475221 - 1372107475214 - 512 - - + /hard-lease-recovery-test 1073741834 - 9 + 0 1010 - - aagarwal - supergroup - 420 - + + -2 - OP_ADD + OP_SET_GENSTAMP_V2 66 - 0 - 16393 - /written_file - 1 - 1372107475221 - 1372107475214 - 512 - DFSClient_NONMAPREDUCE_-1834501254_1 - 127.0.0.1 - - 1073741834 - 9 - 1010 - - - aagarwal - supergroup - 420 - - - - - OP_SET_GENSTAMP_V2 - - 67 1011 - - OP_UPDATE_BLOCKS - - 68 - /written_file - - 1073741834 - 9 - 1011 - - - - - OP_CLOSE - - 69 - 0 - 0 - /written_file - 1 - 1372107475272 - 1372107475221 - 512 - - - - 1073741834 - 26 - 1011 - - - aagarwal - supergroup - 420 - - - - - OP_ADD - - 70 - 0 - 16394 - /hard-lease-recovery-test - 1 - 1372107475275 - 1372107475275 - 512 - DFSClient_NONMAPREDUCE_-1834501254_1 - 127.0.0.1 - - aagarwal - supergroup - 420 - - - - - OP_ALLOCATE_BLOCK_ID - - 71 - 1073741835 - - - - OP_SET_GENSTAMP_V2 - - 72 - 1012 - - - - OP_UPDATE_BLOCKS - - 73 - /hard-lease-recovery-test - - 1073741835 - 0 - 1012 - - - - - OP_UPDATE_BLOCKS - - 74 - /hard-lease-recovery-test - - 1073741835 - 0 - 1012 - - - - - OP_SET_GENSTAMP_V2 - - 75 - 1013 - - OP_REASSIGN_LEASE - 76 - DFSClient_NONMAPREDUCE_-1834501254_1 + 67 + DFSClient_NONMAPREDUCE_-1676409172_1 /hard-lease-recovery-test HDFS_NameNode @@ -984,23 +894,23 @@ OP_CLOSE - 77 + 68 0 0 /hard-lease-recovery-test 1 - 1372107477870 - 1372107475275 + 1374817867688 + 1374817865017 512 - 1073741835 + 1073741834 11 - 1013 + 1011 - aagarwal + jing supergroup 420 @@ -1009,7 +919,7 @@ OP_END_LOG_SEGMENT - 78 + 69