HDFS-5025. Merge 1508332 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1508335 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-07-30 07:56:23 +00:00
parent 1b8522d03d
commit 9cec392a45
31 changed files with 1207 additions and 419 deletions

View File

@ -28,6 +28,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.Client.ConnectionId;
@ -39,7 +40,12 @@
import com.google.common.annotations.VisibleForTesting;
class RetryInvocationHandler implements RpcInvocationHandler {
/**
* This class implements RpcInvocationHandler and supports retry on the client
* side.
*/
@InterfaceAudience.Private
public class RetryInvocationHandler implements RpcInvocationHandler {
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
private final FailoverProxyProvider proxyProvider;

View File

@ -1159,7 +1159,7 @@ public Client(Class<? extends Writable> valueClass, Configuration conf,
CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.clientId = StringUtils.getUuidBytes();
this.clientId = ClientId.getClientId();
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.nio.ByteBuffer;
import java.util.UUID;
import org.apache.hadoop.classification.InterfaceAudience;
import com.google.common.base.Preconditions;
/**
* A class defining a set of static helper methods to provide conversion between
* bytes and string for UUID-based client Id.
*/
@InterfaceAudience.Private
public class ClientId {
/** The byte array of a UUID should be 16 */
public static final int BYTE_LENGTH = 16;
/**
* Return clientId as byte[]
*/
public static byte[] getClientId() {
UUID uuid = UUID.randomUUID();
ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]);
buf.putLong(uuid.getMostSignificantBits());
buf.putLong(uuid.getLeastSignificantBits());
return buf.array();
}
/** Convert a clientId byte[] to string */
public static String toString(byte[] clientId) {
// clientId can be null or an empty array
if (clientId == null || clientId.length == 0) {
return "";
}
// otherwise should be 16 bytes
Preconditions.checkArgument(clientId.length == BYTE_LENGTH);
long msb = 0;
long lsb = 0;
for (int i = 0; i < 8; i++) {
msb = (msb << 8) | (clientId[i] & 0xff);
}
for (int i = 8; i < 16; i++) {
lsb = (lsb << 8) | (clientId[i] & 0xff);
}
return (new UUID(msb, lsb)).toString();
}
/** Convert from clientId string byte[] representation of clientId */
public static byte[] toBytes(String id) {
if (id == null || "".equals(id)) {
return new byte[0];
}
UUID uuid = UUID.fromString(id);
ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]);
buf.putLong(uuid.getMostSignificantBits());
buf.putLong(uuid.getLeastSignificantBits());
return buf.array();
}
}

View File

@ -19,6 +19,7 @@
import java.util.Arrays;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -27,6 +28,7 @@
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@ -64,8 +66,9 @@ public static class CacheEntry implements LightWeightCache.Entry {
CacheEntry(byte[] clientId, int callId, long expirationTime) {
// ClientId must be a UUID - that is 16 octets.
Preconditions.checkArgument(clientId.length == 16,
"Invalid clientId - must be UUID of size 16 octets");
Preconditions.checkArgument(clientId.length == ClientId.BYTE_LENGTH,
"Invalid clientId - length is " + clientId.length
+ " expected length " + ClientId.BYTE_LENGTH);
// Convert UUID bytes to two longs
long tmp = 0;
for (int i=0; i<8; i++) {
@ -131,6 +134,12 @@ public void setExpirationTime(long timeNano) {
public long getExpirationTime() {
return expirationTime;
}
@Override
public String toString() {
return (new UUID(this.clientIdMsb, this.clientIdLsb)).toString() + ":"
+ this.callId + ":" + this.state;
}
}
/**
@ -187,6 +196,11 @@ private static boolean skipRetryCache() {
|| Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID);
}
@VisibleForTesting
public LightWeightGSet<CacheEntry, CacheEntry> getCacheSet() {
return set;
}
/**
* This method handles the following conditions:
* <ul>
@ -241,6 +255,26 @@ private CacheEntry waitForCompletion(CacheEntry newEntry) {
return mapEntry;
}
/**
* Add a new cache entry into the retry cache. The cache entry consists of
* clientId and callId extracted from editlog.
*/
public void addCacheEntry(byte[] clientId, int callId) {
CacheEntry newEntry = new CacheEntry(clientId, callId, System.nanoTime()
+ expirationTime);
newEntry.completed(true);
set.put(newEntry);
}
public void addCacheEntryWithPayload(byte[] clientId, int callId,
Object payload) {
CacheEntry newEntry = new CacheEntryWithPayload(clientId, callId, payload,
System.nanoTime() + expirationTime);
// since the entry is loaded from editlog, we can assume it succeeded.
newEntry.completed(true);
set.put(newEntry);
}
private static CacheEntry newEntry(long expirationTime) {
return new CacheEntry(Server.getClientId(), Server.getCallId(),
System.nanoTime() + expirationTime);

View File

@ -22,7 +22,6 @@
import java.io.StringWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Arrays;
@ -33,7 +32,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -42,7 +40,6 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Shell;
import com.google.common.net.InetAddresses;
@ -897,17 +894,6 @@ public static String replaceTokens(String template, Pattern pattern,
return sb.toString();
}
/**
* Return a new UUID as byte[]
*/
public static byte[] getUuidBytes() {
UUID uuid = UUID.randomUUID();
ByteBuffer buf = ByteBuffer.wrap(new byte[16]);
buf.putLong(uuid.getMostSignificantBits());
buf.putLong(uuid.getLeastSignificantBits());
return buf.array();
}
/**
* Get stack trace for a given thread.
*/

View File

@ -29,8 +29,6 @@
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -39,7 +37,7 @@
* Tests for {@link RetryCache}
*/
public class TestRetryCache {
private static final byte[] CLIENT_ID = StringUtils.getUuidBytes();
private static final byte[] CLIENT_ID = ClientId.getClientId();
private static int callId = 100;
private static final Random r = new Random();
private static final TestServer testServer = new TestServer();

View File

@ -17,8 +17,8 @@
*/
package org.apache.hadoop.util;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -26,6 +26,7 @@
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.ipc.ClientId;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
@ -78,7 +79,7 @@ private void doVarIntTest(int value) throws IOException {
@Test
public void testRpcClientId() {
byte[] uuid = StringUtils.getUuidBytes();
byte[] uuid = ClientId.getClientId();
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0,
RpcConstants.INVALID_RETRY_COUNT, uuid);

View File

@ -120,6 +120,9 @@ Release 2.1.0-beta - 2013-07-02
HDFS-4979. Implement retry cache on Namenode. (suresh)
HDFS-5025. Record ClientId and CallId in EditLog to enable rebuilding
retry cache in case of HA failover. (Jing Zhao via suresh)
IMPROVEMENTS
HDFS-4461. DirectoryScanner: volume path prefix takes up memory for every

View File

@ -455,7 +455,8 @@ public DFSClient(URI nameNodeUri, Configuration conf,
* Create a new DFSClient connected to the given nameNodeUri or rpcNamenode.
* Exactly one of nameNodeUri or rpcNamenode must be null.
*/
DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
@VisibleForTesting
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
// Copy only the required DFSClient configuration

View File

@ -68,6 +68,7 @@
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@ -307,7 +308,8 @@ private static Object createNameNodeProxy(InetSocketAddress address,
}
/** Gets the configured Failover proxy provider's class */
private static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
@VisibleForTesting
public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
if (nameNodeUri == null) {
return null;
@ -344,7 +346,8 @@ private static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass
}
/** Creates the Failover proxy provider instance*/
private static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
@VisibleForTesting
public static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
Configuration conf, Class<FailoverProxyProvider<T>> failoverProxyProviderClass,
Class<T> xface, URI nameNodeUri) throws IOException {
Preconditions.checkArgument(

View File

@ -104,7 +104,9 @@ public static enum Feature {
OPTIMIZE_SNAPSHOT_INODES(-45, -43,
"Reduce snapshot inode memory footprint", false),
SEQUENTIAL_BLOCK_ID(-46, "Allocate block IDs sequentially and store " +
"block IDs in the edits log and image files");
"block IDs in the edits log and image files"),
EDITLOG_SUPPORT_RETRYCACHE(-47, "Record ClientId and CallId in editlog to "
+ "enable rebuilding retry cache in case of HA failover");
final int lv;
final int ancestorLV;

View File

@ -381,12 +381,13 @@ BlockInfo addBlock(String path, INodesInPath inodesInPath, Block block,
/**
* Persist the block list for the inode.
*/
void persistBlocks(String path, INodeFileUnderConstruction file) {
void persistBlocks(String path, INodeFileUnderConstruction file,
boolean logRetryCache) {
waitForReady();
writeLock();
try {
fsImage.getEditLog().logUpdateBlocks(path, file);
fsImage.getEditLog().logUpdateBlocks(path, file, logRetryCache);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.persistBlocks: "
+path+" with "+ file.getBlocks().length
@ -459,7 +460,7 @@ boolean unprotectedRemoveBlock(String path,
* @deprecated Use {@link #renameTo(String, String, Rename...)} instead.
*/
@Deprecated
boolean renameTo(String src, String dst)
boolean renameTo(String src, String dst, boolean logRetryCache)
throws QuotaExceededException, UnresolvedLinkException,
FileAlreadyExistsException, SnapshotAccessControlException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
@ -475,14 +476,15 @@ boolean renameTo(String src, String dst)
} finally {
writeUnlock();
}
fsImage.getEditLog().logRename(src, dst, now);
fsImage.getEditLog().logRename(src, dst, now, logRetryCache);
return true;
}
/**
* @see #unprotectedRenameTo(String, String, long, Options.Rename...)
*/
void renameTo(String src, String dst, Options.Rename... options)
void renameTo(String src, String dst, boolean logRetryCache,
Options.Rename... options)
throws FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, QuotaExceededException,
UnresolvedLinkException, IOException {
@ -500,7 +502,7 @@ void renameTo(String src, String dst, Options.Rename... options)
} finally {
writeUnlock();
}
fsImage.getEditLog().logRename(src, dst, now, options);
fsImage.getEditLog().logRename(src, dst, now, logRetryCache, options);
}
/**
@ -1176,7 +1178,7 @@ void unprotectedSetOwner(String src, String username, String groupname)
/**
* Concat all the blocks from srcs to trg and delete the srcs files
*/
void concat(String target, String [] srcs)
void concat(String target, String [] srcs, boolean supportRetryCache)
throws UnresolvedLinkException, QuotaExceededException,
SnapshotAccessControlException, SnapshotException {
writeLock();
@ -1186,7 +1188,8 @@ void concat(String target, String [] srcs)
long timestamp = now();
unprotectedConcat(target, srcs, timestamp);
// do the commit
fsImage.getEditLog().logConcat(target, srcs, timestamp);
fsImage.getEditLog().logConcat(target, srcs, timestamp,
supportRetryCache);
} finally {
writeUnlock();
}
@ -1261,10 +1264,12 @@ void unprotectedConcat(String target, String [] srcs, long timestamp)
* @param src Path of a directory to delete
* @param collectedBlocks Blocks under the deleted directory
* @param removedINodes INodes that should be removed from {@link #inodeMap}
* @param logRetryCache Whether to record RPC IDs in editlog to support retry
* cache rebuilding.
* @return true on successful deletion; else false
*/
boolean delete(String src, BlocksMapUpdateInfo collectedBlocks,
List<INode> removedINodes) throws IOException {
List<INode> removedINodes, boolean logRetryCache) throws IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
}
@ -1299,7 +1304,7 @@ boolean delete(String src, BlocksMapUpdateInfo collectedBlocks,
if (filesRemoved < 0) {
return false;
}
fsImage.getEditLog().logDelete(src, now);
fsImage.getEditLog().logDelete(src, now, logRetryCache);
incrDeletedFileCount(filesRemoved);
// Blocks/INodes will be handled later by the caller of this method
getFSNamesystem().removePathAndBlocks(src, null, null);
@ -2523,7 +2528,7 @@ private HdfsFileStatus createFileStatus(byte[] path, INode node,
/**
* Create FileStatus by file INode
*/
private HdfsFileStatus createFileStatus(byte[] path, INode node,
HdfsFileStatus createFileStatus(byte[] path, INode node,
Snapshot snapshot) {
long size = 0; // length is zero for directories
short replication = 0;
@ -2596,7 +2601,7 @@ private HdfsLocatedFileStatus createLocatedFileStatus(byte[] path,
* Add the given symbolic link to the fs. Record it in the edits log.
*/
INodeSymlink addSymlink(String path, String target,
PermissionStatus dirPerms, boolean createParent)
PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
throws UnresolvedLinkException, FileAlreadyExistsException,
QuotaExceededException, SnapshotAccessControlException {
waitForReady();
@ -2622,7 +2627,8 @@ INodeSymlink addSymlink(String path, String target,
NameNode.stateChangeLog.info("DIR* addSymlink: failed to add " + path);
return null;
}
fsImage.getEditLog().logSymlink(path, target, modTime, modTime, newNode);
fsImage.getEditLog().logSymlink(path, target, modTime, modTime, newNode,
logRetryCache);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* addSymlink: " + path + " is added");

View File

@ -78,6 +78,7 @@
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.conf.Configuration;
@ -662,11 +663,20 @@ private void printStatistics(boolean force) {
LOG.info(buf);
}
/** Record the RPC IDs if necessary */
private void logRpcIds(FSEditLogOp op, boolean toLogRpcIds) {
if (toLogRpcIds) {
op.setRpcClientId(Server.getClientId());
op.setRpcCallId(Server.getCallId());
}
}
/**
* Add open lease record to edit log.
* Records the block locations of the last block.
*/
public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
public void logOpenFile(String path, INodeFileUnderConstruction newNode,
boolean toLogRpcIds) {
AddOp op = AddOp.getInstance(cache.get())
.setInodeId(newNode.getId())
.setPath(path)
@ -678,8 +688,8 @@ public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
.setPermissionStatus(newNode.getPermissionStatus())
.setClientName(newNode.getClientName())
.setClientMachine(newNode.getClientMachine());
logEdit(op);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
@ -698,10 +708,12 @@ public void logCloseFile(String path, INodeFile newNode) {
logEdit(op);
}
public void logUpdateBlocks(String path, INodeFileUnderConstruction file) {
public void logUpdateBlocks(String path, INodeFileUnderConstruction file,
boolean toLogRpcIds) {
UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get())
.setPath(path)
.setBlocks(file.getBlocks());
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
@ -721,23 +733,26 @@ public void logMkDir(String path, INode newNode) {
* Add rename record to edit log
* TODO: use String parameters until just before writing to disk
*/
void logRename(String src, String dst, long timestamp) {
void logRename(String src, String dst, long timestamp, boolean toLogRpcIds) {
RenameOldOp op = RenameOldOp.getInstance(cache.get())
.setSource(src)
.setDestination(dst)
.setTimestamp(timestamp);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
* Add rename record to edit log
*/
void logRename(String src, String dst, long timestamp, Options.Rename... options) {
void logRename(String src, String dst, long timestamp, boolean toLogRpcIds,
Options.Rename... options) {
RenameOp op = RenameOp.getInstance(cache.get())
.setSource(src)
.setDestination(dst)
.setTimestamp(timestamp)
.setOptions(options);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
@ -784,21 +799,23 @@ void logSetOwner(String src, String username, String groupname) {
/**
* concat(trg,src..) log
*/
void logConcat(String trg, String [] srcs, long timestamp) {
void logConcat(String trg, String[] srcs, long timestamp, boolean toLogRpcIds) {
ConcatDeleteOp op = ConcatDeleteOp.getInstance(cache.get())
.setTarget(trg)
.setSources(srcs)
.setTimestamp(timestamp);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
* Add delete file record to edit log
*/
void logDelete(String src, long timestamp) {
void logDelete(String src, long timestamp, boolean toLogRpcIds) {
DeleteOp op = DeleteOp.getInstance(cache.get())
.setPath(src)
.setTimestamp(timestamp);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
@ -843,8 +860,8 @@ void logTimes(String src, long mtime, long atime) {
/**
* Add a create symlink record.
*/
void logSymlink(String path, String value, long mtime,
long atime, INodeSymlink node) {
void logSymlink(String path, String value, long mtime, long atime,
INodeSymlink node, boolean toLogRpcIds) {
SymlinkOp op = SymlinkOp.getInstance(cache.get())
.setId(node.getId())
.setPath(path)
@ -852,6 +869,7 @@ void logSymlink(String path, String value, long mtime,
.setModificationTime(mtime)
.setAccessTime(atime)
.setPermissionStatus(node.getPermissionStatus());
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
@ -896,22 +914,26 @@ void logReassignLease(String leaseHolder, String src, String newHolder) {
logEdit(op);
}
void logCreateSnapshot(String snapRoot, String snapName) {
void logCreateSnapshot(String snapRoot, String snapName, boolean toLogRpcIds) {
CreateSnapshotOp op = CreateSnapshotOp.getInstance(cache.get())
.setSnapshotRoot(snapRoot).setSnapshotName(snapName);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logDeleteSnapshot(String snapRoot, String snapName) {
void logDeleteSnapshot(String snapRoot, String snapName, boolean toLogRpcIds) {
DeleteSnapshotOp op = DeleteSnapshotOp.getInstance(cache.get())
.setSnapshotRoot(snapRoot).setSnapshotName(snapName);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logRenameSnapshot(String path, String snapOldName, String snapNewName) {
void logRenameSnapshot(String path, String snapOldName, String snapNewName,
boolean toLogRpcIds) {
RenameSnapshotOp op = RenameSnapshotOp.getInstance(cache.get())
.setSnapshotRoot(path).setSnapshotOldName(snapOldName)
.setSnapshotNewName(snapNewName);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}

View File

@ -34,8 +34,10 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.common.Storage;
@ -277,6 +279,7 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
if (LOG.isTraceEnabled()) {
LOG.trace("replaying edit log: " + op);
}
final boolean toAddRetryCache = fsNamesys.hasRetryCache() && op.hasRpcIds();
switch (op.opCode) {
case OP_ADD: {
@ -300,8 +303,8 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
if (oldFile == null) { // this is OP_ADD on a new file (case 1)
// versions > 0 support per file replication
// get name and replication
final short replication = fsNamesys.getBlockManager(
).adjustReplication(addCloseOp.replication);
final short replication = fsNamesys.getBlockManager()
.adjustReplication(addCloseOp.replication);
assert addCloseOp.blocks.length == 0;
// add to the file tree
@ -313,6 +316,13 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
addCloseOp.clientName, addCloseOp.clientMachine);
fsNamesys.leaseManager.addLease(addCloseOp.clientName, addCloseOp.path);
// add the op into retry cache if necessary
if (toAddRetryCache) {
HdfsFileStatus stat = fsNamesys.dir.createFileStatus(
HdfsFileStatus.EMPTY_NAME, newFile, null);
fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
addCloseOp.rpcCallId, stat);
}
} else { // This is OP_ADD on an existing file
if (!oldFile.isUnderConstruction()) {
// This is case 3: a call to append() on an already-closed file.
@ -320,11 +330,17 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
FSNamesystem.LOG.debug("Reopening an already-closed file " +
"for append");
}
fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile,
addCloseOp.clientName, addCloseOp.clientMachine, null,
false, iip.getLatestSnapshot());
LocatedBlock lb = fsNamesys.prepareFileForWrite(addCloseOp.path,
oldFile, addCloseOp.clientName, addCloseOp.clientMachine, null,
false, iip.getLatestSnapshot(), false);
newFile = INodeFile.valueOf(fsDir.getINode(addCloseOp.path),
addCloseOp.path, true);
// add the op into retry cache is necessary
if (toAddRetryCache) {
fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
addCloseOp.rpcCallId, lb);
}
}
}
// Fall-through for case 2.
@ -384,6 +400,10 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
updateOp.path);
// Update in-memory data structures
updateBlocks(fsDir, updateOp, oldFile);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId);
}
break;
}
@ -399,17 +419,30 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op;
fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs,
concatDeleteOp.timestamp);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(concatDeleteOp.rpcClientId,
concatDeleteOp.rpcCallId);
}
break;
}
case OP_RENAME_OLD: {
RenameOldOp renameOp = (RenameOldOp)op;
fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
renameOp.timestamp);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId);
}
break;
}
case OP_DELETE: {
DeleteOp deleteOp = (DeleteOp)op;
fsDir.unprotectedDelete(deleteOp.path, deleteOp.timestamp);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(deleteOp.rpcClientId, deleteOp.rpcCallId);
}
break;
}
case OP_MKDIR: {
@ -474,12 +507,20 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
fsDir.unprotectedAddSymlink(inodeId, symlinkOp.path,
symlinkOp.value, symlinkOp.mtime,
symlinkOp.atime, symlinkOp.permissionStatus);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(symlinkOp.rpcClientId, symlinkOp.rpcCallId);
}
break;
}
case OP_RENAME: {
RenameOp renameOp = (RenameOp)op;
fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
renameOp.timestamp, renameOp.options);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId);
}
break;
}
case OP_GET_DELEGATION_TOKEN: {
@ -532,8 +573,12 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
}
case OP_CREATE_SNAPSHOT: {
CreateSnapshotOp createSnapshotOp = (CreateSnapshotOp) op;
fsNamesys.getSnapshotManager().createSnapshot(
String path = fsNamesys.getSnapshotManager().createSnapshot(
createSnapshotOp.snapshotRoot, createSnapshotOp.snapshotName);
if (toAddRetryCache) {
fsNamesys.addCacheEntryWithPayload(createSnapshotOp.rpcClientId,
createSnapshotOp.rpcCallId, path);
}
break;
}
case OP_DELETE_SNAPSHOT: {
@ -547,6 +592,11 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
collectedBlocks.clear();
fsNamesys.dir.removeFromInodeMap(removedINodes);
removedINodes.clear();
if (toAddRetryCache) {
fsNamesys.addCacheEntry(deleteSnapshotOp.rpcClientId,
deleteSnapshotOp.rpcCallId);
}
break;
}
case OP_RENAME_SNAPSHOT: {
@ -554,6 +604,11 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
fsNamesys.getSnapshotManager().renameSnapshot(
renameSnapshotOp.snapshotRoot, renameSnapshotOp.snapshotOldName,
renameSnapshotOp.snapshotNewName);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(renameSnapshotOp.rpcClientId,
renameSnapshotOp.rpcCallId);
}
break;
}
case OP_ALLOW_SNAPSHOT: {

View File

@ -17,55 +17,88 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOCATE_BLOCK_ID;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOW_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CLEAR_NS_QUOTA;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CLOSE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CONCAT_DELETE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CREATE_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DISALLOW_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_END_LOG_SEGMENT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MKDIR;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENEW_DELEGATION_TOKEN;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V1;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V2;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_NS_QUOTA;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_OWNER;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_PERMISSIONS;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_QUOTA;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_REPLICATION;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_START_LOG_SEGMENT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SYMLINK;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TIMES;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_BLOCKS;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_MASTER_KEY;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.List;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.util.PureJavaCrc32;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.util.XMLUtils;
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.hdfs.util.XMLUtils;
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.ipc.ClientId;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.PureJavaCrc32;
import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.AttributesImpl;
import com.google.common.base.Preconditions;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.EOFException;
/**
* Helper classes for reading the ops from an InputStream.
* All ops derive from FSEditLogOp and are only
@ -76,6 +109,8 @@
public abstract class FSEditLogOp {
public final FSEditLogOpCodes opCode;
long txid;
byte[] rpcClientId = RpcConstants.DUMMY_CLIENT_ID;
int rpcCallId = RpcConstants.INVALID_CALL_ID;
@SuppressWarnings("deprecation")
final public static class OpInstanceCache {
@ -151,6 +186,31 @@ public void setTransactionId(long txid) {
this.txid = txid;
}
public boolean hasRpcIds() {
return rpcClientId != RpcConstants.DUMMY_CLIENT_ID
&& rpcCallId != RpcConstants.INVALID_CALL_ID;
}
/** this has to be called after calling {@link #hasRpcIds()} */
public byte[] getClientId() {
Preconditions.checkState(rpcClientId != RpcConstants.DUMMY_CLIENT_ID);
return rpcClientId;
}
public void setRpcClientId(byte[] clientId) {
this.rpcClientId = clientId;
}
/** this has to be called after calling {@link #hasRpcIds()} */
public int getCallId() {
Preconditions.checkState(rpcCallId != RpcConstants.INVALID_CALL_ID);
return rpcCallId;
}
public void setRpcCallId(int callId) {
this.rpcCallId = callId;
}
abstract void readFields(DataInputStream in, int logVersion)
throws IOException;
@ -163,6 +223,46 @@ static interface BlockListUpdatingOp {
boolean shouldCompleteLastBlock();
}
private static void writeRpcIds(final byte[] clientId, final int callId,
DataOutputStream out) throws IOException {
FSImageSerialization.writeBytes(clientId, out);
FSImageSerialization.writeInt(callId, out);
}
void readRpcIds(DataInputStream in, int logVersion)
throws IOException {
if (LayoutVersion.supports(Feature.EDITLOG_SUPPORT_RETRYCACHE,
logVersion)) {
this.rpcClientId = FSImageSerialization.readBytes(in);
this.rpcCallId = FSImageSerialization.readInt(in);
}
}
void readRpcIdsFromXml(Stanza st) {
this.rpcClientId = st.hasChildren("RPC_CLIENTID") ?
ClientId.toBytes(st.getValue("RPC_CLIENTID"))
: RpcConstants.DUMMY_CLIENT_ID;
this.rpcCallId = st.hasChildren("RPC_CALLID") ?
Integer.valueOf(st.getValue("RPC_CALLID"))
: RpcConstants.INVALID_CALL_ID;
}
private static void appendRpcIdsToString(final StringBuilder builder,
final byte[] clientId, final int callId) {
builder.append(", RpcClientId=");
builder.append(ClientId.toString(clientId));
builder.append(", RpcCallId=");
builder.append(callId);
}
private static void appendRpcIdsToXml(ContentHandler contentHandler,
final byte[] clientId, final int callId) throws SAXException {
XMLUtils.addSaxString(contentHandler, "RPC_CLIENTID",
ClientId.toString(clientId));
XMLUtils.addSaxString(contentHandler, "RPC_CALLID",
Integer.valueOf(callId).toString());
}
@SuppressWarnings("unchecked")
static abstract class AddCloseOp extends FSEditLogOp implements BlockListUpdatingOp {
int length;
@ -247,8 +347,7 @@ <T extends AddCloseOp> T setClientMachine(String clientMachine) {
}
@Override
public
void writeFields(DataOutputStream out) throws IOException {
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeLong(inodeId, out);
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeShort(replication, out);
@ -261,6 +360,8 @@ void writeFields(DataOutputStream out) throws IOException {
if (this.opCode == OP_ADD) {
FSImageSerialization.writeString(clientName,out);
FSImageSerialization.writeString(clientMachine,out);
// write clientId and callId
writeRpcIds(rpcClientId, rpcCallId, out);
}
}
@ -317,6 +418,8 @@ void readFields(DataInputStream in, int logVersion)
if (this.opCode == OP_ADD) {
this.clientName = FSImageSerialization.readString(in);
this.clientMachine = FSImageSerialization.readString(in);
// read clientId and callId
readRpcIds(in, logVersion);
} else {
this.clientName = "";
this.clientMachine = "";
@ -368,6 +471,9 @@ public String stringifyMembers() {
builder.append(clientName);
builder.append(", clientMachine=");
builder.append(clientMachine);
if (this.opCode == OP_ADD) {
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
}
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
@ -397,9 +503,13 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
FSEditLogOp.blockToXml(contentHandler, b);
}
FSEditLogOp.permissionStatusToXml(contentHandler, permissions);
if (this.opCode == OP_ADD) {
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
@Override
void fromXml(Stanza st) throws InvalidXmlException {
this.length = Integer.valueOf(st.getValue("LENGTH"));
this.inodeId = Long.valueOf(st.getValue("INODEID"));
this.path = st.getValue("PATH");
@ -420,9 +530,14 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
}
this.permissions =
permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
readRpcIdsFromXml(st);
}
}
/**
* {@literal @AtMostOnce} for {@link ClientProtocol#startFile} and
* {@link ClientProtocol#appendFile}
*/
static class AddOp extends AddCloseOp {
private AddOp() {
super(OP_ADD);
@ -446,6 +561,11 @@ public String toString() {
}
}
/**
* Although {@link ClientProtocol#appendFile} may also log a close op, we do
* not need to record the rpc ids here since a successful appendFile op will
* finally log an AddOp.
*/
static class CloseOp extends AddCloseOp {
private CloseOp() {
super(OP_CLOSE);
@ -469,6 +589,10 @@ public String toString() {
}
}
/**
* {@literal @AtMostOnce} for {@link ClientProtocol#updatePipeline}, but
* {@literal @Idempotent} for some other ops.
*/
static class UpdateBlocksOp extends FSEditLogOp implements BlockListUpdatingOp {
String path;
Block[] blocks;
@ -481,7 +605,6 @@ static UpdateBlocksOp getInstance(OpInstanceCache cache) {
return (UpdateBlocksOp)cache.get(OP_UPDATE_BLOCKS);
}
UpdateBlocksOp setPath(String path) {
this.path = path;
return this;
@ -507,6 +630,8 @@ public Block[] getBlocks() {
void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeCompactBlockArray(blocks, out);
// clientId and callId
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@ -514,6 +639,7 @@ void readFields(DataInputStream in, int logVersion) throws IOException {
path = FSImageSerialization.readString(in);
this.blocks = FSImageSerialization.readCompactBlockArray(
in, logVersion);
readRpcIds(in, logVersion);
}
@Override
@ -527,8 +653,9 @@ public String toString() {
sb.append("UpdateBlocksOp [path=")
.append(path)
.append(", blocks=")
.append(Arrays.toString(blocks))
.append("]");
.append(Arrays.toString(blocks));
appendRpcIdsToString(sb, rpcClientId, rpcCallId);
sb.append("]");
return sb.toString();
}
@ -538,6 +665,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
for (Block b : blocks) {
FSEditLogOp.blockToXml(contentHandler, b);
}
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
@ -547,9 +675,11 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
for (int i = 0; i < blocks.size(); i++) {
this.blocks[i] = FSEditLogOp.blockFromXml(blocks.get(i));
}
readRpcIdsFromXml(st);
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#setReplication} */
static class SetReplicationOp extends FSEditLogOp {
String path;
short replication;
@ -618,6 +748,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
}
}
/** {@literal @AtMostOnce} for {@link ClientProtocol#concat} */
static class ConcatDeleteOp extends FSEditLogOp {
int length;
String trg;
@ -654,8 +785,7 @@ ConcatDeleteOp setTimestamp(long timestamp) {
}
@Override
public
void writeFields(DataOutputStream out) throws IOException {
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(trg, out);
DeprecatedUTF8 info[] = new DeprecatedUTF8[srcs.length];
@ -666,6 +796,9 @@ void writeFields(DataOutputStream out) throws IOException {
new ArrayWritable(DeprecatedUTF8.class, info).write(out);
FSImageSerialization.writeLong(timestamp, out);
// rpc ids
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@ -704,6 +837,8 @@ void readFields(DataInputStream in, int logVersion)
} else {
this.timestamp = readLong(in);
}
// read RPC ids if necessary
readRpcIds(in, logVersion);
}
@Override
@ -717,6 +852,7 @@ public String toString() {
builder.append(Arrays.toString(srcs));
builder.append(", timestamp=");
builder.append(timestamp);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
@ -738,6 +874,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
"SOURCE" + (i + 1), srcs[i]);
}
contentHandler.endElement("", "", "SOURCES");
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
@ -755,9 +892,11 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
for (i = 0; i < srcs.length; i++) {
srcs[i] = sources.get(0).getValue("SOURCE" + (i + 1));
}
readRpcIdsFromXml(st);
}
}
/** {@literal @AtMostOnce} for {@link ClientProtocol#rename} */
static class RenameOldOp extends FSEditLogOp {
int length;
String src;
@ -793,6 +932,7 @@ void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(src, out);
FSImageSerialization.writeString(dst, out);
FSImageSerialization.writeLong(timestamp, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@ -812,6 +952,9 @@ void readFields(DataInputStream in, int logVersion)
} else {
this.timestamp = readLong(in);
}
// read RPC ids if necessary
readRpcIds(in, logVersion);
}
@Override
@ -825,6 +968,7 @@ public String toString() {
builder.append(dst);
builder.append(", timestamp=");
builder.append(timestamp);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
@ -841,16 +985,21 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "DST", dst);
XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
Long.valueOf(timestamp).toString());
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
@Override
void fromXml(Stanza st) throws InvalidXmlException {
this.length = Integer.valueOf(st.getValue("LENGTH"));
this.src = st.getValue("SRC");
this.dst = st.getValue("DST");
this.timestamp = Long.valueOf(st.getValue("TIMESTAMP"));
readRpcIdsFromXml(st);
}
}
/** {@literal @AtMostOnce} for {@link ClientProtocol#delete} */
static class DeleteOp extends FSEditLogOp {
int length;
String path;
@ -879,6 +1028,7 @@ DeleteOp setTimestamp(long timestamp) {
void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeLong(timestamp, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@ -896,6 +1046,8 @@ void readFields(DataInputStream in, int logVersion)
} else {
this.timestamp = readLong(in);
}
// read RPC ids if necessary
readRpcIds(in, logVersion);
}
@Override
@ -907,6 +1059,7 @@ public String toString() {
builder.append(path);
builder.append(", timestamp=");
builder.append(timestamp);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
@ -922,15 +1075,19 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "PATH", path);
XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
Long.valueOf(timestamp).toString());
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
this.length = Integer.valueOf(st.getValue("LENGTH"));
this.path = st.getValue("PATH");
this.timestamp = Long.valueOf(st.getValue("TIMESTAMP"));
readRpcIdsFromXml(st);
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#mkdirs} */
static class MkdirOp extends FSEditLogOp {
int length;
long inodeId;
@ -1056,6 +1213,13 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
}
}
/**
* The corresponding operations are either {@literal @Idempotent} (
* {@link ClientProtocol#updateBlockForPipeline},
* {@link ClientProtocol#recoverLease}, {@link ClientProtocol#addBlock}) or
* already bound with other editlog op which records rpc ids (
* {@link ClientProtocol#startFile}). Thus no need to record rpc ids here.
*/
static class SetGenstampV1Op extends FSEditLogOp {
long genStampV1;
@ -1108,6 +1272,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
}
}
/** Similar with {@link SetGenstampV1Op} */
static class SetGenstampV2Op extends FSEditLogOp {
long genStampV2;
@ -1160,6 +1325,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#addBlock} */
static class AllocateBlockIdOp extends FSEditLogOp {
long blockId;
@ -1212,6 +1378,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#setPermission} */
static class SetPermissionsOp extends FSEditLogOp {
String src;
FsPermission permissions;
@ -1277,6 +1444,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#setOwner} */
static class SetOwnerOp extends FSEditLogOp {
String src;
String username;
@ -1457,6 +1625,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#setQuota} */
static class SetQuotaOp extends FSEditLogOp {
String src;
long nsQuota;
@ -1534,6 +1703,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#setTimes} */
static class TimesOp extends FSEditLogOp {
int length;
String path;
@ -1629,6 +1799,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
}
}
/** {@literal @AtMostOnce} for {@link ClientProtocol#createSymlink} */
static class SymlinkOp extends FSEditLogOp {
int length;
long inodeId;
@ -1677,14 +1848,14 @@ SymlinkOp setPermissionStatus(PermissionStatus permissionStatus) {
}
@Override
public
void writeFields(DataOutputStream out) throws IOException {
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeLong(inodeId, out);
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeString(value, out);
FSImageSerialization.writeLong(mtime, out);
FSImageSerialization.writeLong(atime, out);
permissionStatus.write(out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@ -1714,6 +1885,9 @@ void readFields(DataInputStream in, int logVersion)
this.atime = readLong(in);
}
this.permissionStatus = PermissionStatus.read(in);
// read RPC ids if necessary
readRpcIds(in, logVersion);
}
@Override
@ -1733,6 +1907,7 @@ public String toString() {
builder.append(atime);
builder.append(", permissionStatus=");
builder.append(permissionStatus);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
@ -1754,9 +1929,11 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "ATIME",
Long.valueOf(atime).toString());
FSEditLogOp.permissionStatusToXml(contentHandler, permissionStatus);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
@Override
void fromXml(Stanza st) throws InvalidXmlException {
this.length = Integer.valueOf(st.getValue("LENGTH"));
this.inodeId = Long.valueOf(st.getValue("INODEID"));
this.path = st.getValue("PATH");
@ -1765,9 +1942,12 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
this.atime = Long.valueOf(st.getValue("ATIME"));
this.permissionStatus =
permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
readRpcIdsFromXml(st);
}
}
/** {@literal @AtMostOnce} for {@link ClientProtocol#rename2} */
static class RenameOp extends FSEditLogOp {
int length;
String src;
@ -1810,6 +1990,7 @@ void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(dst, out);
FSImageSerialization.writeLong(timestamp, out);
toBytesWritable(options).write(out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@ -1830,6 +2011,9 @@ void readFields(DataInputStream in, int logVersion)
this.timestamp = readLong(in);
}
this.options = readRenameOptions(in);
// read RPC ids if necessary
readRpcIds(in, logVersion);
}
private static Rename[] readRenameOptions(DataInputStream in) throws IOException {
@ -1866,6 +2050,7 @@ public String toString() {
builder.append(timestamp);
builder.append(", options=");
builder.append(Arrays.toString(options));
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
@ -1889,6 +2074,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
prefix = "|";
}
XMLUtils.addSaxString(contentHandler, "OPTIONS", bld.toString());
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
@ -1910,9 +2096,15 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
}
}
}
readRpcIdsFromXml(st);
}
}
/**
* {@literal @Idempotent} for {@link ClientProtocol#recoverLease}. In the
* meanwhile, startFile and appendFile both have their own corresponding
* editlog op.
*/
static class ReassignLeaseOp extends FSEditLogOp {
String leaseHolder;
String path;
@ -1988,6 +2180,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#getDelegationToken} */
static class GetDelegationTokenOp extends FSEditLogOp {
DelegationTokenIdentifier token;
long expiryTime;
@ -2059,6 +2252,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#renewDelegationToken} */
static class RenewDelegationTokenOp extends FSEditLogOp {
DelegationTokenIdentifier token;
long expiryTime;
@ -2130,6 +2324,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
}
}
/** {@literal @Idempotent} for {@link ClientProtocol#cancelDelegationToken} */
static class CancelDelegationTokenOp extends FSEditLogOp {
DelegationTokenIdentifier token;
@ -2323,7 +2518,8 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
}
/**
* Operation corresponding to creating a snapshot
* Operation corresponding to creating a snapshot.
* {@literal @AtMostOnce} for {@link ClientProtocol#createSnapshot}.
*/
static class CreateSnapshotOp extends FSEditLogOp {
String snapshotRoot;
@ -2351,24 +2547,31 @@ public CreateSnapshotOp setSnapshotRoot(String snapRoot) {
void readFields(DataInputStream in, int logVersion) throws IOException {
snapshotRoot = FSImageSerialization.readString(in);
snapshotName = FSImageSerialization.readString(in);
// read RPC ids if necessary
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(snapshotRoot, out);
FSImageSerialization.writeString(snapshotName, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
XMLUtils.addSaxString(contentHandler, "SNAPSHOTNAME", snapshotName);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
snapshotRoot = st.getValue("SNAPSHOTROOT");
snapshotName = st.getValue("SNAPSHOTNAME");
readRpcIdsFromXml(st);
}
@Override
@ -2378,13 +2581,15 @@ public String toString() {
builder.append(snapshotRoot);
builder.append(", snapshotName=");
builder.append(snapshotName);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
}
/**
* Operation corresponding to delete a snapshot
* Operation corresponding to delete a snapshot.
* {@literal @AtMostOnce} for {@link ClientProtocol#deleteSnapshot}.
*/
static class DeleteSnapshotOp extends FSEditLogOp {
String snapshotRoot;
@ -2412,24 +2617,31 @@ DeleteSnapshotOp setSnapshotRoot(String snapRoot) {
void readFields(DataInputStream in, int logVersion) throws IOException {
snapshotRoot = FSImageSerialization.readString(in);
snapshotName = FSImageSerialization.readString(in);
// read RPC ids if necessary
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(snapshotRoot, out);
FSImageSerialization.writeString(snapshotName, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
XMLUtils.addSaxString(contentHandler, "SNAPSHOTNAME", snapshotName);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
snapshotRoot = st.getValue("SNAPSHOTROOT");
snapshotName = st.getValue("SNAPSHOTNAME");
readRpcIdsFromXml(st);
}
@Override
@ -2439,13 +2651,15 @@ public String toString() {
builder.append(snapshotRoot);
builder.append(", snapshotName=");
builder.append(snapshotName);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
}
/**
* Operation corresponding to rename a snapshot
* Operation corresponding to rename a snapshot.
* {@literal @AtMostOnce} for {@link ClientProtocol#renameSnapshot}.
*/
static class RenameSnapshotOp extends FSEditLogOp {
String snapshotRoot;
@ -2480,6 +2694,9 @@ void readFields(DataInputStream in, int logVersion) throws IOException {
snapshotRoot = FSImageSerialization.readString(in);
snapshotOldName = FSImageSerialization.readString(in);
snapshotNewName = FSImageSerialization.readString(in);
// read RPC ids if necessary
readRpcIds(in, logVersion);
}
@Override
@ -2487,6 +2704,8 @@ public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(snapshotRoot, out);
FSImageSerialization.writeString(snapshotOldName, out);
FSImageSerialization.writeString(snapshotNewName, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@ -2494,6 +2713,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
XMLUtils.addSaxString(contentHandler, "SNAPSHOTOLDNAME", snapshotOldName);
XMLUtils.addSaxString(contentHandler, "SNAPSHOTNEWNAME", snapshotNewName);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
@ -2501,6 +2721,8 @@ void fromXml(Stanza st) throws InvalidXmlException {
snapshotRoot = st.getValue("SNAPSHOTROOT");
snapshotOldName = st.getValue("SNAPSHOTOLDNAME");
snapshotNewName = st.getValue("SNAPSHOTNEWNAME");
readRpcIdsFromXml(st);
}
@Override
@ -2512,6 +2734,7 @@ public String toString() {
builder.append(snapshotOldName);
builder.append(", snapshotNewName=");
builder.append(snapshotNewName);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
@ -2520,7 +2743,7 @@ public String toString() {
/**
* Operation corresponding to allow creating snapshot on a directory
*/
static class AllowSnapshotOp extends FSEditLogOp {
static class AllowSnapshotOp extends FSEditLogOp { // @Idempotent
String snapshotRoot;
public AllowSnapshotOp() {
@ -2574,7 +2797,7 @@ public String toString() {
/**
* Operation corresponding to disallow creating snapshot on a directory
*/
static class DisallowSnapshotOp extends FSEditLogOp {
static class DisallowSnapshotOp extends FSEditLogOp { // @Idempotent
String snapshotRoot;
public DisallowSnapshotOp() {

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ShortWritable;
import org.apache.hadoop.io.Text;
@ -80,6 +81,7 @@ protected TLData initialValue() {
static private final class TLData {
final DeprecatedUTF8 U_STR = new DeprecatedUTF8();
final ShortWritable U_SHORT = new ShortWritable();
final IntWritable U_INT = new IntWritable();
final LongWritable U_LONG = new LongWritable();
final FsPermission FILE_PERM = new FsPermission((short) 0);
}
@ -350,9 +352,9 @@ public static void writeString(String str, DataOutput out) throws IOException {
/** read the long value */
static long readLong(DataInput in) throws IOException {
LongWritable ustr = TL_DATA.get().U_LONG;
ustr.readFields(in);
return ustr.get();
LongWritable uLong = TL_DATA.get().U_LONG;
uLong.readFields(in);
return uLong.get();
}
/** write the long value */
@ -362,6 +364,20 @@ static void writeLong(long value, DataOutputStream out) throws IOException {
uLong.write(out);
}
/** read the int value */
static int readInt(DataInput in) throws IOException {
IntWritable uInt = TL_DATA.get().U_INT;
uInt.readFields(in);
return uInt.get();
}
/** write the int value */
static void writeInt(int value, DataOutputStream out) throws IOException {
IntWritable uInt = TL_DATA.get().U_INT;
uInt.set(value);
uInt.write(out);
}
/** read short value */
static short readShort(DataInput in) throws IOException {
ShortWritable uShort = TL_DATA.get().U_SHORT;
@ -414,8 +430,13 @@ public static byte[] readLocalName(DataInput in) throws IOException {
private static void writeLocalName(INodeAttributes inode, DataOutput out)
throws IOException {
final byte[] name = inode.getLocalNameBytes();
out.writeShort(name.length);
out.write(name);
writeBytes(name, out);
}
public static void writeBytes(byte[] data, DataOutput out)
throws IOException {
out.writeShort(data.length);
out.write(data);
}
/**

View File

@ -549,7 +549,7 @@ public static FSNamesystem loadFromDisk(Configuration conf)
FSImage fsImage = new FSImage(conf,
FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf));
FSNamesystem namesystem = new FSNamesystem(conf, fsImage);
FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
StartupOption startOpt = NameNode.getStartupOption(conf);
if (startOpt == StartupOption.RECOVER) {
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
@ -568,6 +568,10 @@ public static FSNamesystem loadFromDisk(Configuration conf)
return namesystem;
}
FSNamesystem(Configuration conf, FSImage fsImage) throws IOException {
this(conf, fsImage, false);
}
/**
* Create an FSNamesystem associated with the specified image.
*
@ -576,9 +580,12 @@ public static FSNamesystem loadFromDisk(Configuration conf)
*
* @param conf configuration
* @param fsImage The FSImage to associate with
* @param ignoreRetryCache Whether or not should ignore the retry cache setup
* step. For Secondary NN this should be set to true.
* @throws IOException on bad configuration
*/
FSNamesystem(Configuration conf, FSImage fsImage) throws IOException {
FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
throws IOException {
try {
resourceRecheckInterval = conf.getLong(
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
@ -669,7 +676,7 @@ public static FSNamesystem loadFromDisk(Configuration conf)
this.auditLoggers = initAuditLoggers(conf);
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
auditLoggers.get(0) instanceof DefaultAuditLogger;
this.retryCache = initRetryCache(conf);
this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
} catch(IOException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close();
@ -681,6 +688,28 @@ public static FSNamesystem loadFromDisk(Configuration conf)
}
}
@VisibleForTesting
public RetryCache getRetryCache() {
return retryCache;
}
/** Whether or not retry cache is enabled */
boolean hasRetryCache() {
return retryCache != null;
}
void addCacheEntryWithPayload(byte[] clientId, int callId, Object payload) {
if (retryCache != null) {
retryCache.addCacheEntryWithPayload(clientId, callId, payload);
}
}
void addCacheEntry(byte[] clientId, int callId) {
if (retryCache != null) {
retryCache.addCacheEntry(clientId, callId);
}
}
@VisibleForTesting
static RetryCache initRetryCache(Configuration conf) {
boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY,
@ -1520,7 +1549,7 @@ void concat(String target, String [] srcs)
boolean success = false;
try {
concatInt(target, srcs);
concatInt(target, srcs, cacheEntry != null);
success = true;
} catch (AccessControlException e) {
logAuditEvent(false, "concat", Arrays.toString(srcs), target, null);
@ -1530,8 +1559,8 @@ void concat(String target, String [] srcs)
}
}
private void concatInt(String target, String [] srcs)
throws IOException, UnresolvedLinkException {
private void concatInt(String target, String [] srcs,
boolean logRetryCache) throws IOException, UnresolvedLinkException {
// verify args
if(target.isEmpty()) {
throw new IllegalArgumentException("Target file name is empty");
@ -1560,7 +1589,7 @@ private void concatInt(String target, String [] srcs)
if (isInSafeMode()) {
throw new SafeModeException("Cannot concat " + target, safeMode);
}
concatInternal(pc, target, srcs);
concatInternal(pc, target, srcs, logRetryCache);
resultingStat = getAuditFileInfo(target, false);
} finally {
writeUnlock();
@ -1570,8 +1599,9 @@ private void concatInt(String target, String [] srcs)
}
/** See {@link #concat(String, String[])} */
private void concatInternal(FSPermissionChecker pc, String target, String [] srcs)
throws IOException, UnresolvedLinkException {
private void concatInternal(FSPermissionChecker pc, String target,
String[] srcs, boolean logRetryCache) throws IOException,
UnresolvedLinkException {
assert hasWriteLock();
// write permission for the target
@ -1675,7 +1705,7 @@ private void concatInternal(FSPermissionChecker pc, String target, String [] src
Arrays.toString(srcs) + " to " + target);
}
dir.concat(target,srcs);
dir.concat(target,srcs, logRetryCache);
}
/**
@ -1747,7 +1777,7 @@ void createSymlink(String target, String link,
}
boolean success = false;
try {
createSymlinkInt(target, link, dirPerms, createParent);
createSymlinkInt(target, link, dirPerms, createParent, cacheEntry != null);
success = true;
} catch (AccessControlException e) {
logAuditEvent(false, "createSymlink", link, target, null);
@ -1758,7 +1788,7 @@ void createSymlink(String target, String link,
}
private void createSymlinkInt(String target, String link,
PermissionStatus dirPerms, boolean createParent)
PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
throws IOException, UnresolvedLinkException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target="
@ -1789,7 +1819,7 @@ private void createSymlinkInt(String target, String link,
checkFsObjectLimit();
// add symbolic link to namespace
dir.addSymlink(link, target, dirPerms, createParent);
dir.addSymlink(link, target, dirPerms, createParent, logRetryCache);
resultingStat = getAuditFileInfo(link, false);
} finally {
writeUnlock();
@ -1919,7 +1949,7 @@ HdfsFileStatus startFile(String src, PermissionStatus permissions,
try {
status = startFileInt(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize);
createParent, replication, blockSize, cacheEntry != null);
} catch (AccessControlException e) {
logAuditEvent(false, "create", src);
throw e;
@ -1931,8 +1961,8 @@ HdfsFileStatus startFile(String src, PermissionStatus permissions,
private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
String holder, String clientMachine, EnumSet<CreateFlag> flag,
boolean createParent, short replication, long blockSize)
throws AccessControlException, SafeModeException,
boolean createParent, short replication, long blockSize,
boolean logRetryCache) throws AccessControlException, SafeModeException,
FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
@ -1967,8 +1997,8 @@ private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
throw new SafeModeException("Cannot create file" + src, safeMode);
}
src = FSDirectory.resolvePath(src, pathComponents, dir);
startFileInternal(pc, src, permissions, holder, clientMachine,
create, overwrite, createParent, replication, blockSize);
startFileInternal(pc, src, permissions, holder, clientMachine, create,
overwrite, createParent, replication, blockSize, logRetryCache);
stat = dir.getFileInfo(src, false);
} catch (StandbyException se) {
skipSync = true;
@ -1997,8 +2027,9 @@ private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
private void startFileInternal(FSPermissionChecker pc, String src,
PermissionStatus permissions, String holder, String clientMachine,
boolean create, boolean overwrite, boolean createParent,
short replication, long blockSize) throws FileAlreadyExistsException,
AccessControlException, UnresolvedLinkException, FileNotFoundException,
short replication, long blockSize, boolean logRetryEntry)
throws FileAlreadyExistsException, AccessControlException,
UnresolvedLinkException, FileNotFoundException,
ParentNotDirectoryException, IOException {
assert hasWriteLock();
// Verify that the destination does not exist as a directory already.
@ -2030,7 +2061,7 @@ private void startFileInternal(FSPermissionChecker pc, String src,
} else {
if (overwrite) {
try {
deleteInt(src, true); // File exists - delete if overwrite
deleteInt(src, true, false); // File exists - delete if overwrite
} catch (AccessControlException e) {
logAuditEvent(false, "delete", src);
throw e;
@ -2056,7 +2087,7 @@ private void startFileInternal(FSPermissionChecker pc, String src,
leaseManager.addLease(newNode.getClientName(), src);
// record file record in log, record new generation stamp
getEditLog().logOpenFile(src, newNode);
getEditLog().logOpenFile(src, newNode, logRetryEntry);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+"add "+src+" to namespace for "+holder);
@ -2085,8 +2116,9 @@ private void startFileInternal(FSPermissionChecker pc, String src,
* @return the last block locations if the block is partial or null otherwise
*/
private LocatedBlock appendFileInternal(FSPermissionChecker pc, String src,
String holder, String clientMachine) throws AccessControlException,
UnresolvedLinkException, FileNotFoundException, IOException {
String holder, String clientMachine, boolean logRetryCache)
throws AccessControlException, UnresolvedLinkException,
FileNotFoundException, IOException {
assert hasWriteLock();
// Verify that the destination does not exist as a directory already.
final INodesInPath iip = dir.getINodesInPath4Write(src);
@ -2111,7 +2143,7 @@ private LocatedBlock appendFileInternal(FSPermissionChecker pc, String src,
final DatanodeDescriptor clientNode =
blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
return prepareFileForWrite(src, myFile, holder, clientMachine, clientNode,
true, iip.getLatestSnapshot());
true, iip.getLatestSnapshot(), logRetryCache);
} catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage());
throw ie;
@ -2128,13 +2160,16 @@ private LocatedBlock appendFileInternal(FSPermissionChecker pc, String src,
* @param clientMachine identifier of the client machine
* @param clientNode if the client is collocated with a DN, that DN's descriptor
* @param writeToEditLog whether to persist this change to the edit log
* @param logRetryCache whether to record RPC ids in editlog for retry cache
* rebuilding
* @return the last block locations if the block is partial or null otherwise
* @throws UnresolvedLinkException
* @throws IOException
*/
LocatedBlock prepareFileForWrite(String src, INodeFile file,
String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
boolean writeToEditLog, Snapshot latestSnapshot) throws IOException {
boolean writeToEditLog, Snapshot latestSnapshot, boolean logRetryCache)
throws IOException {
file = file.recordModification(latestSnapshot, dir.getINodeMap());
final INodeFileUnderConstruction cons = file.toUnderConstruction(
leaseHolder, clientMachine, clientNode);
@ -2144,7 +2179,7 @@ LocatedBlock prepareFileForWrite(String src, INodeFile file,
LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
if (writeToEditLog) {
getEditLog().logOpenFile(src, cons);
getEditLog().logOpenFile(src, cons, logRetryCache);
}
return ret;
}
@ -2292,7 +2327,7 @@ LocatedBlock appendFile(String src, String holder, String clientMachine)
boolean success = false;
try {
lb = appendFileInt(src, holder, clientMachine);
lb = appendFileInt(src, holder, clientMachine, cacheEntry != null);
success = true;
return lb;
} catch (AccessControlException e) {
@ -2304,7 +2339,8 @@ LocatedBlock appendFile(String src, String holder, String clientMachine)
}
private LocatedBlock appendFileInt(String src, String holder,
String clientMachine) throws AccessControlException, SafeModeException,
String clientMachine, boolean logRetryCache)
throws AccessControlException, SafeModeException,
FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
@ -2330,7 +2366,7 @@ private LocatedBlock appendFileInt(String src, String holder,
throw new SafeModeException("Cannot append to file" + src, safeMode);
}
src = FSDirectory.resolvePath(src, pathComponents, dir);
lb = appendFileInternal(pc, src, holder, clientMachine);
lb = appendFileInternal(pc, src, holder, clientMachine, logRetryCache);
} catch (StandbyException se) {
skipSync = true;
throw se;
@ -2454,7 +2490,7 @@ LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
newBlock = createNewBlock();
saveAllocatedBlock(src, inodesInPath, newBlock, targets);
dir.persistBlocks(src, pendingFile);
dir.persistBlocks(src, pendingFile, false);
offset = pendingFile.computeFileSize();
} finally {
writeUnlock();
@ -2656,7 +2692,7 @@ boolean abandonBlock(ExtendedBlock b, String src, String holder)
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+ b + " is removed from pendingCreates");
}
dir.persistBlocks(src, file);
dir.persistBlocks(src, file, false);
} finally {
writeUnlock();
}
@ -2873,7 +2909,7 @@ boolean renameTo(String src, String dst)
}
boolean ret = false;
try {
ret = renameToInt(src, dst);
ret = renameToInt(src, dst, cacheEntry != null);
} catch (AccessControlException e) {
logAuditEvent(false, "rename", src, dst, null);
throw e;
@ -2883,7 +2919,7 @@ boolean renameTo(String src, String dst)
return ret;
}
private boolean renameToInt(String src, String dst)
private boolean renameToInt(String src, String dst, boolean logRetryCache)
throws IOException, UnresolvedLinkException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
@ -2907,7 +2943,7 @@ private boolean renameToInt(String src, String dst)
src = FSDirectory.resolvePath(src, srcComponents, dir);
dst = FSDirectory.resolvePath(dst, dstComponents, dir);
checkOperation(OperationCategory.WRITE);
status = renameToInternal(pc, src, dst);
status = renameToInternal(pc, src, dst, logRetryCache);
if (status) {
resultingStat = getAuditFileInfo(dst, false);
}
@ -2923,8 +2959,9 @@ private boolean renameToInt(String src, String dst)
/** @deprecated See {@link #renameTo(String, String)} */
@Deprecated
private boolean renameToInternal(FSPermissionChecker pc, String src, String dst)
throws IOException, UnresolvedLinkException {
private boolean renameToInternal(FSPermissionChecker pc, String src,
String dst, boolean logRetryCache) throws IOException,
UnresolvedLinkException {
assert hasWriteLock();
if (isPermissionEnabled) {
//We should not be doing this. This is move() not renameTo().
@ -2942,7 +2979,7 @@ private boolean renameToInternal(FSPermissionChecker pc, String src, String dst)
false);
}
if (dir.renameTo(src, dst)) {
if (dir.renameTo(src, dst, logRetryCache)) {
return true;
}
return false;
@ -2977,7 +3014,7 @@ void renameTo(String src, String dst, Options.Rename... options)
}
src = FSDirectory.resolvePath(src, srcComponents, dir);
dst = FSDirectory.resolvePath(dst, dstComponents, dir);
renameToInternal(pc, src, dst, options);
renameToInternal(pc, src, dst, cacheEntry != null, options);
resultingStat = getAuditFileInfo(dst, false);
success = true;
} finally {
@ -2995,7 +3032,7 @@ void renameTo(String src, String dst, Options.Rename... options)
}
private void renameToInternal(FSPermissionChecker pc, String src, String dst,
Options.Rename... options) throws IOException {
boolean logRetryCache, Options.Rename... options) throws IOException {
assert hasWriteLock();
if (isPermissionEnabled) {
// Rename does not operates on link targets
@ -3006,7 +3043,7 @@ private void renameToInternal(FSPermissionChecker pc, String src, String dst,
checkPermission(pc, dst, false, FsAction.WRITE, null, null, null, false);
}
dir.renameTo(src, dst, options);
dir.renameTo(src, dst, logRetryCache, options);
}
/**
@ -3024,7 +3061,7 @@ boolean delete(String src, boolean recursive)
}
boolean ret = false;
try {
ret = deleteInt(src, recursive);
ret = deleteInt(src, recursive, cacheEntry != null);
} catch (AccessControlException e) {
logAuditEvent(false, "delete", src);
throw e;
@ -3034,13 +3071,13 @@ boolean delete(String src, boolean recursive)
return ret;
}
private boolean deleteInt(String src, boolean recursive)
private boolean deleteInt(String src, boolean recursive, boolean logRetryCache)
throws AccessControlException, SafeModeException,
UnresolvedLinkException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
}
boolean status = deleteInternal(src, recursive, true);
boolean status = deleteInternal(src, recursive, true, logRetryCache);
if (status) {
logAuditEvent(true, "delete", src);
}
@ -3068,7 +3105,7 @@ private FSPermissionChecker getPermissionChecker()
* @see ClientProtocol#delete(String, boolean) for description of exceptions
*/
private boolean deleteInternal(String src, boolean recursive,
boolean enforcePermission)
boolean enforcePermission, boolean logRetryCache)
throws AccessControlException, SafeModeException, UnresolvedLinkException,
IOException {
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
@ -3092,7 +3129,7 @@ private boolean deleteInternal(String src, boolean recursive,
FsAction.ALL, false);
}
// Unlink the target directory from directory tree
if (!dir.delete(src, collectedBlocks, removedINodes)) {
if (!dir.delete(src, collectedBlocks, removedINodes, logRetryCache)) {
return false;
}
ret = true;
@ -3421,7 +3458,7 @@ void fsync(String src, String clientName, long lastBlockLength)
if (lastBlockLength > 0) {
pendingFile.updateLengthOfLastBlock(lastBlockLength);
}
dir.persistBlocks(src, pendingFile);
dir.persistBlocks(src, pendingFile, false);
} finally {
writeUnlock();
}
@ -3718,7 +3755,7 @@ void commitBlockSynchronization(ExtendedBlock lastblock,
src = closeFileCommitBlocks(pendingFile, storedBlock);
} else {
// If this commit does not want to close the file, persist blocks
src = persistBlocks(pendingFile);
src = persistBlocks(pendingFile, false);
}
} finally {
writeUnlock();
@ -3767,10 +3804,10 @@ String closeFileCommitBlocks(INodeFileUnderConstruction pendingFile,
* @throws IOException
*/
@VisibleForTesting
String persistBlocks(INodeFileUnderConstruction pendingFile)
throws IOException {
String persistBlocks(INodeFileUnderConstruction pendingFile,
boolean logRetryCache) throws IOException {
String src = leaseManager.findPath(pendingFile);
dir.persistBlocks(src, pendingFile);
dir.persistBlocks(src, pendingFile, logRetryCache);
return src;
}
@ -5578,7 +5615,8 @@ void updatePipeline(String clientName, ExtendedBlock oldBlock,
}
assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
+ oldBlock + " has different block identifier";
updatePipelineInternal(clientName, oldBlock, newBlock, newNodes);
updatePipelineInternal(clientName, oldBlock, newBlock, newNodes,
cacheEntry != null);
success = true;
} finally {
writeUnlock();
@ -5590,7 +5628,7 @@ void updatePipeline(String clientName, ExtendedBlock oldBlock,
/** @see #updatePipeline(String, ExtendedBlock, ExtendedBlock, DatanodeID[]) */
private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes)
ExtendedBlock newBlock, DatanodeID[] newNodes, boolean logRetryCache)
throws IOException {
assert hasWriteLock();
// check the vadility of the block and lease holder name
@ -5625,7 +5663,7 @@ private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
blockinfo.setExpectedLocations(descriptors);
String src = leaseManager.findPath(pendingFile);
dir.persistBlocks(src, pendingFile);
dir.persistBlocks(src, pendingFile, logRetryCache);
}
// rename was successful. If any part of the renamed subtree had
@ -6493,7 +6531,8 @@ String createSnapshot(String snapshotRoot, String snapshotName)
} finally {
dir.writeUnlock();
}
getEditLog().logCreateSnapshot(snapshotRoot, snapshotName);
getEditLog().logCreateSnapshot(snapshotRoot, snapshotName,
cacheEntry != null);
} finally {
writeUnlock();
RetryCache.setState(cacheEntry, snapshotPath != null, snapshotPath);
@ -6535,7 +6574,8 @@ void renameSnapshot(String path, String snapshotOldName,
dir.verifySnapshotName(snapshotNewName, path);
snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName);
getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName,
cacheEntry != null);
success = true;
} finally {
writeUnlock();
@ -6659,7 +6699,8 @@ void deleteSnapshot(String snapshotRoot, String snapshotName)
removedINodes.clear();
this.removeBlocks(collectedBlocks);
collectedBlocks.clear();
getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName);
getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName,
cacheEntry != null);
success = true;
} finally {
writeUnlock();

View File

@ -735,7 +735,8 @@ public boolean isInSafeMode() {
}
/** get FSImage */
FSImage getFSImage() {
@VisibleForTesting
public FSImage getFSImage() {
return namesystem.dir.fsImage;
}

View File

@ -249,7 +249,7 @@ private void initialize(final Configuration conf,
checkpointImage.recoverCreate(commandLineOpts.shouldFormat());
checkpointImage.deleteTempEdits();
namesystem = new FSNamesystem(conf, checkpointImage);
namesystem = new FSNamesystem(conf, checkpointImage, true);
// Initialize other scheduling parameters from the configuration
checkpointConf = new CheckpointConf(conf);

View File

@ -126,7 +126,7 @@ class ImageLoaderCurrent implements ImageLoader {
new SimpleDateFormat("yyyy-MM-dd HH:mm");
private static int[] versions = { -16, -17, -18, -19, -20, -21, -22, -23,
-24, -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39,
-40, -41, -42, -43, -44, -45, -46 };
-40, -41, -42, -43, -44, -45, -46, -47 };
private int imageVersion = 0;
private final Map<Long, String> subtreeMap = new HashMap<Long, String>();

View File

@ -57,8 +57,11 @@
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
@ -942,4 +945,102 @@ public DFSTestUtil build() {
return new DFSTestUtil(nFiles, maxLevels, maxSize, minSize);
}
}
/**
* Run a set of operations and generate all edit logs
*/
public static void runOperations(MiniDFSCluster cluster,
DistributedFileSystem filesystem, Configuration conf, long blockSize,
int nnIndex) throws IOException {
// create FileContext for rename2
FileContext fc = FileContext.getFileContext(cluster.getURI(0), conf);
// OP_ADD 0
final Path pathFileCreate = new Path("/file_create");
FSDataOutputStream s = filesystem.create(pathFileCreate);
// OP_CLOSE 9
s.close();
// OP_RENAME_OLD 1
final Path pathFileMoved = new Path("/file_moved");
filesystem.rename(pathFileCreate, pathFileMoved);
// OP_DELETE 2
filesystem.delete(pathFileMoved, false);
// OP_MKDIR 3
Path pathDirectoryMkdir = new Path("/directory_mkdir");
filesystem.mkdirs(pathDirectoryMkdir);
// OP_ALLOW_SNAPSHOT 29
filesystem.allowSnapshot(pathDirectoryMkdir);
// OP_DISALLOW_SNAPSHOT 30
filesystem.disallowSnapshot(pathDirectoryMkdir);
// OP_CREATE_SNAPSHOT 26
String ssName = "snapshot1";
filesystem.allowSnapshot(pathDirectoryMkdir);
filesystem.createSnapshot(pathDirectoryMkdir, ssName);
// OP_RENAME_SNAPSHOT 28
String ssNewName = "snapshot2";
filesystem.renameSnapshot(pathDirectoryMkdir, ssName, ssNewName);
// OP_DELETE_SNAPSHOT 27
filesystem.deleteSnapshot(pathDirectoryMkdir, ssNewName);
// OP_SET_REPLICATION 4
s = filesystem.create(pathFileCreate);
s.close();
filesystem.setReplication(pathFileCreate, (short)1);
// OP_SET_PERMISSIONS 7
Short permission = 0777;
filesystem.setPermission(pathFileCreate, new FsPermission(permission));
// OP_SET_OWNER 8
filesystem.setOwner(pathFileCreate, new String("newOwner"), null);
// OP_CLOSE 9 see above
// OP_SET_GENSTAMP 10 see above
// OP_SET_NS_QUOTA 11 obsolete
// OP_CLEAR_NS_QUOTA 12 obsolete
// OP_TIMES 13
long mtime = 1285195527000L; // Wed, 22 Sep 2010 22:45:27 GMT
long atime = mtime;
filesystem.setTimes(pathFileCreate, mtime, atime);
// OP_SET_QUOTA 14
filesystem.setQuota(pathDirectoryMkdir, 1000L,
HdfsConstants.QUOTA_DONT_SET);
// OP_RENAME 15
fc.rename(pathFileCreate, pathFileMoved, Rename.NONE);
// OP_CONCAT_DELETE 16
Path pathConcatTarget = new Path("/file_concat_target");
Path[] pathConcatFiles = new Path[2];
pathConcatFiles[0] = new Path("/file_concat_0");
pathConcatFiles[1] = new Path("/file_concat_1");
long length = blockSize * 3; // multiple of blocksize for concat
short replication = 1;
long seed = 1;
DFSTestUtil.createFile(filesystem, pathConcatTarget, length, replication,
seed);
DFSTestUtil.createFile(filesystem, pathConcatFiles[0], length, replication,
seed);
DFSTestUtil.createFile(filesystem, pathConcatFiles[1], length, replication,
seed);
filesystem.concat(pathConcatTarget, pathConcatFiles);
// OP_SYMLINK 17
Path pathSymlink = new Path("/file_symlink");
fc.createSymlink(pathConcatTarget, pathSymlink, false);
// OP_REASSIGN_LEASE 22
String filePath = "/hard-lease-recovery-test";
byte[] bytes = "foo-bar-baz".getBytes();
DFSClientAdapter.stopLeaseRenewer(filesystem);
FSDataOutputStream leaseRecoveryPath = filesystem.create(new Path(filePath));
leaseRecoveryPath.write(bytes);
leaseRecoveryPath.hflush();
// Set the hard lease timeout to 1 second.
cluster.setLeasePeriod(60 * 1000, 1000, nnIndex);
// wait for lease recovery to complete
LocatedBlocks locatedBlocks;
do {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
locatedBlocks = DFSClientAdapter.callGetBlockLocations(
cluster.getNameNodeRpc(nnIndex), filePath, 0L, bytes.length);
} while (locatedBlocks.isUnderConstruction());
}
}

View File

@ -2038,6 +2038,10 @@ public void setLeasePeriod(long soft, long hard) {
NameNodeAdapter.setLeasePeriod(getNamesystem(), soft, hard);
}
public void setLeasePeriod(long soft, long hard, int nnIndex) {
NameNodeAdapter.setLeasePeriod(getNamesystem(nnIndex), soft, hard);
}
public void setWaitSafeMode(boolean wait) {
this.waitSafeMode = wait;
}

View File

@ -26,7 +26,6 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
/**
*
@ -97,8 +96,9 @@ static void addFiles(FSEditLog editLog, int numFiles, short replication,
dirInode = new INodeDirectory(inodeId.nextValue(), null, p, 0L);
editLog.logMkDir(currentDir, dirInode);
}
editLog.logOpenFile(filePath, new INodeFileUnderConstruction(
inodeId.nextValue(), p, replication, 0, blockSize, "", "", null));
editLog.logOpenFile(filePath,
new INodeFileUnderConstruction(inodeId.nextValue(), p, replication,
0, blockSize, "", "", null), false);
editLog.logCloseFile(filePath, inode);
if (currentBlockId - bidAtSync >= 2000) { // sync every 2K blocks

View File

@ -18,9 +18,9 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.*;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@ -28,7 +28,9 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@ -64,7 +66,7 @@ private FSNamesystem makeNameSystemSpy(Block block,
any(INodeFileUnderConstruction.class),
any(BlockInfo.class));
doReturn("").when(namesystemSpy).persistBlocks(
any(INodeFileUnderConstruction.class));
any(INodeFileUnderConstruction.class), anyBoolean());
doReturn(mock(FSEditLog.class)).when(namesystemSpy).getEditLog();
return namesystemSpy;
@ -127,7 +129,6 @@ public void testCommitBlockSynchronizationWithDelete() throws IOException {
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeDescriptor[] targets = new DatanodeDescriptor[0];
DatanodeID[] newTargets = new DatanodeID[0];
ExtendedBlock lastBlock = new ExtendedBlock();
@ -148,7 +149,6 @@ public void testCommitBlockSynchronizationWithClose() throws IOException {
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeDescriptor[] targets = new DatanodeDescriptor[0];
DatanodeID[] newTargets = new DatanodeID[0];
ExtendedBlock lastBlock = new ExtendedBlock();

View File

@ -155,7 +155,7 @@ public void run() {
INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
namesystem.allocateNewInodeId(), p, replication, blockSize, 0, "",
"", null);
editLog.logOpenFile("/filename" + (startIndex + i), inode);
editLog.logOpenFile("/filename" + (startIndex + i), inode, false);
editLog.logCloseFile("/filename" + (startIndex + i), inode);
editLog.logSync();
}
@ -912,14 +912,14 @@ public void testAutoSync() throws Exception {
log.setMetricsForTests(mockMetrics);
for (int i = 0; i < 400; i++) {
log.logDelete(oneKB, 1L);
log.logDelete(oneKB, 1L, false);
}
// After ~400KB, we're still within the 512KB buffer size
Mockito.verify(mockMetrics, Mockito.times(0)).addSync(Mockito.anyLong());
// After ~400KB more, we should have done an automatic sync
for (int i = 0; i < 400; i++) {
log.logDelete(oneKB, 1L);
log.logDelete(oneKB, 1L, false);
}
Mockito.verify(mockMetrics, Mockito.times(1)).addSync(Mockito.anyLong());

View File

@ -292,7 +292,7 @@ static private File prepareUnfinalizedTestEditLog(File testDir, int numTx,
long thisTxId = spyLog.getLastWrittenTxId() + 1;
offsetToTxId.put(trueOffset, thisTxId);
System.err.println("txid " + thisTxId + " at offset " + trueOffset);
spyLog.logDelete("path" + i, i);
spyLog.logDelete("path" + i, i, false);
spyLog.logSync();
}
} finally {

View File

@ -30,8 +30,6 @@
import java.util.HashSet;
import java.util.Set;
import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -273,7 +271,7 @@ public Set<Long> getValidTxIds() {
}
public int getMaxOpSize() {
return 30;
return 36;
}
}

View File

@ -19,12 +19,17 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
@ -32,19 +37,21 @@
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.ipc.ClientId;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.RetryCache;
import org.apache.hadoop.ipc.RetryCache.CacheEntry;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.LightWeightCache;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
/**
@ -61,19 +68,20 @@
* request, a new callId is generated using {@link #newCall()}.
*/
public class TestNamenodeRetryCache {
private static final byte[] CLIENT_ID = StringUtils.getUuidBytes();
private static final byte[] CLIENT_ID = ClientId.getClientId();
private static MiniDFSCluster cluster;
private static FSNamesystem namesystem;
private static PermissionStatus perm = new PermissionStatus(
"TestNamenodeRetryCache", null, FsPermission.getDefault());
private static FileSystem filesystem;
private static DistributedFileSystem filesystem;
private static int callId = 100;
private static Configuration conf = new HdfsConfiguration();
private static final int BlockSize = 512;
/** Start a cluster */
@BeforeClass
public static void setup() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "512");
@Before
public void setup() throws Exception {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BlockSize);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, true);
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
@ -109,8 +117,8 @@ public static void resetCall() {
}
private void concatSetup(String file1, String file2) throws Exception {
DFSTestUtil.createFile(filesystem, new Path(file1), 512, (short)1, 0L);
DFSTestUtil.createFile(filesystem, new Path(file2), 512, (short)1, 0L);
DFSTestUtil.createFile(filesystem, new Path(file1), BlockSize, (short)1, 0L);
DFSTestUtil.createFile(filesystem, new Path(file2), BlockSize, (short)1, 0L);
}
/**
@ -192,19 +200,19 @@ public void testCreate() throws Exception {
// Two retried calls succeed
newCall();
HdfsFileStatus status = namesystem.startFile(src, perm, "holder",
"clientmachine", EnumSet.of(CreateFlag.CREATE), true, (short) 1, 512);
"clientmachine", EnumSet.of(CreateFlag.CREATE), true, (short) 1, BlockSize);
Assert.assertEquals(status, namesystem.startFile(src, perm,
"holder", "clientmachine", EnumSet.of(CreateFlag.CREATE),
true, (short) 1, 512));
true, (short) 1, BlockSize));
Assert.assertEquals(status, namesystem.startFile(src, perm,
"holder", "clientmachine", EnumSet.of(CreateFlag.CREATE),
true, (short) 1, 512));
true, (short) 1, BlockSize));
// A non-retried call fails
newCall();
try {
namesystem.startFile(src, perm, "holder", "clientmachine",
EnumSet.of(CreateFlag.CREATE), true, (short) 1, 512);
EnumSet.of(CreateFlag.CREATE), true, (short) 1, BlockSize);
Assert.fail("testCreate - expected exception is not thrown");
} catch (IOException e) {
// expected
@ -352,4 +360,41 @@ public void testRetryCacheConfig() {
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, false);
Assert.assertNull(FSNamesystem.initRetryCache(conf));
}
/**
* After run a set of operations, restart NN and check if the retry cache has
* been rebuilt based on the editlog.
*/
@Test
public void testRetryCacheRebuild() throws Exception {
DFSTestUtil.runOperations(cluster, filesystem, conf, BlockSize, 0);
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
(LightWeightCache<CacheEntry, CacheEntry>) namesystem.getRetryCache().getCacheSet();
assertEquals(14, cacheSet.size());
Map<CacheEntry, CacheEntry> oldEntries =
new HashMap<CacheEntry, CacheEntry>();
Iterator<CacheEntry> iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();
oldEntries.put(entry, entry);
}
// restart NameNode
cluster.restartNameNode();
cluster.waitActive();
namesystem = cluster.getNamesystem();
// check retry cache
assertTrue(namesystem.hasRetryCache());
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) namesystem
.getRetryCache().getCacheSet();
assertEquals(14, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();
assertTrue(oldEntries.containsKey(entry));
}
}
}

View File

@ -0,0 +1,248 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryInvocationHandler;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.RetryCache.CacheEntry;
import org.apache.hadoop.util.LightWeightCache;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestRetryCacheWithHA {
private static final Log LOG = LogFactory.getLog(TestRetryCacheWithHA.class);
private static MiniDFSCluster cluster;
private static DistributedFileSystem dfs;
private static Configuration conf = new HdfsConfiguration();
private static final int BlockSize = 1024;
private static final short DataNodes = 3;
private final static Map<String, Object> results =
new HashMap<String, Object>();
/**
* A dummy invocation handler extending RetryInvocationHandler. We can use
* a boolean flag to control whether the method invocation succeeds or not.
*/
private static class DummyRetryInvocationHandler extends
RetryInvocationHandler {
static AtomicBoolean block = new AtomicBoolean(false);
DummyRetryInvocationHandler(
FailoverProxyProvider<ClientProtocol> proxyProvider,
RetryPolicy retryPolicy) {
super(proxyProvider, retryPolicy);
}
@Override
protected Object invokeMethod(Method method, Object[] args)
throws Throwable {
Object result = super.invokeMethod(method, args);
if (block.get()) {
throw new UnknownHostException("Fake Exception");
} else {
return result;
}
}
}
@Before
public void setup() throws Exception {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BlockSize);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(DataNodes).build();
cluster.waitActive();
cluster.transitionToActive(0);
// setup the configuration
HATestUtil.setFailoverConfigurations(cluster, conf);
dfs = (DistributedFileSystem) HATestUtil.configureFailoverFs(cluster, conf);
}
@After
public void cleanup() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* 1. Run a set of operations
* 2. Trigger the NN failover
* 3. Check the retry cache on the original standby NN
*/
@Test
public void testRetryCacheOnStandbyNN() throws Exception {
// 1. run operations
DFSTestUtil.runOperations(cluster, dfs, conf, BlockSize, 0);
// check retry cache in NN1
FSNamesystem fsn0 = cluster.getNamesystem(0);
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
(LightWeightCache<CacheEntry, CacheEntry>) fsn0.getRetryCache().getCacheSet();
assertEquals(14, cacheSet.size());
Map<CacheEntry, CacheEntry> oldEntries =
new HashMap<CacheEntry, CacheEntry>();
Iterator<CacheEntry> iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();
oldEntries.put(entry, entry);
}
// 2. Failover the current standby to active.
cluster.getNameNode(0).getRpcServer().rollEditLog();
cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits();
cluster.shutdownNameNode(0);
cluster.transitionToActive(1);
// 3. check the retry cache on the new active NN
FSNamesystem fsn1 = cluster.getNamesystem(1);
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) fsn1
.getRetryCache().getCacheSet();
assertEquals(14, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();
assertTrue(oldEntries.containsKey(entry));
}
}
private DFSClient genClientWithDummyHandler() throws IOException {
URI nnUri = dfs.getUri();
Class<FailoverProxyProvider<ClientProtocol>> failoverProxyProviderClass =
NameNodeProxies.getFailoverProxyProviderClass(conf, nnUri,
ClientProtocol.class);
FailoverProxyProvider<ClientProtocol> failoverProxyProvider =
NameNodeProxies.createFailoverProxyProvider(conf,
failoverProxyProviderClass, ClientProtocol.class, nnUri);
InvocationHandler dummyHandler = new DummyRetryInvocationHandler(
failoverProxyProvider, RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
Integer.MAX_VALUE,
DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT,
DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT));
ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance(
failoverProxyProvider.getInterface().getClassLoader(),
new Class[] { ClientProtocol.class }, dummyHandler);
DFSClient client = new DFSClient(null, proxy, conf, null);
return client;
}
/**
* When NN failover happens, if the client did not receive the response and
* send a retry request to the other NN, the same response should be recieved
* based on the retry cache.
*
* TODO: currently we only test the createSnapshot from the client side. We
* may need to cover all the calls with "@AtMostOnce" annotation.
*/
@Test
public void testClientRetryWithFailover() throws Exception {
final String dir = "/test";
final Path dirPath = new Path(dir);
final String sName = "s1";
final String dirSnapshot = dir + HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR
+ Path.SEPARATOR + sName;
dfs.mkdirs(dirPath);
dfs.allowSnapshot(dirPath);
final DFSClient client = genClientWithDummyHandler();
// set DummyRetryInvocationHandler#block to true
DummyRetryInvocationHandler.block.set(true);
new Thread() {
@Override
public void run() {
try {
final String snapshotPath = client.createSnapshot(dir, "s1");
assertEquals(dirSnapshot, snapshotPath);
LOG.info("Created snapshot " + snapshotPath);
synchronized (TestRetryCacheWithHA.this) {
results.put("createSnapshot", snapshotPath);
TestRetryCacheWithHA.this.notifyAll();
}
} catch (IOException e) {
LOG.info("Got IOException " + e + " while creating snapshot");
} finally {
IOUtils.cleanup(null, client);
}
}
}.start();
// make sure the client's createSnapshot call has actually been handled by
// the active NN
boolean snapshotCreated = dfs.exists(new Path(dirSnapshot));
while (!snapshotCreated) {
Thread.sleep(1000);
snapshotCreated = dfs.exists(new Path(dirSnapshot));
}
// force the failover
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
// disable the block in DummyHandler
LOG.info("Setting block to false");
DummyRetryInvocationHandler.block.set(false);
synchronized (this) {
while (!results.containsKey("createSnapshot")) {
this.wait();
}
LOG.info("Got the result of createSnapshot: "
+ results.get("createSnapshot"));
}
}
}

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<EDITS>
<EDITS_VERSION>-46</EDITS_VERSION>
<EDITS_VERSION>-47</EDITS_VERSION>
<RECORD>
<OPCODE>OP_START_LOG_SEGMENT</OPCODE>
<DATA>
@ -13,8 +13,8 @@
<TXID>2</TXID>
<DELEGATION_KEY>
<KEY_ID>1</KEY_ID>
<EXPIRY_DATE>1372798673941</EXPIRY_DATE>
<KEY>247c47b8bf6b89ec</KEY>
<EXPIRY_DATE>1375509063810</EXPIRY_DATE>
<KEY>4d47710649039b98</KEY>
</DELEGATION_KEY>
</DATA>
</RECORD>
@ -24,8 +24,8 @@
<TXID>3</TXID>
<DELEGATION_KEY>
<KEY_ID>2</KEY_ID>
<EXPIRY_DATE>1372798673944</EXPIRY_DATE>
<KEY>ef1a35da6b4fc327</KEY>
<EXPIRY_DATE>1375509063812</EXPIRY_DATE>
<KEY>38cbb1d8fd90fcb2</KEY>
</DELEGATION_KEY>
</DATA>
</RECORD>
@ -37,16 +37,18 @@
<INODEID>16386</INODEID>
<PATH>/file_create_u\0001;F431</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107474972</MTIME>
<ATIME>1372107474972</ATIME>
<MTIME>1374817864805</MTIME>
<ATIME>1374817864805</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1834501254_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1676409172_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>8</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -57,13 +59,13 @@
<INODEID>0</INODEID>
<PATH>/file_create_u\0001;F431</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107474983</MTIME>
<ATIME>1372107474972</ATIME>
<MTIME>1374817864816</MTIME>
<ATIME>1374817864805</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -76,7 +78,9 @@
<LENGTH>0</LENGTH>
<SRC>/file_create_u\0001;F431</SRC>
<DST>/file_moved</DST>
<TIMESTAMP>1372107474986</TIMESTAMP>
<TIMESTAMP>1374817864818</TIMESTAMP>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>10</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -85,7 +89,9 @@
<TXID>7</TXID>
<LENGTH>0</LENGTH>
<PATH>/file_moved</PATH>
<TIMESTAMP>1372107474989</TIMESTAMP>
<TIMESTAMP>1374817864822</TIMESTAMP>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>11</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -95,9 +101,9 @@
<LENGTH>0</LENGTH>
<INODEID>16387</INODEID>
<PATH>/directory_mkdir</PATH>
<TIMESTAMP>1372107474991</TIMESTAMP>
<TIMESTAMP>1374817864825</TIMESTAMP>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>493</MODE>
</PERMISSION_STATUS>
@ -130,6 +136,8 @@
<TXID>12</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTNAME>snapshot1</SNAPSHOTNAME>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>16</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -139,6 +147,8 @@
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTOLDNAME>snapshot1</SNAPSHOTOLDNAME>
<SNAPSHOTNEWNAME>snapshot2</SNAPSHOTNEWNAME>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>17</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -147,6 +157,8 @@
<TXID>14</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTNAME>snapshot2</SNAPSHOTNAME>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>18</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -157,16 +169,18 @@
<INODEID>16388</INODEID>
<PATH>/file_create_u\0001;F431</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475007</MTIME>
<ATIME>1372107475007</ATIME>
<MTIME>1374817864846</MTIME>
<ATIME>1374817864846</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1834501254_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1676409172_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>19</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -177,13 +191,13 @@
<INODEID>0</INODEID>
<PATH>/file_create_u\0001;F431</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475009</MTIME>
<ATIME>1372107475007</ATIME>
<MTIME>1374817864848</MTIME>
<ATIME>1374817864846</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -239,8 +253,10 @@
<LENGTH>0</LENGTH>
<SRC>/file_create_u\0001;F431</SRC>
<DST>/file_moved</DST>
<TIMESTAMP>1372107475019</TIMESTAMP>
<TIMESTAMP>1374817864860</TIMESTAMP>
<OPTIONS>NONE</OPTIONS>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>26</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -251,16 +267,18 @@
<INODEID>16389</INODEID>
<PATH>/file_concat_target</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475023</MTIME>
<ATIME>1372107475023</ATIME>
<MTIME>1374817864864</MTIME>
<ATIME>1374817864864</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1834501254_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1676409172_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>28</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -287,6 +305,8 @@
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1001</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -318,6 +338,8 @@
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1002</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -354,6 +376,8 @@
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1003</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -364,8 +388,8 @@
<INODEID>0</INODEID>
<PATH>/file_concat_target</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475091</MTIME>
<ATIME>1372107475023</ATIME>
<MTIME>1374817864927</MTIME>
<ATIME>1374817864864</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -385,7 +409,7 @@
<GENSTAMP>1003</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -399,16 +423,18 @@
<INODEID>16390</INODEID>
<PATH>/file_concat_0</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475093</MTIME>
<ATIME>1372107475093</ATIME>
<MTIME>1374817864929</MTIME>
<ATIME>1374817864929</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1834501254_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1676409172_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>41</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -435,6 +461,8 @@
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1004</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -466,6 +494,8 @@
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1005</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -502,6 +532,8 @@
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1006</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -512,8 +544,8 @@
<INODEID>0</INODEID>
<PATH>/file_concat_0</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475110</MTIME>
<ATIME>1372107475093</ATIME>
<MTIME>1374817864947</MTIME>
<ATIME>1374817864929</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -533,7 +565,7 @@
<GENSTAMP>1006</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -547,16 +579,18 @@
<INODEID>16391</INODEID>
<PATH>/file_concat_1</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475112</MTIME>
<ATIME>1372107475112</ATIME>
<MTIME>1374817864950</MTIME>
<ATIME>1374817864950</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1834501254_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1676409172_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>53</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -583,6 +617,8 @@
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1007</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -614,6 +650,8 @@
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1008</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -650,6 +688,8 @@
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1009</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -660,8 +700,8 @@
<INODEID>0</INODEID>
<PATH>/file_concat_1</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475131</MTIME>
<ATIME>1372107475112</ATIME>
<MTIME>1374817864966</MTIME>
<ATIME>1374817864950</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -681,7 +721,7 @@
<GENSTAMP>1009</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -693,11 +733,13 @@
<TXID>56</TXID>
<LENGTH>0</LENGTH>
<TRG>/file_concat_target</TRG>
<TIMESTAMP>1372107475133</TIMESTAMP>
<TIMESTAMP>1374817864967</TIMESTAMP>
<SOURCES>
<SOURCE1>/file_concat_0</SOURCE1>
<SOURCE2>/file_concat_1</SOURCE2>
</SOURCES>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>64</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -708,13 +750,15 @@
<INODEID>16392</INODEID>
<PATH>/file_symlink</PATH>
<VALUE>/file_concat_target</VALUE>
<MTIME>1372107475137</MTIME>
<ATIME>1372107475137</ATIME>
<MTIME>1374817864971</MTIME>
<ATIME>1374817864971</ATIME>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>511</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>65</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -724,14 +768,14 @@
<DELEGATION_TOKEN_IDENTIFIER>
<KIND>HDFS_DELEGATION_TOKEN</KIND>
<SEQUENCE_NUMBER>1</SEQUENCE_NUMBER>
<OWNER>aagarwal</OWNER>
<OWNER>jing</OWNER>
<RENEWER>JobTracker</RENEWER>
<REALUSER></REALUSER>
<ISSUE_DATE>1372107475140</ISSUE_DATE>
<MAX_DATE>1372712275140</MAX_DATE>
<ISSUE_DATE>1374817864974</ISSUE_DATE>
<MAX_DATE>1375422664974</MAX_DATE>
<MASTER_KEY_ID>2</MASTER_KEY_ID>
</DELEGATION_TOKEN_IDENTIFIER>
<EXPIRY_TIME>1372193875140</EXPIRY_TIME>
<EXPIRY_TIME>1374904264974</EXPIRY_TIME>
</DATA>
</RECORD>
<RECORD>
@ -741,14 +785,14 @@
<DELEGATION_TOKEN_IDENTIFIER>
<KIND>HDFS_DELEGATION_TOKEN</KIND>
<SEQUENCE_NUMBER>1</SEQUENCE_NUMBER>
<OWNER>aagarwal</OWNER>
<OWNER>jing</OWNER>
<RENEWER>JobTracker</RENEWER>
<REALUSER></REALUSER>
<ISSUE_DATE>1372107475140</ISSUE_DATE>
<MAX_DATE>1372712275140</MAX_DATE>
<ISSUE_DATE>1374817864974</ISSUE_DATE>
<MAX_DATE>1375422664974</MAX_DATE>
<MASTER_KEY_ID>2</MASTER_KEY_ID>
</DELEGATION_TOKEN_IDENTIFIER>
<EXPIRY_TIME>1372193875208</EXPIRY_TIME>
<EXPIRY_TIME>1374904265012</EXPIRY_TIME>
</DATA>
</RECORD>
<RECORD>
@ -758,11 +802,11 @@
<DELEGATION_TOKEN_IDENTIFIER>
<KIND>HDFS_DELEGATION_TOKEN</KIND>
<SEQUENCE_NUMBER>1</SEQUENCE_NUMBER>
<OWNER>aagarwal</OWNER>
<OWNER>jing</OWNER>
<RENEWER>JobTracker</RENEWER>
<REALUSER></REALUSER>
<ISSUE_DATE>1372107475140</ISSUE_DATE>
<MAX_DATE>1372712275140</MAX_DATE>
<ISSUE_DATE>1374817864974</ISSUE_DATE>
<MAX_DATE>1375422664974</MAX_DATE>
<MASTER_KEY_ID>2</MASTER_KEY_ID>
</DELEGATION_TOKEN_IDENTIFIER>
</DATA>
@ -773,18 +817,20 @@
<TXID>61</TXID>
<LENGTH>0</LENGTH>
<INODEID>16393</INODEID>
<PATH>/written_file</PATH>
<PATH>/hard-lease-recovery-test</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475214</MTIME>
<ATIME>1372107475214</ATIME>
<MTIME>1374817865017</MTIME>
<ATIME>1374817865017</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1834501254_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1676409172_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>5245793a-984b-4264-8d7c-7890775547a0</RPC_CLIENTID>
<RPC_CALLID>69</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -805,178 +851,42 @@
<OPCODE>OP_UPDATE_BLOCKS</OPCODE>
<DATA>
<TXID>64</TXID>
<PATH>/written_file</PATH>
<PATH>/hard-lease-recovery-test</PATH>
<BLOCK>
<BLOCK_ID>1073741834</BLOCK_ID>
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1010</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_CLOSE</OPCODE>
<OPCODE>OP_UPDATE_BLOCKS</OPCODE>
<DATA>
<TXID>65</TXID>
<LENGTH>0</LENGTH>
<INODEID>0</INODEID>
<PATH>/written_file</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475221</MTIME>
<ATIME>1372107475214</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<PATH>/hard-lease-recovery-test</PATH>
<BLOCK>
<BLOCK_ID>1073741834</BLOCK_ID>
<NUM_BYTES>9</NUM_BYTES>
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1010</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD</OPCODE>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
<TXID>66</TXID>
<LENGTH>0</LENGTH>
<INODEID>16393</INODEID>
<PATH>/written_file</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475221</MTIME>
<ATIME>1372107475214</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1834501254_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<BLOCK>
<BLOCK_ID>1073741834</BLOCK_ID>
<NUM_BYTES>9</NUM_BYTES>
<GENSTAMP>1010</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
<TXID>67</TXID>
<GENSTAMPV2>1011</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_UPDATE_BLOCKS</OPCODE>
<DATA>
<TXID>68</TXID>
<PATH>/written_file</PATH>
<BLOCK>
<BLOCK_ID>1073741834</BLOCK_ID>
<NUM_BYTES>9</NUM_BYTES>
<GENSTAMP>1011</GENSTAMP>
</BLOCK>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_CLOSE</OPCODE>
<DATA>
<TXID>69</TXID>
<LENGTH>0</LENGTH>
<INODEID>0</INODEID>
<PATH>/written_file</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475272</MTIME>
<ATIME>1372107475221</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<BLOCK>
<BLOCK_ID>1073741834</BLOCK_ID>
<NUM_BYTES>26</NUM_BYTES>
<GENSTAMP>1011</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD</OPCODE>
<DATA>
<TXID>70</TXID>
<LENGTH>0</LENGTH>
<INODEID>16394</INODEID>
<PATH>/hard-lease-recovery-test</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107475275</MTIME>
<ATIME>1372107475275</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1834501254_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ALLOCATE_BLOCK_ID</OPCODE>
<DATA>
<TXID>71</TXID>
<BLOCK_ID>1073741835</BLOCK_ID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
<TXID>72</TXID>
<GENSTAMPV2>1012</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_UPDATE_BLOCKS</OPCODE>
<DATA>
<TXID>73</TXID>
<PATH>/hard-lease-recovery-test</PATH>
<BLOCK>
<BLOCK_ID>1073741835</BLOCK_ID>
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1012</GENSTAMP>
</BLOCK>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_UPDATE_BLOCKS</OPCODE>
<DATA>
<TXID>74</TXID>
<PATH>/hard-lease-recovery-test</PATH>
<BLOCK>
<BLOCK_ID>1073741835</BLOCK_ID>
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1012</GENSTAMP>
</BLOCK>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
<TXID>75</TXID>
<GENSTAMPV2>1013</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_REASSIGN_LEASE</OPCODE>
<DATA>
<TXID>76</TXID>
<LEASEHOLDER>DFSClient_NONMAPREDUCE_-1834501254_1</LEASEHOLDER>
<TXID>67</TXID>
<LEASEHOLDER>DFSClient_NONMAPREDUCE_-1676409172_1</LEASEHOLDER>
<PATH>/hard-lease-recovery-test</PATH>
<NEWHOLDER>HDFS_NameNode</NEWHOLDER>
</DATA>
@ -984,23 +894,23 @@
<RECORD>
<OPCODE>OP_CLOSE</OPCODE>
<DATA>
<TXID>77</TXID>
<TXID>68</TXID>
<LENGTH>0</LENGTH>
<INODEID>0</INODEID>
<PATH>/hard-lease-recovery-test</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1372107477870</MTIME>
<ATIME>1372107475275</ATIME>
<MTIME>1374817867688</MTIME>
<ATIME>1374817865017</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<BLOCK>
<BLOCK_ID>1073741835</BLOCK_ID>
<BLOCK_ID>1073741834</BLOCK_ID>
<NUM_BYTES>11</NUM_BYTES>
<GENSTAMP>1013</GENSTAMP>
<GENSTAMP>1011</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -1009,7 +919,7 @@
<RECORD>
<OPCODE>OP_END_LOG_SEGMENT</OPCODE>
<DATA>
<TXID>78</TXID>
<TXID>69</TXID>
</DATA>
</RECORD>
</EDITS>