Compare commits

...

6 Commits
trunk ... fgl

16 changed files with 1420 additions and 136 deletions

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
/**
* LatchLock controls two hierarchical Read/Write locks:
* the topLock and the childLock.
* Typically an operation starts with the topLock already acquired.
* To acquire child lock LatchLock will
* first acquire the childLock, and then release the topLock.
*/
public abstract class LatchLock<C> {
// Interfaces methods to be defined for subclasses
/** @return true topLock is locked for read by any thread */
protected abstract boolean isReadTopLocked();
/** @return true topLock is locked for write by any thread */
protected abstract boolean isWriteTopLocked();
protected abstract void readTopUnlock();
protected abstract void writeTopUnlock();
protected abstract boolean hasReadChildLock();
protected abstract void readChildLock();
protected abstract void readChildUnlock();
protected abstract boolean hasWriteChildLock();
protected abstract void writeChildLock();
protected abstract void writeChildUnlock();
protected abstract LatchLock<C> clone();
// Public APIs to use with the class
public void readLock() {
readChildLock();
readTopUnlock();
}
public void readUnlock() {
readChildUnlock();
}
public void writeLock() {
writeChildLock();
writeTopUnlock();
}
public void writeUnlock() {
writeChildUnlock();
}
}

View File

@ -0,0 +1,348 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.NoSuchElementException;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
/**
* An implementation of {@link GSet}, which splits a collection of elements
* into partitions each corresponding to a range of keys.
*
* This class does not support null element.
*
* This class is backed up by LatchLock for hierarchical synchronization.
*
* @param <K> Key type for looking up the elements
* @param <E> Element type, which must be
* (1) a subclass of K, and
* (2) implementing {@link LinkedElement} interface.
*/
@InterfaceAudience.Private
public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
private static final int DEFAULT_PARTITION_CAPACITY = 65536; // 4096; // 5120; // 2048; // 1027;
private static final float DEFAULT_PARTITION_OVERFLOW = 1.8f;
/**
* An ordered map of contiguous segments of elements.
* Each key in the map represent the smallest key in the mapped segment,
* so that all elements in this segment are >= the mapping key,
* but are smaller then the next key in the map.
* Elements within a partition do not need to be ordered.
*/
private final NavigableMap<K, PartitionEntry> partitions;
private LatchLock<?> latchLock;
/**
* The number of elements in the set.
*/
protected volatile int size;
/**
* A single partition of the {@link PartitionedGSet}.
* Consists of a hash table {@link LightWeightGSet} and a lock, which
* controls access to this partition independently on the other ones.
*/
public class PartitionEntry extends LightWeightGSet<K, E> {
private final LatchLock<?> partLock;
PartitionEntry(int defaultPartitionCapacity) {
super(defaultPartitionCapacity);
this.partLock = latchLock.clone();
}
}
public PartitionedGSet(final int capacity,
final Comparator<? super K> comparator,
final LatchLock<?> latchLock) {
this.partitions = new TreeMap<K, PartitionEntry>(comparator);
this.latchLock = latchLock;
// addNewPartition(rootKey).put(rootKey);
// this.size = 1;
this.size = 0;
LOG.info("Partition capacity = {}", DEFAULT_PARTITION_CAPACITY);
LOG.info("Partition overflow factor = {}", DEFAULT_PARTITION_OVERFLOW);
}
/**
* Creates new empty partition.
* @param key
* @return
*/
public PartitionEntry addNewPartition(final K key) {
Entry<K, PartitionEntry> lastEntry = partitions.lastEntry();
PartitionEntry lastPart = null;
if(lastEntry != null)
lastPart = lastEntry.getValue();
PartitionEntry newPart =
new PartitionEntry(DEFAULT_PARTITION_CAPACITY);
// assert size == 0 || newPart.partLock.isWriteTopLocked() :
// "Must hold write Lock: key = " + key;
PartitionEntry oldPart = partitions.put(key, newPart);
assert oldPart == null :
"RangeMap already has a partition associated with " + key;
LOG.debug("Total GSet size = {}", size);
LOG.debug("Number of partitions = {}", partitions.size());
LOG.debug("Previous partition size = {}",
lastPart == null ? 0 : lastPart.size());
return newPart;
}
@Override
public int size() {
return size;
}
public PartitionEntry getPartition(final K key) {
Entry<K, PartitionEntry> partEntry = partitions.floorEntry(key);
if(partEntry == null) {
return null;
}
PartitionEntry part = partEntry.getValue();
if(part == null) {
throw new IllegalStateException("Null partition for key: " + key);
}
assert size == 0 || part.partLock.isReadTopLocked() ||
part.partLock.hasReadChildLock() : "Must hold read Lock: key = " + key;
return part;
}
@Override
public boolean contains(final K key) {
PartitionEntry part = getPartition(key);
if(part == null) {
return false;
}
return part.contains(key);
}
@Override
public E get(final K key) {
PartitionEntry part = getPartition(key);
if(part == null) {
return null;
}
LOG.debug("get key: {}", key);
// part.partLock.readLock();
return part.get(key);
}
@Override
public E put(final E element) {
K key = element;
PartitionEntry part = getPartition(key);
if(part == null) {
throw new HadoopIllegalArgumentException("Illegal key: " + key);
}
assert size == 0 || part.partLock.isWriteTopLocked() ||
part.partLock.hasWriteChildLock() :
"Must hold write Lock: key = " + key;
LOG.debug("put key: {}", key);
PartitionEntry newPart = addNewPartitionIfNeeded(part, key);
if(newPart != part) {
newPart.partLock.writeChildLock();
part = newPart;
}
E result = part.put(element);
if(result == null) { // new element
size++;
LOG.debug("partitionPGSet.put: added key {}, size is now {} ", key, size);
} else {
LOG.debug("partitionPGSet.put: replaced key {}, size is now {}",
key, size);
}
return result;
}
private PartitionEntry addNewPartitionIfNeeded(
PartitionEntry curPart, K key) {
if(curPart.size() < DEFAULT_PARTITION_CAPACITY * DEFAULT_PARTITION_OVERFLOW
|| curPart.contains(key)) {
return curPart;
}
return addNewPartition(key);
}
@Override
public E remove(final K key) {
PartitionEntry part = getPartition(key);
if(part == null) {
return null;
}
E result = part.remove(key);
if(result != null) {
size--;
}
return result;
}
@Override
public void clear() {
LOG.error("Total GSet size = {}", size);
LOG.error("Number of partitions = {}", partitions.size());
printStats();
// assert latchLock.hasWriteTopLock() : "Must hold write topLock";
// SHV May need to clear all partitions?
partitions.clear();
size = 0;
}
private void printStats() {
int partSizeMin = Integer.MAX_VALUE, partSizeAvg = 0, partSizeMax = 0;
long totalSize = 0;
int numEmptyPartitions = 0, numFullPartitions = 0;
Collection<PartitionEntry> parts = partitions.values();
Set<Entry<K, PartitionEntry>> entries = partitions.entrySet();
int i = 0;
for(Entry<K, PartitionEntry> e : entries) {
PartitionEntry part = e.getValue();
int s = part.size;
if(s == 0) numEmptyPartitions++;
if(s > DEFAULT_PARTITION_CAPACITY) numFullPartitions++;
totalSize += s;
partSizeMin = (s < partSizeMin ? s : partSizeMin);
partSizeMax = (partSizeMax < s ? s : partSizeMax);
Class<?> inodeClass = e.getKey().getClass();
try {
long[] key = (long[]) inodeClass.
getMethod("getNamespaceKey", int.class).invoke(e.getKey(), 2);
long[] firstKey = new long[key.length];
if(part.iterator().hasNext()) {
Object first = part.iterator().next();
long[] firstKeyRef = (long[]) inodeClass.getMethod(
"getNamespaceKey", int.class).invoke(first, 2);
Object parent = inodeClass.
getMethod("getParent").invoke(first);
long parentId = (parent == null ? 0L :
(long) inodeClass.getMethod("getId").invoke(parent));
for (int j=0; j < key.length; j++) {
firstKey[j] = firstKeyRef[j];
}
firstKey[0] = parentId;
}
LOG.error("Partition #{}\t key: {}\t size: {}\t first: {}",
i++, key, s, firstKey); // SHV should be info
} catch (NoSuchElementException ex) {
LOG.error("iterator.next() throws NoSuchElementException.");
throw ex;
} catch (Exception ex) {
LOG.error("Cannot find Method getNamespaceKey() in {}", inodeClass);
}
}
partSizeAvg = (int) (totalSize / parts.size());
LOG.error("Partition sizes: min = {}, avg = {}, max = {}, sum = {}",
partSizeMin, partSizeAvg, partSizeMax, totalSize);
LOG.error("Number of partitions: empty = {}, in-use = {}, full = {}",
numEmptyPartitions, parts.size()-numEmptyPartitions, numFullPartitions);
}
@Override
public Collection<E> values() {
// TODO Auto-generated method stub
return null;
}
@Override
public Iterator<E> iterator() {
return new EntryIterator();
}
/**
* Iterator over the elements in the set.
* Iterates first by keys, then inside the partition
* corresponding to the key.
*
* Modifications are tracked by the underlying collections. We allow
* modifying other partitions, while iterating through the current one.
*/
private class EntryIterator implements Iterator<E> {
private Iterator<K> keyIterator;
private Iterator<E> partitionIterator;
// Set partitionIterator to point to the first partition, or set it to null
// when there is no partitions created for this PartitionedGSet.
public EntryIterator() {
keyIterator = partitions.keySet().iterator();
if (!keyIterator.hasNext()) {
partitionIterator = null;
return;
}
K firstKey = keyIterator.next();
partitionIterator = partitions.get(firstKey).iterator();
}
@Override
public boolean hasNext() {
// Special case: an iterator was created for an empty PartitionedGSet.
// Check whether new partitions have been added since then.
if (partitionIterator == null) {
if (partitions.size() == 0) {
return false;
} else {
keyIterator = partitions.keySet().iterator();
K nextKey = keyIterator.next();
partitionIterator = partitions.get(nextKey).iterator();
}
}
while(!partitionIterator.hasNext()) {
if(!keyIterator.hasNext()) {
return false;
}
K curKey = keyIterator.next();
partitionIterator = getPartition(curKey).iterator();
}
return partitionIterator.hasNext();
}
@Override
public E next() {
if (!hasNext()) {
throw new NoSuchElementException("No more elements in this set.");
}
return partitionIterator.next();
}
}
public void latchWriteLock(K[] keys) {
// getPartition(parent).partLock.writeChildLock();
LatchLock<?> pLock = null;
for(K key : keys) {
pLock = getPartition(key).partLock;
pLock.writeChildLock();
}
assert pLock != null : "pLock is null";
pLock.writeTopUnlock();
}
}

View File

@ -0,0 +1,270 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Testing {@link PartitionedGSet} */
public class TestPartitionedGSet {
public static final Logger LOG =
LoggerFactory.getLogger(TestPartitionedGSet.class);
private static final int ELEMENT_NUM = 100;
/**
* Generate positive random numbers for testing. We want to use only positive
* numbers because the smallest partition used in testing is 0.
*
* @param length
* number of random numbers to be generated.
*
* @param randomSeed
* seed to be used for random number generator.
*
* @return
* An array of Integers
*/
private static ArrayList<Integer> getRandomList(int length, int randomSeed) {
Random random = new Random(randomSeed);
ArrayList<Integer> list = new ArrayList<Integer>(length);
for (int i = 0; i < length; i++) {
list.add(random.nextInt(Integer.MAX_VALUE));
}
return list;
}
private static class TestElement implements LinkedElement {
private final int val;
private LinkedElement next;
TestElement(int val) {
this.val = val;
this.next = null;
}
public int getVal() {
return val;
}
@Override
public void setNext(LinkedElement next) {
this.next = next;
}
@Override
public LinkedElement getNext() {
return next;
}
}
private static class TestElementComparator implements Comparator<TestElement>
{
@Override
public int compare(TestElement e1, TestElement e2) {
if (e1 == null || e2 == null) {
throw new NullPointerException("Cannot compare null elements");
}
return e1.getVal() - e2.getVal();
}
}
protected ReentrantReadWriteLock topLock =
new ReentrantReadWriteLock(false);
/**
* We are NOT testing any concurrent access to a PartitionedGSet here.
*/
private class NoOpLock extends LatchLock<ReentrantReadWriteLock> {
private ReentrantReadWriteLock childLock;
public NoOpLock() {
childLock = new ReentrantReadWriteLock(false);
}
@Override
protected boolean isReadTopLocked() {
return topLock.getReadLockCount() > 0 || isWriteTopLocked();
}
@Override
protected boolean isWriteTopLocked() {
return topLock.isWriteLocked();
}
@Override
protected void readTopUnlock() {
topLock.readLock().unlock();
}
@Override
protected void writeTopUnlock() {
topLock.writeLock().unlock();
}
@Override
protected boolean hasReadChildLock() {
return childLock.getReadLockCount() > 0 || hasWriteChildLock();
}
@Override
protected void readChildLock() {
childLock.readLock().lock();
}
@Override
protected void readChildUnlock() {
childLock.readLock().unlock();
}
@Override
protected boolean hasWriteChildLock() {
return childLock.isWriteLockedByCurrentThread();
}
@Override
protected void writeChildLock() {
childLock.writeLock().lock();
}
@Override
protected void writeChildUnlock() {
childLock.writeLock().unlock();
}
@Override
protected LatchLock<ReentrantReadWriteLock> clone() {
return new NoOpLock();
}
}
/**
* Test iterator for a PartitionedGSet with no partitions.
*/
@Test(timeout=60000)
public void testIteratorForNoPartition() {
PartitionedGSet<TestElement, TestElement> set =
new PartitionedGSet<TestElement, TestElement>(
16, new TestElementComparator(), new NoOpLock());
topLock.readLock().lock();
int count = 0;
Iterator<TestElement> iter = set.iterator();
while( iter.hasNext() ) {
iter.next();
count ++;
}
topLock.readLock().unlock();
Assert.assertEquals(0, count);
}
/**
* Test iterator for a PartitionedGSet with empty partitions.
*/
@Test(timeout=60000)
public void testIteratorForEmptyPartitions() {
PartitionedGSet<TestElement, TestElement> set =
new PartitionedGSet<TestElement, TestElement>(
16, new TestElementComparator(), new NoOpLock());
set.addNewPartition(new TestElement(0));
set.addNewPartition(new TestElement(1000));
set.addNewPartition(new TestElement(2000));
topLock.readLock().lock();
int count = 0;
Iterator<TestElement> iter = set.iterator();
while( iter.hasNext() ) {
iter.next();
count ++;
}
topLock.readLock().unlock();
Assert.assertEquals(0, count);
}
/**
* Test whether the iterator can return the same number of elements as stored
* into the PartitionedGSet.
*/
@Test(timeout=60000)
public void testIteratorCountElements() {
ArrayList<Integer> list = getRandomList(ELEMENT_NUM, 123);
PartitionedGSet<TestElement, TestElement> set =
new PartitionedGSet<TestElement, TestElement>(
16, new TestElementComparator(), new NoOpLock());
set.addNewPartition(new TestElement(0));
set.addNewPartition(new TestElement(1000));
set.addNewPartition(new TestElement(2000));
topLock.writeLock().lock();
for (Integer i : list) {
set.put(new TestElement(i));
}
topLock.writeLock().unlock();
topLock.readLock().lock();
int count = 0;
Iterator<TestElement> iter = set.iterator();
while( iter.hasNext() ) {
iter.next();
count ++;
}
topLock.readLock().unlock();
Assert.assertEquals(ELEMENT_NUM, count);
}
/**
* Test iterator when it is created before partitions/elements are
* added to the PartitionedGSet.
*/
@Test(timeout=60000)
public void testIteratorAddElementsAfterIteratorCreation() {
PartitionedGSet<TestElement, TestElement> set =
new PartitionedGSet<TestElement, TestElement>(
16, new TestElementComparator(), new NoOpLock());
// Create the iterator before partitions are added.
Iterator<TestElement> iter = set.iterator();
set.addNewPartition(new TestElement(0));
set.addNewPartition(new TestElement(1000));
set.addNewPartition(new TestElement(2000));
// Added one element
topLock.writeLock().lock();
set.put(new TestElement(2500));
topLock.writeLock().unlock();
topLock.readLock().lock();
int count = 0;
while( iter.hasNext() ) {
iter.next();
count ++;
}
topLock.readLock().unlock();
Assert.assertEquals(1, count);
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.nio.charset.StandardCharsets;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@ -69,18 +70,7 @@ class FSDirMkdirOp {
// create multiple inodes.
fsn.checkFsObjectLimit();
// Ensure that the user can traversal the path by adding implicit
// u+wx permission to all ancestor directories.
INodesInPath existing =
createParentDirectories(fsd, iip, permissions, false);
if (existing != null) {
existing = createSingleDirectory(
fsd, existing, iip.getLastLocalName(), permissions);
}
if (existing == null) {
throw new IOException("Failed to create directory: " + src);
}
iip = existing;
iip = createMissingDirs(fsd, iip, permissions, false);
}
return fsd.getAuditFileInfo(iip);
} finally {
@ -88,6 +78,36 @@ class FSDirMkdirOp {
}
}
static INodesInPath createMissingDirs(FSDirectory fsd, INodesInPath iip,
PermissionStatus permissions, boolean inheritPerms) throws IOException {
PermissionStatus basePerm = inheritPerms ?
iip.getExistingINodes().getLastINode().getPermissionStatus() :
permissions;
// create all missing directories along the path,
// but don't add them to the INodeMap yet
permissions = addImplicitUwx(basePerm, permissions);
INode[] missing = createPathDirectories(fsd, iip, permissions);
iip = iip.getExistingINodes();
if (missing.length == 0) {
return iip;
}
// switch the locks
fsd.getINodeMap().latchWriteLock(iip, missing);
int counter = 0;
// Add missing inodes to the INodeMap
for (INode dir : missing) {
if (counter++ == missing.length - 1) {
//Last folder in the path, use the user given permission
//For MKDIR - refers to the permission given by the user
//For create - refers to the parent directory permission.
permissions = basePerm;
}
iip = addSingleDirectory(fsd, iip, dir, permissions);
assert iip != null : "iip should not be null";
}
return iip;
}
/**
* For a given absolute path, create all ancestors as directories along the
* path. All ancestors inherit their parent's permission plus an implicit
@ -132,6 +152,7 @@ class FSDirMkdirOp {
if (missing == 0) { // full path exists, return parents.
existing = iip.getParentINodesInPath();
} else if (missing > 1) { // need to create at least one ancestor dir.
FSNamesystem.LOG.error("missing = " + missing);
// Ensure that the user can traversal the path by adding implicit
// u+wx permission to all ancestor directories.
PermissionStatus basePerm = inheritPerms
@ -143,6 +164,14 @@ class FSDirMkdirOp {
for (int i = existing.length(); existing != null && i <= last; i++) {
byte[] component = iip.getPathComponent(i);
existing = createSingleDirectory(fsd, existing, component, perm);
if(existing == null) {
FSNamesystem.LOG.error("unprotectedMkdir returned null for "
+ iip.getPath() + " for "
+ new String(component, StandardCharsets.US_ASCII) + " i = " + i);
// Somebody already created the parent. Recalculate existing
existing = INodesInPath.resolve(fsd.getRoot(), iip.getPathComponents());
i = existing.length() - 1;
}
}
}
return existing;
@ -228,5 +257,70 @@ class FSDirMkdirOp {
}
return iip;
}
private static INode createDirectoryINode(FSDirectory fsd,
INodesInPath parent, byte[] name, PermissionStatus permission)
throws FileAlreadyExistsException {
assert fsd.hasReadLock();
assert parent.getLastINode() != null;
if (!parent.getLastINode().isDirectory()) {
throw new FileAlreadyExistsException("Parent path is not a directory: " +
parent.getPath() + " " + DFSUtil.bytes2String(name));
}
final INodeDirectory dir = new INodeDirectory(
fsd.allocateNewInodeId(), name, permission, now());
return dir;
}
/**
* Find-out missing iNodes for the current mkdir OP.
*/
private static INode[] createPathDirectories(FSDirectory fsd,
INodesInPath iip, PermissionStatus perm)
throws IOException {
assert fsd.hasWriteLock();
INodesInPath existing = iip.getExistingINodes();
assert existing != null : "existing should not be null";
int numMissing = iip.length() - existing.length();
if (numMissing == 0) { // full path exists
return new INode[0];
}
// create the missing directories along the path
INode[] missing = new INode[numMissing];
final int last = iip.length();
for (int i = existing.length(); i < last; i++) {
byte[] component = iip.getPathComponent(i);
missing[i - existing.length()] =
createDirectoryINode(fsd, existing, component, perm);
}
return missing;
}
private static INodesInPath addSingleDirectory(FSDirectory fsd,
INodesInPath existing, INode dir, PermissionStatus perm)
throws IOException {
assert fsd.hasWriteLock();
INodesInPath iip = fsd.addLastINode(existing, dir, perm.getPermission(), true);
if (iip == null) {
FSNamesystem.LOG.debug("somebody already created {} on path {}", dir, existing.getPath());
final INodeDirectory parent = existing.getLastINode().asDirectory();
dir = parent.getChild(dir.getLocalNameBytes(), Snapshot.CURRENT_STATE_ID);
return INodesInPath.append(existing, dir, dir.getLocalNameBytes());
}
existing = iip;
assert dir.equals(existing.getLastINode()) : "dir is not the last INode";
// Directory creation also count towards FilesCreated
// to match count of FilesDeleted metric.
NameNode.getNameNodeMetrics().incrFilesCreated();
assert dir.getPermissionStatus().getGroupName() != null :
"GroupName is null for " + existing.getPath();
String cur = existing.getPath();
fsd.getEditLog().logMkDir(cur, dir);
NameNode.stateChangeLog.debug("mkdirs: created directory {}", cur);
return existing;
}
}

View File

@ -228,6 +228,13 @@ class FSDirWriteFileOp {
// while chooseTarget() was executing.
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
INodesInPath iip = fsn.dir.resolvePath(null, src, fileId);
INode[] missing = new INode[]{iip.getLastINode()};
INodesInPath existing = iip.getParentINodesInPath();
FSDirectory fsd = fsn.getFSDirectory();
// switch the locks
fsd.getINodeMap().latchWriteLock(existing, missing);
FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
previous, onRetryBlock);
final INodeFile pendingFile = fileState.inode;
@ -392,8 +399,8 @@ class FSDirWriteFileOp {
}
fsn.checkFsObjectLimit();
INodeFile newNode = null;
INodesInPath parent =
FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions);
INodesInPath parent = FSDirMkdirOp.createMissingDirs(fsd,
iip.getParentINodesInPath(), permissions, true);
if (parent != null) {
iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
replication, blockSize, holder, clientMachine, shouldReplicate,
@ -541,41 +548,22 @@ class FSDirWriteFileOp {
FSDirectory fsd, INodesInPath existing, byte[] localName,
PermissionStatus permissions, short replication, long preferredBlockSize,
String clientName, String clientMachine, boolean shouldReplicate,
String ecPolicyName, String storagePolicy) throws IOException {
String ecPolicyName, String storagePolicy)
throws IOException {
Preconditions.checkNotNull(existing);
long modTime = now();
INodesInPath newiip;
fsd.writeLock();
try {
boolean isStriped = false;
ErasureCodingPolicy ecPolicy = null;
byte storagepolicyid = 0;
if (storagePolicy != null && !storagePolicy.isEmpty()) {
BlockStoragePolicy policy =
fsd.getBlockManager().getStoragePolicy(storagePolicy);
if (policy == null) {
throw new HadoopIllegalArgumentException(
"Cannot find a block policy with the name " + storagePolicy);
}
storagepolicyid = policy.getId();
}
if (!shouldReplicate) {
ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
fsd.getFSNamesystem(), ecPolicyName, existing);
if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {
isStriped = true;
}
}
final BlockType blockType = isStriped ?
BlockType.STRIPED : BlockType.CONTIGUOUS;
final Short replicationFactor = (!isStriped ? replication : null);
final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize,
storagepolicyid, blockType);
newNode.setLocalName(localName);
newNode.toUnderConstruction(clientName, clientMachine);
INodeFile newNode = createINodeFile(fsd, existing, localName,
permissions, replication, preferredBlockSize, clientName,
clientMachine, shouldReplicate, ecPolicyName, storagePolicy, modTime);
INode[] missing = new INode[] {newNode};
// switch the locks
fsd.getINodeMap().latchWriteLock(existing, missing);
newiip = fsd.addINode(existing, newNode, permissions.getPermission());
} finally {
fsd.writeUnlock();
@ -593,6 +581,42 @@ class FSDirWriteFileOp {
return newiip;
}
private static INodeFile createINodeFile(FSDirectory fsd,
INodesInPath existing, byte[] localName, PermissionStatus permissions,
short replication, long preferredBlockSize, String clientName,
String clientMachine, boolean shouldReplicate, String ecPolicyName,
String storagePolicy, long modTime) throws IOException {
boolean isStriped = false;
ErasureCodingPolicy ecPolicy = null;
byte storagepolicyid = 0;
if (storagePolicy != null && !storagePolicy.isEmpty()) {
BlockStoragePolicy policy =
fsd.getBlockManager().getStoragePolicy(storagePolicy);
if (policy == null) {
throw new HadoopIllegalArgumentException(
"Cannot find a block policy with the name " + storagePolicy);
}
storagepolicyid = policy.getId();
}
if (!shouldReplicate) {
ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
fsd.getFSNamesystem(), ecPolicyName, existing);
if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {
isStriped = true;
}
}
final BlockType blockType = isStriped ?
BlockType.STRIPED : BlockType.CONTIGUOUS;
final Short replicationFactor = (!isStriped ? replication : null);
final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize,
storagepolicyid, blockType);
newNode.setLocalName(localName);
newNode.toUnderConstruction(clientName, clientMachine);
return newNode;
}
private static FileState analyzeFileState(
FSNamesystem fsn, INodesInPath iip, long fileId, String clientName,
ExtendedBlock previous, LocatedBlock[] onRetryBlock)
@ -687,6 +711,14 @@ class FSDirWriteFileOp {
}
checkBlock(fsn, last);
INodesInPath iip = fsn.dir.resolvePath(pc, src, fileId);
assert (iip.getLastINode() instanceof INodeFile);
INode[] missing = new INode[] {iip.getLastINode()};
INodesInPath existing = iip.getParentINodesInPath();
// switch the locks
FSDirectory fsd = fsn.getFSDirectory();
fsd.getINodeMap().latchWriteLock(existing, missing);
return completeFileInternal(fsn, iip, holder,
ExtendedBlock.getLocalBlock(last), fileId);
}

View File

@ -17,7 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
@ -160,6 +165,8 @@ public class FSDirectory implements Closeable {
private final int contentCountLimit; // max content summary counts per run
private final long contentSleepMicroSec;
private final INodeMap inodeMap; // Synchronized by dirLock
// Temp InodeMap used when loading an FS image.
private HashMap<Long, INodeWithAdditionalFields> tempInodeMap;
private long yieldCount = 0; // keep track of lock yield count.
private int quotaInitThreads;
@ -317,7 +324,12 @@ public class FSDirectory implements Closeable {
FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
this.inodeId = new INodeId();
rootDir = createRoot(ns);
inodeMap = INodeMap.newInstance(rootDir);
inodeMap = INodeMap.newInstance(rootDir, ns);
tempInodeMap = new HashMap<>(1000);
// add rootDir to inodeMapTemp.
tempInodeMap.put(rootDir.getId(), rootDir);
this.isPermissionEnabled = conf.getBoolean(
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
@ -1476,6 +1488,23 @@ public class FSDirectory implements Closeable {
return inodeMap;
}
final void addToTempInodeMap(INode inode) {
if (inode instanceof INodeWithAdditionalFields) {
LOG.debug("addToTempInodeMap: id={}, inodeMapTemp.size={}",
inode.getId(), tempInodeMap.size());
tempInodeMap.put(inode.getId(), (INodeWithAdditionalFields) inode);
if (!inode.isSymlink()) {
final XAttrFeature xaf = inode.getXAttrFeature();
addEncryptionZone((INodeWithAdditionalFields) inode, xaf);
StoragePolicySatisfyManager spsManager =
namesystem.getBlockManager().getSPSManager();
if (spsManager != null && spsManager.isEnabled()) {
addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf);
}
}
}
}
/**
* This method is always called with writeLock of FSDirectory held.
*/
@ -1543,6 +1572,36 @@ public class FSDirectory implements Closeable {
addEncryptionZone(rootDir, xaf);
}
/**
* After the inodes are set properly (set the parent for each inode), we move
* them from INodeMapTemp to INodeMap.
*/
void moveInodes() throws IOException {
long count=0, totalInodes = tempInodeMap.size();
LOG.debug("inodeMapTemp={}", tempInodeMap);
for (Map.Entry e: tempInodeMap.entrySet()) {
INodeWithAdditionalFields n = (INodeWithAdditionalFields)e.getValue();
LOG.debug("populate {}-th inode: id={}, fullpath={}",
count, n.getId(), n.getFullPathName());
inodeMap.put(n);
count++;
}
if (count != totalInodes) {
String msg = String.format("moveInodes: expected to move %l inodes, " +
"but moved %l inodes", totalInodes, count);
throw new IOException(msg);
}
//inodeMap.show();
tempInodeMap.clear();
assert(tempInodeMap.isEmpty());
tempInodeMap = null;
}
/**
* This method is always called with writeLock of FSDirectory held.
*/
@ -1860,6 +1919,17 @@ public class FSDirectory implements Closeable {
}
}
public INode getInode(INode inode) {
return inodeMap.get(inode);
}
public INode getInodeFromTempINodeMap(long id) {
LOG.debug("getInodeFromTempINodeMap: id={}, TempINodeMap.size={}",
id, tempInodeMap.size());
if (id < INodeId.ROOT_INODE_ID)
return null;
return tempInodeMap.get(id);
}
@VisibleForTesting
FSPermissionChecker getPermissionChecker(String fsOwner, String superGroup,
UserGroupInformation ugi) throws AccessControlException {

View File

@ -177,18 +177,23 @@ public class FSImage implements Closeable {
void format(FSNamesystem fsn, String clusterId, boolean force)
throws IOException {
long fileCount = fsn.getFilesTotal();
// Expect 1 file, which is the root inode
Preconditions.checkState(fileCount == 1,
"FSImage.format should be called with an uninitialized namesystem, has " +
fileCount + " files");
NamespaceInfo ns = NNStorage.newNamespaceInfo();
LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID());
ns.clusterID = clusterId;
storage.format(ns);
editLog.formatNonFileJournals(ns, force);
saveFSImageInAllDirs(fsn, 0);
fsn.readLock();
try {
long fileCount = fsn.getFilesTotal();
// Expect 1 file, which is the root inode
Preconditions.checkState(fileCount == 1,
"FSImage.format should be called with an uninitialized namesystem, has " +
fileCount + " files");
NamespaceInfo ns = NNStorage.newNamespaceInfo();
LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID());
ns.clusterID = clusterId;
storage.format(ns);
editLog.formatNonFileJournals(ns, force);
saveFSImageInAllDirs(fsn, 0);
} finally {
fsn.readUnlock();
}
}
/**
@ -756,6 +761,16 @@ public class FSImage implements Closeable {
"above for more info.");
}
prog.endPhase(Phase.LOADING_FSIMAGE);
/*
* loadEdits always sets the parent of an inode before adding the inode to
* inodeMap. So, it is safe to move inodes from inodeMapTemp to inodeMap
* before loadEdits.
*/
FSDirectory dir = target.getFSDirectory();
dir.moveInodes();
LOG.info("LOADING_FSIMAGE: loaded {} inodes into inodeMap",
dir.getINodeMap().size());
if (!rollingRollback) {
prog.beginPhase(Phase.LOADING_EDITS);
@ -771,6 +786,8 @@ public class FSImage implements Closeable {
needToSave = false;
}
editLog.setNextTxId(lastAppliedTxId + 1);
LOG.info("LOADING_EDITS: loaded {} inodes into inodeMap",
dir.getINodeMap().size());
return needToSave;
}

View File

@ -276,9 +276,10 @@ public final class FSImageFormatPBINode {
if (e == null) {
break;
}
INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
INodeDirectory p =
dir.getInodeFromTempINodeMap(e.getParent()).asDirectory();
for (long id : e.getChildrenList()) {
INode child = dir.getInode(id);
INode child = dir.getInodeFromTempINodeMap(id);
if (!addToParent(p, child)) {
LOG.warn("Failed to add the inode {} to the directory {}",
child.getId(), p.getId());
@ -382,6 +383,7 @@ public final class FSImageFormatPBINode {
if (p == null) {
break;
}
LOG.debug("loadINodesInSection: cntr={}, inode={}", cntr, p.getId());
if (p.getId() == INodeId.ROOT_INODE_ID) {
synchronized(this) {
loadRootINode(p);
@ -389,7 +391,7 @@ public final class FSImageFormatPBINode {
} else {
INode n = loadINode(p);
synchronized(this) {
dir.addToInodeMap(n);
dir.addToTempInodeMap(n);
}
fillUpInodeList(inodeList, n);
}
@ -761,7 +763,7 @@ public final class FSImageFormatPBINode {
DirEntry.newBuilder().setParent(n.getId());
for (INode inode : children) {
// Error if the child inode doesn't exist in inodeMap
if (dir.getInode(inode.getId()) == null) {
if (dir.getInode(inode) == null) {
FSImage.LOG.error(
"FSImageFormatPBINode#serializeINodeDirectorySection: " +
"Dangling child pointer found. Missing INode in " +
@ -812,6 +814,7 @@ public final class FSImageFormatPBINode {
Iterator<INodeWithAdditionalFields> iter = inodesMap.getMapIterator();
while (iter.hasNext()) {
INodeWithAdditionalFields n = iter.next();
LOG.debug("i = {}, save inode: {}", i, n);
save(out, n);
++i;
if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {

View File

@ -1753,7 +1753,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
public void readUnlock(String opName,
Supplier<String> lockReportInfoSupplier) {
this.fsLock.readUnlock(opName, lockReportInfoSupplier);
this.fsLock.readUnlock(opName, lockReportInfoSupplier, true);
}
@Override
@ -1786,7 +1786,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
@Override
public boolean hasWriteLock() {
return this.fsLock.isWriteLockedByCurrentThread();
return this.fsLock.isWriteLockedByCurrentThread() ||
fsLock.hasWriteChildLock();
}
@Override
public boolean hasReadLock() {
@ -1801,6 +1802,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return this.fsLock.getWriteHoldCount();
}
public FSNamesystemLock getFSLock() {
return this.fsLock;
}
/** Lock the checkpoint lock */
public void cpLock() {
this.cpLock.lock();

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -29,6 +32,7 @@ import java.util.function.Supplier;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.namenode.INodeMap.INodeMapLock;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.log.LogThrottlingHelper;
import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
@ -129,6 +133,32 @@ class FSNamesystemLock {
private static final String OVERALL_METRIC_NAME = "Overall";
private final ThreadLocal<Collection<INodeMapLock>> partitionLocks =
new ThreadLocal<Collection<INodeMapLock>>() {
@Override
public Collection<INodeMapLock> initialValue() {
return new ArrayList<INodeMapLock>();
}
};
void addChildLock(INodeMapLock lock) {
partitionLocks.get().add(lock);
}
boolean removeChildLock(INodeMapLock lock) {
return partitionLocks.get().remove(lock);
}
boolean hasWriteChildLock() {
Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
// FSNamesystem.LOG.debug("partitionLocks.size = {}", partitionLocks.get().size());
while(iter.hasNext()) {
if(iter.next().hasWriteChildLock())
return true;
}
return false;
}
FSNamesystemLock(Configuration conf,
MutableRatesWithAggregation detailedHoldTimeMetrics) {
this(conf, detailedHoldTimeMetrics, new Timer());
@ -180,11 +210,29 @@ class FSNamesystemLock {
public void readUnlock(String opName,
Supplier<String> lockReportInfoSupplier) {
readUnlock(opName, lockReportInfoSupplier, true);
}
public void readUnlock(String opName,
Supplier<String> lockReportInfoSupplier,
boolean unlockChildren) {
final boolean needReport = coarseLock.getReadHoldCount() == 1;
final long readLockIntervalNanos =
timer.monotonicNowNanos() - readLockHeldTimeStampNanos.get();
final long currentTimeMs = timer.now();
coarseLock.readLock().unlock();
if(getReadHoldCount() > 0) { // Current thread holds the lock
// Unlock the top FSNamesystemLock
coarseLock.readLock().unlock();
}
if(unlockChildren) { // Also unlock and remove children locks
Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
while(iter.hasNext()) {
iter.next().readChildUnlock();
iter.remove();
}
}
if (needReport) {
addMetric(opName, readLockIntervalNanos, false);
@ -252,7 +300,7 @@ class FSNamesystemLock {
* FSNamesystemLock#writeUnlock(String, boolean, Supplier)}
*/
public void writeUnlock() {
writeUnlock(OP_NAME_OTHER, false, null);
writeUnlock(OP_NAME_OTHER, false, null, true);
}
/**
@ -262,7 +310,7 @@ class FSNamesystemLock {
* @param opName Operation name.
*/
public void writeUnlock(String opName) {
writeUnlock(opName, false, null);
writeUnlock(opName, false, null, true);
}
/**
@ -274,7 +322,7 @@ class FSNamesystemLock {
*/
public void writeUnlock(String opName,
Supplier<String> lockReportInfoSupplier) {
writeUnlock(opName, false, lockReportInfoSupplier);
writeUnlock(opName, false, lockReportInfoSupplier, true);
}
/**
@ -286,7 +334,7 @@ class FSNamesystemLock {
* for long time will be logged in logs and metrics.
*/
public void writeUnlock(String opName, boolean suppressWriteLockReport) {
writeUnlock(opName, suppressWriteLockReport, null);
writeUnlock(opName, suppressWriteLockReport, null, true);
}
/**
@ -297,8 +345,9 @@ class FSNamesystemLock {
* for long time will be logged in logs and metrics.
* @param lockReportInfoSupplier The info shown in the lock report
*/
private void writeUnlock(String opName, boolean suppressWriteLockReport,
Supplier<String> lockReportInfoSupplier) {
public void writeUnlock(String opName, boolean suppressWriteLockReport,
Supplier<String> lockReportInfoSupplier,
boolean unlockChildren) {
final boolean needReport = !suppressWriteLockReport && coarseLock
.getWriteHoldCount() == 1 && coarseLock.isWriteLockedByCurrentThread();
final long writeLockIntervalNanos =
@ -329,7 +378,18 @@ class FSNamesystemLock {
longestWriteLockHeldInfo = new LockHeldInfo();
}
coarseLock.writeLock().unlock();
if(this.isWriteLockedByCurrentThread()) { // Current thread holds the lock
// Unlock the top FSNamesystemLock
coarseLock.writeLock().unlock();
}
if(unlockChildren) { // Unlock and remove children locks
Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
while(iter.hasNext()) {
iter.next().writeChildUnlock();
iter.remove();
}
}
if (needReport) {
addMetric(opName, writeLockIntervalNanos, true);
@ -355,7 +415,25 @@ class FSNamesystemLock {
public int getWriteHoldCount() {
return coarseLock.getWriteHoldCount();
}
/**
* Queries if the write lock is held by any thread.
* @return {@code true} if any thread holds the write lock and
* {@code false} otherwise
*/
public boolean isReadLocked() {
return coarseLock.getReadLockCount() > 0 || isWriteLocked();
}
/**
* Queries if the write lock is held by any thread.
* @return {@code true} if any thread holds the write lock and
* {@code false} otherwise
*/
public boolean isWriteLocked() {
return coarseLock.isWriteLocked();
}
public boolean isWriteLockedByCurrentThread() {
return coarseLock.isWriteLockedByCurrentThread();
}

View File

@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ -577,6 +578,43 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
return name == null? null: DFSUtil.bytes2String(name);
}
private long[] namespaceKey;
/**
* Key of an INode.
* Defines partitioning of INodes in the INodeMap.
*
* @param level how many levels to be included in the key
* @return
*/
public long[] getNamespaceKey(int level) {
if(namespaceKey == null) { // generate the namespace key
long[] buf = new long[level];
INode cur = this;
for(int l = 0; l < level; l++) {
long curId = (cur == null) ? 0L : cur.getId();
buf[level - l - 1] = curId;
cur = (cur == null) ? null : cur.parent;
}
buf[0] = indexOf(buf);
namespaceKey = buf;
}
return namespaceKey;
}
private final static long LARGE_PRIME = 512927357;
public static long indexOf(long[] key) {
if(key[key.length-1] == INodeId.ROOT_INODE_ID) {
return key[0];
}
long idx = LARGE_PRIME * key[0];
idx = (idx ^ (idx >> 32)) & (INodeMap.NUM_RANGES_STATIC -1);
return idx;
}
/**
* Key of a snapshot Diff Element
*/
@Override
public final byte[] getKey() {
return getLocalNameBytes();
@ -636,7 +674,7 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
@Override
public String toString() {
return getLocalName();
return getLocalName() + ": " + Arrays.toString(namespaceKey);
}
@VisibleForTesting

View File

@ -17,44 +17,202 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.nio.charset.StandardCharsets;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.LatchLock;
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.util.PartitionedGSet;
/**
* Storing all the {@link INode}s and maintaining the mapping between INode ID
* and INode.
*/
public class INodeMap {
static INodeMap newInstance(INodeDirectory rootDir) {
// Compute the map capacity by allocating 1% of total memory
int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
GSet<INode, INodeWithAdditionalFields> map =
new LightWeightGSet<>(capacity);
map.put(rootDir);
return new INodeMap(map);
static final int NAMESPACE_KEY_DEPTH = 2;
static final int NUM_RANGES_STATIC = 256; // power of 2
public static class INodeKeyComparator implements Comparator<INode> {
INodeKeyComparator() {
FSDirectory.LOG.info("Namespace key depth = {}", NAMESPACE_KEY_DEPTH);
}
@Override
public int compare(INode i1, INode i2) {
if (i1 == null || i2 == null) {
throw new NullPointerException("Cannot compare null INodes");
}
long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEPTH);
long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEPTH);
for(int l = 0; l < NAMESPACE_KEY_DEPTH; l++) {
if(key1[l] == key2[l]) continue;
return (key1[l] < key2[l] ? -1 : 1);
}
return 0;
}
}
/**
* INodeKeyComparator with Hashed Parent
*
*/
public static class HPINodeKeyComparator implements Comparator<INode> {
HPINodeKeyComparator() {
FSDirectory.LOG.info("Namespace key depth = {}", NAMESPACE_KEY_DEPTH);
}
@Override
public int compare(INode i1, INode i2) {
if (i1 == null || i2 == null) {
throw new NullPointerException("Cannot compare null INodes");
}
long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEPTH);
long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEPTH);
long key1_0 = INode.indexOf(key1);
long key2_0 = INode.indexOf(key2);
if(key1_0 != key2_0)
return (key1_0 < key2_0 ? -1 : 1);
for(int l = 1; l < NAMESPACE_KEY_DEPTH; l++) {
if(key1[l] == key2[l]) continue;
return (key1[l] < key2[l] ? -1 : 1);
}
return 0;
}
}
public static class INodeIdComparator implements Comparator<INode> {
@Override
public int compare(INode i1, INode i2) {
if (i1 == null || i2 == null) {
throw new NullPointerException("Cannot compare null INodesl");
}
long id1 = i1.getId();
long id2 = i2.getId();
return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
}
}
public class INodeMapLock extends LatchLock<ReentrantReadWriteLock> {
private ReentrantReadWriteLock childLock;
INodeMapLock() {
this(null);
}
private INodeMapLock(ReentrantReadWriteLock childLock) {
assert namesystem != null : "namesystem is null";
this.childLock = childLock;
}
@Override
protected boolean isReadTopLocked() {
return namesystem.getFSLock().isReadLocked();
}
@Override
protected boolean isWriteTopLocked() {
return namesystem.getFSLock().isWriteLocked();
}
@Override
protected void readTopUnlock() {
namesystem.getFSLock().readUnlock("INodeMap", null, false);
}
@Override
protected void writeTopUnlock() {
namesystem.getFSLock().writeUnlock("INodeMap", false, null, false);
}
@Override
protected boolean hasReadChildLock() {
return this.childLock.getReadHoldCount() > 0 || hasWriteChildLock();
}
@Override
protected void readChildLock() {
// LOG.info("readChildLock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
this.childLock.readLock().lock();
namesystem.getFSLock().addChildLock(this);
// LOG.info("readChildLock: done");
}
@Override
protected void readChildUnlock() {
// LOG.info("readChildUnlock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
this.childLock.readLock().unlock();
// LOG.info("readChildUnlock: done");
}
@Override
protected boolean hasWriteChildLock() {
return this.childLock.isWriteLockedByCurrentThread() || namesystem
.getFSLock().hasWriteChildLock();
}
@Override
protected void writeChildLock() {
// LOG.info("writeChildLock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
this.childLock.writeLock().lock();
namesystem.getFSLock().addChildLock(this);
// LOG.info("writeChildLock: done");
}
@Override
protected void writeChildUnlock() {
// LOG.info("writeChildUnlock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
this.childLock.writeLock().unlock();
// LOG.info("writeChildUnlock: done");
}
@Override
protected LatchLock<ReentrantReadWriteLock> clone() {
return new INodeMapLock(new ReentrantReadWriteLock(false)); // not fair
}
}
static INodeMap newInstance(INodeDirectory rootDir,
FSNamesystem ns) {
return new INodeMap(rootDir, ns);
}
/** Synchronized by external lock. */
private final GSet<INode, INodeWithAdditionalFields> map;
private FSNamesystem namesystem;
public Iterator<INodeWithAdditionalFields> getMapIterator() {
return map.iterator();
}
private INodeMap(GSet<INode, INodeWithAdditionalFields> map) {
Preconditions.checkArgument(map != null);
this.map = map;
private INodeMap(INodeDirectory rootDir, FSNamesystem ns) {
this.namesystem = ns;
// Compute the map capacity by allocating 1% of total memory
int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
this.map = new PartitionedGSet<>(capacity, new INodeKeyComparator(),
new INodeMapLock());
// Pre-populate initial empty partitions
PartitionedGSet<INode, INodeWithAdditionalFields> pgs =
(PartitionedGSet<INode, INodeWithAdditionalFields>) map;
PermissionStatus perm = new PermissionStatus(
"", "", new FsPermission((short) 0));
for(int p = 0; p < NUM_RANGES_STATIC; p++) {
INodeDirectory key = new INodeDirectory(INodeId.ROOT_INODE_ID,
"range key".getBytes(StandardCharsets.UTF_8), perm, 0);
key.setParent(new INodeDirectory((long)p, null, perm, 0));
pgs.addNewPartition(key);
}
map.put(rootDir);
}
/**
* Add an {@link INode} into the {@link INode} map. Replace the old value if
* necessary.
@ -88,48 +246,58 @@ public class INodeMap {
* such {@link INode} in the map.
*/
public INode get(long id) {
INode inode = new INodeWithAdditionalFields(id, null, new PermissionStatus(
"", "", new FsPermission((short) 0)), 0, 0) {
@Override
void recordModification(int latestSnapshotId) {
}
@Override
public void destroyAndCollectBlocks(ReclaimContext reclaimContext) {
// Nothing to do
}
PartitionedGSet<INode, INodeWithAdditionalFields> pgs =
(PartitionedGSet<INode, INodeWithAdditionalFields>) map;
/*
* Convert a long inode id into an INode object. We only need to compare
* two inodes by inode id. So, it can be any type of INode object.
*/
INode inode = new INodeDirectory(id, null,
new PermissionStatus("", "", new FsPermission((short) 0)), 0);
@Override
public QuotaCounts computeQuotaUsage(
BlockStoragePolicySuite bsps, byte blockStoragePolicyId,
boolean useCache, int lastSnapshotId) {
return null;
}
@Override
public ContentSummaryComputationContext computeContentSummary(
int snapshotId, ContentSummaryComputationContext summary) {
return null;
}
/*
* Iterate all partitions of PGSet and return the INode.
* Just for fallback.
*/
PermissionStatus perm =
new PermissionStatus("", "", new FsPermission((short) 0));
// TODO: create a static array, to avoid creation of keys each time.
for (int p = 0; p < NUM_RANGES_STATIC; p++) {
INodeDirectory key = new INodeDirectory(INodeId.ROOT_INODE_ID,
"range key".getBytes(StandardCharsets.UTF_8), perm, 0);
key.setParent(new INodeDirectory((long)p, null, perm, 0));
PartitionedGSet.PartitionEntry e = pgs.getPartition(key);
@Override
public void cleanSubtree(
ReclaimContext reclaimContext, int snapshotId, int priorSnapshotId) {
if (e.contains(inode)) {
return (INode) e.get(inode);
}
}
@Override
public byte getStoragePolicyID(){
return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
}
return null;
}
@Override
public byte getLocalStoragePolicyID() {
return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
}
};
return map.get(inode);
public INode get(INode inode) {
/*
* Check whether the Inode has (NAMESPACE_KEY_DEPTH - 1) levels of parent
* dirs
*/
int i = NAMESPACE_KEY_DEPTH - 1;
INode tmpInode = inode;
while (i > 0 && tmpInode.getParent() != null) {
tmpInode = tmpInode.getParent();
i--;
}
/*
* If the Inode has (NAMESPACE_KEY_DEPTH - 1) levels of parent dirs,
* use map.get(); else, fall back to get INode based on Inode ID.
*/
if (i == 0) {
return map.get(inode);
} else {
return get(inode.getId());
}
}
/**
@ -138,4 +306,27 @@ public class INodeMap {
public void clear() {
map.clear();
}
public void latchWriteLock(INodesInPath iip, INode[] missing) {
assert namesystem.hasReadLock() : "must have namesysem lock";
assert iip.length() > 0 : "INodesInPath has 0 length";
if(!(map instanceof PartitionedGSet)) {
return;
}
// Locks partitions along the path starting from the first existing parent
// Locking is in the hierarchical order
INode[] allINodes = new INode[Math.min(1, iip.length()) + missing.length];
allINodes[0] = iip.getLastINode();
System.arraycopy(missing, 0, allINodes, 1, missing.length);
/*
// Locks all the partitions along the path in the hierarchical order
INode[] allINodes = new INode[iip.length() + missing.length];
INode[] existing = iip.getINodesArray();
System.arraycopy(existing, 0, allINodes, 0, existing.length);
System.arraycopy(missing, 0, allINodes, existing.length, missing.length);
*/
((PartitionedGSet<INode, INodeWithAdditionalFields>)
map).latchWriteLock(allINodes);
}
}

View File

@ -186,6 +186,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.Whitebox;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
@ -1997,6 +1998,7 @@ public class DFSTestUtil {
GenericTestUtils.setLogLevel(NameNode.LOG, level);
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
GenericTestUtils.setLogLevel(GSet.LOG, level);
}
/**

View File

@ -25,6 +25,8 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@ -152,4 +154,55 @@ public class TestDFSMkdirs {
cluster.shutdown();
}
}
@Test
public void testMkDirsWithRestart() throws IOException {
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
DistributedFileSystem dfs = cluster.getFileSystem();
try {
Path dir1 = new Path("/mkdir-1");
Path file1 = new Path(dir1, "file1");
Path deleteDir = new Path("/deleteDir");
Path deleteFile = new Path(dir1, "deleteFile");
// Create a dir in root dir, should succeed
assertTrue(dfs.mkdir(dir1, FsPermission.getDefault()));
dfs.mkdir(deleteDir, FsPermission.getDefault());
assertTrue(dfs.exists(deleteDir));
dfs.delete(deleteDir, true);
assertTrue(!dfs.exists(deleteDir));
DFSTestUtil.writeFile(dfs, file1, "hello world");
DFSTestUtil.writeFile(dfs, deleteFile, "hello world");
int totalFiles = getFileCount(dfs);
//Before deletion there are 2 files
assertTrue("Incorrect file count", 2 == totalFiles);
dfs.delete(deleteFile, false);
totalFiles = getFileCount(dfs);
//After deletion, left with 1 file
assertTrue("Incorrect file count", 1 == totalFiles);
cluster.restartNameNodes();
dfs = cluster.getFileSystem();
assertTrue(dfs.exists(dir1));
assertTrue(!dfs.exists(deleteDir));
assertTrue(dfs.exists(file1));
totalFiles = getFileCount(dfs);
assertTrue("Incorrect file count", 1 == totalFiles);
} finally {
dfs.close();
cluster.shutdown();
}
}
private int getFileCount(DistributedFileSystem dfs) throws IOException {
RemoteIterator<LocatedFileStatus> fileItr =
dfs.listFiles(new Path("/"), true);
int totalFiles = 0;
while (fileItr.hasNext()) {
fileItr.next();
totalFiles++;
}
return totalFiles;
}
}

View File

@ -1313,6 +1313,10 @@ public class TestFileCreation {
fail();
} catch(FileNotFoundException e) {
FileSystem.LOG.info("Caught Expected FileNotFoundException: ", e);
} catch (AssertionError ae) {
//FSDirWriteFileOp#completeFile throws AssertError if the given
// id/node is not an instance of INodeFile.
FileSystem.LOG.info("Caught Expected AssertionError: ", ae);
}
} finally {
IOUtils.closeStream(dfs);

View File

@ -1017,23 +1017,38 @@ public class TestINodeFile {
final Path dir = new Path("/dir");
hdfs.mkdirs(dir);
INodeDirectory dirNode = getDir(fsdir, dir);
INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
assertSame(dirNode, dirNodeFromNode);
cluster.getNamesystem().readLock();
try {
INodeDirectory dirNode = getDir(fsdir, dir);
INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
assertSame(dirNode, dirNodeFromNode);
} finally {
cluster.getNamesystem().readUnlock();
}
// set quota to dir, which leads to node replacement
hdfs.setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
dirNode = getDir(fsdir, dir);
assertTrue(dirNode.isWithQuota());
// the inode in inodeMap should also be replaced
dirNodeFromNode = fsdir.getInode(dirNode.getId());
assertSame(dirNode, dirNodeFromNode);
cluster.getNamesystem().readLock();
try {
INodeDirectory dirNode = getDir(fsdir, dir);
assertTrue(dirNode.isWithQuota());
// the inode in inodeMap should also be replaced
INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
assertSame(dirNode, dirNodeFromNode);
} finally {
cluster.getNamesystem().readUnlock();
}
hdfs.setQuota(dir, -1, -1);
dirNode = getDir(fsdir, dir);
// the inode in inodeMap should also be replaced
dirNodeFromNode = fsdir.getInode(dirNode.getId());
assertSame(dirNode, dirNodeFromNode);
cluster.getNamesystem().readLock();
try {
INodeDirectory dirNode = getDir(fsdir, dir);
// the inode in inodeMap should also be replaced
INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
assertSame(dirNode, dirNodeFromNode);
} finally {
cluster.getNamesystem().readUnlock();
}
} finally {
if (cluster != null) {
cluster.shutdown();