HDFS-5025. Record ClientId and CallId in EditLog to enable rebuilding retry cache in case of HA failover. Contributed by Jing Zhao.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1508332 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-07-30 07:51:38 +00:00
parent 3be4036abe
commit 8c7a7e6196
31 changed files with 1207 additions and 419 deletions

View File

@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.Client.ConnectionId;
@ -38,7 +39,12 @@ import org.apache.hadoop.util.ThreadUtil;
import com.google.common.annotations.VisibleForTesting;
class RetryInvocationHandler implements RpcInvocationHandler {
/**
* This class implements RpcInvocationHandler and supports retry on the client
* side.
*/
@InterfaceAudience.Private
public class RetryInvocationHandler implements RpcInvocationHandler {
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
private final FailoverProxyProvider proxyProvider;

View File

@ -1161,7 +1161,7 @@ public class Client {
CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.clientId = StringUtils.getUuidBytes();
this.clientId = ClientId.getClientId();
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.nio.ByteBuffer;
import java.util.UUID;
import org.apache.hadoop.classification.InterfaceAudience;
import com.google.common.base.Preconditions;
/**
* A class defining a set of static helper methods to provide conversion between
* bytes and string for UUID-based client Id.
*/
@InterfaceAudience.Private
public class ClientId {
/** The byte array of a UUID should be 16 */
public static final int BYTE_LENGTH = 16;
/**
* Return clientId as byte[]
*/
public static byte[] getClientId() {
UUID uuid = UUID.randomUUID();
ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]);
buf.putLong(uuid.getMostSignificantBits());
buf.putLong(uuid.getLeastSignificantBits());
return buf.array();
}
/** Convert a clientId byte[] to string */
public static String toString(byte[] clientId) {
// clientId can be null or an empty array
if (clientId == null || clientId.length == 0) {
return "";
}
// otherwise should be 16 bytes
Preconditions.checkArgument(clientId.length == BYTE_LENGTH);
long msb = 0;
long lsb = 0;
for (int i = 0; i < 8; i++) {
msb = (msb << 8) | (clientId[i] & 0xff);
}
for (int i = 8; i < 16; i++) {
lsb = (lsb << 8) | (clientId[i] & 0xff);
}
return (new UUID(msb, lsb)).toString();
}
/** Convert from clientId string byte[] representation of clientId */
public static byte[] toBytes(String id) {
if (id == null || "".equals(id)) {
return new byte[0];
}
UUID uuid = UUID.fromString(id);
ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]);
buf.putLong(uuid.getMostSignificantBits());
buf.putLong(uuid.getLeastSignificantBits());
return buf.array();
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.ipc;
import java.util.Arrays;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -27,6 +28,7 @@ import org.apache.hadoop.util.LightWeightCache;
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@ -64,8 +66,9 @@ public class RetryCache {
CacheEntry(byte[] clientId, int callId, long expirationTime) {
// ClientId must be a UUID - that is 16 octets.
Preconditions.checkArgument(clientId.length == 16,
"Invalid clientId - must be UUID of size 16 octets");
Preconditions.checkArgument(clientId.length == ClientId.BYTE_LENGTH,
"Invalid clientId - length is " + clientId.length
+ " expected length " + ClientId.BYTE_LENGTH);
// Convert UUID bytes to two longs
long tmp = 0;
for (int i=0; i<8; i++) {
@ -131,6 +134,12 @@ public class RetryCache {
public long getExpirationTime() {
return expirationTime;
}
@Override
public String toString() {
return (new UUID(this.clientIdMsb, this.clientIdLsb)).toString() + ":"
+ this.callId + ":" + this.state;
}
}
/**
@ -186,6 +195,11 @@ public class RetryCache {
return !Server.isRpcInvocation() || Server.getCallId() < 0
|| Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID);
}
@VisibleForTesting
public LightWeightGSet<CacheEntry, CacheEntry> getCacheSet() {
return set;
}
/**
* This method handles the following conditions:
@ -240,6 +254,26 @@ public class RetryCache {
}
return mapEntry;
}
/**
* Add a new cache entry into the retry cache. The cache entry consists of
* clientId and callId extracted from editlog.
*/
public void addCacheEntry(byte[] clientId, int callId) {
CacheEntry newEntry = new CacheEntry(clientId, callId, System.nanoTime()
+ expirationTime);
newEntry.completed(true);
set.put(newEntry);
}
public void addCacheEntryWithPayload(byte[] clientId, int callId,
Object payload) {
CacheEntry newEntry = new CacheEntryWithPayload(clientId, callId, payload,
System.nanoTime() + expirationTime);
// since the entry is loaded from editlog, we can assume it succeeded.
newEntry.completed(true);
set.put(newEntry);
}
private static CacheEntry newEntry(long expirationTime) {
return new CacheEntry(Server.getClientId(), Server.getCallId(),

View File

@ -22,7 +22,6 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Arrays;
@ -33,7 +32,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -42,7 +40,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Shell;
import com.google.common.net.InetAddresses;
@ -897,17 +894,6 @@ public class StringUtils {
return sb.toString();
}
/**
* Return a new UUID as byte[]
*/
public static byte[] getUuidBytes() {
UUID uuid = UUID.randomUUID();
ByteBuffer buf = ByteBuffer.wrap(new byte[16]);
buf.putLong(uuid.getMostSignificantBits());
buf.putLong(uuid.getLeastSignificantBits());
return buf.array();
}
/**
* Get stack trace for a given thread.
*/

View File

@ -29,8 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -39,7 +37,7 @@ import org.junit.Test;
* Tests for {@link RetryCache}
*/
public class TestRetryCache {
private static final byte[] CLIENT_ID = StringUtils.getUuidBytes();
private static final byte[] CLIENT_ID = ClientId.getClientId();
private static int callId = 100;
private static final Random r = new Random();
private static final TestServer testServer = new TestServer();

View File

@ -17,8 +17,8 @@
*/
package org.apache.hadoop.util;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -26,6 +26,7 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.ipc.ClientId;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
@ -78,7 +79,7 @@ public class TestProtoUtil {
@Test
public void testRpcClientId() {
byte[] uuid = StringUtils.getUuidBytes();
byte[] uuid = ClientId.getClientId();
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0,
RpcConstants.INVALID_RETRY_COUNT, uuid);

View File

@ -343,6 +343,9 @@ Release 2.1.0-beta - 2013-07-02
protocol methods. (suresh)
HDFS-4979. Implement retry cache on Namenode. (suresh)
HDFS-5025. Record ClientId and CallId in EditLog to enable rebuilding
retry cache in case of HA failover. (Jing Zhao via suresh)
IMPROVEMENTS

View File

@ -453,7 +453,8 @@ public class DFSClient implements java.io.Closeable {
* Create a new DFSClient connected to the given nameNodeUri or rpcNamenode.
* Exactly one of nameNodeUri or rpcNamenode must be null.
*/
DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
@VisibleForTesting
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
// Copy only the required DFSClient configuration

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@ -307,7 +308,8 @@ public class NameNodeProxies {
}
/** Gets the configured Failover proxy provider's class */
private static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
@VisibleForTesting
public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
if (nameNodeUri == null) {
return null;
@ -344,7 +346,8 @@ public class NameNodeProxies {
}
/** Creates the Failover proxy provider instance*/
private static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
@VisibleForTesting
public static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
Configuration conf, Class<FailoverProxyProvider<T>> failoverProxyProviderClass,
Class<T> xface, URI nameNodeUri) throws IOException {
Preconditions.checkArgument(

View File

@ -104,7 +104,9 @@ public class LayoutVersion {
OPTIMIZE_SNAPSHOT_INODES(-45, -43,
"Reduce snapshot inode memory footprint", false),
SEQUENTIAL_BLOCK_ID(-46, "Allocate block IDs sequentially and store " +
"block IDs in the edits log and image files");
"block IDs in the edits log and image files"),
EDITLOG_SUPPORT_RETRYCACHE(-47, "Record ClientId and CallId in editlog to "
+ "enable rebuilding retry cache in case of HA failover");
final int lv;
final int ancestorLV;

View File

@ -381,12 +381,13 @@ public class FSDirectory implements Closeable {
/**
* Persist the block list for the inode.
*/
void persistBlocks(String path, INodeFileUnderConstruction file) {
void persistBlocks(String path, INodeFileUnderConstruction file,
boolean logRetryCache) {
waitForReady();
writeLock();
try {
fsImage.getEditLog().logUpdateBlocks(path, file);
fsImage.getEditLog().logUpdateBlocks(path, file, logRetryCache);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.persistBlocks: "
+path+" with "+ file.getBlocks().length
@ -459,7 +460,7 @@ public class FSDirectory implements Closeable {
* @deprecated Use {@link #renameTo(String, String, Rename...)} instead.
*/
@Deprecated
boolean renameTo(String src, String dst)
boolean renameTo(String src, String dst, boolean logRetryCache)
throws QuotaExceededException, UnresolvedLinkException,
FileAlreadyExistsException, SnapshotAccessControlException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
@ -475,14 +476,15 @@ public class FSDirectory implements Closeable {
} finally {
writeUnlock();
}
fsImage.getEditLog().logRename(src, dst, now);
fsImage.getEditLog().logRename(src, dst, now, logRetryCache);
return true;
}
/**
* @see #unprotectedRenameTo(String, String, long, Options.Rename...)
*/
void renameTo(String src, String dst, Options.Rename... options)
void renameTo(String src, String dst, boolean logRetryCache,
Options.Rename... options)
throws FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, QuotaExceededException,
UnresolvedLinkException, IOException {
@ -500,7 +502,7 @@ public class FSDirectory implements Closeable {
} finally {
writeUnlock();
}
fsImage.getEditLog().logRename(src, dst, now, options);
fsImage.getEditLog().logRename(src, dst, now, logRetryCache, options);
}
/**
@ -1176,7 +1178,7 @@ public class FSDirectory implements Closeable {
/**
* Concat all the blocks from srcs to trg and delete the srcs files
*/
void concat(String target, String [] srcs)
void concat(String target, String [] srcs, boolean supportRetryCache)
throws UnresolvedLinkException, QuotaExceededException,
SnapshotAccessControlException, SnapshotException {
writeLock();
@ -1186,7 +1188,8 @@ public class FSDirectory implements Closeable {
long timestamp = now();
unprotectedConcat(target, srcs, timestamp);
// do the commit
fsImage.getEditLog().logConcat(target, srcs, timestamp);
fsImage.getEditLog().logConcat(target, srcs, timestamp,
supportRetryCache);
} finally {
writeUnlock();
}
@ -1261,10 +1264,12 @@ public class FSDirectory implements Closeable {
* @param src Path of a directory to delete
* @param collectedBlocks Blocks under the deleted directory
* @param removedINodes INodes that should be removed from {@link #inodeMap}
* @param logRetryCache Whether to record RPC IDs in editlog to support retry
* cache rebuilding.
* @return true on successful deletion; else false
*/
boolean delete(String src, BlocksMapUpdateInfo collectedBlocks,
List<INode> removedINodes) throws IOException {
List<INode> removedINodes, boolean logRetryCache) throws IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
}
@ -1299,7 +1304,7 @@ public class FSDirectory implements Closeable {
if (filesRemoved < 0) {
return false;
}
fsImage.getEditLog().logDelete(src, now);
fsImage.getEditLog().logDelete(src, now, logRetryCache);
incrDeletedFileCount(filesRemoved);
// Blocks/INodes will be handled later by the caller of this method
getFSNamesystem().removePathAndBlocks(src, null, null);
@ -2522,7 +2527,7 @@ public class FSDirectory implements Closeable {
/**
* Create FileStatus by file INode
*/
private HdfsFileStatus createFileStatus(byte[] path, INode node,
HdfsFileStatus createFileStatus(byte[] path, INode node,
Snapshot snapshot) {
long size = 0; // length is zero for directories
short replication = 0;
@ -2595,7 +2600,7 @@ public class FSDirectory implements Closeable {
* Add the given symbolic link to the fs. Record it in the edits log.
*/
INodeSymlink addSymlink(String path, String target,
PermissionStatus dirPerms, boolean createParent)
PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
throws UnresolvedLinkException, FileAlreadyExistsException,
QuotaExceededException, SnapshotAccessControlException {
waitForReady();
@ -2621,7 +2626,8 @@ public class FSDirectory implements Closeable {
NameNode.stateChangeLog.info("DIR* addSymlink: failed to add " + path);
return null;
}
fsImage.getEditLog().logSymlink(path, target, modTime, modTime, newNode);
fsImage.getEditLog().logSymlink(path, target, modTime, modTime, newNode,
logRetryCache);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* addSymlink: " + path + " is added");

View File

@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import com.google.common.annotations.VisibleForTesting;
@ -661,11 +662,20 @@ public class FSEditLog implements LogsPurgeable {
LOG.info(buf);
}
/** Record the RPC IDs if necessary */
private void logRpcIds(FSEditLogOp op, boolean toLogRpcIds) {
if (toLogRpcIds) {
op.setRpcClientId(Server.getClientId());
op.setRpcCallId(Server.getCallId());
}
}
/**
* Add open lease record to edit log.
* Records the block locations of the last block.
*/
public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
public void logOpenFile(String path, INodeFileUnderConstruction newNode,
boolean toLogRpcIds) {
AddOp op = AddOp.getInstance(cache.get())
.setInodeId(newNode.getId())
.setPath(path)
@ -677,8 +687,8 @@ public class FSEditLog implements LogsPurgeable {
.setPermissionStatus(newNode.getPermissionStatus())
.setClientName(newNode.getClientName())
.setClientMachine(newNode.getClientMachine());
logEdit(op);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
@ -697,10 +707,12 @@ public class FSEditLog implements LogsPurgeable {
logEdit(op);
}
public void logUpdateBlocks(String path, INodeFileUnderConstruction file) {
public void logUpdateBlocks(String path, INodeFileUnderConstruction file,
boolean toLogRpcIds) {
UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get())
.setPath(path)
.setBlocks(file.getBlocks());
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
@ -720,23 +732,26 @@ public class FSEditLog implements LogsPurgeable {
* Add rename record to edit log
* TODO: use String parameters until just before writing to disk
*/
void logRename(String src, String dst, long timestamp) {
void logRename(String src, String dst, long timestamp, boolean toLogRpcIds) {
RenameOldOp op = RenameOldOp.getInstance(cache.get())
.setSource(src)
.setDestination(dst)
.setTimestamp(timestamp);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
* Add rename record to edit log
*/
void logRename(String src, String dst, long timestamp, Options.Rename... options) {
void logRename(String src, String dst, long timestamp, boolean toLogRpcIds,
Options.Rename... options) {
RenameOp op = RenameOp.getInstance(cache.get())
.setSource(src)
.setDestination(dst)
.setTimestamp(timestamp)
.setOptions(options);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
@ -783,21 +798,23 @@ public class FSEditLog implements LogsPurgeable {
/**
* concat(trg,src..) log
*/
void logConcat(String trg, String [] srcs, long timestamp) {
void logConcat(String trg, String[] srcs, long timestamp, boolean toLogRpcIds) {
ConcatDeleteOp op = ConcatDeleteOp.getInstance(cache.get())
.setTarget(trg)
.setSources(srcs)
.setTimestamp(timestamp);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
* Add delete file record to edit log
*/
void logDelete(String src, long timestamp) {
void logDelete(String src, long timestamp, boolean toLogRpcIds) {
DeleteOp op = DeleteOp.getInstance(cache.get())
.setPath(src)
.setTimestamp(timestamp);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
@ -842,8 +859,8 @@ public class FSEditLog implements LogsPurgeable {
/**
* Add a create symlink record.
*/
void logSymlink(String path, String value, long mtime,
long atime, INodeSymlink node) {
void logSymlink(String path, String value, long mtime, long atime,
INodeSymlink node, boolean toLogRpcIds) {
SymlinkOp op = SymlinkOp.getInstance(cache.get())
.setId(node.getId())
.setPath(path)
@ -851,6 +868,7 @@ public class FSEditLog implements LogsPurgeable {
.setModificationTime(mtime)
.setAccessTime(atime)
.setPermissionStatus(node.getPermissionStatus());
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
@ -895,22 +913,26 @@ public class FSEditLog implements LogsPurgeable {
logEdit(op);
}
void logCreateSnapshot(String snapRoot, String snapName) {
void logCreateSnapshot(String snapRoot, String snapName, boolean toLogRpcIds) {
CreateSnapshotOp op = CreateSnapshotOp.getInstance(cache.get())
.setSnapshotRoot(snapRoot).setSnapshotName(snapName);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logDeleteSnapshot(String snapRoot, String snapName) {
void logDeleteSnapshot(String snapRoot, String snapName, boolean toLogRpcIds) {
DeleteSnapshotOp op = DeleteSnapshotOp.getInstance(cache.get())
.setSnapshotRoot(snapRoot).setSnapshotName(snapName);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logRenameSnapshot(String path, String snapOldName, String snapNewName) {
void logRenameSnapshot(String path, String snapOldName, String snapNewName,
boolean toLogRpcIds) {
RenameSnapshotOp op = RenameSnapshotOp.getInstance(cache.get())
.setSnapshotRoot(path).setSnapshotOldName(snapOldName)
.setSnapshotNewName(snapNewName);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}

View File

@ -33,8 +33,10 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.common.Storage;
@ -275,7 +277,8 @@ public class FSEditLogLoader {
if (LOG.isTraceEnabled()) {
LOG.trace("replaying edit log: " + op);
}
final boolean toAddRetryCache = fsNamesys.hasRetryCache() && op.hasRpcIds();
switch (op.opCode) {
case OP_ADD: {
AddCloseOp addCloseOp = (AddCloseOp)op;
@ -298,8 +301,8 @@ public class FSEditLogLoader {
if (oldFile == null) { // this is OP_ADD on a new file (case 1)
// versions > 0 support per file replication
// get name and replication
final short replication = fsNamesys.getBlockManager(
).adjustReplication(addCloseOp.replication);
final short replication = fsNamesys.getBlockManager()
.adjustReplication(addCloseOp.replication);
assert addCloseOp.blocks.length == 0;
// add to the file tree
@ -311,6 +314,13 @@ public class FSEditLogLoader {
addCloseOp.clientName, addCloseOp.clientMachine);
fsNamesys.leaseManager.addLease(addCloseOp.clientName, addCloseOp.path);
// add the op into retry cache if necessary
if (toAddRetryCache) {
HdfsFileStatus stat = fsNamesys.dir.createFileStatus(
HdfsFileStatus.EMPTY_NAME, newFile, null);
fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
addCloseOp.rpcCallId, stat);
}
} else { // This is OP_ADD on an existing file
if (!oldFile.isUnderConstruction()) {
// This is case 3: a call to append() on an already-closed file.
@ -318,11 +328,17 @@ public class FSEditLogLoader {
FSNamesystem.LOG.debug("Reopening an already-closed file " +
"for append");
}
fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile,
addCloseOp.clientName, addCloseOp.clientMachine, null,
false, iip.getLatestSnapshot());
LocatedBlock lb = fsNamesys.prepareFileForWrite(addCloseOp.path,
oldFile, addCloseOp.clientName, addCloseOp.clientMachine, null,
false, iip.getLatestSnapshot(), false);
newFile = INodeFile.valueOf(fsDir.getINode(addCloseOp.path),
addCloseOp.path, true);
// add the op into retry cache is necessary
if (toAddRetryCache) {
fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
addCloseOp.rpcCallId, lb);
}
}
}
// Fall-through for case 2.
@ -382,6 +398,10 @@ public class FSEditLogLoader {
updateOp.path);
// Update in-memory data structures
updateBlocks(fsDir, updateOp, oldFile);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId);
}
break;
}
@ -397,17 +417,30 @@ public class FSEditLogLoader {
ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op;
fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs,
concatDeleteOp.timestamp);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(concatDeleteOp.rpcClientId,
concatDeleteOp.rpcCallId);
}
break;
}
case OP_RENAME_OLD: {
RenameOldOp renameOp = (RenameOldOp)op;
fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
renameOp.timestamp);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId);
}
break;
}
case OP_DELETE: {
DeleteOp deleteOp = (DeleteOp)op;
fsDir.unprotectedDelete(deleteOp.path, deleteOp.timestamp);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(deleteOp.rpcClientId, deleteOp.rpcCallId);
}
break;
}
case OP_MKDIR: {
@ -472,12 +505,20 @@ public class FSEditLogLoader {
fsDir.unprotectedAddSymlink(inodeId, symlinkOp.path,
symlinkOp.value, symlinkOp.mtime,
symlinkOp.atime, symlinkOp.permissionStatus);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(symlinkOp.rpcClientId, symlinkOp.rpcCallId);
}
break;
}
case OP_RENAME: {
RenameOp renameOp = (RenameOp)op;
fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
renameOp.timestamp, renameOp.options);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId);
}
break;
}
case OP_GET_DELEGATION_TOKEN: {
@ -530,8 +571,12 @@ public class FSEditLogLoader {
}
case OP_CREATE_SNAPSHOT: {
CreateSnapshotOp createSnapshotOp = (CreateSnapshotOp) op;
fsNamesys.getSnapshotManager().createSnapshot(
String path = fsNamesys.getSnapshotManager().createSnapshot(
createSnapshotOp.snapshotRoot, createSnapshotOp.snapshotName);
if (toAddRetryCache) {
fsNamesys.addCacheEntryWithPayload(createSnapshotOp.rpcClientId,
createSnapshotOp.rpcCallId, path);
}
break;
}
case OP_DELETE_SNAPSHOT: {
@ -545,6 +590,11 @@ public class FSEditLogLoader {
collectedBlocks.clear();
fsNamesys.dir.removeFromInodeMap(removedINodes);
removedINodes.clear();
if (toAddRetryCache) {
fsNamesys.addCacheEntry(deleteSnapshotOp.rpcClientId,
deleteSnapshotOp.rpcCallId);
}
break;
}
case OP_RENAME_SNAPSHOT: {
@ -552,6 +602,11 @@ public class FSEditLogLoader {
fsNamesys.getSnapshotManager().renameSnapshot(
renameSnapshotOp.snapshotRoot, renameSnapshotOp.snapshotOldName,
renameSnapshotOp.snapshotNewName);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(renameSnapshotOp.rpcClientId,
renameSnapshotOp.rpcCallId);
}
break;
}
case OP_ALLOW_SNAPSHOT: {

View File

@ -17,55 +17,88 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOCATE_BLOCK_ID;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOW_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CLEAR_NS_QUOTA;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CLOSE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CONCAT_DELETE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CREATE_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DISALLOW_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_END_LOG_SEGMENT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MKDIR;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENEW_DELEGATION_TOKEN;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V1;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V2;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_NS_QUOTA;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_OWNER;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_PERMISSIONS;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_QUOTA;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_REPLICATION;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_START_LOG_SEGMENT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SYMLINK;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TIMES;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_BLOCKS;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_MASTER_KEY;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.List;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.util.PureJavaCrc32;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.util.XMLUtils;
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.hdfs.util.XMLUtils;
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.ipc.ClientId;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.PureJavaCrc32;
import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.AttributesImpl;
import com.google.common.base.Preconditions;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.EOFException;
/**
* Helper classes for reading the ops from an InputStream.
* All ops derive from FSEditLogOp and are only
@ -76,6 +109,8 @@ import java.io.EOFException;
public abstract class FSEditLogOp {
public final FSEditLogOpCodes opCode;
long txid;
byte[] rpcClientId = RpcConstants.DUMMY_CLIENT_ID;
int rpcCallId = RpcConstants.INVALID_CALL_ID;
@SuppressWarnings("deprecation")
final public static class OpInstanceCache {
@ -150,6 +185,31 @@ public abstract class FSEditLogOp {
public void setTransactionId(long txid) {
this.txid = txid;
}
public boolean hasRpcIds() {
return rpcClientId != RpcConstants.DUMMY_CLIENT_ID
&& rpcCallId != RpcConstants.INVALID_CALL_ID;
}
/** this has to be called after calling {@link #hasRpcIds()} */
public byte[] getClientId() {
Preconditions.checkState(rpcClientId != RpcConstants.DUMMY_CLIENT_ID);
return rpcClientId;
}
public void setRpcClientId(byte[] clientId) {
this.rpcClientId = clientId;
}
/** this has to be called after calling {@link #hasRpcIds()} */
public int getCallId() {
Preconditions.checkState(rpcCallId != RpcConstants.INVALID_CALL_ID);
return rpcCallId;
}
public void setRpcCallId(int callId) {
this.rpcCallId = callId;
}
abstract void readFields(DataInputStream in, int logVersion)
throws IOException;
@ -163,6 +223,46 @@ public abstract class FSEditLogOp {
boolean shouldCompleteLastBlock();
}
private static void writeRpcIds(final byte[] clientId, final int callId,
DataOutputStream out) throws IOException {
FSImageSerialization.writeBytes(clientId, out);
FSImageSerialization.writeInt(callId, out);
}
void readRpcIds(DataInputStream in, int logVersion)
throws IOException {
if (LayoutVersion.supports(Feature.EDITLOG_SUPPORT_RETRYCACHE,
logVersion)) {
this.rpcClientId = FSImageSerialization.readBytes(in);
this.rpcCallId = FSImageSerialization.readInt(in);
}
}
void readRpcIdsFromXml(Stanza st) {
this.rpcClientId = st.hasChildren("RPC_CLIENTID") ?
ClientId.toBytes(st.getValue("RPC_CLIENTID"))
: RpcConstants.DUMMY_CLIENT_ID;
this.rpcCallId = st.hasChildren("RPC_CALLID") ?
Integer.valueOf(st.getValue("RPC_CALLID"))
: RpcConstants.INVALID_CALL_ID;
}
private static void appendRpcIdsToString(final StringBuilder builder,
final byte[] clientId, final int callId) {
builder.append(", RpcClientId=");
builder.append(ClientId.toString(clientId));
builder.append(", RpcCallId=");
builder.append(callId);
}
private static void appendRpcIdsToXml(ContentHandler contentHandler,
final byte[] clientId, final int callId) throws SAXException {
XMLUtils.addSaxString(contentHandler, "RPC_CLIENTID",
ClientId.toString(clientId));
XMLUtils.addSaxString(contentHandler, "RPC_CALLID",
Integer.valueOf(callId).toString());
}
@SuppressWarnings("unchecked")
static abstract class AddCloseOp extends FSEditLogOp implements BlockListUpdatingOp {
int length;
@ -176,7 +276,7 @@ public abstract class FSEditLogOp {
PermissionStatus permissions;
String clientName;
String clientMachine;
private AddCloseOp(FSEditLogOpCodes opCode) {
super(opCode);
assert(opCode == OP_ADD || opCode == OP_CLOSE);
@ -247,8 +347,7 @@ public abstract class FSEditLogOp {
}
@Override
public
void writeFields(DataOutputStream out) throws IOException {
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeLong(inodeId, out);
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeShort(replication, out);
@ -261,6 +360,8 @@ public abstract class FSEditLogOp {
if (this.opCode == OP_ADD) {
FSImageSerialization.writeString(clientName,out);
FSImageSerialization.writeString(clientMachine,out);
// write clientId and callId
writeRpcIds(rpcClientId, rpcCallId, out);
}
}
@ -317,6 +418,8 @@ public abstract class FSEditLogOp {
if (this.opCode == OP_ADD) {
this.clientName = FSImageSerialization.readString(in);
this.clientMachine = FSImageSerialization.readString(in);
// read clientId and callId
readRpcIds(in, logVersion);
} else {
this.clientName = "";
this.clientMachine = "";
@ -368,6 +471,9 @@ public abstract class FSEditLogOp {
builder.append(clientName);
builder.append(", clientMachine=");
builder.append(clientMachine);
if (this.opCode == OP_ADD) {
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
}
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
@ -397,9 +503,13 @@ public abstract class FSEditLogOp {
FSEditLogOp.blockToXml(contentHandler, b);
}
FSEditLogOp.permissionStatusToXml(contentHandler, permissions);
if (this.opCode == OP_ADD) {
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
@Override
void fromXml(Stanza st) throws InvalidXmlException {
this.length = Integer.valueOf(st.getValue("LENGTH"));
this.inodeId = Long.valueOf(st.getValue("INODEID"));
this.path = st.getValue("PATH");
@ -420,9 +530,14 @@ public abstract class FSEditLogOp {
}
this.permissions =
permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
readRpcIdsFromXml(st);
}
}
/**
* {@literal @AtMostOnce} for {@link ClientProtocol#startFile} and
* {@link ClientProtocol#appendFile}
*/
static class AddOp extends AddCloseOp {
private AddOp() {
super(OP_ADD);
@ -446,6 +561,11 @@ public abstract class FSEditLogOp {
}
}
/**
* Although {@link ClientProtocol#appendFile} may also log a close op, we do
* not need to record the rpc ids here since a successful appendFile op will
* finally log an AddOp.
*/
static class CloseOp extends AddCloseOp {
private CloseOp() {
super(OP_CLOSE);
@ -469,6 +589,10 @@ public abstract class FSEditLogOp {
}
}
/**
* {@literal @AtMostOnce} for {@link ClientProtocol#updatePipeline}, but
* {@literal @Idempotent} for some other ops.
*/
static class UpdateBlocksOp extends FSEditLogOp implements BlockListUpdatingOp {
String path;
Block[] blocks;
@ -481,7 +605,6 @@ public abstract class FSEditLogOp {
return (UpdateBlocksOp)cache.get(OP_UPDATE_BLOCKS);
}
UpdateBlocksOp setPath(String path) {
this.path = path;
return this;
@ -507,6 +630,8 @@ public abstract class FSEditLogOp {
void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeCompactBlockArray(blocks, out);
// clientId and callId
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@ -514,6 +639,7 @@ public abstract class FSEditLogOp {
path = FSImageSerialization.readString(in);
this.blocks = FSImageSerialization.readCompactBlockArray(
in, logVersion);
readRpcIds(in, logVersion);
}
@Override
@ -527,8 +653,9 @@ public abstract class FSEditLogOp {
sb.append("UpdateBlocksOp [path=")
.append(path)
.append(", blocks=")
.append(Arrays.toString(blocks))
.append("]");
.append(Arrays.toString(blocks));
appendRpcIdsToString(sb, rpcClientId, rpcCallId);
sb.append("]");
return sb.toString();
}
@ -538,6 +665,7 @@ public abstract class FSEditLogOp {
for (Block b : blocks) {
FSEditLogOp.blockToXml(contentHandler, b);
}
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
@ -547,9 +675,11 @@ public abstract class FSEditLogOp {
for (int i = 0; i < blocks.size(); i++) {
this.blocks[i] = FSEditLogOp.blockFromXml(blocks.get(i));
}
readRpcIdsFromXml(st);
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#setReplication} */
static class SetReplicationOp extends FSEditLogOp {
String path;
short replication;
@ -618,6 +748,7 @@ public abstract class FSEditLogOp {
}
}
/** {@literal @AtMostOnce} for {@link ClientProtocol#concat} */
static class ConcatDeleteOp extends FSEditLogOp {
int length;
String trg;
@ -654,8 +785,7 @@ public abstract class FSEditLogOp {
}
@Override
public
void writeFields(DataOutputStream out) throws IOException {
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(trg, out);
DeprecatedUTF8 info[] = new DeprecatedUTF8[srcs.length];
@ -666,6 +796,9 @@ public abstract class FSEditLogOp {
new ArrayWritable(DeprecatedUTF8.class, info).write(out);
FSImageSerialization.writeLong(timestamp, out);
// rpc ids
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@ -704,6 +837,8 @@ public abstract class FSEditLogOp {
} else {
this.timestamp = readLong(in);
}
// read RPC ids if necessary
readRpcIds(in, logVersion);
}
@Override
@ -717,6 +852,7 @@ public abstract class FSEditLogOp {
builder.append(Arrays.toString(srcs));
builder.append(", timestamp=");
builder.append(timestamp);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
@ -738,6 +874,7 @@ public abstract class FSEditLogOp {
"SOURCE" + (i + 1), srcs[i]);
}
contentHandler.endElement("", "", "SOURCES");
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
@ -755,9 +892,11 @@ public abstract class FSEditLogOp {
for (i = 0; i < srcs.length; i++) {
srcs[i] = sources.get(0).getValue("SOURCE" + (i + 1));
}
readRpcIdsFromXml(st);
}
}
/** {@literal @AtMostOnce} for {@link ClientProtocol#rename} */
static class RenameOldOp extends FSEditLogOp {
int length;
String src;
@ -793,6 +932,7 @@ public abstract class FSEditLogOp {
FSImageSerialization.writeString(src, out);
FSImageSerialization.writeString(dst, out);
FSImageSerialization.writeLong(timestamp, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@ -812,6 +952,9 @@ public abstract class FSEditLogOp {
} else {
this.timestamp = readLong(in);
}
// read RPC ids if necessary
readRpcIds(in, logVersion);
}
@Override
@ -825,6 +968,7 @@ public abstract class FSEditLogOp {
builder.append(dst);
builder.append(", timestamp=");
builder.append(timestamp);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
@ -841,16 +985,21 @@ public abstract class FSEditLogOp {
XMLUtils.addSaxString(contentHandler, "DST", dst);
XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
Long.valueOf(timestamp).toString());
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
@Override
void fromXml(Stanza st) throws InvalidXmlException {
this.length = Integer.valueOf(st.getValue("LENGTH"));
this.src = st.getValue("SRC");
this.dst = st.getValue("DST");
this.timestamp = Long.valueOf(st.getValue("TIMESTAMP"));
readRpcIdsFromXml(st);
}
}
/** {@literal @AtMostOnce} for {@link ClientProtocol#delete} */
static class DeleteOp extends FSEditLogOp {
int length;
String path;
@ -879,6 +1028,7 @@ public abstract class FSEditLogOp {
void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeLong(timestamp, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@ -896,6 +1046,8 @@ public abstract class FSEditLogOp {
} else {
this.timestamp = readLong(in);
}
// read RPC ids if necessary
readRpcIds(in, logVersion);
}
@Override
@ -907,6 +1059,7 @@ public abstract class FSEditLogOp {
builder.append(path);
builder.append(", timestamp=");
builder.append(timestamp);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
@ -922,15 +1075,19 @@ public abstract class FSEditLogOp {
XMLUtils.addSaxString(contentHandler, "PATH", path);
XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
Long.valueOf(timestamp).toString());
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
this.length = Integer.valueOf(st.getValue("LENGTH"));
this.path = st.getValue("PATH");
this.timestamp = Long.valueOf(st.getValue("TIMESTAMP"));
readRpcIdsFromXml(st);
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#mkdirs} */
static class MkdirOp extends FSEditLogOp {
int length;
long inodeId;
@ -1056,6 +1213,13 @@ public abstract class FSEditLogOp {
}
}
/**
* The corresponding operations are either {@literal @Idempotent} (
* {@link ClientProtocol#updateBlockForPipeline},
* {@link ClientProtocol#recoverLease}, {@link ClientProtocol#addBlock}) or
* already bound with other editlog op which records rpc ids (
* {@link ClientProtocol#startFile}). Thus no need to record rpc ids here.
*/
static class SetGenstampV1Op extends FSEditLogOp {
long genStampV1;
@ -1108,6 +1272,7 @@ public abstract class FSEditLogOp {
}
}
/** Similar with {@link SetGenstampV1Op} */
static class SetGenstampV2Op extends FSEditLogOp {
long genStampV2;
@ -1160,6 +1325,7 @@ public abstract class FSEditLogOp {
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#addBlock} */
static class AllocateBlockIdOp extends FSEditLogOp {
long blockId;
@ -1212,6 +1378,7 @@ public abstract class FSEditLogOp {
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#setPermission} */
static class SetPermissionsOp extends FSEditLogOp {
String src;
FsPermission permissions;
@ -1277,6 +1444,7 @@ public abstract class FSEditLogOp {
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#setOwner} */
static class SetOwnerOp extends FSEditLogOp {
String src;
String username;
@ -1357,7 +1525,7 @@ public abstract class FSEditLogOp {
st.getValue("GROUPNAME") : null;
}
}
static class SetNSQuotaOp extends FSEditLogOp {
String src;
long nsQuota;
@ -1457,6 +1625,7 @@ public abstract class FSEditLogOp {
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#setQuota} */
static class SetQuotaOp extends FSEditLogOp {
String src;
long nsQuota;
@ -1534,6 +1703,7 @@ public abstract class FSEditLogOp {
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#setTimes} */
static class TimesOp extends FSEditLogOp {
int length;
String path;
@ -1629,6 +1799,7 @@ public abstract class FSEditLogOp {
}
}
/** {@literal @AtMostOnce} for {@link ClientProtocol#createSymlink} */
static class SymlinkOp extends FSEditLogOp {
int length;
long inodeId;
@ -1677,14 +1848,14 @@ public abstract class FSEditLogOp {
}
@Override
public
void writeFields(DataOutputStream out) throws IOException {
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeLong(inodeId, out);
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeString(value, out);
FSImageSerialization.writeLong(mtime, out);
FSImageSerialization.writeLong(atime, out);
permissionStatus.write(out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@ -1714,6 +1885,9 @@ public abstract class FSEditLogOp {
this.atime = readLong(in);
}
this.permissionStatus = PermissionStatus.read(in);
// read RPC ids if necessary
readRpcIds(in, logVersion);
}
@Override
@ -1733,6 +1907,7 @@ public abstract class FSEditLogOp {
builder.append(atime);
builder.append(", permissionStatus=");
builder.append(permissionStatus);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
@ -1754,9 +1929,11 @@ public abstract class FSEditLogOp {
XMLUtils.addSaxString(contentHandler, "ATIME",
Long.valueOf(atime).toString());
FSEditLogOp.permissionStatusToXml(contentHandler, permissionStatus);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
@Override
void fromXml(Stanza st) throws InvalidXmlException {
this.length = Integer.valueOf(st.getValue("LENGTH"));
this.inodeId = Long.valueOf(st.getValue("INODEID"));
this.path = st.getValue("PATH");
@ -1765,9 +1942,12 @@ public abstract class FSEditLogOp {
this.atime = Long.valueOf(st.getValue("ATIME"));
this.permissionStatus =
permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
readRpcIdsFromXml(st);
}
}
/** {@literal @AtMostOnce} for {@link ClientProtocol#rename2} */
static class RenameOp extends FSEditLogOp {
int length;
String src;
@ -1810,6 +1990,7 @@ public abstract class FSEditLogOp {
FSImageSerialization.writeString(dst, out);
FSImageSerialization.writeLong(timestamp, out);
toBytesWritable(options).write(out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@ -1830,6 +2011,9 @@ public abstract class FSEditLogOp {
this.timestamp = readLong(in);
}
this.options = readRenameOptions(in);
// read RPC ids if necessary
readRpcIds(in, logVersion);
}
private static Rename[] readRenameOptions(DataInputStream in) throws IOException {
@ -1866,6 +2050,7 @@ public abstract class FSEditLogOp {
builder.append(timestamp);
builder.append(", options=");
builder.append(Arrays.toString(options));
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
@ -1889,6 +2074,7 @@ public abstract class FSEditLogOp {
prefix = "|";
}
XMLUtils.addSaxString(contentHandler, "OPTIONS", bld.toString());
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
@ -1910,9 +2096,15 @@ public abstract class FSEditLogOp {
}
}
}
readRpcIdsFromXml(st);
}
}
/**
* {@literal @Idempotent} for {@link ClientProtocol#recoverLease}. In the
* meanwhile, startFile and appendFile both have their own corresponding
* editlog op.
*/
static class ReassignLeaseOp extends FSEditLogOp {
String leaseHolder;
String path;
@ -1988,6 +2180,7 @@ public abstract class FSEditLogOp {
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#getDelegationToken} */
static class GetDelegationTokenOp extends FSEditLogOp {
DelegationTokenIdentifier token;
long expiryTime;
@ -2059,6 +2252,7 @@ public abstract class FSEditLogOp {
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#renewDelegationToken} */
static class RenewDelegationTokenOp extends FSEditLogOp {
DelegationTokenIdentifier token;
long expiryTime;
@ -2130,6 +2324,7 @@ public abstract class FSEditLogOp {
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#cancelDelegationToken} */
static class CancelDelegationTokenOp extends FSEditLogOp {
DelegationTokenIdentifier token;
@ -2323,7 +2518,8 @@ public abstract class FSEditLogOp {
}
/**
* Operation corresponding to creating a snapshot
* Operation corresponding to creating a snapshot.
* {@literal @AtMostOnce} for {@link ClientProtocol#createSnapshot}.
*/
static class CreateSnapshotOp extends FSEditLogOp {
String snapshotRoot;
@ -2351,24 +2547,31 @@ public abstract class FSEditLogOp {
void readFields(DataInputStream in, int logVersion) throws IOException {
snapshotRoot = FSImageSerialization.readString(in);
snapshotName = FSImageSerialization.readString(in);
// read RPC ids if necessary
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(snapshotRoot, out);
FSImageSerialization.writeString(snapshotName, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
XMLUtils.addSaxString(contentHandler, "SNAPSHOTNAME", snapshotName);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
snapshotRoot = st.getValue("SNAPSHOTROOT");
snapshotName = st.getValue("SNAPSHOTNAME");
readRpcIdsFromXml(st);
}
@Override
@ -2378,13 +2581,15 @@ public abstract class FSEditLogOp {
builder.append(snapshotRoot);
builder.append(", snapshotName=");
builder.append(snapshotName);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
}
/**
* Operation corresponding to delete a snapshot
* Operation corresponding to delete a snapshot.
* {@literal @AtMostOnce} for {@link ClientProtocol#deleteSnapshot}.
*/
static class DeleteSnapshotOp extends FSEditLogOp {
String snapshotRoot;
@ -2412,24 +2617,31 @@ public abstract class FSEditLogOp {
void readFields(DataInputStream in, int logVersion) throws IOException {
snapshotRoot = FSImageSerialization.readString(in);
snapshotName = FSImageSerialization.readString(in);
// read RPC ids if necessary
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(snapshotRoot, out);
FSImageSerialization.writeString(snapshotName, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
XMLUtils.addSaxString(contentHandler, "SNAPSHOTNAME", snapshotName);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
snapshotRoot = st.getValue("SNAPSHOTROOT");
snapshotName = st.getValue("SNAPSHOTNAME");
readRpcIdsFromXml(st);
}
@Override
@ -2439,13 +2651,15 @@ public abstract class FSEditLogOp {
builder.append(snapshotRoot);
builder.append(", snapshotName=");
builder.append(snapshotName);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
}
/**
* Operation corresponding to rename a snapshot
* Operation corresponding to rename a snapshot.
* {@literal @AtMostOnce} for {@link ClientProtocol#renameSnapshot}.
*/
static class RenameSnapshotOp extends FSEditLogOp {
String snapshotRoot;
@ -2480,6 +2694,9 @@ public abstract class FSEditLogOp {
snapshotRoot = FSImageSerialization.readString(in);
snapshotOldName = FSImageSerialization.readString(in);
snapshotNewName = FSImageSerialization.readString(in);
// read RPC ids if necessary
readRpcIds(in, logVersion);
}
@Override
@ -2487,6 +2704,8 @@ public abstract class FSEditLogOp {
FSImageSerialization.writeString(snapshotRoot, out);
FSImageSerialization.writeString(snapshotOldName, out);
FSImageSerialization.writeString(snapshotNewName, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@ -2494,6 +2713,7 @@ public abstract class FSEditLogOp {
XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
XMLUtils.addSaxString(contentHandler, "SNAPSHOTOLDNAME", snapshotOldName);
XMLUtils.addSaxString(contentHandler, "SNAPSHOTNEWNAME", snapshotNewName);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
@ -2501,6 +2721,8 @@ public abstract class FSEditLogOp {
snapshotRoot = st.getValue("SNAPSHOTROOT");
snapshotOldName = st.getValue("SNAPSHOTOLDNAME");
snapshotNewName = st.getValue("SNAPSHOTNEWNAME");
readRpcIdsFromXml(st);
}
@Override
@ -2512,6 +2734,7 @@ public abstract class FSEditLogOp {
builder.append(snapshotOldName);
builder.append(", snapshotNewName=");
builder.append(snapshotNewName);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
@ -2520,7 +2743,7 @@ public abstract class FSEditLogOp {
/**
* Operation corresponding to allow creating snapshot on a directory
*/
static class AllowSnapshotOp extends FSEditLogOp {
static class AllowSnapshotOp extends FSEditLogOp { // @Idempotent
String snapshotRoot;
public AllowSnapshotOp() {
@ -2574,7 +2797,7 @@ public abstract class FSEditLogOp {
/**
* Operation corresponding to disallow creating snapshot on a directory
*/
static class DisallowSnapshotOp extends FSEditLogOp {
static class DisallowSnapshotOp extends FSEditLogOp { // @Idempotent
String snapshotRoot;
public DisallowSnapshotOp() {

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottab
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ShortWritable;
import org.apache.hadoop.io.Text;
@ -80,6 +81,7 @@ public class FSImageSerialization {
static private final class TLData {
final DeprecatedUTF8 U_STR = new DeprecatedUTF8();
final ShortWritable U_SHORT = new ShortWritable();
final IntWritable U_INT = new IntWritable();
final LongWritable U_LONG = new LongWritable();
final FsPermission FILE_PERM = new FsPermission((short) 0);
}
@ -350,9 +352,9 @@ public class FSImageSerialization {
/** read the long value */
static long readLong(DataInput in) throws IOException {
LongWritable ustr = TL_DATA.get().U_LONG;
ustr.readFields(in);
return ustr.get();
LongWritable uLong = TL_DATA.get().U_LONG;
uLong.readFields(in);
return uLong.get();
}
/** write the long value */
@ -361,6 +363,20 @@ public class FSImageSerialization {
uLong.set(value);
uLong.write(out);
}
/** read the int value */
static int readInt(DataInput in) throws IOException {
IntWritable uInt = TL_DATA.get().U_INT;
uInt.readFields(in);
return uInt.get();
}
/** write the int value */
static void writeInt(int value, DataOutputStream out) throws IOException {
IntWritable uInt = TL_DATA.get().U_INT;
uInt.set(value);
uInt.write(out);
}
/** read short value */
static short readShort(DataInput in) throws IOException {
@ -414,8 +430,13 @@ public class FSImageSerialization {
private static void writeLocalName(INodeAttributes inode, DataOutput out)
throws IOException {
final byte[] name = inode.getLocalNameBytes();
out.writeShort(name.length);
out.write(name);
writeBytes(name, out);
}
public static void writeBytes(byte[] data, DataOutput out)
throws IOException {
out.writeShort(data.length);
out.write(data);
}
/**

View File

@ -564,7 +564,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
FSImage fsImage = new FSImage(conf,
FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf));
FSNamesystem namesystem = new FSNamesystem(conf, fsImage);
FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
StartupOption startOpt = NameNode.getStartupOption(conf);
if (startOpt == StartupOption.RECOVER) {
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
@ -582,7 +582,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
return namesystem;
}
FSNamesystem(Configuration conf, FSImage fsImage) throws IOException {
this(conf, fsImage, false);
}
/**
* Create an FSNamesystem associated with the specified image.
*
@ -591,9 +595,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*
* @param conf configuration
* @param fsImage The FSImage to associate with
* @param ignoreRetryCache Whether or not should ignore the retry cache setup
* step. For Secondary NN this should be set to true.
* @throws IOException on bad configuration
*/
FSNamesystem(Configuration conf, FSImage fsImage) throws IOException {
FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
throws IOException {
try {
resourceRecheckInterval = conf.getLong(
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
@ -684,7 +691,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
this.auditLoggers = initAuditLoggers(conf);
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
auditLoggers.get(0) instanceof DefaultAuditLogger;
this.retryCache = initRetryCache(conf);
this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
} catch(IOException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close();
@ -696,6 +703,28 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
@VisibleForTesting
public RetryCache getRetryCache() {
return retryCache;
}
/** Whether or not retry cache is enabled */
boolean hasRetryCache() {
return retryCache != null;
}
void addCacheEntryWithPayload(byte[] clientId, int callId, Object payload) {
if (retryCache != null) {
retryCache.addCacheEntryWithPayload(clientId, callId, payload);
}
}
void addCacheEntry(byte[] clientId, int callId) {
if (retryCache != null) {
retryCache.addCacheEntry(clientId, callId);
}
}
@VisibleForTesting
static RetryCache initRetryCache(Configuration conf) {
boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY,
@ -1536,7 +1565,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean success = false;
try {
concatInt(target, srcs);
concatInt(target, srcs, cacheEntry != null);
success = true;
} catch (AccessControlException e) {
logAuditEvent(false, "concat", Arrays.toString(srcs), target, null);
@ -1546,8 +1575,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
private void concatInt(String target, String [] srcs)
throws IOException, UnresolvedLinkException {
private void concatInt(String target, String [] srcs,
boolean logRetryCache) throws IOException, UnresolvedLinkException {
// verify args
if(target.isEmpty()) {
throw new IllegalArgumentException("Target file name is empty");
@ -1576,7 +1605,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (isInSafeMode()) {
throw new SafeModeException("Cannot concat " + target, safeMode);
}
concatInternal(pc, target, srcs);
concatInternal(pc, target, srcs, logRetryCache);
resultingStat = getAuditFileInfo(target, false);
} finally {
writeUnlock();
@ -1586,8 +1615,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
/** See {@link #concat(String, String[])} */
private void concatInternal(FSPermissionChecker pc, String target, String [] srcs)
throws IOException, UnresolvedLinkException {
private void concatInternal(FSPermissionChecker pc, String target,
String[] srcs, boolean logRetryCache) throws IOException,
UnresolvedLinkException {
assert hasWriteLock();
// write permission for the target
@ -1691,7 +1721,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
Arrays.toString(srcs) + " to " + target);
}
dir.concat(target,srcs);
dir.concat(target,srcs, logRetryCache);
}
/**
@ -1763,7 +1793,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
boolean success = false;
try {
createSymlinkInt(target, link, dirPerms, createParent);
createSymlinkInt(target, link, dirPerms, createParent, cacheEntry != null);
success = true;
} catch (AccessControlException e) {
logAuditEvent(false, "createSymlink", link, target, null);
@ -1774,7 +1804,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
private void createSymlinkInt(String target, String link,
PermissionStatus dirPerms, boolean createParent)
PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
throws IOException, UnresolvedLinkException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target="
@ -1805,7 +1835,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkFsObjectLimit();
// add symbolic link to namespace
dir.addSymlink(link, target, dirPerms, createParent);
dir.addSymlink(link, target, dirPerms, createParent, logRetryCache);
resultingStat = getAuditFileInfo(link, false);
} finally {
writeUnlock();
@ -1935,7 +1965,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
status = startFileInt(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize);
createParent, replication, blockSize, cacheEntry != null);
} catch (AccessControlException e) {
logAuditEvent(false, "create", src);
throw e;
@ -1947,8 +1977,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
String holder, String clientMachine, EnumSet<CreateFlag> flag,
boolean createParent, short replication, long blockSize)
throws AccessControlException, SafeModeException,
boolean createParent, short replication, long blockSize,
boolean logRetryCache) throws AccessControlException, SafeModeException,
FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
@ -1983,8 +2013,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException("Cannot create file" + src, safeMode);
}
src = FSDirectory.resolvePath(src, pathComponents, dir);
startFileInternal(pc, src, permissions, holder, clientMachine,
create, overwrite, createParent, replication, blockSize);
startFileInternal(pc, src, permissions, holder, clientMachine, create,
overwrite, createParent, replication, blockSize, logRetryCache);
stat = dir.getFileInfo(src, false);
} catch (StandbyException se) {
skipSync = true;
@ -2014,8 +2044,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private void startFileInternal(FSPermissionChecker pc, String src,
PermissionStatus permissions, String holder, String clientMachine,
boolean create, boolean overwrite, boolean createParent,
short replication, long blockSize) throws FileAlreadyExistsException,
AccessControlException, UnresolvedLinkException, FileNotFoundException,
short replication, long blockSize, boolean logRetryEntry)
throws FileAlreadyExistsException, AccessControlException,
UnresolvedLinkException, FileNotFoundException,
ParentNotDirectoryException, IOException {
assert hasWriteLock();
// Verify that the destination does not exist as a directory already.
@ -2047,7 +2078,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} else {
if (overwrite) {
try {
deleteInt(src, true); // File exists - delete if overwrite
deleteInt(src, true, false); // File exists - delete if overwrite
} catch (AccessControlException e) {
logAuditEvent(false, "delete", src);
throw e;
@ -2073,7 +2104,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
leaseManager.addLease(newNode.getClientName(), src);
// record file record in log, record new generation stamp
getEditLog().logOpenFile(src, newNode);
getEditLog().logOpenFile(src, newNode, logRetryEntry);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+"add "+src+" to namespace for "+holder);
@ -2102,8 +2133,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @return the last block locations if the block is partial or null otherwise
*/
private LocatedBlock appendFileInternal(FSPermissionChecker pc, String src,
String holder, String clientMachine) throws AccessControlException,
UnresolvedLinkException, FileNotFoundException, IOException {
String holder, String clientMachine, boolean logRetryCache)
throws AccessControlException, UnresolvedLinkException,
FileNotFoundException, IOException {
assert hasWriteLock();
// Verify that the destination does not exist as a directory already.
final INodesInPath iip = dir.getINodesInPath4Write(src);
@ -2128,7 +2160,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
final DatanodeDescriptor clientNode =
blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
return prepareFileForWrite(src, myFile, holder, clientMachine, clientNode,
true, iip.getLatestSnapshot());
true, iip.getLatestSnapshot(), logRetryCache);
} catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage());
throw ie;
@ -2145,13 +2177,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @param clientMachine identifier of the client machine
* @param clientNode if the client is collocated with a DN, that DN's descriptor
* @param writeToEditLog whether to persist this change to the edit log
* @param logRetryCache whether to record RPC ids in editlog for retry cache
* rebuilding
* @return the last block locations if the block is partial or null otherwise
* @throws UnresolvedLinkException
* @throws IOException
*/
LocatedBlock prepareFileForWrite(String src, INodeFile file,
String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
boolean writeToEditLog, Snapshot latestSnapshot) throws IOException {
boolean writeToEditLog, Snapshot latestSnapshot, boolean logRetryCache)
throws IOException {
file = file.recordModification(latestSnapshot, dir.getINodeMap());
final INodeFileUnderConstruction cons = file.toUnderConstruction(
leaseHolder, clientMachine, clientNode);
@ -2161,7 +2196,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
if (writeToEditLog) {
getEditLog().logOpenFile(src, cons);
getEditLog().logOpenFile(src, cons, logRetryCache);
}
return ret;
}
@ -2309,7 +2344,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean success = false;
try {
lb = appendFileInt(src, holder, clientMachine);
lb = appendFileInt(src, holder, clientMachine, cacheEntry != null);
success = true;
return lb;
} catch (AccessControlException e) {
@ -2321,7 +2356,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
private LocatedBlock appendFileInt(String src, String holder,
String clientMachine) throws AccessControlException, SafeModeException,
String clientMachine, boolean logRetryCache)
throws AccessControlException, SafeModeException,
FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
@ -2347,7 +2383,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException("Cannot append to file" + src, safeMode);
}
src = FSDirectory.resolvePath(src, pathComponents, dir);
lb = appendFileInternal(pc, src, holder, clientMachine);
lb = appendFileInternal(pc, src, holder, clientMachine, logRetryCache);
} catch (StandbyException se) {
skipSync = true;
throw se;
@ -2471,7 +2507,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
newBlock = createNewBlock();
saveAllocatedBlock(src, inodesInPath, newBlock, targets);
dir.persistBlocks(src, pendingFile);
dir.persistBlocks(src, pendingFile, false);
offset = pendingFile.computeFileSize();
} finally {
writeUnlock();
@ -2673,7 +2709,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+ b + " is removed from pendingCreates");
}
dir.persistBlocks(src, file);
dir.persistBlocks(src, file, false);
} finally {
writeUnlock();
}
@ -2890,7 +2926,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
boolean ret = false;
try {
ret = renameToInt(src, dst);
ret = renameToInt(src, dst, cacheEntry != null);
} catch (AccessControlException e) {
logAuditEvent(false, "rename", src, dst, null);
throw e;
@ -2900,7 +2936,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return ret;
}
private boolean renameToInt(String src, String dst)
private boolean renameToInt(String src, String dst, boolean logRetryCache)
throws IOException, UnresolvedLinkException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
@ -2924,7 +2960,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
src = FSDirectory.resolvePath(src, srcComponents, dir);
dst = FSDirectory.resolvePath(dst, dstComponents, dir);
checkOperation(OperationCategory.WRITE);
status = renameToInternal(pc, src, dst);
status = renameToInternal(pc, src, dst, logRetryCache);
if (status) {
resultingStat = getAuditFileInfo(dst, false);
}
@ -2940,8 +2976,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/** @deprecated See {@link #renameTo(String, String)} */
@Deprecated
private boolean renameToInternal(FSPermissionChecker pc, String src, String dst)
throws IOException, UnresolvedLinkException {
private boolean renameToInternal(FSPermissionChecker pc, String src,
String dst, boolean logRetryCache) throws IOException,
UnresolvedLinkException {
assert hasWriteLock();
if (isPermissionEnabled) {
//We should not be doing this. This is move() not renameTo().
@ -2959,7 +2996,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
false);
}
if (dir.renameTo(src, dst)) {
if (dir.renameTo(src, dst, logRetryCache)) {
return true;
}
return false;
@ -2994,7 +3031,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
src = FSDirectory.resolvePath(src, srcComponents, dir);
dst = FSDirectory.resolvePath(dst, dstComponents, dir);
renameToInternal(pc, src, dst, options);
renameToInternal(pc, src, dst, cacheEntry != null, options);
resultingStat = getAuditFileInfo(dst, false);
success = true;
} finally {
@ -3012,7 +3049,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
private void renameToInternal(FSPermissionChecker pc, String src, String dst,
Options.Rename... options) throws IOException {
boolean logRetryCache, Options.Rename... options) throws IOException {
assert hasWriteLock();
if (isPermissionEnabled) {
// Rename does not operates on link targets
@ -3023,7 +3060,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkPermission(pc, dst, false, FsAction.WRITE, null, null, null, false);
}
dir.renameTo(src, dst, options);
dir.renameTo(src, dst, logRetryCache, options);
}
/**
@ -3041,7 +3078,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
boolean ret = false;
try {
ret = deleteInt(src, recursive);
ret = deleteInt(src, recursive, cacheEntry != null);
} catch (AccessControlException e) {
logAuditEvent(false, "delete", src);
throw e;
@ -3051,13 +3088,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return ret;
}
private boolean deleteInt(String src, boolean recursive)
private boolean deleteInt(String src, boolean recursive, boolean logRetryCache)
throws AccessControlException, SafeModeException,
UnresolvedLinkException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
}
boolean status = deleteInternal(src, recursive, true);
boolean status = deleteInternal(src, recursive, true, logRetryCache);
if (status) {
logAuditEvent(true, "delete", src);
}
@ -3085,7 +3122,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @see ClientProtocol#delete(String, boolean) for description of exceptions
*/
private boolean deleteInternal(String src, boolean recursive,
boolean enforcePermission)
boolean enforcePermission, boolean logRetryCache)
throws AccessControlException, SafeModeException, UnresolvedLinkException,
IOException {
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
@ -3109,7 +3146,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
FsAction.ALL, false);
}
// Unlink the target directory from directory tree
if (!dir.delete(src, collectedBlocks, removedINodes)) {
if (!dir.delete(src, collectedBlocks, removedINodes, logRetryCache)) {
return false;
}
ret = true;
@ -3438,7 +3475,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (lastBlockLength > 0) {
pendingFile.updateLengthOfLastBlock(lastBlockLength);
}
dir.persistBlocks(src, pendingFile);
dir.persistBlocks(src, pendingFile, false);
} finally {
writeUnlock();
}
@ -3735,7 +3772,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
src = closeFileCommitBlocks(pendingFile, storedBlock);
} else {
// If this commit does not want to close the file, persist blocks
src = persistBlocks(pendingFile);
src = persistBlocks(pendingFile, false);
}
} finally {
writeUnlock();
@ -3784,10 +3821,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @throws IOException
*/
@VisibleForTesting
String persistBlocks(INodeFileUnderConstruction pendingFile)
throws IOException {
String persistBlocks(INodeFileUnderConstruction pendingFile,
boolean logRetryCache) throws IOException {
String src = leaseManager.findPath(pendingFile);
dir.persistBlocks(src, pendingFile);
dir.persistBlocks(src, pendingFile, logRetryCache);
return src;
}
@ -5595,7 +5632,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
+ oldBlock + " has different block identifier";
updatePipelineInternal(clientName, oldBlock, newBlock, newNodes);
updatePipelineInternal(clientName, oldBlock, newBlock, newNodes,
cacheEntry != null);
success = true;
} finally {
writeUnlock();
@ -5607,7 +5645,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/** @see #updatePipeline(String, ExtendedBlock, ExtendedBlock, DatanodeID[]) */
private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes)
ExtendedBlock newBlock, DatanodeID[] newNodes, boolean logRetryCache)
throws IOException {
assert hasWriteLock();
// check the vadility of the block and lease holder name
@ -5642,7 +5680,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
blockinfo.setExpectedLocations(descriptors);
String src = leaseManager.findPath(pendingFile);
dir.persistBlocks(src, pendingFile);
dir.persistBlocks(src, pendingFile, logRetryCache);
}
// rename was successful. If any part of the renamed subtree had
@ -6510,7 +6548,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} finally {
dir.writeUnlock();
}
getEditLog().logCreateSnapshot(snapshotRoot, snapshotName);
getEditLog().logCreateSnapshot(snapshotRoot, snapshotName,
cacheEntry != null);
} finally {
writeUnlock();
RetryCache.setState(cacheEntry, snapshotPath != null, snapshotPath);
@ -6552,7 +6591,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
dir.verifySnapshotName(snapshotNewName, path);
snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName);
getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName,
cacheEntry != null);
success = true;
} finally {
writeUnlock();
@ -6676,7 +6716,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
removedINodes.clear();
this.removeBlocks(collectedBlocks);
collectedBlocks.clear();
getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName);
getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName,
cacheEntry != null);
success = true;
} finally {
writeUnlock();

View File

@ -733,7 +733,8 @@ public class NameNode implements NameNodeStatusMXBean {
}
/** get FSImage */
FSImage getFSImage() {
@VisibleForTesting
public FSImage getFSImage() {
return namesystem.dir.fsImage;
}

View File

@ -249,7 +249,7 @@ public class SecondaryNameNode implements Runnable {
checkpointImage.recoverCreate(commandLineOpts.shouldFormat());
checkpointImage.deleteTempEdits();
namesystem = new FSNamesystem(conf, checkpointImage);
namesystem = new FSNamesystem(conf, checkpointImage, true);
// Initialize other scheduling parameters from the configuration
checkpointConf = new CheckpointConf(conf);

View File

@ -126,7 +126,7 @@ class ImageLoaderCurrent implements ImageLoader {
new SimpleDateFormat("yyyy-MM-dd HH:mm");
private static int[] versions = { -16, -17, -18, -19, -20, -21, -22, -23,
-24, -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39,
-40, -41, -42, -43, -44, -45, -46 };
-40, -41, -42, -43, -44, -45, -46, -47 };
private int imageVersion = 0;
private final Map<Long, String> subtreeMap = new HashMap<Long, String>();

View File

@ -57,8 +57,11 @@ import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
@ -922,4 +925,102 @@ public class DFSTestUtil {
return new DFSTestUtil(nFiles, maxLevels, maxSize, minSize);
}
}
/**
* Run a set of operations and generate all edit logs
*/
public static void runOperations(MiniDFSCluster cluster,
DistributedFileSystem filesystem, Configuration conf, long blockSize,
int nnIndex) throws IOException {
// create FileContext for rename2
FileContext fc = FileContext.getFileContext(cluster.getURI(0), conf);
// OP_ADD 0
final Path pathFileCreate = new Path("/file_create");
FSDataOutputStream s = filesystem.create(pathFileCreate);
// OP_CLOSE 9
s.close();
// OP_RENAME_OLD 1
final Path pathFileMoved = new Path("/file_moved");
filesystem.rename(pathFileCreate, pathFileMoved);
// OP_DELETE 2
filesystem.delete(pathFileMoved, false);
// OP_MKDIR 3
Path pathDirectoryMkdir = new Path("/directory_mkdir");
filesystem.mkdirs(pathDirectoryMkdir);
// OP_ALLOW_SNAPSHOT 29
filesystem.allowSnapshot(pathDirectoryMkdir);
// OP_DISALLOW_SNAPSHOT 30
filesystem.disallowSnapshot(pathDirectoryMkdir);
// OP_CREATE_SNAPSHOT 26
String ssName = "snapshot1";
filesystem.allowSnapshot(pathDirectoryMkdir);
filesystem.createSnapshot(pathDirectoryMkdir, ssName);
// OP_RENAME_SNAPSHOT 28
String ssNewName = "snapshot2";
filesystem.renameSnapshot(pathDirectoryMkdir, ssName, ssNewName);
// OP_DELETE_SNAPSHOT 27
filesystem.deleteSnapshot(pathDirectoryMkdir, ssNewName);
// OP_SET_REPLICATION 4
s = filesystem.create(pathFileCreate);
s.close();
filesystem.setReplication(pathFileCreate, (short)1);
// OP_SET_PERMISSIONS 7
Short permission = 0777;
filesystem.setPermission(pathFileCreate, new FsPermission(permission));
// OP_SET_OWNER 8
filesystem.setOwner(pathFileCreate, new String("newOwner"), null);
// OP_CLOSE 9 see above
// OP_SET_GENSTAMP 10 see above
// OP_SET_NS_QUOTA 11 obsolete
// OP_CLEAR_NS_QUOTA 12 obsolete
// OP_TIMES 13
long mtime = 1285195527000L; // Wed, 22 Sep 2010 22:45:27 GMT
long atime = mtime;
filesystem.setTimes(pathFileCreate, mtime, atime);
// OP_SET_QUOTA 14
filesystem.setQuota(pathDirectoryMkdir, 1000L,
HdfsConstants.QUOTA_DONT_SET);
// OP_RENAME 15
fc.rename(pathFileCreate, pathFileMoved, Rename.NONE);
// OP_CONCAT_DELETE 16
Path pathConcatTarget = new Path("/file_concat_target");
Path[] pathConcatFiles = new Path[2];
pathConcatFiles[0] = new Path("/file_concat_0");
pathConcatFiles[1] = new Path("/file_concat_1");
long length = blockSize * 3; // multiple of blocksize for concat
short replication = 1;
long seed = 1;
DFSTestUtil.createFile(filesystem, pathConcatTarget, length, replication,
seed);
DFSTestUtil.createFile(filesystem, pathConcatFiles[0], length, replication,
seed);
DFSTestUtil.createFile(filesystem, pathConcatFiles[1], length, replication,
seed);
filesystem.concat(pathConcatTarget, pathConcatFiles);
// OP_SYMLINK 17
Path pathSymlink = new Path("/file_symlink");
fc.createSymlink(pathConcatTarget, pathSymlink, false);
// OP_REASSIGN_LEASE 22
String filePath = "/hard-lease-recovery-test";
byte[] bytes = "foo-bar-baz".getBytes();
DFSClientAdapter.stopLeaseRenewer(filesystem);
FSDataOutputStream leaseRecoveryPath = filesystem.create(new Path(filePath));
leaseRecoveryPath.write(bytes);
leaseRecoveryPath.hflush();
// Set the hard lease timeout to 1 second.
cluster.setLeasePeriod(60 * 1000, 1000, nnIndex);
// wait for lease recovery to complete
LocatedBlocks locatedBlocks;
do {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
locatedBlocks = DFSClientAdapter.callGetBlockLocations(
cluster.getNameNodeRpc(nnIndex), filePath, 0L, bytes.length);
} while (locatedBlocks.isUnderConstruction());
}
}

View File

@ -2040,6 +2040,10 @@ public class MiniDFSCluster {
NameNodeAdapter.setLeasePeriod(getNamesystem(), soft, hard);
}
public void setLeasePeriod(long soft, long hard, int nnIndex) {
NameNodeAdapter.setLeasePeriod(getNamesystem(nnIndex), soft, hard);
}
public void setWaitSafeMode(boolean wait) {
this.waitSafeMode = wait;
}

View File

@ -26,7 +26,6 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
/**
*
@ -97,8 +96,9 @@ public class CreateEditsLog {
dirInode = new INodeDirectory(inodeId.nextValue(), null, p, 0L);
editLog.logMkDir(currentDir, dirInode);
}
editLog.logOpenFile(filePath, new INodeFileUnderConstruction(
inodeId.nextValue(), p, replication, 0, blockSize, "", "", null));
editLog.logOpenFile(filePath,
new INodeFileUnderConstruction(inodeId.nextValue(), p, replication,
0, blockSize, "", "", null), false);
editLog.logCloseFile(filePath, inode);
if (currentBlockId - bidAtSync >= 2000) { // sync every 2K blocks

View File

@ -18,9 +18,9 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.*;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@ -28,7 +28,9 @@ import static org.mockito.Mockito.spy;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@ -64,7 +66,7 @@ public class TestCommitBlockSynchronization {
any(INodeFileUnderConstruction.class),
any(BlockInfo.class));
doReturn("").when(namesystemSpy).persistBlocks(
any(INodeFileUnderConstruction.class));
any(INodeFileUnderConstruction.class), anyBoolean());
doReturn(mock(FSEditLog.class)).when(namesystemSpy).getEditLog();
return namesystemSpy;
@ -127,7 +129,6 @@ public class TestCommitBlockSynchronization {
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeDescriptor[] targets = new DatanodeDescriptor[0];
DatanodeID[] newTargets = new DatanodeID[0];
ExtendedBlock lastBlock = new ExtendedBlock();
@ -148,7 +149,6 @@ public class TestCommitBlockSynchronization {
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeDescriptor[] targets = new DatanodeDescriptor[0];
DatanodeID[] newTargets = new DatanodeID[0];
ExtendedBlock lastBlock = new ExtendedBlock();

View File

@ -155,7 +155,7 @@ public class TestEditLog {
INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
namesystem.allocateNewInodeId(), p, replication, blockSize, 0, "",
"", null);
editLog.logOpenFile("/filename" + (startIndex + i), inode);
editLog.logOpenFile("/filename" + (startIndex + i), inode, false);
editLog.logCloseFile("/filename" + (startIndex + i), inode);
editLog.logSync();
}
@ -912,14 +912,14 @@ public class TestEditLog {
log.setMetricsForTests(mockMetrics);
for (int i = 0; i < 400; i++) {
log.logDelete(oneKB, 1L);
log.logDelete(oneKB, 1L, false);
}
// After ~400KB, we're still within the 512KB buffer size
Mockito.verify(mockMetrics, Mockito.times(0)).addSync(Mockito.anyLong());
// After ~400KB more, we should have done an automatic sync
for (int i = 0; i < 400; i++) {
log.logDelete(oneKB, 1L);
log.logDelete(oneKB, 1L, false);
}
Mockito.verify(mockMetrics, Mockito.times(1)).addSync(Mockito.anyLong());

View File

@ -292,7 +292,7 @@ public class TestFSEditLogLoader {
long thisTxId = spyLog.getLastWrittenTxId() + 1;
offsetToTxId.put(trueOffset, thisTxId);
System.err.println("txid " + thisTxId + " at offset " + trueOffset);
spyLog.logDelete("path" + i, i);
spyLog.logDelete("path" + i, i, false);
spyLog.logSync();
}
} finally {

View File

@ -30,8 +30,6 @@ import java.io.RandomAccessFile;
import java.util.HashSet;
import java.util.Set;
import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -276,7 +274,7 @@ public class TestNameNodeRecovery {
}
public int getMaxOpSize() {
return 30;
return 36;
}
}

View File

@ -19,12 +19,17 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
@ -32,19 +37,21 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.ipc.ClientId;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.RetryCache;
import org.apache.hadoop.ipc.RetryCache.CacheEntry;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.LightWeightCache;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
/**
@ -61,19 +68,20 @@ import org.junit.Test;
* request, a new callId is generated using {@link #newCall()}.
*/
public class TestNamenodeRetryCache {
private static final byte[] CLIENT_ID = StringUtils.getUuidBytes();
private static final byte[] CLIENT_ID = ClientId.getClientId();
private static MiniDFSCluster cluster;
private static FSNamesystem namesystem;
private static PermissionStatus perm = new PermissionStatus(
"TestNamenodeRetryCache", null, FsPermission.getDefault());
private static FileSystem filesystem;
private static DistributedFileSystem filesystem;
private static int callId = 100;
private static Configuration conf = new HdfsConfiguration();
private static final int BlockSize = 512;
/** Start a cluster */
@BeforeClass
public static void setup() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "512");
@Before
public void setup() throws Exception {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BlockSize);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, true);
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
@ -109,8 +117,8 @@ public class TestNamenodeRetryCache {
}
private void concatSetup(String file1, String file2) throws Exception {
DFSTestUtil.createFile(filesystem, new Path(file1), 512, (short)1, 0L);
DFSTestUtil.createFile(filesystem, new Path(file2), 512, (short)1, 0L);
DFSTestUtil.createFile(filesystem, new Path(file1), BlockSize, (short)1, 0L);
DFSTestUtil.createFile(filesystem, new Path(file2), BlockSize, (short)1, 0L);
}
/**
@ -192,19 +200,19 @@ public class TestNamenodeRetryCache {
// Two retried calls succeed
newCall();
HdfsFileStatus status = namesystem.startFile(src, perm, "holder",
"clientmachine", EnumSet.of(CreateFlag.CREATE), true, (short) 1, 512);
"clientmachine", EnumSet.of(CreateFlag.CREATE), true, (short) 1, BlockSize);
Assert.assertEquals(status, namesystem.startFile(src, perm,
"holder", "clientmachine", EnumSet.of(CreateFlag.CREATE),
true, (short) 1, 512));
true, (short) 1, BlockSize));
Assert.assertEquals(status, namesystem.startFile(src, perm,
"holder", "clientmachine", EnumSet.of(CreateFlag.CREATE),
true, (short) 1, 512));
true, (short) 1, BlockSize));
// A non-retried call fails
newCall();
try {
namesystem.startFile(src, perm, "holder", "clientmachine",
EnumSet.of(CreateFlag.CREATE), true, (short) 1, 512);
EnumSet.of(CreateFlag.CREATE), true, (short) 1, BlockSize);
Assert.fail("testCreate - expected exception is not thrown");
} catch (IOException e) {
// expected
@ -352,4 +360,41 @@ public class TestNamenodeRetryCache {
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, false);
Assert.assertNull(FSNamesystem.initRetryCache(conf));
}
/**
* After run a set of operations, restart NN and check if the retry cache has
* been rebuilt based on the editlog.
*/
@Test
public void testRetryCacheRebuild() throws Exception {
DFSTestUtil.runOperations(cluster, filesystem, conf, BlockSize, 0);
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
(LightWeightCache<CacheEntry, CacheEntry>) namesystem.getRetryCache().getCacheSet();
assertEquals(14, cacheSet.size());
Map<CacheEntry, CacheEntry> oldEntries =
new HashMap<CacheEntry, CacheEntry>();
Iterator<CacheEntry> iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();
oldEntries.put(entry, entry);
}
// restart NameNode
cluster.restartNameNode();
cluster.waitActive();
namesystem = cluster.getNamesystem();
// check retry cache
assertTrue(namesystem.hasRetryCache());
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) namesystem
.getRetryCache().getCacheSet();
assertEquals(14, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();
assertTrue(oldEntries.containsKey(entry));
}
}
}

View File

@ -0,0 +1,248 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryInvocationHandler;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.RetryCache.CacheEntry;
import org.apache.hadoop.util.LightWeightCache;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestRetryCacheWithHA {
private static final Log LOG = LogFactory.getLog(TestRetryCacheWithHA.class);
private static MiniDFSCluster cluster;
private static DistributedFileSystem dfs;
private static Configuration conf = new HdfsConfiguration();
private static final int BlockSize = 1024;
private static final short DataNodes = 3;
private final static Map<String, Object> results =
new HashMap<String, Object>();
/**
* A dummy invocation handler extending RetryInvocationHandler. We can use
* a boolean flag to control whether the method invocation succeeds or not.
*/
private static class DummyRetryInvocationHandler extends
RetryInvocationHandler {
static AtomicBoolean block = new AtomicBoolean(false);
DummyRetryInvocationHandler(
FailoverProxyProvider<ClientProtocol> proxyProvider,
RetryPolicy retryPolicy) {
super(proxyProvider, retryPolicy);
}
@Override
protected Object invokeMethod(Method method, Object[] args)
throws Throwable {
Object result = super.invokeMethod(method, args);
if (block.get()) {
throw new UnknownHostException("Fake Exception");
} else {
return result;
}
}
}
@Before
public void setup() throws Exception {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BlockSize);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(DataNodes).build();
cluster.waitActive();
cluster.transitionToActive(0);
// setup the configuration
HATestUtil.setFailoverConfigurations(cluster, conf);
dfs = (DistributedFileSystem) HATestUtil.configureFailoverFs(cluster, conf);
}
@After
public void cleanup() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* 1. Run a set of operations
* 2. Trigger the NN failover
* 3. Check the retry cache on the original standby NN
*/
@Test
public void testRetryCacheOnStandbyNN() throws Exception {
// 1. run operations
DFSTestUtil.runOperations(cluster, dfs, conf, BlockSize, 0);
// check retry cache in NN1
FSNamesystem fsn0 = cluster.getNamesystem(0);
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
(LightWeightCache<CacheEntry, CacheEntry>) fsn0.getRetryCache().getCacheSet();
assertEquals(14, cacheSet.size());
Map<CacheEntry, CacheEntry> oldEntries =
new HashMap<CacheEntry, CacheEntry>();
Iterator<CacheEntry> iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();
oldEntries.put(entry, entry);
}
// 2. Failover the current standby to active.
cluster.getNameNode(0).getRpcServer().rollEditLog();
cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits();
cluster.shutdownNameNode(0);
cluster.transitionToActive(1);
// 3. check the retry cache on the new active NN
FSNamesystem fsn1 = cluster.getNamesystem(1);
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) fsn1
.getRetryCache().getCacheSet();
assertEquals(14, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();
assertTrue(oldEntries.containsKey(entry));
}
}
private DFSClient genClientWithDummyHandler() throws IOException {
URI nnUri = dfs.getUri();
Class<FailoverProxyProvider<ClientProtocol>> failoverProxyProviderClass =
NameNodeProxies.getFailoverProxyProviderClass(conf, nnUri,
ClientProtocol.class);
FailoverProxyProvider<ClientProtocol> failoverProxyProvider =
NameNodeProxies.createFailoverProxyProvider(conf,
failoverProxyProviderClass, ClientProtocol.class, nnUri);
InvocationHandler dummyHandler = new DummyRetryInvocationHandler(
failoverProxyProvider, RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
Integer.MAX_VALUE,
DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT,
DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT));
ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance(
failoverProxyProvider.getInterface().getClassLoader(),
new Class[] { ClientProtocol.class }, dummyHandler);
DFSClient client = new DFSClient(null, proxy, conf, null);
return client;
}
/**
* When NN failover happens, if the client did not receive the response and
* send a retry request to the other NN, the same response should be recieved
* based on the retry cache.
*
* TODO: currently we only test the createSnapshot from the client side. We
* may need to cover all the calls with "@AtMostOnce" annotation.
*/
@Test
public void testClientRetryWithFailover() throws Exception {
final String dir = "/test";
final Path dirPath = new Path(dir);
final String sName = "s1";
final String dirSnapshot = dir + HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR
+ Path.SEPARATOR + sName;
dfs.mkdirs(dirPath);
dfs.allowSnapshot(dirPath);
final DFSClient client = genClientWithDummyHandler();
// set DummyRetryInvocationHandler#block to true
DummyRetryInvocationHandler.block.set(true);
new Thread() {
@Override
public void run() {
try {
final String snapshotPath = client.createSnapshot(dir, "s1");
assertEquals(dirSnapshot, snapshotPath);
LOG.info("Created snapshot " + snapshotPath);
synchronized (TestRetryCacheWithHA.this) {
results.put("createSnapshot", snapshotPath);
TestRetryCacheWithHA.this.notifyAll();
}
} catch (IOException e) {
LOG.info("Got IOException " + e + " while creating snapshot");
} finally {
IOUtils.cleanup(null, client);
}
}
}.start();
// make sure the client's createSnapshot call has actually been handled by
// the active NN
boolean snapshotCreated = dfs.exists(new Path(dirSnapshot));
while (!snapshotCreated) {
Thread.sleep(1000);
snapshotCreated = dfs.exists(new Path(dirSnapshot));
}
// force the failover
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
// disable the block in DummyHandler
LOG.info("Setting block to false");
DummyRetryInvocationHandler.block.set(false);
synchronized (this) {
while (!results.containsKey("createSnapshot")) {
this.wait();
}
LOG.info("Got the result of createSnapshot: "
+ results.get("createSnapshot"));
}
}
}

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<EDITS>
<EDITS_VERSION>-46</EDITS_VERSION>
<EDITS_VERSION>-47</EDITS_VERSION>
<RECORD>
<OPCODE>OP_START_LOG_SEGMENT</OPCODE>
<DATA>
@ -13,8 +13,8 @@
<TXID>2</TXID>
<DELEGATION_KEY>
<KEY_ID>1</KEY_ID>
<EXPIRY_DATE>1372798673941</EXPIRY_DATE>
<KEY>247c47b8bf6b89ec</KEY>
<EXPIRY_DATE>1375509063810</EXPIRY_DATE>
<KEY>4d47710649039b98</KEY>
</DELEGATION_KEY>
</DATA>
</RECORD>
@ -24,8 +24,8 @@
<TXID>3</TXID>
<DELEGATION_KEY>
<KEY_ID>2</KEY_ID>
<EXPIRY_DATE>1372798673944</EXPIRY_DATE>
<KEY>ef1a35da6b4fc327</KEY>
<EXPIRY_DATE>1375509063812</EXPIRY_DATE>
<KEY>38cbb1d8fd90fcb2</KEY>
</DELEGATION_KEY>
</DATA>
</RECORD>
@ -37,16 +37,18 @@
<INODEID>16386</INODEID>
<PATH>/file_create_u\0001;F431</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107474972</MTIME>
<ATIME>1372107474972</ATIME>
<MTIME>1374817864805</MTIME>
<ATIME>1374817864805</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1834501254_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1676409172_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>8</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -57,13 +59,13 @@
<INODEID>0</INODEID>
<PATH>/file_create_u\0001;F431</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107474983</MTIME>
<ATIME>1372107474972</ATIME>
<MTIME>1374817864816</MTIME>
<ATIME>1374817864805</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -76,7 +78,9 @@
<LENGTH>0</LENGTH>
<SRC>/file_create_u\0001;F431</SRC>
<DST>/file_moved</DST>
<TIMESTAMP>1372107474986</TIMESTAMP>
<TIMESTAMP>1374817864818</TIMESTAMP>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>10</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -85,7 +89,9 @@
<TXID>7</TXID>
<LENGTH>0</LENGTH>
<PATH>/file_moved</PATH>
<TIMESTAMP>1372107474989</TIMESTAMP>
<TIMESTAMP>1374817864822</TIMESTAMP>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>11</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -95,9 +101,9 @@
<LENGTH>0</LENGTH>
<INODEID>16387</INODEID>
<PATH>/directory_mkdir</PATH>
<TIMESTAMP>1372107474991</TIMESTAMP>
<TIMESTAMP>1374817864825</TIMESTAMP>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>493</MODE>
</PERMISSION_STATUS>
@ -130,6 +136,8 @@
<TXID>12</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTNAME>snapshot1</SNAPSHOTNAME>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>16</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -139,6 +147,8 @@
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTOLDNAME>snapshot1</SNAPSHOTOLDNAME>
<SNAPSHOTNEWNAME>snapshot2</SNAPSHOTNEWNAME>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>17</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -147,6 +157,8 @@
<TXID>14</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTNAME>snapshot2</SNAPSHOTNAME>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>18</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -157,16 +169,18 @@
<INODEID>16388</INODEID>
<PATH>/file_create_u\0001;F431</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475007</MTIME>
<ATIME>1372107475007</ATIME>
<MTIME>1374817864846</MTIME>
<ATIME>1374817864846</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1834501254_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1676409172_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>19</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -177,13 +191,13 @@
<INODEID>0</INODEID>
<PATH>/file_create_u\0001;F431</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475009</MTIME>
<ATIME>1372107475007</ATIME>
<MTIME>1374817864848</MTIME>
<ATIME>1374817864846</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -239,8 +253,10 @@
<LENGTH>0</LENGTH>
<SRC>/file_create_u\0001;F431</SRC>
<DST>/file_moved</DST>
<TIMESTAMP>1372107475019</TIMESTAMP>
<TIMESTAMP>1374817864860</TIMESTAMP>
<OPTIONS>NONE</OPTIONS>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>26</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -251,16 +267,18 @@
<INODEID>16389</INODEID>
<PATH>/file_concat_target</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475023</MTIME>
<ATIME>1372107475023</ATIME>
<MTIME>1374817864864</MTIME>
<ATIME>1374817864864</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1834501254_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1676409172_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>28</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -287,6 +305,8 @@
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1001</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -318,6 +338,8 @@
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1002</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -354,6 +376,8 @@
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1003</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -364,8 +388,8 @@
<INODEID>0</INODEID>
<PATH>/file_concat_target</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475091</MTIME>
<ATIME>1372107475023</ATIME>
<MTIME>1374817864927</MTIME>
<ATIME>1374817864864</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -385,7 +409,7 @@
<GENSTAMP>1003</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -399,16 +423,18 @@
<INODEID>16390</INODEID>
<PATH>/file_concat_0</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475093</MTIME>
<ATIME>1372107475093</ATIME>
<MTIME>1374817864929</MTIME>
<ATIME>1374817864929</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1834501254_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1676409172_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>41</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -435,6 +461,8 @@
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1004</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -466,6 +494,8 @@
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1005</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -502,6 +532,8 @@
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1006</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -512,8 +544,8 @@
<INODEID>0</INODEID>
<PATH>/file_concat_0</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475110</MTIME>
<ATIME>1372107475093</ATIME>
<MTIME>1374817864947</MTIME>
<ATIME>1374817864929</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -533,7 +565,7 @@
<GENSTAMP>1006</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -547,16 +579,18 @@
<INODEID>16391</INODEID>
<PATH>/file_concat_1</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475112</MTIME>
<ATIME>1372107475112</ATIME>
<MTIME>1374817864950</MTIME>
<ATIME>1374817864950</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1834501254_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1676409172_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>53</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -583,6 +617,8 @@
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1007</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -614,6 +650,8 @@
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1008</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -650,6 +688,8 @@
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1009</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -660,8 +700,8 @@
<INODEID>0</INODEID>
<PATH>/file_concat_1</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475131</MTIME>
<ATIME>1372107475112</ATIME>
<MTIME>1374817864966</MTIME>
<ATIME>1374817864950</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -681,7 +721,7 @@
<GENSTAMP>1009</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -693,11 +733,13 @@
<TXID>56</TXID>
<LENGTH>0</LENGTH>
<TRG>/file_concat_target</TRG>
<TIMESTAMP>1372107475133</TIMESTAMP>
<TIMESTAMP>1374817864967</TIMESTAMP>
<SOURCES>
<SOURCE1>/file_concat_0</SOURCE1>
<SOURCE2>/file_concat_1</SOURCE2>
</SOURCES>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>64</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -708,13 +750,15 @@
<INODEID>16392</INODEID>
<PATH>/file_symlink</PATH>
<VALUE>/file_concat_target</VALUE>
<MTIME>1372107475137</MTIME>
<ATIME>1372107475137</ATIME>
<MTIME>1374817864971</MTIME>
<ATIME>1374817864971</ATIME>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>511</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>65</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -724,14 +768,14 @@
<DELEGATION_TOKEN_IDENTIFIER>
<KIND>HDFS_DELEGATION_TOKEN</KIND>
<SEQUENCE_NUMBER>1</SEQUENCE_NUMBER>
<OWNER>aagarwal</OWNER>
<OWNER>jing</OWNER>
<RENEWER>JobTracker</RENEWER>
<REALUSER></REALUSER>
<ISSUE_DATE>1372107475140</ISSUE_DATE>
<MAX_DATE>1372712275140</MAX_DATE>
<ISSUE_DATE>1374817864974</ISSUE_DATE>
<MAX_DATE>1375422664974</MAX_DATE>
<MASTER_KEY_ID>2</MASTER_KEY_ID>
</DELEGATION_TOKEN_IDENTIFIER>
<EXPIRY_TIME>1372193875140</EXPIRY_TIME>
<EXPIRY_TIME>1374904264974</EXPIRY_TIME>
</DATA>
</RECORD>
<RECORD>
@ -741,14 +785,14 @@
<DELEGATION_TOKEN_IDENTIFIER>
<KIND>HDFS_DELEGATION_TOKEN</KIND>
<SEQUENCE_NUMBER>1</SEQUENCE_NUMBER>
<OWNER>aagarwal</OWNER>
<OWNER>jing</OWNER>
<RENEWER>JobTracker</RENEWER>
<REALUSER></REALUSER>
<ISSUE_DATE>1372107475140</ISSUE_DATE>
<MAX_DATE>1372712275140</MAX_DATE>
<ISSUE_DATE>1374817864974</ISSUE_DATE>
<MAX_DATE>1375422664974</MAX_DATE>
<MASTER_KEY_ID>2</MASTER_KEY_ID>
</DELEGATION_TOKEN_IDENTIFIER>
<EXPIRY_TIME>1372193875208</EXPIRY_TIME>
<EXPIRY_TIME>1374904265012</EXPIRY_TIME>
</DATA>
</RECORD>
<RECORD>
@ -758,11 +802,11 @@
<DELEGATION_TOKEN_IDENTIFIER>
<KIND>HDFS_DELEGATION_TOKEN</KIND>
<SEQUENCE_NUMBER>1</SEQUENCE_NUMBER>
<OWNER>aagarwal</OWNER>
<OWNER>jing</OWNER>
<RENEWER>JobTracker</RENEWER>
<REALUSER></REALUSER>
<ISSUE_DATE>1372107475140</ISSUE_DATE>
<MAX_DATE>1372712275140</MAX_DATE>
<ISSUE_DATE>1374817864974</ISSUE_DATE>
<MAX_DATE>1375422664974</MAX_DATE>
<MASTER_KEY_ID>2</MASTER_KEY_ID>
</DELEGATION_TOKEN_IDENTIFIER>
</DATA>
@ -773,18 +817,20 @@
<TXID>61</TXID>
<LENGTH>0</LENGTH>
<INODEID>16393</INODEID>
<PATH>/written_file</PATH>
<PATH>/hard-lease-recovery-test</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475214</MTIME>
<ATIME>1372107475214</ATIME>
<MTIME>1374817865017</MTIME>
<ATIME>1374817865017</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1834501254_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1676409172_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>69</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -805,178 +851,42 @@
<OPCODE>OP_UPDATE_BLOCKS</OPCODE>
<DATA>
<TXID>64</TXID>
<PATH>/written_file</PATH>
<PATH>/hard-lease-recovery-test</PATH>
<BLOCK>
<BLOCK_ID>1073741834</BLOCK_ID>
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1010</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_CLOSE</OPCODE>
<OPCODE>OP_UPDATE_BLOCKS</OPCODE>
<DATA>
<TXID>65</TXID>
<LENGTH>0</LENGTH>
<INODEID>0</INODEID>
<PATH>/written_file</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475221</MTIME>
<ATIME>1372107475214</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<PATH>/hard-lease-recovery-test</PATH>
<BLOCK>
<BLOCK_ID>1073741834</BLOCK_ID>
<NUM_BYTES>9</NUM_BYTES>
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1010</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD</OPCODE>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
<TXID>66</TXID>
<LENGTH>0</LENGTH>
<INODEID>16393</INODEID>
<PATH>/written_file</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475221</MTIME>
<ATIME>1372107475214</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1834501254_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<BLOCK>
<BLOCK_ID>1073741834</BLOCK_ID>
<NUM_BYTES>9</NUM_BYTES>
<GENSTAMP>1010</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
<TXID>67</TXID>
<GENSTAMPV2>1011</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_UPDATE_BLOCKS</OPCODE>
<DATA>
<TXID>68</TXID>
<PATH>/written_file</PATH>
<BLOCK>
<BLOCK_ID>1073741834</BLOCK_ID>
<NUM_BYTES>9</NUM_BYTES>
<GENSTAMP>1011</GENSTAMP>
</BLOCK>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_CLOSE</OPCODE>
<DATA>
<TXID>69</TXID>
<LENGTH>0</LENGTH>
<INODEID>0</INODEID>
<PATH>/written_file</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475272</MTIME>
<ATIME>1372107475221</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<BLOCK>
<BLOCK_ID>1073741834</BLOCK_ID>
<NUM_BYTES>26</NUM_BYTES>
<GENSTAMP>1011</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD</OPCODE>
<DATA>
<TXID>70</TXID>
<LENGTH>0</LENGTH>
<INODEID>16394</INODEID>
<PATH>/hard-lease-recovery-test</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475275</MTIME>
<ATIME>1372107475275</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1834501254_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ALLOCATE_BLOCK_ID</OPCODE>
<DATA>
<TXID>71</TXID>
<BLOCK_ID>1073741835</BLOCK_ID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
<TXID>72</TXID>
<GENSTAMPV2>1012</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_UPDATE_BLOCKS</OPCODE>
<DATA>
<TXID>73</TXID>
<PATH>/hard-lease-recovery-test</PATH>
<BLOCK>
<BLOCK_ID>1073741835</BLOCK_ID>
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1012</GENSTAMP>
</BLOCK>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_UPDATE_BLOCKS</OPCODE>
<DATA>
<TXID>74</TXID>
<PATH>/hard-lease-recovery-test</PATH>
<BLOCK>
<BLOCK_ID>1073741835</BLOCK_ID>
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1012</GENSTAMP>
</BLOCK>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
<TXID>75</TXID>
<GENSTAMPV2>1013</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_REASSIGN_LEASE</OPCODE>
<DATA>
<TXID>76</TXID>
<LEASEHOLDER>DFSClient_NONMAPREDUCE_-1834501254_1</LEASEHOLDER>
<TXID>67</TXID>
<LEASEHOLDER>DFSClient_NONMAPREDUCE_-1676409172_1</LEASEHOLDER>
<PATH>/hard-lease-recovery-test</PATH>
<NEWHOLDER>HDFS_NameNode</NEWHOLDER>
</DATA>
@ -984,23 +894,23 @@
<RECORD>
<OPCODE>OP_CLOSE</OPCODE>
<DATA>
<TXID>77</TXID>
<TXID>68</TXID>
<LENGTH>0</LENGTH>
<INODEID>0</INODEID>
<PATH>/hard-lease-recovery-test</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107477870</MTIME>
<ATIME>1372107475275</ATIME>
<MTIME>1374817867688</MTIME>
<ATIME>1374817865017</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<BLOCK>
<BLOCK_ID>1073741835</BLOCK_ID>
<BLOCK_ID>1073741834</BLOCK_ID>
<NUM_BYTES>11</NUM_BYTES>
<GENSTAMP>1013</GENSTAMP>
<GENSTAMP>1011</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -1009,7 +919,7 @@
<RECORD>
<OPCODE>OP_END_LOG_SEGMENT</OPCODE>
<DATA>
<TXID>78</TXID>
<TXID>69</TXID>
</DATA>
</RECORD>
</EDITS>