HDFS-16128. [FGL] Added support for saving/loading an FS Image for PartitionedGSet. Contributed by Xing Lin. (#3201)
This commit is contained in:
parent
c06cd9ae8c
commit
4610e1d902
|
@ -68,7 +68,7 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
|
|||
* Consists of a hash table {@link LightWeightGSet} and a lock, which
|
||||
* controls access to this partition independently on the other ones.
|
||||
*/
|
||||
private class PartitionEntry extends LightWeightGSet<K, E> {
|
||||
public class PartitionEntry extends LightWeightGSet<K, E> {
|
||||
private final LatchLock<?> partLock;
|
||||
|
||||
PartitionEntry(int defaultPartitionCapacity) {
|
||||
|
@ -121,7 +121,7 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
|
|||
return size;
|
||||
}
|
||||
|
||||
protected PartitionEntry getPartition(final K key) {
|
||||
public PartitionEntry getPartition(final K key) {
|
||||
Entry<K, PartitionEntry> partEntry = partitions.floorEntry(key);
|
||||
if(partEntry == null) {
|
||||
return null;
|
||||
|
@ -174,6 +174,10 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
|
|||
E result = part.put(element);
|
||||
if(result == null) { // new element
|
||||
size++;
|
||||
LOG.debug("partitionPGSet.put: added key {}, size is now {} ", key, size);
|
||||
} else {
|
||||
LOG.debug("partitionPGSet.put: replaced key {}, size is now {}",
|
||||
key, size);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
@ -230,19 +234,25 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
|
|||
try {
|
||||
long[] key = (long[]) inodeClass.
|
||||
getMethod("getNamespaceKey", int.class).invoke(e.getKey(), 2);
|
||||
long[] firstKey = new long[0];
|
||||
long[] firstKey = new long[key.length];
|
||||
if(part.iterator().hasNext()) {
|
||||
Object first = part.iterator().next();
|
||||
firstKey = (long[]) inodeClass.getMethod(
|
||||
long[] firstKeyRef = (long[]) inodeClass.getMethod(
|
||||
"getNamespaceKey", int.class).invoke(first, 2);
|
||||
Object parent = inodeClass.
|
||||
getMethod("getParent").invoke(first);
|
||||
long parentId = (parent == null ? 0L :
|
||||
(long) inodeClass.getMethod("getId").invoke(parent));
|
||||
for (int j=0; j < key.length; j++) {
|
||||
firstKey[j] = firstKeyRef[j];
|
||||
}
|
||||
firstKey[0] = parentId;
|
||||
}
|
||||
LOG.error("Partition #{}\t key: {}\t size: {}\t first: {}",
|
||||
i++, key, s, firstKey); // SHV should be info
|
||||
} catch (NoSuchElementException ex) {
|
||||
LOG.error("iterator.next() throws NoSuchElementException.");
|
||||
throw ex;
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Cannot find Method getNamespaceKey() in {}", inodeClass);
|
||||
}
|
||||
|
@ -250,8 +260,8 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
|
|||
partSizeAvg = (int) (totalSize / parts.size());
|
||||
LOG.error("Partition sizes: min = {}, avg = {}, max = {}, sum = {}",
|
||||
partSizeMin, partSizeAvg, partSizeMax, totalSize);
|
||||
LOG.error("Number of partitions: empty = {}, full = {}",
|
||||
numEmptyPartitions, numFullPartitions);
|
||||
LOG.error("Number of partitions: empty = {}, in-use = {}, full = {}",
|
||||
numEmptyPartitions, parts.size()-numEmptyPartitions, numFullPartitions);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -277,6 +287,8 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
|
|||
private Iterator<K> keyIterator;
|
||||
private Iterator<E> partitionIterator;
|
||||
|
||||
// Set partitionIterator to point to the first partition, or set it to null
|
||||
// when there is no partitions created for this PartitionedGSet.
|
||||
public EntryIterator() {
|
||||
keyIterator = partitions.keySet().iterator();
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import org.apache.hadoop.fs.permission.FsCreateModes;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
|
@ -146,7 +147,8 @@ class FSDirMkdirOp {
|
|||
existing = createSingleDirectory(fsd, existing, component, perm);
|
||||
if(existing == null) {
|
||||
FSNamesystem.LOG.error("unprotectedMkdir returned null for "
|
||||
+ iip.getPath() + " for " + new String(component) + " i = " + i);
|
||||
+ iip.getPath() + " for "
|
||||
+ new String(component, StandardCharsets.US_ASCII) + " i = " + i);
|
||||
// Somebody already created the parent. Recalculate existing
|
||||
existing = INodesInPath.resolve(fsd.getRoot(), iip.getPathComponents());
|
||||
i = existing.length() - 1;
|
||||
|
|
|
@ -17,7 +17,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||
import org.apache.hadoop.util.GSet;
|
||||
import org.apache.hadoop.util.LightWeightGSet;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -160,6 +165,8 @@ public class FSDirectory implements Closeable {
|
|||
private final int contentCountLimit; // max content summary counts per run
|
||||
private final long contentSleepMicroSec;
|
||||
private final INodeMap inodeMap; // Synchronized by dirLock
|
||||
// Temp InodeMap used when loading an FS image.
|
||||
private HashMap<Long, INodeWithAdditionalFields> tempInodeMap;
|
||||
private long yieldCount = 0; // keep track of lock yield count.
|
||||
private int quotaInitThreads;
|
||||
|
||||
|
@ -318,6 +325,11 @@ public class FSDirectory implements Closeable {
|
|||
this.inodeId = new INodeId();
|
||||
rootDir = createRoot(ns);
|
||||
inodeMap = INodeMap.newInstance(rootDir, ns);
|
||||
tempInodeMap = new HashMap<>(1000);
|
||||
|
||||
// add rootDir to inodeMapTemp.
|
||||
tempInodeMap.put(rootDir.getId(), rootDir);
|
||||
|
||||
this.isPermissionEnabled = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
|
||||
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
|
||||
|
@ -1476,6 +1488,23 @@ public class FSDirectory implements Closeable {
|
|||
return inodeMap;
|
||||
}
|
||||
|
||||
final void addToTempInodeMap(INode inode) {
|
||||
if (inode instanceof INodeWithAdditionalFields) {
|
||||
LOG.debug("addToTempInodeMap: id={}, inodeMapTemp.size={}",
|
||||
inode.getId(), tempInodeMap.size());
|
||||
tempInodeMap.put(inode.getId(), (INodeWithAdditionalFields) inode);
|
||||
if (!inode.isSymlink()) {
|
||||
final XAttrFeature xaf = inode.getXAttrFeature();
|
||||
addEncryptionZone((INodeWithAdditionalFields) inode, xaf);
|
||||
StoragePolicySatisfyManager spsManager =
|
||||
namesystem.getBlockManager().getSPSManager();
|
||||
if (spsManager != null && spsManager.isEnabled()) {
|
||||
addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is always called with writeLock of FSDirectory held.
|
||||
*/
|
||||
|
@ -1543,6 +1572,36 @@ public class FSDirectory implements Closeable {
|
|||
addEncryptionZone(rootDir, xaf);
|
||||
}
|
||||
|
||||
/**
|
||||
* After the inodes are set properly (set the parent for each inode), we move
|
||||
* them from INodeMapTemp to INodeMap.
|
||||
*/
|
||||
void moveInodes() throws IOException {
|
||||
long count=0, totalInodes = tempInodeMap.size();
|
||||
LOG.debug("inodeMapTemp={}", tempInodeMap);
|
||||
|
||||
for (Map.Entry e: tempInodeMap.entrySet()) {
|
||||
INodeWithAdditionalFields n = (INodeWithAdditionalFields)e.getValue();
|
||||
|
||||
LOG.debug("populate {}-th inode: id={}, fullpath={}",
|
||||
count, n.getId(), n.getFullPathName());
|
||||
|
||||
inodeMap.put(n);
|
||||
count++;
|
||||
}
|
||||
|
||||
if (count != totalInodes) {
|
||||
String msg = String.format("moveInodes: expected to move %l inodes, " +
|
||||
"but moved %l inodes", totalInodes, count);
|
||||
throw new IOException(msg);
|
||||
}
|
||||
|
||||
//inodeMap.show();
|
||||
tempInodeMap.clear();
|
||||
assert(tempInodeMap.isEmpty());
|
||||
tempInodeMap = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is always called with writeLock of FSDirectory held.
|
||||
*/
|
||||
|
@ -1860,6 +1919,17 @@ public class FSDirectory implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
public INode getInode(INode inode) {
|
||||
return inodeMap.get(inode);
|
||||
}
|
||||
public INode getInodeFromTempINodeMap(long id) {
|
||||
LOG.debug("getInodeFromTempINodeMap: id={}, TempINodeMap.size={}",
|
||||
id, tempInodeMap.size());
|
||||
if (id < INodeId.ROOT_INODE_ID)
|
||||
return null;
|
||||
|
||||
return tempInodeMap.get(id);
|
||||
}
|
||||
@VisibleForTesting
|
||||
FSPermissionChecker getPermissionChecker(String fsOwner, String superGroup,
|
||||
UserGroupInformation ugi) throws AccessControlException {
|
||||
|
|
|
@ -761,6 +761,16 @@ public class FSImage implements Closeable {
|
|||
"above for more info.");
|
||||
}
|
||||
prog.endPhase(Phase.LOADING_FSIMAGE);
|
||||
|
||||
/*
|
||||
* loadEdits always sets the parent of an inode before adding the inode to
|
||||
* inodeMap. So, it is safe to move inodes from inodeMapTemp to inodeMap
|
||||
* before loadEdits.
|
||||
*/
|
||||
FSDirectory dir = target.getFSDirectory();
|
||||
dir.moveInodes();
|
||||
LOG.info("LOADING_FSIMAGE: loaded {} inodes into inodeMap",
|
||||
dir.getINodeMap().size());
|
||||
|
||||
if (!rollingRollback) {
|
||||
prog.beginPhase(Phase.LOADING_EDITS);
|
||||
|
@ -776,6 +786,8 @@ public class FSImage implements Closeable {
|
|||
needToSave = false;
|
||||
}
|
||||
editLog.setNextTxId(lastAppliedTxId + 1);
|
||||
LOG.info("LOADING_EDITS: loaded {} inodes into inodeMap",
|
||||
dir.getINodeMap().size());
|
||||
return needToSave;
|
||||
}
|
||||
|
||||
|
|
|
@ -276,9 +276,10 @@ public final class FSImageFormatPBINode {
|
|||
if (e == null) {
|
||||
break;
|
||||
}
|
||||
INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
|
||||
INodeDirectory p =
|
||||
dir.getInodeFromTempINodeMap(e.getParent()).asDirectory();
|
||||
for (long id : e.getChildrenList()) {
|
||||
INode child = dir.getInode(id);
|
||||
INode child = dir.getInodeFromTempINodeMap(id);
|
||||
if (!addToParent(p, child)) {
|
||||
LOG.warn("Failed to add the inode {} to the directory {}",
|
||||
child.getId(), p.getId());
|
||||
|
@ -382,6 +383,7 @@ public final class FSImageFormatPBINode {
|
|||
if (p == null) {
|
||||
break;
|
||||
}
|
||||
LOG.debug("loadINodesInSection: cntr={}, inode={}", cntr, p.getId());
|
||||
if (p.getId() == INodeId.ROOT_INODE_ID) {
|
||||
synchronized(this) {
|
||||
loadRootINode(p);
|
||||
|
@ -389,7 +391,7 @@ public final class FSImageFormatPBINode {
|
|||
} else {
|
||||
INode n = loadINode(p);
|
||||
synchronized(this) {
|
||||
dir.addToInodeMap(n);
|
||||
dir.addToTempInodeMap(n);
|
||||
}
|
||||
fillUpInodeList(inodeList, n);
|
||||
}
|
||||
|
@ -761,7 +763,7 @@ public final class FSImageFormatPBINode {
|
|||
DirEntry.newBuilder().setParent(n.getId());
|
||||
for (INode inode : children) {
|
||||
// Error if the child inode doesn't exist in inodeMap
|
||||
if (dir.getInode(inode.getId()) == null) {
|
||||
if (dir.getInode(inode) == null) {
|
||||
FSImage.LOG.error(
|
||||
"FSImageFormatPBINode#serializeINodeDirectorySection: " +
|
||||
"Dangling child pointer found. Missing INode in " +
|
||||
|
@ -812,6 +814,7 @@ public final class FSImageFormatPBINode {
|
|||
Iterator<INodeWithAdditionalFields> iter = inodesMap.getMapIterator();
|
||||
while (iter.hasNext()) {
|
||||
INodeWithAdditionalFields n = iter.next();
|
||||
LOG.debug("i = {}, save inode: {}", i, n);
|
||||
save(out, n);
|
||||
++i;
|
||||
if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
@ -35,12 +36,12 @@ import org.apache.hadoop.util.PartitionedGSet;
|
|||
* and INode.
|
||||
*/
|
||||
public class INodeMap {
|
||||
static final int NAMESPACE_KEY_DEBTH = 2;
|
||||
static final int NAMESPACE_KEY_DEPTH = 2;
|
||||
static final int NUM_RANGES_STATIC = 256; // power of 2
|
||||
|
||||
public static class INodeKeyComparator implements Comparator<INode> {
|
||||
INodeKeyComparator() {
|
||||
FSDirectory.LOG.info("Namespace key debth = {}", NAMESPACE_KEY_DEBTH);
|
||||
FSDirectory.LOG.info("Namespace key depth = {}", NAMESPACE_KEY_DEPTH);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -48,9 +49,9 @@ public class INodeMap {
|
|||
if (i1 == null || i2 == null) {
|
||||
throw new NullPointerException("Cannot compare null INodes");
|
||||
}
|
||||
long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEBTH);
|
||||
long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEBTH);
|
||||
for(int l = 0; l < NAMESPACE_KEY_DEBTH; l++) {
|
||||
long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEPTH);
|
||||
long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEPTH);
|
||||
for(int l = 0; l < NAMESPACE_KEY_DEPTH; l++) {
|
||||
if(key1[l] == key2[l]) continue;
|
||||
return (key1[l] < key2[l] ? -1 : 1);
|
||||
}
|
||||
|
@ -64,7 +65,7 @@ public class INodeMap {
|
|||
*/
|
||||
public static class HPINodeKeyComparator implements Comparator<INode> {
|
||||
HPINodeKeyComparator() {
|
||||
FSDirectory.LOG.info("Namespace key debth = {}", NAMESPACE_KEY_DEBTH);
|
||||
FSDirectory.LOG.info("Namespace key depth = {}", NAMESPACE_KEY_DEPTH);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -72,13 +73,13 @@ public class INodeMap {
|
|||
if (i1 == null || i2 == null) {
|
||||
throw new NullPointerException("Cannot compare null INodes");
|
||||
}
|
||||
long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEBTH);
|
||||
long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEBTH);
|
||||
long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEPTH);
|
||||
long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEPTH);
|
||||
long key1_0 = INode.indexOf(key1);
|
||||
long key2_0 = INode.indexOf(key2);
|
||||
if(key1_0 != key2_0)
|
||||
return (key1_0 < key2_0 ? -1 : 1);
|
||||
for(int l = 1; l < NAMESPACE_KEY_DEBTH; l++) {
|
||||
for(int l = 1; l < NAMESPACE_KEY_DEPTH; l++) {
|
||||
if(key1[l] == key2[l]) continue;
|
||||
return (key1[l] < key2[l] ? -1 : 1);
|
||||
}
|
||||
|
@ -202,8 +203,8 @@ public class INodeMap {
|
|||
PermissionStatus perm = new PermissionStatus(
|
||||
"", "", new FsPermission((short) 0));
|
||||
for(int p = 0; p < NUM_RANGES_STATIC; p++) {
|
||||
INodeDirectory key = new INodeDirectory(
|
||||
INodeId.ROOT_INODE_ID, "range key".getBytes(), perm, 0);
|
||||
INodeDirectory key = new INodeDirectory(INodeId.ROOT_INODE_ID,
|
||||
"range key".getBytes(StandardCharsets.UTF_8), perm, 0);
|
||||
key.setParent(new INodeDirectory((long)p, null, perm, 0));
|
||||
pgs.addNewPartition(key);
|
||||
}
|
||||
|
@ -244,48 +245,58 @@ public class INodeMap {
|
|||
* such {@link INode} in the map.
|
||||
*/
|
||||
public INode get(long id) {
|
||||
INode inode = new INodeWithAdditionalFields(id, null, new PermissionStatus(
|
||||
"", "", new FsPermission((short) 0)), 0, 0) {
|
||||
|
||||
@Override
|
||||
void recordModification(int latestSnapshotId) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroyAndCollectBlocks(ReclaimContext reclaimContext) {
|
||||
// Nothing to do
|
||||
}
|
||||
PartitionedGSet<INode, INodeWithAdditionalFields> pgs =
|
||||
(PartitionedGSet<INode, INodeWithAdditionalFields>) map;
|
||||
/*
|
||||
* Convert a long inode id into an INode object. We only need to compare
|
||||
* two inodes by inode id. So, it can be any type of INode object.
|
||||
*/
|
||||
INode inode = new INodeDirectory(id, null,
|
||||
new PermissionStatus("", "", new FsPermission((short) 0)), 0);
|
||||
|
||||
@Override
|
||||
public QuotaCounts computeQuotaUsage(
|
||||
BlockStoragePolicySuite bsps, byte blockStoragePolicyId,
|
||||
boolean useCache, int lastSnapshotId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContentSummaryComputationContext computeContentSummary(
|
||||
int snapshotId, ContentSummaryComputationContext summary) {
|
||||
return null;
|
||||
}
|
||||
/*
|
||||
* Iterate all partitions of PGSet and return the INode.
|
||||
* Just for fallback.
|
||||
*/
|
||||
PermissionStatus perm =
|
||||
new PermissionStatus("", "", new FsPermission((short) 0));
|
||||
// TODO: create a static array, to avoid creation of keys each time.
|
||||
for (int p = 0; p < NUM_RANGES_STATIC; p++) {
|
||||
INodeDirectory key = new INodeDirectory(INodeId.ROOT_INODE_ID,
|
||||
"range key".getBytes(StandardCharsets.UTF_8), perm, 0);
|
||||
key.setParent(new INodeDirectory((long)p, null, perm, 0));
|
||||
PartitionedGSet.PartitionEntry e = pgs.getPartition(key);
|
||||
|
||||
@Override
|
||||
public void cleanSubtree(
|
||||
ReclaimContext reclaimContext, int snapshotId, int priorSnapshotId) {
|
||||
if (e.contains(inode)) {
|
||||
return (INode) e.get(inode);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getStoragePolicyID(){
|
||||
return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getLocalStoragePolicyID() {
|
||||
return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
|
||||
}
|
||||
};
|
||||
|
||||
return map.get(inode);
|
||||
public INode get(INode inode) {
|
||||
|
||||
/*
|
||||
* Check whether the Inode has (NAMESPACE_KEY_DEPTH - 1) levels of parent
|
||||
* dirs
|
||||
*/
|
||||
int i = NAMESPACE_KEY_DEPTH - 1;
|
||||
INode tmpInode = inode;
|
||||
while (i > 0 && tmpInode.getParent() != null) {
|
||||
tmpInode = tmpInode.getParent();
|
||||
i--;
|
||||
}
|
||||
|
||||
/*
|
||||
* If the Inode has (NAMESPACE_KEY_DEPTH - 1) levels of parent dirs,
|
||||
* use map.get(); else, fall back to get INode based on Inode ID.
|
||||
*/
|
||||
if (i == 0) {
|
||||
return map.get(inode);
|
||||
} else {
|
||||
return get(inode.getId());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue