HDFS-11402. HDFS Snapshots should capture point-in-time copies of OPEN files. (Manoj Govindassamy via Yongjun Zhang)

This commit is contained in:
Yongjun Zhang 2017-04-21 20:19:20 -07:00
parent 3721cfe1fb
commit 20e3ae260b
15 changed files with 994 additions and 57 deletions

View File

@ -171,6 +171,10 @@ public interface HdfsClientConfigKeys {
"dfs.data.transfer.client.tcpnodelay";
boolean DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT = true;
String DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES =
"dfs.namenode.snapshot.capture.openfiles";
boolean DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT = false;
/**
* These are deprecated config keys to client code.
*/

View File

@ -352,6 +352,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY = "dfs.namenode.startup.delay.block.deletion.sec";
public static final long DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT = 0L;
public static final String DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES =
HdfsClientConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES;
public static final boolean DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT =
HdfsClientConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT;
// Whether to enable datanode's stale state detection and usage for reads
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false;

View File

@ -97,12 +97,13 @@ class FSDirSnapshotOp {
throw new InvalidPathException("Invalid snapshot name: " + snapshotName);
}
String snapshotPath = null;
String snapshotPath;
verifySnapshotName(fsd, snapshotName, snapshotRoot);
fsd.writeLock();
try {
snapshotPath = snapshotManager.createSnapshot(iip, snapshotRoot,
snapshotName);
snapshotPath = snapshotManager.createSnapshot(
fsd.getFSNamesystem().getLeaseManager(),
iip, snapshotRoot, snapshotName);
} finally {
fsd.writeUnlock();
}

View File

@ -746,8 +746,9 @@ public class FSEditLogLoader {
renameReservedPathsOnUpgrade(createSnapshotOp.snapshotRoot,
logVersion);
INodesInPath iip = fsDir.getINodesInPath(snapshotRoot, DirOp.WRITE);
String path = fsNamesys.getSnapshotManager().createSnapshot(iip,
snapshotRoot, createSnapshotOp.snapshotName);
String path = fsNamesys.getSnapshotManager().createSnapshot(
fsDir.getFSNamesystem().getLeaseManager(),
iip, snapshotRoot, createSnapshotOp.snapshotName);
if (toAddRetryCache) {
fsNamesys.addCacheEntryWithPayload(createSnapshotOp.rpcClientId,
createSnapshotOp.rpcCallId, path);

View File

@ -844,7 +844,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
this.dtSecretManager = createDelegationTokenSecretManager(conf);
this.dir = new FSDirectory(this, conf);
this.snapshotManager = new SnapshotManager(dir);
this.snapshotManager = new SnapshotManager(conf, dir);
this.cacheManager = new CacheManager(this, conf, blockManager);
this.ecPolicyManager = new ErasureCodingPolicyManager(conf);
this.topConf = new TopConf(conf);

View File

@ -256,9 +256,11 @@ public class INodeDirectory extends INodeWithAdditionalFields
getDirectorySnapshottableFeature().setSnapshotQuota(snapshotQuota);
}
public Snapshot addSnapshot(int id, String name) throws SnapshotException,
QuotaExceededException {
return getDirectorySnapshottableFeature().addSnapshot(this, id, name);
public Snapshot addSnapshot(int id, String name,
final LeaseManager leaseManager, final boolean captureOpenFiles)
throws SnapshotException, QuotaExceededException {
return getDirectorySnapshottableFeature().addSnapshot(this, id, name,
leaseManager, captureOpenFiles);
}
public Snapshot removeSnapshot(

View File

@ -49,24 +49,62 @@ public class INodesInPath {
Arrays.equals(HdfsServerConstants.DOT_SNAPSHOT_DIR_BYTES, pathComponent);
}
static INodesInPath fromINode(INode inode) {
private static INode[] getINodes(final INode inode) {
int depth = 0, index;
INode tmp = inode;
while (tmp != null) {
depth++;
tmp = tmp.getParent();
}
final byte[][] path = new byte[depth][];
final INode[] inodes = new INode[depth];
INode[] inodes = new INode[depth];
tmp = inode;
index = depth;
while (tmp != null) {
index--;
path[index] = tmp.getKey();
inodes[index] = tmp;
tmp = tmp.getParent();
}
return new INodesInPath(inodes, path);
return inodes;
}
private static byte[][] getPaths(final INode[] inodes) {
byte[][] paths = new byte[inodes.length][];
for (int i = 0; i < inodes.length; i++) {
paths[i] = inodes[i].getKey();
}
return paths;
}
/**
* Construct {@link INodesInPath} from {@link INode}.
*
* @param inode to construct from
* @return INodesInPath
*/
static INodesInPath fromINode(INode inode) {
INode[] inodes = getINodes(inode);
byte[][] paths = getPaths(inodes);
return new INodesInPath(inodes, paths);
}
/**
* Construct {@link INodesInPath} from {@link INode} and its root
* {@link INodeDirectory}. INodesInPath constructed this way will
* each have its snapshot and latest snapshot id filled in.
*
* This routine is specifically for
* {@link LeaseManager#getINodeWithLeases(INodeDirectory)} to get
* open files along with their snapshot details which is used during
* new snapshot creation to capture their meta data.
*
* @param rootDir the root {@link INodeDirectory} under which inode
* needs to be resolved
* @param inode the {@link INode} to be resolved
* @return INodesInPath
*/
static INodesInPath fromINode(final INodeDirectory rootDir, INode inode) {
byte[][] paths = getPaths(getINodes(inode));
return resolve(rootDir, paths);
}
static INodesInPath fromComponents(byte[][] components) {
@ -381,6 +419,36 @@ public class INodesInPath {
null;
}
/**
* Verify if this {@link INodesInPath} is a descendant of the
* requested {@link INodeDirectory}.
*
* @param inodeDirectory the ancestor directory
* @return true if this INodesInPath is a descendant of inodeDirectory
*/
public boolean isDescendant(final INodeDirectory inodeDirectory) {
final INodesInPath dirIIP = fromINode(inodeDirectory);
return isDescendant(dirIIP);
}
private boolean isDescendant(final INodesInPath ancestorDirIIP) {
int ancestorDirINodesLength = ancestorDirIIP.length();
int myParentINodesLength = length() - 1;
if (myParentINodesLength < ancestorDirINodesLength) {
return false;
}
int index = 0;
while (index < ancestorDirINodesLength) {
if (inodes[index] != ancestorDirIIP.getINode(index)) {
return false;
}
index++;
}
return true;
}
/**
* @return a new INodesInPath instance that only contains existing INodes.
* Note that this method only handles non-snapshot paths.

View File

@ -28,9 +28,15 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -41,6 +47,7 @@ import org.apache.hadoop.util.Daemon;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.util.Time;
/**
* LeaseManager does the lease housekeeping for writing on files.
@ -67,16 +74,14 @@ import com.google.common.base.Preconditions;
@InterfaceAudience.Private
public class LeaseManager {
public static final Log LOG = LogFactory.getLog(LeaseManager.class);
private final FSNamesystem fsnamesystem;
private long softLimit = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
private long hardLimit = HdfsConstants.LEASE_HARDLIMIT_PERIOD;
static final int INODE_FILTER_WORKER_COUNT_MAX = 4;
static final int INODE_FILTER_WORKER_TASK_MIN = 512;
//
// Used for handling lock-leases
// Mapping: leaseHolder -> Lease
//
private final SortedMap<String, Lease> leases = new TreeMap<>();
// Set of: Lease
private final PriorityQueue<Lease> sortedLeases = new PriorityQueue<>(512,
@ -129,6 +134,96 @@ public class LeaseManager {
Collection<Long> getINodeIdWithLeases() {return leasesById.keySet();}
/**
* Get {@link INodesInPath} for all {@link INode} in the system
* which has a valid lease.
*
* @return Set<INodesInPath>
*/
public Set<INodesInPath> getINodeWithLeases() {
return getINodeWithLeases(null);
}
private synchronized INode[] getINodesWithLease() {
int inodeCount = 0;
INode[] inodes = new INode[leasesById.size()];
for (long inodeId : leasesById.keySet()) {
inodes[inodeCount] = fsnamesystem.getFSDirectory().getInode(inodeId);
inodeCount++;
}
return inodes;
}
/**
* Get {@link INodesInPath} for all files under the ancestor directory which
* has valid lease. If the ancestor directory is null, then return all files
* in the system with valid lease. Callers must hold {@link FSNamesystem}
* read or write lock.
*
* @param ancestorDir the ancestor {@link INodeDirectory}
* @return Set<INodesInPath>
*/
public Set<INodesInPath> getINodeWithLeases(final INodeDirectory
ancestorDir) {
assert fsnamesystem.hasReadLock();
final long startTimeMs = Time.monotonicNow();
Set<INodesInPath> iipSet = new HashSet<>();
final INode[] inodes = getINodesWithLease();
int inodeCount = inodes.length;
if (inodeCount == 0) {
return iipSet;
}
List<Future<List<INodesInPath>>> futureList = Lists.newArrayList();
final int workerCount = Math.min(INODE_FILTER_WORKER_COUNT_MAX,
(((inodeCount - 1) / INODE_FILTER_WORKER_TASK_MIN) + 1));
ExecutorService inodeFilterService =
Executors.newFixedThreadPool(workerCount);
for (int workerIdx = 0; workerIdx < workerCount; workerIdx++) {
final int startIdx = workerIdx;
Callable<List<INodesInPath>> c = new Callable<List<INodesInPath>>() {
@Override
public List<INodesInPath> call() {
List<INodesInPath> iNodesInPaths = Lists.newArrayList();
for (int idx = startIdx; idx < inodeCount; idx += workerCount) {
INode inode = inodes[idx];
if (!inode.isFile()) {
continue;
}
INodesInPath inodesInPath = INodesInPath.fromINode(
fsnamesystem.getFSDirectory().getRoot(), inode.asFile());
if (ancestorDir != null &&
!inodesInPath.isDescendant(ancestorDir)) {
continue;
}
iNodesInPaths.add(inodesInPath);
}
return iNodesInPaths;
}
};
// Submit the inode filter task to the Executor Service
futureList.add(inodeFilterService.submit(c));
}
inodeFilterService.shutdown();
for (Future<List<INodesInPath>> f : futureList) {
try {
iipSet.addAll(f.get());
} catch (Exception e) {
LOG.warn("INode filter task encountered exception: ", e);
}
}
final long endTimeMs = Time.monotonicNow();
if ((endTimeMs - startTimeMs) > 1000) {
LOG.info("Took " + (endTimeMs - startTimeMs) + " ms to collect "
+ iipSet.size() + " open files with leases" +
((ancestorDir != null) ?
" under " + ancestorDir.getFullPathName() : "."));
}
return iipSet;
}
/** @return the lease containing src */
public synchronized Lease getLease(INodeFile src) {return leasesById.get(src.getId());}

View File

@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
@ -39,6 +40,8 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeReference;
import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.util.Diff.ListType;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.util.Time;
@ -163,7 +166,8 @@ public class DirectorySnapshottableFeature extends DirectoryWithSnapshotFeature
}
/** Add a snapshot. */
public Snapshot addSnapshot(INodeDirectory snapshotRoot, int id, String name)
public Snapshot addSnapshot(INodeDirectory snapshotRoot, int id, String name,
final LeaseManager leaseManager, final boolean captureOpenFiles)
throws SnapshotException, QuotaExceededException {
//check snapshot quota
final int n = getNumSnapshots();
@ -188,6 +192,15 @@ public class DirectorySnapshottableFeature extends DirectoryWithSnapshotFeature
final long now = Time.now();
snapshotRoot.updateModificationTime(now, Snapshot.CURRENT_STATE_ID);
s.getRoot().setModificationTime(now, Snapshot.CURRENT_STATE_ID);
if (captureOpenFiles) {
Set<INodesInPath> openFilesIIP =
leaseManager.getINodeWithLeases(snapshotRoot);
for (INodesInPath openFileIIP : openFilesIIP) {
INodeFile openFile = openFileIIP.getLastINode().asFile();
openFile.recordModification(openFileIIP.getLatestSnapshotId());
}
}
return s;
}

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.namenode.snapshot;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@ -29,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@ -43,6 +47,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.metrics2.util.MBeans;
import com.google.common.base.Preconditions;
@ -60,20 +65,23 @@ import com.google.common.base.Preconditions;
* if necessary.
*/
public class SnapshotManager implements SnapshotStatsMXBean {
private boolean allowNestedSnapshots = false;
private final FSDirectory fsdir;
private final boolean captureOpenFiles;
private final AtomicInteger numSnapshots = new AtomicInteger();
private static final int SNAPSHOT_ID_BIT_WIDTH = 24;
private final AtomicInteger numSnapshots = new AtomicInteger();
private boolean allowNestedSnapshots = false;
private int snapshotCounter = 0;
/** All snapshottable directories in the namesystem. */
private final Map<Long, INodeDirectory> snapshottables =
new HashMap<Long, INodeDirectory>();
public SnapshotManager(final FSDirectory fsdir) {
public SnapshotManager(final Configuration conf, final FSDirectory fsdir) {
this.fsdir = fsdir;
this.captureOpenFiles = conf.getBoolean(
DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES,
DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT);
}
/** Used in tests only */
@ -203,8 +211,9 @@ public class SnapshotManager implements SnapshotStatsMXBean {
* snapshot with the given name for the directory, and/or 3)
* snapshot number exceeds quota
*/
public String createSnapshot(final INodesInPath iip, String snapshotRoot,
String snapshotName) throws IOException {
public String createSnapshot(final LeaseManager leaseManager,
final INodesInPath iip, String snapshotRoot, String snapshotName)
throws IOException {
INodeDirectory srcRoot = getSnapshottableRoot(iip);
if (snapshotCounter == getMaxSnapshotID()) {
@ -216,7 +225,8 @@ public class SnapshotManager implements SnapshotStatsMXBean {
"snapshot IDs and ID rollover is not supported.");
}
srcRoot.addSnapshot(snapshotCounter, snapshotName);
srcRoot.addSnapshot(snapshotCounter, snapshotName, leaseManager,
this.captureOpenFiles);
//create success, update id
snapshotCounter++;

View File

@ -4154,6 +4154,23 @@
</description>
</property>
<property>
<name>dfs.namenode.snapshot.capture.openfiles</name>
<value>false</value>
<description>
If true, snapshots taken will have an immutable shared copy of
the open files that have valid leases. Even after the open files
grow or shrink in size, snapshot will always have the previous
point-in-time version of the open files, just like all other
closed files. Default is false.
Note: The file length captured for open files in snapshot is
whats recorded in NameNode at the time of snapshot and it may
be shorter than what the client has written till then. In order
to capture the latest length, the client can call hflush/hsync
with the flag SyncFlag.UPDATE_LENGTH on the open files handles.
</description>
</property>
<property>
<name>dfs.pipeline.ecn</name>
<value>false</value>

View File

@ -20,24 +20,33 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.*;
@ -69,19 +78,21 @@ public class TestLeaseManager {
/** Check that LeaseManager.checkLease release some leases
*/
@Test
public void testCheckLease() {
public void testCheckLease() throws InterruptedException {
LeaseManager lm = new LeaseManager(makeMockFsNameSystem());
long numLease = 100;
final long numLease = 100;
final long expiryTime = 0;
final long waitTime = expiryTime + 1;
//Make sure the leases we are going to add exceed the hard limit
lm.setLeasePeriod(0, 0);
lm.setLeasePeriod(expiryTime, expiryTime);
for (long i = 0; i <= numLease - 1; i++) {
//Add some leases to the LeaseManager
lm.addLease("holder"+i, INodeId.ROOT_INODE_ID + i);
}
assertEquals(numLease, lm.countLease());
Thread.sleep(waitTime);
//Initiate a call to checkLease. This should exit within the test timeout
lm.checkLeases();
@ -156,10 +167,271 @@ public class TestLeaseManager {
}
}
/**
* Test leased files counts from
* {@link LeaseManager#getINodeWithLeases()},
* {@link LeaseManager#getINodeIdWithLeases()} and
* {@link LeaseManager#getINodeWithLeases(INodeDirectory)}.
*/
@Test (timeout = 60000)
public void testInodeWithLeases() throws Exception {
FSNamesystem fsNamesystem = makeMockFsNameSystem();
FSDirectory fsDirectory = fsNamesystem.getFSDirectory();
LeaseManager lm = new LeaseManager(fsNamesystem);
Set<Long> iNodeIds = new HashSet<>(Arrays.asList(
INodeId.ROOT_INODE_ID + 1,
INodeId.ROOT_INODE_ID + 2,
INodeId.ROOT_INODE_ID + 3,
INodeId.ROOT_INODE_ID + 4
));
final PermissionStatus perm = PermissionStatus.createImmutable(
"user", "group", FsPermission.createImmutable((short)0755));
INodeDirectory rootInodeDirectory = new INodeDirectory(
HdfsConstants.GRANDFATHER_INODE_ID, DFSUtil.string2Bytes(""),
perm, 0L);
when(fsDirectory.getRoot()).thenReturn(rootInodeDirectory);
verifyINodeLeaseCounts(lm, rootInodeDirectory, 0, 0, 0);
for (Long iNodeId : iNodeIds) {
INodeFile iNodeFile = stubInodeFile(iNodeId);
iNodeFile.setParent(rootInodeDirectory);
when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile);
lm.addLease("holder_" + iNodeId, iNodeId);
}
verifyINodeLeaseCounts(lm, rootInodeDirectory, iNodeIds.size(),
iNodeIds.size(), iNodeIds.size());
for (Long iNodeId : iNodeIds) {
lm.removeLease(iNodeId);
}
verifyINodeLeaseCounts(lm, rootInodeDirectory, 0, 0, 0);
}
/**
* Test leased files counts at various scale from
* {@link LeaseManager#getINodeWithLeases()},
* {@link LeaseManager#getINodeIdWithLeases()} and
* {@link LeaseManager#getINodeWithLeases(INodeDirectory)}.
*/
@Test (timeout = 240000)
public void testInodeWithLeasesAtScale() throws Exception {
FSNamesystem fsNamesystem = makeMockFsNameSystem();
FSDirectory fsDirectory = fsNamesystem.getFSDirectory();
LeaseManager lm = new LeaseManager(fsNamesystem);
final PermissionStatus perm = PermissionStatus.createImmutable(
"user", "group", FsPermission.createImmutable((short)0755));
INodeDirectory rootInodeDirectory = new INodeDirectory(
HdfsConstants.GRANDFATHER_INODE_ID, DFSUtil.string2Bytes(""),
perm, 0L);
when(fsDirectory.getRoot()).thenReturn(rootInodeDirectory);
// Case 1: No open files
int scale = 0;
testInodeWithLeasesAtScaleImpl(lm, fsDirectory, rootInodeDirectory, scale);
for (int workerCount = 1;
workerCount <= LeaseManager.INODE_FILTER_WORKER_COUNT_MAX / 2;
workerCount++) {
// Case 2: Open files count is half of worker task size
scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN / 2;
testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
rootInodeDirectory, scale);
// Case 3: Open files count is 1 less of worker task size
scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN - 1;
testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
rootInodeDirectory, scale);
// Case 4: Open files count is equal to worker task size
scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN;
testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
rootInodeDirectory, scale);
// Case 5: Open files count is 1 more than worker task size
scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN + 1;
testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
rootInodeDirectory, scale);
}
// Case 6: Open files count is way more than worker count
scale = 1279;
testInodeWithLeasesAtScaleImpl(lm, fsDirectory, rootInodeDirectory, scale);
}
private void testInodeWithLeasesAtScaleImpl(final LeaseManager leaseManager,
final FSDirectory fsDirectory, INodeDirectory ancestorDirectory,
int scale) {
verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0);
Set<Long> iNodeIds = new HashSet<>();
for (int i = 0; i < scale; i++) {
iNodeIds.add(INodeId.ROOT_INODE_ID + i);
}
for (Long iNodeId : iNodeIds) {
INodeFile iNodeFile = stubInodeFile(iNodeId);
iNodeFile.setParent(ancestorDirectory);
when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile);
leaseManager.addLease("holder_" + iNodeId, iNodeId);
}
verifyINodeLeaseCounts(leaseManager, ancestorDirectory, iNodeIds.size(),
iNodeIds.size(), iNodeIds.size());
leaseManager.removeAllLeases();
verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0);
}
/**
* Verify leased INode details across lease get and release from
* {@link LeaseManager#getINodeIdWithLeases()} and
* {@link LeaseManager#getINodeWithLeases(INodeDirectory)}.
*/
@Test (timeout = 60000)
public void testInodeWithLeasesForAncestorDir() throws Exception {
FSNamesystem fsNamesystem = makeMockFsNameSystem();
FSDirectory fsDirectory = fsNamesystem.getFSDirectory();
LeaseManager lm = new LeaseManager(fsNamesystem);
final PermissionStatus perm = PermissionStatus.createImmutable(
"user", "group", FsPermission.createImmutable((short)0755));
INodeDirectory rootInodeDirectory = new INodeDirectory(
HdfsConstants.GRANDFATHER_INODE_ID, DFSUtil.string2Bytes(""),
perm, 0L);
when(fsDirectory.getRoot()).thenReturn(rootInodeDirectory);
AtomicInteger inodeIds = new AtomicInteger(
(int) (HdfsConstants.GRANDFATHER_INODE_ID + 1234));
String[] pathTree = new String[] {
"/root.log",
"/ENG/a/a1.log",
"/ENG/a/b/b1.log",
"/ENG/a/b/c/c1.log",
"/ENG/a/b/c/c2.log",
"/OPS/m/m1.log",
"/OPS/m/n/n1.log",
"/OPS/m/n/n2.log"
};
Map<String, INode> pathINodeMap = createINodeTree(rootInodeDirectory,
pathTree, inodeIds);
assertEquals(0, lm.getINodeIdWithLeases().size());
for (Entry<String, INode> entry : pathINodeMap.entrySet()) {
long iNodeId = entry.getValue().getId();
when(fsDirectory.getInode(iNodeId)).thenReturn(entry.getValue());
if (entry.getKey().contains("log")) {
lm.addLease("holder_" + iNodeId, iNodeId);
}
}
assertEquals(pathTree.length, lm.getINodeIdWithLeases().size());
assertEquals(pathTree.length, lm.getINodeWithLeases().size());
assertEquals(pathTree.length, lm.getINodeWithLeases(
rootInodeDirectory).size());
// reset
lm.removeAllLeases();
Set<String> filesLeased = new HashSet<>(
Arrays.asList("root.log", "a1.log", "c1.log", "n2.log"));
for (String fileName : filesLeased) {
lm.addLease("holder", pathINodeMap.get(fileName).getId());
}
assertEquals(filesLeased.size(), lm.getINodeIdWithLeases().size());
assertEquals(filesLeased.size(), lm.getINodeWithLeases().size());
Set<INodesInPath> iNodeWithLeases = lm.getINodeWithLeases();
for (INodesInPath iNodesInPath : iNodeWithLeases) {
String leasedFileName = DFSUtil.bytes2String(
iNodesInPath.getLastLocalName());
assertTrue(filesLeased.contains(leasedFileName));
}
assertEquals(filesLeased.size(),
lm.getINodeWithLeases(rootInodeDirectory).size());
assertEquals(filesLeased.size() - 2,
lm.getINodeWithLeases(pathINodeMap.get("ENG").asDirectory()).size());
assertEquals(filesLeased.size() - 2,
lm.getINodeWithLeases(pathINodeMap.get("a").asDirectory()).size());
assertEquals(filesLeased.size() - 3,
lm.getINodeWithLeases(pathINodeMap.get("c").asDirectory()).size());
assertEquals(filesLeased.size() - 3,
lm.getINodeWithLeases(pathINodeMap.get("OPS").asDirectory()).size());
assertEquals(filesLeased.size() - 3,
lm.getINodeWithLeases(pathINodeMap.get("n").asDirectory()).size());
lm.removeLease(pathINodeMap.get("n2.log").getId());
assertEquals(filesLeased.size() - 1,
lm.getINodeWithLeases(rootInodeDirectory).size());
assertEquals(filesLeased.size() - 4,
lm.getINodeWithLeases(pathINodeMap.get("n").asDirectory()).size());
lm.removeAllLeases();
filesLeased.clear();
assertEquals(filesLeased.size(),
lm.getINodeWithLeases(rootInodeDirectory).size());
}
private void verifyINodeLeaseCounts(final LeaseManager leaseManager,
INodeDirectory ancestorDirectory, int iNodeIdWithLeaseCount,
int iNodeWithLeaseCount, int iNodeUnderAncestorLeaseCount) {
assertEquals(iNodeIdWithLeaseCount,
leaseManager.getINodeIdWithLeases().size());
assertEquals(iNodeWithLeaseCount,
leaseManager.getINodeWithLeases().size());
assertEquals(iNodeUnderAncestorLeaseCount,
leaseManager.getINodeWithLeases(ancestorDirectory).size());
}
private Map<String, INode> createINodeTree(INodeDirectory parentDir,
String[] pathTree, AtomicInteger inodeId)
throws QuotaExceededException {
HashMap<String, INode> pathINodeMap = new HashMap<>();
for (String path : pathTree) {
byte[][] components = INode.getPathComponents(path);
FsPermission perm = FsPermission.createImmutable((short) 0755);
PermissionStatus permStatus =
PermissionStatus.createImmutable("", "", perm);
INodeDirectory prev = parentDir;
INodeDirectory dir = null;
for (int i = 0; i < components.length - 1; i++) {
byte[] component = components[i];
if (component.length == 0) {
continue;
}
INode existingChild = prev.getChild(
component, Snapshot.CURRENT_STATE_ID);
if (existingChild == null) {
String dirName = DFSUtil.bytes2String(component);
dir = new INodeDirectory(inodeId.incrementAndGet(), component,
permStatus, 0);
prev.addChild(dir, false, Snapshot.CURRENT_STATE_ID);
pathINodeMap.put(dirName, dir);
prev = dir;
} else {
assertTrue(existingChild.isDirectory());
prev = existingChild.asDirectory();
}
}
PermissionStatus p = new PermissionStatus(
"user", "group", new FsPermission((short) 0777));
byte[] fileNameBytes = components[components.length - 1];
String fileName = DFSUtil.bytes2String(fileNameBytes);
INodeFile iNodeFile = new INodeFile(
inodeId.incrementAndGet(), fileNameBytes,
p, 0L, 0L, BlockInfo.EMPTY_ARRAY, (short) 1, 1L);
iNodeFile.setParent(prev);
pathINodeMap.put(fileName, iNodeFile);
}
return pathINodeMap;
}
private static FSNamesystem makeMockFsNameSystem() {
FSDirectory dir = mock(FSDirectory.class);
FSNamesystem fsn = mock(FSNamesystem.class);
when(fsn.isRunning()).thenReturn(true);
when(fsn.hasReadLock()).thenReturn(true);
when(fsn.hasWriteLock()).thenReturn(true);
when(fsn.getFSDirectory()).thenReturn(dir);
when(fsn.getMaxLockHoldToReleaseLeaseMs()).thenReturn(maxLockHoldToReleaseLeaseMs);
@ -170,7 +442,7 @@ public class TestLeaseManager {
PermissionStatus p = new PermissionStatus(
"dummy", "dummy", new FsPermission((short) 0777));
return new INodeFile(
inodeId, "/foo".getBytes(), p, 0L, 0L,
inodeId, new String("foo-" + inodeId).getBytes(), p, 0L, 0L,
BlockInfo.EMPTY_ARRAY, (short) 1, 1L);
}
}

View File

@ -18,21 +18,28 @@
package org.apache.hadoop.hdfs.server.namenode.snapshot;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -41,8 +48,16 @@ public class TestOpenFilesWithSnapshot {
MiniDFSCluster cluster = null;
DistributedFileSystem fs = null;
private static final long SEED = 0;
private static final short REPLICATION = 3;
private static final long BLOCKSIZE = 1024;
private static final long BUFFERLEN = BLOCKSIZE / 2;
private static final long FILELEN = BLOCKSIZE * 2;
@Before
public void setup() throws IOException {
conf.setBoolean(
DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES, true);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
conf.set("dfs.blocksize", "1048576");
fs = cluster.getFileSystem();
@ -198,6 +213,289 @@ public class TestOpenFilesWithSnapshot {
restartNameNode();
}
private void createFile(final Path filePath) throws IOException {
DFSTestUtil.createFile(fs, filePath, (int) BUFFERLEN,
FILELEN, BLOCKSIZE, REPLICATION, SEED);
}
private int writeToStream(final FSDataOutputStream outputStream, byte[] buf)
throws IOException {
outputStream.write(buf);
((HdfsDataOutputStream)outputStream).hsync(
EnumSet.of(SyncFlag.UPDATE_LENGTH));
return buf.length;
}
/**
* Test open files under snapshot directories are getting captured
* in snapshots as a truly immutable copy. Verify open files outside
* of snapshot directory not getting affected.
*
* \- level_0_A
* \- level_1_C
* +- appA.log (open file, not under snap root)
* \- level_2_E (Snapshottable Dir)
* \- level_3_G
* +- flume.log (open file, under snap root)
* \- level_0_B
* +- appB.log (open file, not under snap root)
* \- level_2_D (Snapshottable Dir)
* +- hbase.log (open file, under snap root)
*/
@Test (timeout = 120000)
public void testPointInTimeSnapshotCopiesForOpenFiles() throws Exception {
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES,
true);
// Construct the directory tree
final Path level0A = new Path("/level_0_A");
final Path level0B = new Path("/level_0_B");
final Path level1C = new Path(level0A, "level_1_C");
final Path level1D = new Path(level0B, "level_1_D");
final Path level2E = new Path(level1C, "level_2_E");
final Path level3G = new Path(level2E, "level_3_G");
Set<Path> dirPaths = new HashSet<>(Arrays.asList(level0A, level0B,
level1C, level1D, level2E, level3G));
for (Path dirPath : dirPaths) {
fs.mkdirs(dirPath);
}
// String constants
final Path flumeSnapRootDir = level2E;
final Path hbaseSnapRootDir = level1D;
final String flumeFileName = "flume.log";
final String hbaseFileName = "hbase.log";
final String appAFileName = "appA.log";
final String appBFileName = "appB.log";
final String flumeSnap1Name = "flume_snap_s1";
final String flumeSnap2Name = "flume_snap_s2";
final String flumeSnap3Name = "flume_snap_s3";
final String hbaseSnap1Name = "hbase_snap_s1";
final String hbaseSnap2Name = "hbase_snap_s2";
final String hbaseSnap3Name = "hbase_snap_s3";
final String flumeRelPathFromSnapDir = "level_3_G/" + flumeFileName;
// Create files and open a stream
final Path flumeFile = new Path(level3G, flumeFileName);
createFile(flumeFile);
FSDataOutputStream flumeOutputStream = fs.append(flumeFile);
final Path hbaseFile = new Path(level1D, hbaseFileName);
createFile(hbaseFile);
FSDataOutputStream hbaseOutputStream = fs.append(hbaseFile);
final Path appAFile = new Path(level1C, appAFileName);
createFile(appAFile);
FSDataOutputStream appAOutputStream = fs.append(appAFile);
final Path appBFile = new Path(level0B, appBFileName);
createFile(appBFile);
FSDataOutputStream appBOutputStream = fs.append(appBFile);
final long appAFileInitialLength = fs.getFileStatus(appAFile).getLen();
final long appBFileInitialLength = fs.getFileStatus(appBFile).getLen();
// Create Snapshot S1
final Path flumeS1Dir = SnapshotTestHelper.createSnapshot(
fs, flumeSnapRootDir, flumeSnap1Name);
final Path flumeS1Path = new Path(flumeS1Dir, flumeRelPathFromSnapDir);
final Path hbaseS1Dir = SnapshotTestHelper.createSnapshot(
fs, hbaseSnapRootDir, hbaseSnap1Name);
final Path hbaseS1Path = new Path(hbaseS1Dir, hbaseFileName);
final long flumeFileLengthAfterS1 = fs.getFileStatus(flumeFile).getLen();
final long hbaseFileLengthAfterS1 = fs.getFileStatus(hbaseFile).getLen();
// Verify if Snap S1 file lengths are same as the the live ones
Assert.assertEquals(flumeFileLengthAfterS1,
fs.getFileStatus(flumeS1Path).getLen());
Assert.assertEquals(hbaseFileLengthAfterS1,
fs.getFileStatus(hbaseS1Path).getLen());
Assert.assertEquals(appAFileInitialLength,
fs.getFileStatus(appAFile).getLen());
Assert.assertEquals(appBFileInitialLength,
fs.getFileStatus(appBFile).getLen());
long flumeFileWrittenDataLength = flumeFileLengthAfterS1;
long hbaseFileWrittenDataLength = hbaseFileLengthAfterS1;
long appAFileWrittenDataLength = appAFileInitialLength;
int newWriteLength = (int) (BLOCKSIZE * 1.5);
byte[] buf = new byte[newWriteLength];
Random random = new Random();
random.nextBytes(buf);
// Write more data to flume and hbase files only
flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf);
// Create Snapshot S2
final Path flumeS2Dir = SnapshotTestHelper.createSnapshot(
fs, flumeSnapRootDir, flumeSnap2Name);
final Path flumeS2Path = new Path(flumeS2Dir, flumeRelPathFromSnapDir);
final Path hbaseS2Dir = SnapshotTestHelper.createSnapshot(
fs, hbaseSnapRootDir, hbaseSnap2Name);
final Path hbaseS2Path = new Path(hbaseS2Dir, hbaseFileName);
// Verify live files lengths are same as all data written till now
final long flumeFileLengthAfterS2 = fs.getFileStatus(flumeFile).getLen();
final long hbaseFileLengthAfterS2 = fs.getFileStatus(hbaseFile).getLen();
Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS2);
Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS2);
// Verify if Snap S2 file lengths are same as the live ones
Assert.assertEquals(flumeFileLengthAfterS2,
fs.getFileStatus(flumeS2Path).getLen());
Assert.assertEquals(hbaseFileLengthAfterS2,
fs.getFileStatus(hbaseS2Path).getLen());
Assert.assertEquals(appAFileInitialLength,
fs.getFileStatus(appAFile).getLen());
Assert.assertEquals(appBFileInitialLength,
fs.getFileStatus(appBFile).getLen());
// Write more data to appA file only
newWriteLength = (int) (BLOCKSIZE * 2.5);
buf = new byte[newWriteLength];
random.nextBytes(buf);
appAFileWrittenDataLength += writeToStream(appAOutputStream, buf);
// Verify other open files are not affected in their snapshots
Assert.assertEquals(flumeFileLengthAfterS2,
fs.getFileStatus(flumeS2Path).getLen());
Assert.assertEquals(appAFileWrittenDataLength,
fs.getFileStatus(appAFile).getLen());
// Write more data to flume file only
newWriteLength = (int) (BLOCKSIZE * 2.5);
buf = new byte[newWriteLength];
random.nextBytes(buf);
flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
// Create Snapshot S3
final Path flumeS3Dir = SnapshotTestHelper.createSnapshot(
fs, flumeSnapRootDir, flumeSnap3Name);
final Path flumeS3Path = new Path(flumeS3Dir, flumeRelPathFromSnapDir);
final Path hbaseS3Dir = SnapshotTestHelper.createSnapshot(
fs, hbaseSnapRootDir, hbaseSnap3Name);
final Path hbaseS3Path = new Path(hbaseS3Dir, hbaseFileName);
// Verify live files lengths are same as all data written till now
final long flumeFileLengthAfterS3 = fs.getFileStatus(flumeFile).getLen();
final long hbaseFileLengthAfterS3 = fs.getFileStatus(hbaseFile).getLen();
Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS3);
Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS3);
// Verify if Snap S3 file lengths are same as the live ones
Assert.assertEquals(flumeFileLengthAfterS3,
fs.getFileStatus(flumeS3Path).getLen());
Assert.assertEquals(hbaseFileLengthAfterS3,
fs.getFileStatus(hbaseS3Path).getLen());
Assert.assertEquals(appAFileWrittenDataLength,
fs.getFileStatus(appAFile).getLen());
Assert.assertEquals(appBFileInitialLength,
fs.getFileStatus(appBFile).getLen());
// Verify old flume snapshots have point-in-time / frozen file lengths
// even after the live file have moved forward.
Assert.assertEquals(flumeFileLengthAfterS1,
fs.getFileStatus(flumeS1Path).getLen());
Assert.assertEquals(flumeFileLengthAfterS2,
fs.getFileStatus(flumeS2Path).getLen());
Assert.assertEquals(flumeFileLengthAfterS3,
fs.getFileStatus(flumeS3Path).getLen());
// Verify old hbase snapshots have point-in-time / frozen file lengths
// even after the live files have moved forward.
Assert.assertEquals(hbaseFileLengthAfterS1,
fs.getFileStatus(hbaseS1Path).getLen());
Assert.assertEquals(hbaseFileLengthAfterS2,
fs.getFileStatus(hbaseS2Path).getLen());
Assert.assertEquals(hbaseFileLengthAfterS3,
fs.getFileStatus(hbaseS3Path).getLen());
flumeOutputStream.close();
hbaseOutputStream.close();
appAOutputStream.close();
appBOutputStream.close();
}
/**
* Test snapshot capturing open files and verify the same
* across NameNode restarts.
*/
@Test (timeout = 120000)
public void testSnapshotsForOpenFilesWithNNRestart() throws Exception {
// Construct the directory tree
final Path level0A = new Path("/level_0_A");
final Path flumeSnapRootDir = level0A;
final String flumeFileName = "flume.log";
final String flumeSnap1Name = "flume_snap_1";
final String flumeSnap2Name = "flume_snap_2";
// Create files and open a stream
final Path flumeFile = new Path(level0A, flumeFileName);
createFile(flumeFile);
FSDataOutputStream flumeOutputStream = fs.append(flumeFile);
// Create Snapshot S1
final Path flumeS1Dir = SnapshotTestHelper.createSnapshot(
fs, flumeSnapRootDir, flumeSnap1Name);
final Path flumeS1Path = new Path(flumeS1Dir, flumeFileName);
final long flumeFileLengthAfterS1 = fs.getFileStatus(flumeFile).getLen();
// Verify if Snap S1 file length is same as the the live one
Assert.assertEquals(flumeFileLengthAfterS1,
fs.getFileStatus(flumeS1Path).getLen());
long flumeFileWrittenDataLength = flumeFileLengthAfterS1;
int newWriteLength = (int) (BLOCKSIZE * 1.5);
byte[] buf = new byte[newWriteLength];
Random random = new Random();
random.nextBytes(buf);
// Write more data to flume file
flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
// Create Snapshot S2
final Path flumeS2Dir = SnapshotTestHelper.createSnapshot(
fs, flumeSnapRootDir, flumeSnap2Name);
final Path flumeS2Path = new Path(flumeS2Dir, flumeFileName);
// Verify live files length is same as all data written till now
final long flumeFileLengthAfterS2 = fs.getFileStatus(flumeFile).getLen();
Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS2);
// Verify if Snap S2 file length is same as the live one
Assert.assertEquals(flumeFileLengthAfterS2,
fs.getFileStatus(flumeS2Path).getLen());
// Write more data to flume file
flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
// Verify old flume snapshots have point-in-time / frozen file lengths
// even after the live file have moved forward.
Assert.assertEquals(flumeFileLengthAfterS1,
fs.getFileStatus(flumeS1Path).getLen());
Assert.assertEquals(flumeFileLengthAfterS2,
fs.getFileStatus(flumeS2Path).getLen());
// Restart the NameNode
restartNameNode();
cluster.waitActive();
// Verify live file length hasn't changed after NN restart
Assert.assertEquals(flumeFileWrittenDataLength,
fs.getFileStatus(flumeFile).getLen());
// Verify old flume snapshots have point-in-time / frozen file lengths
// after NN restart and live file moved forward.
Assert.assertEquals(flumeFileLengthAfterS1,
fs.getFileStatus(flumeS1Path).getLen());
Assert.assertEquals(flumeFileLengthAfterS2,
fs.getFileStatus(flumeS2Path).getLen());
flumeOutputStream.close();
}
private void restartNameNode() throws Exception {
cluster.triggerBlockReports();
NameNode nameNode = cluster.getNameNode();

View File

@ -22,20 +22,29 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -43,12 +52,13 @@ import org.junit.Test;
* Tests snapshot deletion.
*/
public class TestSnapshotDiffReport {
protected static final long seed = 0;
protected static final short REPLICATION = 3;
protected static final short REPLICATION_1 = 2;
protected static final long BLOCKSIZE = 1024;
public static final int SNAPSHOTNUMBER = 10;
private static final long SEED = 0;
private static final short REPLICATION = 3;
private static final short REPLICATION_1 = 2;
private static final long BLOCKSIZE = 1024;
private static final long BUFFERLEN = BLOCKSIZE / 2;
private static final long FILELEN = BLOCKSIZE * 2;
private final Path dir = new Path("/TestSnapshot");
private final Path sub1 = new Path(dir, "sub1");
@ -61,6 +71,8 @@ public class TestSnapshotDiffReport {
@Before
public void setUp() throws Exception {
conf = new Configuration();
conf.setBoolean(
DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES, true);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
.format(true).build();
cluster.waitActive();
@ -97,10 +109,10 @@ public class TestSnapshotDiffReport {
Path link13 = new Path(modifyDir, "link13");
Path file14 = new Path(modifyDir, "file14");
Path file15 = new Path(modifyDir, "file15");
DFSTestUtil.createFile(hdfs, file10, BLOCKSIZE, REPLICATION_1, seed);
DFSTestUtil.createFile(hdfs, file11, BLOCKSIZE, REPLICATION_1, seed);
DFSTestUtil.createFile(hdfs, file12, BLOCKSIZE, REPLICATION_1, seed);
DFSTestUtil.createFile(hdfs, file13, BLOCKSIZE, REPLICATION_1, seed);
DFSTestUtil.createFile(hdfs, file10, BLOCKSIZE, REPLICATION_1, SEED);
DFSTestUtil.createFile(hdfs, file11, BLOCKSIZE, REPLICATION_1, SEED);
DFSTestUtil.createFile(hdfs, file12, BLOCKSIZE, REPLICATION_1, SEED);
DFSTestUtil.createFile(hdfs, file13, BLOCKSIZE, REPLICATION_1, SEED);
// create link13
hdfs.createSymlink(file13, link13, false);
// create snapshot
@ -118,9 +130,9 @@ public class TestSnapshotDiffReport {
// delete link13
hdfs.delete(link13, false);
// create file14
DFSTestUtil.createFile(hdfs, file14, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, file14, BLOCKSIZE, REPLICATION, SEED);
// create file15
DFSTestUtil.createFile(hdfs, file15, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, file15, BLOCKSIZE, REPLICATION, SEED);
// create snapshot
for (Path snapshotDir : snapshotDirs) {
@ -128,7 +140,7 @@ public class TestSnapshotDiffReport {
}
// create file11 again
DFSTestUtil.createFile(hdfs, file11, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, file11, BLOCKSIZE, REPLICATION, SEED);
// delete file12
hdfs.delete(file12, true);
// modify file13
@ -386,8 +398,8 @@ public class TestSnapshotDiffReport {
final Path fileInFoo = new Path(foo, "file");
final Path bar = new Path(dir2, "bar");
final Path fileInBar = new Path(bar, "file");
DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, fileInBar, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, SEED);
DFSTestUtil.createFile(hdfs, fileInBar, BLOCKSIZE, REPLICATION, SEED);
// create snapshot on /dir1
SnapshotTestHelper.createSnapshot(hdfs, dir1, "s0");
@ -421,8 +433,8 @@ public class TestSnapshotDiffReport {
final Path fileInFoo = new Path(foo, "file");
final Path bar = new Path(dir2, "bar");
final Path fileInBar = new Path(bar, "file");
DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, fileInBar, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, SEED);
DFSTestUtil.createFile(hdfs, fileInBar, BLOCKSIZE, REPLICATION, SEED);
SnapshotTestHelper.createSnapshot(hdfs, root, "s0");
hdfs.rename(fileInFoo, fileInBar, Rename.OVERWRITE);
@ -454,7 +466,7 @@ public class TestSnapshotDiffReport {
final Path root = new Path("/");
final Path foo = new Path(root, "foo");
final Path fileInFoo = new Path(foo, "file");
DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, SEED);
SnapshotTestHelper.createSnapshot(hdfs, root, "s0");
final Path bar = new Path(root, "bar");
@ -478,7 +490,7 @@ public class TestSnapshotDiffReport {
public void testDiffReportWithRenameAndAppend() throws Exception {
final Path root = new Path("/");
final Path foo = new Path(root, "foo");
DFSTestUtil.createFile(hdfs, foo, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, foo, BLOCKSIZE, REPLICATION, SEED);
SnapshotTestHelper.createSnapshot(hdfs, root, "s0");
final Path bar = new Path(root, "bar");
@ -504,7 +516,7 @@ public class TestSnapshotDiffReport {
final Path root = new Path("/");
final Path foo = new Path(root, "foo");
final Path bar = new Path(foo, "bar");
DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPLICATION, SEED);
SnapshotTestHelper.createSnapshot(hdfs, root, "s0");
// rename /foo to /foo2
@ -529,4 +541,140 @@ public class TestSnapshotDiffReport {
new DiffReportEntry(DiffType.RENAME, DFSUtil.string2Bytes("foo2/bar"),
DFSUtil.string2Bytes("foo2/bar-new")));
}
private void createFile(final Path filePath) throws IOException {
DFSTestUtil.createFile(hdfs, filePath, (int) BUFFERLEN,
FILELEN, BLOCKSIZE, REPLICATION, SEED);
}
private int writeToStream(final FSDataOutputStream outputStream,
byte[] buf) throws IOException {
outputStream.write(buf);
((HdfsDataOutputStream)outputStream).hsync(
EnumSet.of(SyncFlag.UPDATE_LENGTH));
return buf.length;
}
private void restartNameNode() throws Exception {
cluster.triggerBlockReports();
NameNode nameNode = cluster.getNameNode();
NameNodeAdapter.enterSafeMode(nameNode, false);
NameNodeAdapter.saveNamespace(nameNode);
NameNodeAdapter.leaveSafeMode(nameNode);
cluster.restartNameNode(true);
}
/**
* Test Snapshot diff report for snapshots with open files captures in them.
* Also verify if the diff report remains the same across NameNode restarts.
*/
@Test (timeout = 120000)
public void testDiffReportWithOpenFiles() throws Exception {
// Construct the directory tree
final Path level0A = new Path("/level_0_A");
final Path flumeSnapRootDir = level0A;
final String flumeFileName = "flume.log";
final String flumeSnap1Name = "flume_snap_1";
final String flumeSnap2Name = "flume_snap_2";
// Create files and open a stream
final Path flumeFile = new Path(level0A, flumeFileName);
createFile(flumeFile);
FSDataOutputStream flumeOutputStream = hdfs.append(flumeFile);
// Create Snapshot S1
final Path flumeS1Dir = SnapshotTestHelper.createSnapshot(
hdfs, flumeSnapRootDir, flumeSnap1Name);
final Path flumeS1Path = new Path(flumeS1Dir, flumeFileName);
final long flumeFileLengthAfterS1 = hdfs.getFileStatus(flumeFile).getLen();
// Verify if Snap S1 file length is same as the the live one
Assert.assertEquals(flumeFileLengthAfterS1,
hdfs.getFileStatus(flumeS1Path).getLen());
verifyDiffReport(level0A, flumeSnap1Name, "",
new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")));
long flumeFileWrittenDataLength = flumeFileLengthAfterS1;
int newWriteLength = (int) (BLOCKSIZE * 1.5);
byte[] buf = new byte[newWriteLength];
Random random = new Random();
random.nextBytes(buf);
// Write more data to flume file
flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
// Create Snapshot S2
final Path flumeS2Dir = SnapshotTestHelper.createSnapshot(
hdfs, flumeSnapRootDir, flumeSnap2Name);
final Path flumeS2Path = new Path(flumeS2Dir, flumeFileName);
// Verify live files length is same as all data written till now
final long flumeFileLengthAfterS2 = hdfs.getFileStatus(flumeFile).getLen();
Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS2);
// Verify if Snap S2 file length is same as the live one
Assert.assertEquals(flumeFileLengthAfterS2,
hdfs.getFileStatus(flumeS2Path).getLen());
verifyDiffReport(level0A, flumeSnap1Name, "",
new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
new DiffReportEntry(DiffType.MODIFY,
DFSUtil.string2Bytes(flumeFileName)));
verifyDiffReport(level0A, flumeSnap2Name, "",
new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")));
verifyDiffReport(level0A, flumeSnap1Name, flumeSnap2Name,
new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
new DiffReportEntry(DiffType.MODIFY,
DFSUtil.string2Bytes(flumeFileName)));
// Write more data to flume file
flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
// Verify old flume snapshots have point-in-time / frozen file lengths
// even after the live file have moved forward.
Assert.assertEquals(flumeFileLengthAfterS1,
hdfs.getFileStatus(flumeS1Path).getLen());
Assert.assertEquals(flumeFileLengthAfterS2,
hdfs.getFileStatus(flumeS2Path).getLen());
flumeOutputStream.close();
// Verify if Snap S2 file length is same as the live one
Assert.assertEquals(flumeFileWrittenDataLength,
hdfs.getFileStatus(flumeFile).getLen());
// Verify old flume snapshots have point-in-time / frozen file lengths
// even after the live file have moved forward.
Assert.assertEquals(flumeFileLengthAfterS1,
hdfs.getFileStatus(flumeS1Path).getLen());
Assert.assertEquals(flumeFileLengthAfterS2,
hdfs.getFileStatus(flumeS2Path).getLen());
verifyDiffReport(level0A, flumeSnap1Name, "",
new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
new DiffReportEntry(DiffType.MODIFY,
DFSUtil.string2Bytes(flumeFileName)));
verifyDiffReport(level0A, flumeSnap2Name, "",
new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
new DiffReportEntry(DiffType.MODIFY,
DFSUtil.string2Bytes(flumeFileName)));
verifyDiffReport(level0A, flumeSnap1Name, flumeSnap2Name,
new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
new DiffReportEntry(DiffType.MODIFY,
DFSUtil.string2Bytes(flumeFileName)));
restartNameNode();
verifyDiffReport(level0A, flumeSnap1Name, flumeSnap2Name,
new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
new DiffReportEntry(DiffType.MODIFY,
DFSUtil.string2Bytes(flumeFileName)));
}
}

View File

@ -23,11 +23,13 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Test;
@ -46,25 +48,26 @@ public class TestSnapshotManager {
public void testSnapshotLimits() throws Exception {
// Setup mock objects for SnapshotManager.createSnapshot.
//
LeaseManager leaseManager = mock(LeaseManager.class);
INodeDirectory ids = mock(INodeDirectory.class);
FSDirectory fsdir = mock(FSDirectory.class);
INodesInPath iip = mock(INodesInPath.class);
SnapshotManager sm = spy(new SnapshotManager(fsdir));
SnapshotManager sm = spy(new SnapshotManager(new Configuration(), fsdir));
doReturn(ids).when(sm).getSnapshottableRoot((INodesInPath) anyObject());
doReturn(testMaxSnapshotLimit).when(sm).getMaxSnapshotID();
// Create testMaxSnapshotLimit snapshots. These should all succeed.
//
for (Integer i = 0; i < testMaxSnapshotLimit; ++i) {
sm.createSnapshot(iip, "dummy", i.toString());
sm.createSnapshot(leaseManager, iip, "dummy", i.toString());
}
// Attempt to create one more snapshot. This should fail due to snapshot
// ID rollover.
//
try {
sm.createSnapshot(iip, "dummy", "shouldFailSnapshot");
sm.createSnapshot(leaseManager, iip, "dummy", "shouldFailSnapshot");
Assert.fail("Expected SnapshotException not thrown");
} catch (SnapshotException se) {
Assert.assertTrue(
@ -79,7 +82,7 @@ public class TestSnapshotManager {
// to snapshot ID rollover.
//
try {
sm.createSnapshot(iip, "dummy", "shouldFailSnapshot2");
sm.createSnapshot(leaseManager, iip, "dummy", "shouldFailSnapshot2");
Assert.fail("Expected SnapshotException not thrown");
} catch (SnapshotException se) {
Assert.assertTrue(