Compare commits
6 Commits
Author | SHA1 | Date |
---|---|---|
prasad-acit | 892fa48c5a | |
Renukaprasad C | 88484fceb4 | |
Xing Lin | 4610e1d902 | |
Xing Lin | c06cd9ae8c | |
Konstantin V Shvachko | 455e8c0191 | |
Konstantin V Shvachko | a30e9f6663 |
|
@ -0,0 +1,64 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.util;
|
||||
|
||||
/**
|
||||
* LatchLock controls two hierarchical Read/Write locks:
|
||||
* the topLock and the childLock.
|
||||
* Typically an operation starts with the topLock already acquired.
|
||||
* To acquire child lock LatchLock will
|
||||
* first acquire the childLock, and then release the topLock.
|
||||
*/
|
||||
public abstract class LatchLock<C> {
|
||||
// Interfaces methods to be defined for subclasses
|
||||
/** @return true topLock is locked for read by any thread */
|
||||
protected abstract boolean isReadTopLocked();
|
||||
/** @return true topLock is locked for write by any thread */
|
||||
protected abstract boolean isWriteTopLocked();
|
||||
protected abstract void readTopUnlock();
|
||||
protected abstract void writeTopUnlock();
|
||||
|
||||
protected abstract boolean hasReadChildLock();
|
||||
protected abstract void readChildLock();
|
||||
protected abstract void readChildUnlock();
|
||||
|
||||
protected abstract boolean hasWriteChildLock();
|
||||
protected abstract void writeChildLock();
|
||||
protected abstract void writeChildUnlock();
|
||||
|
||||
protected abstract LatchLock<C> clone();
|
||||
|
||||
// Public APIs to use with the class
|
||||
public void readLock() {
|
||||
readChildLock();
|
||||
readTopUnlock();
|
||||
}
|
||||
|
||||
public void readUnlock() {
|
||||
readChildUnlock();
|
||||
}
|
||||
|
||||
public void writeLock() {
|
||||
writeChildLock();
|
||||
writeTopUnlock();
|
||||
}
|
||||
|
||||
public void writeUnlock() {
|
||||
writeChildUnlock();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,348 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.util;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.NoSuchElementException;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
|
||||
|
||||
/**
|
||||
* An implementation of {@link GSet}, which splits a collection of elements
|
||||
* into partitions each corresponding to a range of keys.
|
||||
*
|
||||
* This class does not support null element.
|
||||
*
|
||||
* This class is backed up by LatchLock for hierarchical synchronization.
|
||||
*
|
||||
* @param <K> Key type for looking up the elements
|
||||
* @param <E> Element type, which must be
|
||||
* (1) a subclass of K, and
|
||||
* (2) implementing {@link LinkedElement} interface.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
|
||||
|
||||
private static final int DEFAULT_PARTITION_CAPACITY = 65536; // 4096; // 5120; // 2048; // 1027;
|
||||
private static final float DEFAULT_PARTITION_OVERFLOW = 1.8f;
|
||||
|
||||
/**
|
||||
* An ordered map of contiguous segments of elements.
|
||||
* Each key in the map represent the smallest key in the mapped segment,
|
||||
* so that all elements in this segment are >= the mapping key,
|
||||
* but are smaller then the next key in the map.
|
||||
* Elements within a partition do not need to be ordered.
|
||||
*/
|
||||
private final NavigableMap<K, PartitionEntry> partitions;
|
||||
private LatchLock<?> latchLock;
|
||||
|
||||
/**
|
||||
* The number of elements in the set.
|
||||
*/
|
||||
protected volatile int size;
|
||||
|
||||
/**
|
||||
* A single partition of the {@link PartitionedGSet}.
|
||||
* Consists of a hash table {@link LightWeightGSet} and a lock, which
|
||||
* controls access to this partition independently on the other ones.
|
||||
*/
|
||||
public class PartitionEntry extends LightWeightGSet<K, E> {
|
||||
private final LatchLock<?> partLock;
|
||||
|
||||
PartitionEntry(int defaultPartitionCapacity) {
|
||||
super(defaultPartitionCapacity);
|
||||
this.partLock = latchLock.clone();
|
||||
}
|
||||
}
|
||||
|
||||
public PartitionedGSet(final int capacity,
|
||||
final Comparator<? super K> comparator,
|
||||
final LatchLock<?> latchLock) {
|
||||
this.partitions = new TreeMap<K, PartitionEntry>(comparator);
|
||||
this.latchLock = latchLock;
|
||||
// addNewPartition(rootKey).put(rootKey);
|
||||
// this.size = 1;
|
||||
this.size = 0;
|
||||
LOG.info("Partition capacity = {}", DEFAULT_PARTITION_CAPACITY);
|
||||
LOG.info("Partition overflow factor = {}", DEFAULT_PARTITION_OVERFLOW);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates new empty partition.
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
public PartitionEntry addNewPartition(final K key) {
|
||||
Entry<K, PartitionEntry> lastEntry = partitions.lastEntry();
|
||||
PartitionEntry lastPart = null;
|
||||
if(lastEntry != null)
|
||||
lastPart = lastEntry.getValue();
|
||||
|
||||
PartitionEntry newPart =
|
||||
new PartitionEntry(DEFAULT_PARTITION_CAPACITY);
|
||||
// assert size == 0 || newPart.partLock.isWriteTopLocked() :
|
||||
// "Must hold write Lock: key = " + key;
|
||||
PartitionEntry oldPart = partitions.put(key, newPart);
|
||||
assert oldPart == null :
|
||||
"RangeMap already has a partition associated with " + key;
|
||||
|
||||
LOG.debug("Total GSet size = {}", size);
|
||||
LOG.debug("Number of partitions = {}", partitions.size());
|
||||
LOG.debug("Previous partition size = {}",
|
||||
lastPart == null ? 0 : lastPart.size());
|
||||
|
||||
return newPart;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return size;
|
||||
}
|
||||
|
||||
public PartitionEntry getPartition(final K key) {
|
||||
Entry<K, PartitionEntry> partEntry = partitions.floorEntry(key);
|
||||
if(partEntry == null) {
|
||||
return null;
|
||||
}
|
||||
PartitionEntry part = partEntry.getValue();
|
||||
if(part == null) {
|
||||
throw new IllegalStateException("Null partition for key: " + key);
|
||||
}
|
||||
assert size == 0 || part.partLock.isReadTopLocked() ||
|
||||
part.partLock.hasReadChildLock() : "Must hold read Lock: key = " + key;
|
||||
return part;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean contains(final K key) {
|
||||
PartitionEntry part = getPartition(key);
|
||||
if(part == null) {
|
||||
return false;
|
||||
}
|
||||
return part.contains(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public E get(final K key) {
|
||||
PartitionEntry part = getPartition(key);
|
||||
if(part == null) {
|
||||
return null;
|
||||
}
|
||||
LOG.debug("get key: {}", key);
|
||||
// part.partLock.readLock();
|
||||
return part.get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public E put(final E element) {
|
||||
K key = element;
|
||||
PartitionEntry part = getPartition(key);
|
||||
if(part == null) {
|
||||
throw new HadoopIllegalArgumentException("Illegal key: " + key);
|
||||
}
|
||||
assert size == 0 || part.partLock.isWriteTopLocked() ||
|
||||
part.partLock.hasWriteChildLock() :
|
||||
"Must hold write Lock: key = " + key;
|
||||
LOG.debug("put key: {}", key);
|
||||
PartitionEntry newPart = addNewPartitionIfNeeded(part, key);
|
||||
if(newPart != part) {
|
||||
newPart.partLock.writeChildLock();
|
||||
part = newPart;
|
||||
}
|
||||
E result = part.put(element);
|
||||
if(result == null) { // new element
|
||||
size++;
|
||||
LOG.debug("partitionPGSet.put: added key {}, size is now {} ", key, size);
|
||||
} else {
|
||||
LOG.debug("partitionPGSet.put: replaced key {}, size is now {}",
|
||||
key, size);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private PartitionEntry addNewPartitionIfNeeded(
|
||||
PartitionEntry curPart, K key) {
|
||||
if(curPart.size() < DEFAULT_PARTITION_CAPACITY * DEFAULT_PARTITION_OVERFLOW
|
||||
|| curPart.contains(key)) {
|
||||
return curPart;
|
||||
}
|
||||
return addNewPartition(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public E remove(final K key) {
|
||||
PartitionEntry part = getPartition(key);
|
||||
if(part == null) {
|
||||
return null;
|
||||
}
|
||||
E result = part.remove(key);
|
||||
if(result != null) {
|
||||
size--;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
LOG.error("Total GSet size = {}", size);
|
||||
LOG.error("Number of partitions = {}", partitions.size());
|
||||
printStats();
|
||||
// assert latchLock.hasWriteTopLock() : "Must hold write topLock";
|
||||
// SHV May need to clear all partitions?
|
||||
partitions.clear();
|
||||
size = 0;
|
||||
}
|
||||
|
||||
private void printStats() {
|
||||
int partSizeMin = Integer.MAX_VALUE, partSizeAvg = 0, partSizeMax = 0;
|
||||
long totalSize = 0;
|
||||
int numEmptyPartitions = 0, numFullPartitions = 0;
|
||||
Collection<PartitionEntry> parts = partitions.values();
|
||||
Set<Entry<K, PartitionEntry>> entries = partitions.entrySet();
|
||||
int i = 0;
|
||||
for(Entry<K, PartitionEntry> e : entries) {
|
||||
PartitionEntry part = e.getValue();
|
||||
int s = part.size;
|
||||
if(s == 0) numEmptyPartitions++;
|
||||
if(s > DEFAULT_PARTITION_CAPACITY) numFullPartitions++;
|
||||
totalSize += s;
|
||||
partSizeMin = (s < partSizeMin ? s : partSizeMin);
|
||||
partSizeMax = (partSizeMax < s ? s : partSizeMax);
|
||||
Class<?> inodeClass = e.getKey().getClass();
|
||||
try {
|
||||
long[] key = (long[]) inodeClass.
|
||||
getMethod("getNamespaceKey", int.class).invoke(e.getKey(), 2);
|
||||
long[] firstKey = new long[key.length];
|
||||
if(part.iterator().hasNext()) {
|
||||
Object first = part.iterator().next();
|
||||
long[] firstKeyRef = (long[]) inodeClass.getMethod(
|
||||
"getNamespaceKey", int.class).invoke(first, 2);
|
||||
Object parent = inodeClass.
|
||||
getMethod("getParent").invoke(first);
|
||||
long parentId = (parent == null ? 0L :
|
||||
(long) inodeClass.getMethod("getId").invoke(parent));
|
||||
for (int j=0; j < key.length; j++) {
|
||||
firstKey[j] = firstKeyRef[j];
|
||||
}
|
||||
firstKey[0] = parentId;
|
||||
}
|
||||
LOG.error("Partition #{}\t key: {}\t size: {}\t first: {}",
|
||||
i++, key, s, firstKey); // SHV should be info
|
||||
} catch (NoSuchElementException ex) {
|
||||
LOG.error("iterator.next() throws NoSuchElementException.");
|
||||
throw ex;
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Cannot find Method getNamespaceKey() in {}", inodeClass);
|
||||
}
|
||||
}
|
||||
partSizeAvg = (int) (totalSize / parts.size());
|
||||
LOG.error("Partition sizes: min = {}, avg = {}, max = {}, sum = {}",
|
||||
partSizeMin, partSizeAvg, partSizeMax, totalSize);
|
||||
LOG.error("Number of partitions: empty = {}, in-use = {}, full = {}",
|
||||
numEmptyPartitions, parts.size()-numEmptyPartitions, numFullPartitions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<E> values() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<E> iterator() {
|
||||
return new EntryIterator();
|
||||
}
|
||||
|
||||
/**
|
||||
* Iterator over the elements in the set.
|
||||
* Iterates first by keys, then inside the partition
|
||||
* corresponding to the key.
|
||||
*
|
||||
* Modifications are tracked by the underlying collections. We allow
|
||||
* modifying other partitions, while iterating through the current one.
|
||||
*/
|
||||
private class EntryIterator implements Iterator<E> {
|
||||
private Iterator<K> keyIterator;
|
||||
private Iterator<E> partitionIterator;
|
||||
|
||||
// Set partitionIterator to point to the first partition, or set it to null
|
||||
// when there is no partitions created for this PartitionedGSet.
|
||||
public EntryIterator() {
|
||||
keyIterator = partitions.keySet().iterator();
|
||||
|
||||
if (!keyIterator.hasNext()) {
|
||||
partitionIterator = null;
|
||||
return;
|
||||
}
|
||||
|
||||
K firstKey = keyIterator.next();
|
||||
partitionIterator = partitions.get(firstKey).iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
|
||||
// Special case: an iterator was created for an empty PartitionedGSet.
|
||||
// Check whether new partitions have been added since then.
|
||||
if (partitionIterator == null) {
|
||||
if (partitions.size() == 0) {
|
||||
return false;
|
||||
} else {
|
||||
keyIterator = partitions.keySet().iterator();
|
||||
K nextKey = keyIterator.next();
|
||||
partitionIterator = partitions.get(nextKey).iterator();
|
||||
}
|
||||
}
|
||||
|
||||
while(!partitionIterator.hasNext()) {
|
||||
if(!keyIterator.hasNext()) {
|
||||
return false;
|
||||
}
|
||||
K curKey = keyIterator.next();
|
||||
partitionIterator = getPartition(curKey).iterator();
|
||||
}
|
||||
return partitionIterator.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public E next() {
|
||||
if (!hasNext()) {
|
||||
throw new NoSuchElementException("No more elements in this set.");
|
||||
}
|
||||
return partitionIterator.next();
|
||||
}
|
||||
}
|
||||
|
||||
public void latchWriteLock(K[] keys) {
|
||||
// getPartition(parent).partLock.writeChildLock();
|
||||
LatchLock<?> pLock = null;
|
||||
for(K key : keys) {
|
||||
pLock = getPartition(key).partLock;
|
||||
pLock.writeChildLock();
|
||||
}
|
||||
assert pLock != null : "pLock is null";
|
||||
pLock.writeTopUnlock();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,270 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.util;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/** Testing {@link PartitionedGSet} */
|
||||
public class TestPartitionedGSet {
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestPartitionedGSet.class);
|
||||
private static final int ELEMENT_NUM = 100;
|
||||
|
||||
/**
|
||||
* Generate positive random numbers for testing. We want to use only positive
|
||||
* numbers because the smallest partition used in testing is 0.
|
||||
*
|
||||
* @param length
|
||||
* number of random numbers to be generated.
|
||||
*
|
||||
* @param randomSeed
|
||||
* seed to be used for random number generator.
|
||||
*
|
||||
* @return
|
||||
* An array of Integers
|
||||
*/
|
||||
private static ArrayList<Integer> getRandomList(int length, int randomSeed) {
|
||||
Random random = new Random(randomSeed);
|
||||
ArrayList<Integer> list = new ArrayList<Integer>(length);
|
||||
for (int i = 0; i < length; i++) {
|
||||
list.add(random.nextInt(Integer.MAX_VALUE));
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
private static class TestElement implements LinkedElement {
|
||||
private final int val;
|
||||
private LinkedElement next;
|
||||
|
||||
TestElement(int val) {
|
||||
this.val = val;
|
||||
this.next = null;
|
||||
}
|
||||
|
||||
public int getVal() {
|
||||
return val;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNext(LinkedElement next) {
|
||||
this.next = next;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LinkedElement getNext() {
|
||||
return next;
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestElementComparator implements Comparator<TestElement>
|
||||
{
|
||||
@Override
|
||||
public int compare(TestElement e1, TestElement e2) {
|
||||
if (e1 == null || e2 == null) {
|
||||
throw new NullPointerException("Cannot compare null elements");
|
||||
}
|
||||
|
||||
return e1.getVal() - e2.getVal();
|
||||
}
|
||||
}
|
||||
|
||||
protected ReentrantReadWriteLock topLock =
|
||||
new ReentrantReadWriteLock(false);
|
||||
/**
|
||||
* We are NOT testing any concurrent access to a PartitionedGSet here.
|
||||
*/
|
||||
private class NoOpLock extends LatchLock<ReentrantReadWriteLock> {
|
||||
private ReentrantReadWriteLock childLock;
|
||||
|
||||
public NoOpLock() {
|
||||
childLock = new ReentrantReadWriteLock(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isReadTopLocked() {
|
||||
return topLock.getReadLockCount() > 0 || isWriteTopLocked();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isWriteTopLocked() {
|
||||
return topLock.isWriteLocked();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void readTopUnlock() {
|
||||
topLock.readLock().unlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeTopUnlock() {
|
||||
topLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean hasReadChildLock() {
|
||||
return childLock.getReadLockCount() > 0 || hasWriteChildLock();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void readChildLock() {
|
||||
childLock.readLock().lock();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void readChildUnlock() {
|
||||
childLock.readLock().unlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean hasWriteChildLock() {
|
||||
return childLock.isWriteLockedByCurrentThread();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeChildLock() {
|
||||
childLock.writeLock().lock();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeChildUnlock() {
|
||||
childLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected LatchLock<ReentrantReadWriteLock> clone() {
|
||||
return new NoOpLock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test iterator for a PartitionedGSet with no partitions.
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
public void testIteratorForNoPartition() {
|
||||
PartitionedGSet<TestElement, TestElement> set =
|
||||
new PartitionedGSet<TestElement, TestElement>(
|
||||
16, new TestElementComparator(), new NoOpLock());
|
||||
|
||||
topLock.readLock().lock();
|
||||
int count = 0;
|
||||
Iterator<TestElement> iter = set.iterator();
|
||||
while( iter.hasNext() ) {
|
||||
iter.next();
|
||||
count ++;
|
||||
}
|
||||
topLock.readLock().unlock();
|
||||
Assert.assertEquals(0, count);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test iterator for a PartitionedGSet with empty partitions.
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
public void testIteratorForEmptyPartitions() {
|
||||
PartitionedGSet<TestElement, TestElement> set =
|
||||
new PartitionedGSet<TestElement, TestElement>(
|
||||
16, new TestElementComparator(), new NoOpLock());
|
||||
|
||||
set.addNewPartition(new TestElement(0));
|
||||
set.addNewPartition(new TestElement(1000));
|
||||
set.addNewPartition(new TestElement(2000));
|
||||
|
||||
topLock.readLock().lock();
|
||||
int count = 0;
|
||||
Iterator<TestElement> iter = set.iterator();
|
||||
while( iter.hasNext() ) {
|
||||
iter.next();
|
||||
count ++;
|
||||
}
|
||||
topLock.readLock().unlock();
|
||||
Assert.assertEquals(0, count);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test whether the iterator can return the same number of elements as stored
|
||||
* into the PartitionedGSet.
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
public void testIteratorCountElements() {
|
||||
ArrayList<Integer> list = getRandomList(ELEMENT_NUM, 123);
|
||||
PartitionedGSet<TestElement, TestElement> set =
|
||||
new PartitionedGSet<TestElement, TestElement>(
|
||||
16, new TestElementComparator(), new NoOpLock());
|
||||
|
||||
set.addNewPartition(new TestElement(0));
|
||||
set.addNewPartition(new TestElement(1000));
|
||||
set.addNewPartition(new TestElement(2000));
|
||||
|
||||
topLock.writeLock().lock();
|
||||
for (Integer i : list) {
|
||||
set.put(new TestElement(i));
|
||||
}
|
||||
topLock.writeLock().unlock();
|
||||
|
||||
topLock.readLock().lock();
|
||||
int count = 0;
|
||||
Iterator<TestElement> iter = set.iterator();
|
||||
while( iter.hasNext() ) {
|
||||
iter.next();
|
||||
count ++;
|
||||
}
|
||||
topLock.readLock().unlock();
|
||||
Assert.assertEquals(ELEMENT_NUM, count);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test iterator when it is created before partitions/elements are
|
||||
* added to the PartitionedGSet.
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
public void testIteratorAddElementsAfterIteratorCreation() {
|
||||
PartitionedGSet<TestElement, TestElement> set =
|
||||
new PartitionedGSet<TestElement, TestElement>(
|
||||
16, new TestElementComparator(), new NoOpLock());
|
||||
|
||||
// Create the iterator before partitions are added.
|
||||
Iterator<TestElement> iter = set.iterator();
|
||||
|
||||
set.addNewPartition(new TestElement(0));
|
||||
set.addNewPartition(new TestElement(1000));
|
||||
set.addNewPartition(new TestElement(2000));
|
||||
|
||||
// Added one element
|
||||
topLock.writeLock().lock();
|
||||
set.put(new TestElement(2500));
|
||||
topLock.writeLock().unlock();
|
||||
|
||||
topLock.readLock().lock();
|
||||
int count = 0;
|
||||
while( iter.hasNext() ) {
|
||||
iter.next();
|
||||
count ++;
|
||||
}
|
||||
topLock.readLock().unlock();
|
||||
Assert.assertEquals(1, count);
|
||||
}
|
||||
}
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import org.apache.hadoop.fs.permission.FsCreateModes;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
|
@ -69,18 +70,7 @@ class FSDirMkdirOp {
|
|||
// create multiple inodes.
|
||||
fsn.checkFsObjectLimit();
|
||||
|
||||
// Ensure that the user can traversal the path by adding implicit
|
||||
// u+wx permission to all ancestor directories.
|
||||
INodesInPath existing =
|
||||
createParentDirectories(fsd, iip, permissions, false);
|
||||
if (existing != null) {
|
||||
existing = createSingleDirectory(
|
||||
fsd, existing, iip.getLastLocalName(), permissions);
|
||||
}
|
||||
if (existing == null) {
|
||||
throw new IOException("Failed to create directory: " + src);
|
||||
}
|
||||
iip = existing;
|
||||
iip = createMissingDirs(fsd, iip, permissions, false);
|
||||
}
|
||||
return fsd.getAuditFileInfo(iip);
|
||||
} finally {
|
||||
|
@ -88,6 +78,36 @@ class FSDirMkdirOp {
|
|||
}
|
||||
}
|
||||
|
||||
static INodesInPath createMissingDirs(FSDirectory fsd, INodesInPath iip,
|
||||
PermissionStatus permissions, boolean inheritPerms) throws IOException {
|
||||
PermissionStatus basePerm = inheritPerms ?
|
||||
iip.getExistingINodes().getLastINode().getPermissionStatus() :
|
||||
permissions;
|
||||
// create all missing directories along the path,
|
||||
// but don't add them to the INodeMap yet
|
||||
permissions = addImplicitUwx(basePerm, permissions);
|
||||
INode[] missing = createPathDirectories(fsd, iip, permissions);
|
||||
iip = iip.getExistingINodes();
|
||||
if (missing.length == 0) {
|
||||
return iip;
|
||||
}
|
||||
// switch the locks
|
||||
fsd.getINodeMap().latchWriteLock(iip, missing);
|
||||
int counter = 0;
|
||||
// Add missing inodes to the INodeMap
|
||||
for (INode dir : missing) {
|
||||
if (counter++ == missing.length - 1) {
|
||||
//Last folder in the path, use the user given permission
|
||||
//For MKDIR - refers to the permission given by the user
|
||||
//For create - refers to the parent directory permission.
|
||||
permissions = basePerm;
|
||||
}
|
||||
iip = addSingleDirectory(fsd, iip, dir, permissions);
|
||||
assert iip != null : "iip should not be null";
|
||||
}
|
||||
return iip;
|
||||
}
|
||||
|
||||
/**
|
||||
* For a given absolute path, create all ancestors as directories along the
|
||||
* path. All ancestors inherit their parent's permission plus an implicit
|
||||
|
@ -132,6 +152,7 @@ class FSDirMkdirOp {
|
|||
if (missing == 0) { // full path exists, return parents.
|
||||
existing = iip.getParentINodesInPath();
|
||||
} else if (missing > 1) { // need to create at least one ancestor dir.
|
||||
FSNamesystem.LOG.error("missing = " + missing);
|
||||
// Ensure that the user can traversal the path by adding implicit
|
||||
// u+wx permission to all ancestor directories.
|
||||
PermissionStatus basePerm = inheritPerms
|
||||
|
@ -143,6 +164,14 @@ class FSDirMkdirOp {
|
|||
for (int i = existing.length(); existing != null && i <= last; i++) {
|
||||
byte[] component = iip.getPathComponent(i);
|
||||
existing = createSingleDirectory(fsd, existing, component, perm);
|
||||
if(existing == null) {
|
||||
FSNamesystem.LOG.error("unprotectedMkdir returned null for "
|
||||
+ iip.getPath() + " for "
|
||||
+ new String(component, StandardCharsets.US_ASCII) + " i = " + i);
|
||||
// Somebody already created the parent. Recalculate existing
|
||||
existing = INodesInPath.resolve(fsd.getRoot(), iip.getPathComponents());
|
||||
i = existing.length() - 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
return existing;
|
||||
|
@ -228,5 +257,70 @@ class FSDirMkdirOp {
|
|||
}
|
||||
return iip;
|
||||
}
|
||||
|
||||
private static INode createDirectoryINode(FSDirectory fsd,
|
||||
INodesInPath parent, byte[] name, PermissionStatus permission)
|
||||
throws FileAlreadyExistsException {
|
||||
assert fsd.hasReadLock();
|
||||
assert parent.getLastINode() != null;
|
||||
if (!parent.getLastINode().isDirectory()) {
|
||||
throw new FileAlreadyExistsException("Parent path is not a directory: " +
|
||||
parent.getPath() + " " + DFSUtil.bytes2String(name));
|
||||
}
|
||||
final INodeDirectory dir = new INodeDirectory(
|
||||
fsd.allocateNewInodeId(), name, permission, now());
|
||||
return dir;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find-out missing iNodes for the current mkdir OP.
|
||||
*/
|
||||
private static INode[] createPathDirectories(FSDirectory fsd,
|
||||
INodesInPath iip, PermissionStatus perm)
|
||||
throws IOException {
|
||||
assert fsd.hasWriteLock();
|
||||
INodesInPath existing = iip.getExistingINodes();
|
||||
assert existing != null : "existing should not be null";
|
||||
int numMissing = iip.length() - existing.length();
|
||||
if (numMissing == 0) { // full path exists
|
||||
return new INode[0];
|
||||
}
|
||||
|
||||
// create the missing directories along the path
|
||||
INode[] missing = new INode[numMissing];
|
||||
final int last = iip.length();
|
||||
for (int i = existing.length(); i < last; i++) {
|
||||
byte[] component = iip.getPathComponent(i);
|
||||
missing[i - existing.length()] =
|
||||
createDirectoryINode(fsd, existing, component, perm);
|
||||
}
|
||||
return missing;
|
||||
}
|
||||
|
||||
private static INodesInPath addSingleDirectory(FSDirectory fsd,
|
||||
INodesInPath existing, INode dir, PermissionStatus perm)
|
||||
throws IOException {
|
||||
assert fsd.hasWriteLock();
|
||||
INodesInPath iip = fsd.addLastINode(existing, dir, perm.getPermission(), true);
|
||||
if (iip == null) {
|
||||
FSNamesystem.LOG.debug("somebody already created {} on path {}", dir, existing.getPath());
|
||||
final INodeDirectory parent = existing.getLastINode().asDirectory();
|
||||
dir = parent.getChild(dir.getLocalNameBytes(), Snapshot.CURRENT_STATE_ID);
|
||||
return INodesInPath.append(existing, dir, dir.getLocalNameBytes());
|
||||
}
|
||||
existing = iip;
|
||||
assert dir.equals(existing.getLastINode()) : "dir is not the last INode";
|
||||
|
||||
// Directory creation also count towards FilesCreated
|
||||
// to match count of FilesDeleted metric.
|
||||
NameNode.getNameNodeMetrics().incrFilesCreated();
|
||||
|
||||
assert dir.getPermissionStatus().getGroupName() != null :
|
||||
"GroupName is null for " + existing.getPath();
|
||||
String cur = existing.getPath();
|
||||
fsd.getEditLog().logMkDir(cur, dir);
|
||||
NameNode.stateChangeLog.debug("mkdirs: created directory {}", cur);
|
||||
return existing;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -228,6 +228,13 @@ class FSDirWriteFileOp {
|
|||
// while chooseTarget() was executing.
|
||||
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
||||
INodesInPath iip = fsn.dir.resolvePath(null, src, fileId);
|
||||
|
||||
INode[] missing = new INode[]{iip.getLastINode()};
|
||||
INodesInPath existing = iip.getParentINodesInPath();
|
||||
FSDirectory fsd = fsn.getFSDirectory();
|
||||
// switch the locks
|
||||
fsd.getINodeMap().latchWriteLock(existing, missing);
|
||||
|
||||
FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
|
||||
previous, onRetryBlock);
|
||||
final INodeFile pendingFile = fileState.inode;
|
||||
|
@ -392,8 +399,8 @@ class FSDirWriteFileOp {
|
|||
}
|
||||
fsn.checkFsObjectLimit();
|
||||
INodeFile newNode = null;
|
||||
INodesInPath parent =
|
||||
FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions);
|
||||
INodesInPath parent = FSDirMkdirOp.createMissingDirs(fsd,
|
||||
iip.getParentINodesInPath(), permissions, true);
|
||||
if (parent != null) {
|
||||
iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
|
||||
replication, blockSize, holder, clientMachine, shouldReplicate,
|
||||
|
@ -541,41 +548,22 @@ class FSDirWriteFileOp {
|
|||
FSDirectory fsd, INodesInPath existing, byte[] localName,
|
||||
PermissionStatus permissions, short replication, long preferredBlockSize,
|
||||
String clientName, String clientMachine, boolean shouldReplicate,
|
||||
String ecPolicyName, String storagePolicy) throws IOException {
|
||||
String ecPolicyName, String storagePolicy)
|
||||
throws IOException {
|
||||
|
||||
Preconditions.checkNotNull(existing);
|
||||
long modTime = now();
|
||||
INodesInPath newiip;
|
||||
fsd.writeLock();
|
||||
try {
|
||||
boolean isStriped = false;
|
||||
ErasureCodingPolicy ecPolicy = null;
|
||||
byte storagepolicyid = 0;
|
||||
if (storagePolicy != null && !storagePolicy.isEmpty()) {
|
||||
BlockStoragePolicy policy =
|
||||
fsd.getBlockManager().getStoragePolicy(storagePolicy);
|
||||
if (policy == null) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Cannot find a block policy with the name " + storagePolicy);
|
||||
}
|
||||
storagepolicyid = policy.getId();
|
||||
}
|
||||
if (!shouldReplicate) {
|
||||
ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
|
||||
fsd.getFSNamesystem(), ecPolicyName, existing);
|
||||
if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {
|
||||
isStriped = true;
|
||||
}
|
||||
}
|
||||
final BlockType blockType = isStriped ?
|
||||
BlockType.STRIPED : BlockType.CONTIGUOUS;
|
||||
final Short replicationFactor = (!isStriped ? replication : null);
|
||||
final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
|
||||
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
|
||||
modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize,
|
||||
storagepolicyid, blockType);
|
||||
newNode.setLocalName(localName);
|
||||
newNode.toUnderConstruction(clientName, clientMachine);
|
||||
INodeFile newNode = createINodeFile(fsd, existing, localName,
|
||||
permissions, replication, preferredBlockSize, clientName,
|
||||
clientMachine, shouldReplicate, ecPolicyName, storagePolicy, modTime);
|
||||
|
||||
INode[] missing = new INode[] {newNode};
|
||||
// switch the locks
|
||||
fsd.getINodeMap().latchWriteLock(existing, missing);
|
||||
|
||||
newiip = fsd.addINode(existing, newNode, permissions.getPermission());
|
||||
} finally {
|
||||
fsd.writeUnlock();
|
||||
|
@ -593,6 +581,42 @@ class FSDirWriteFileOp {
|
|||
return newiip;
|
||||
}
|
||||
|
||||
private static INodeFile createINodeFile(FSDirectory fsd,
|
||||
INodesInPath existing, byte[] localName, PermissionStatus permissions,
|
||||
short replication, long preferredBlockSize, String clientName,
|
||||
String clientMachine, boolean shouldReplicate, String ecPolicyName,
|
||||
String storagePolicy, long modTime) throws IOException {
|
||||
boolean isStriped = false;
|
||||
ErasureCodingPolicy ecPolicy = null;
|
||||
byte storagepolicyid = 0;
|
||||
if (storagePolicy != null && !storagePolicy.isEmpty()) {
|
||||
BlockStoragePolicy policy =
|
||||
fsd.getBlockManager().getStoragePolicy(storagePolicy);
|
||||
if (policy == null) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Cannot find a block policy with the name " + storagePolicy);
|
||||
}
|
||||
storagepolicyid = policy.getId();
|
||||
}
|
||||
if (!shouldReplicate) {
|
||||
ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
|
||||
fsd.getFSNamesystem(), ecPolicyName, existing);
|
||||
if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {
|
||||
isStriped = true;
|
||||
}
|
||||
}
|
||||
final BlockType blockType = isStriped ?
|
||||
BlockType.STRIPED : BlockType.CONTIGUOUS;
|
||||
final Short replicationFactor = (!isStriped ? replication : null);
|
||||
final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
|
||||
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
|
||||
modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize,
|
||||
storagepolicyid, blockType);
|
||||
newNode.setLocalName(localName);
|
||||
newNode.toUnderConstruction(clientName, clientMachine);
|
||||
return newNode;
|
||||
}
|
||||
|
||||
private static FileState analyzeFileState(
|
||||
FSNamesystem fsn, INodesInPath iip, long fileId, String clientName,
|
||||
ExtendedBlock previous, LocatedBlock[] onRetryBlock)
|
||||
|
@ -687,6 +711,14 @@ class FSDirWriteFileOp {
|
|||
}
|
||||
checkBlock(fsn, last);
|
||||
INodesInPath iip = fsn.dir.resolvePath(pc, src, fileId);
|
||||
|
||||
assert (iip.getLastINode() instanceof INodeFile);
|
||||
INode[] missing = new INode[] {iip.getLastINode()};
|
||||
INodesInPath existing = iip.getParentINodesInPath();
|
||||
// switch the locks
|
||||
FSDirectory fsd = fsn.getFSDirectory();
|
||||
fsd.getINodeMap().latchWriteLock(existing, missing);
|
||||
|
||||
return completeFileInternal(fsn, iip, holder,
|
||||
ExtendedBlock.getLocalBlock(last), fileId);
|
||||
}
|
||||
|
|
|
@ -17,7 +17,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||
import org.apache.hadoop.util.GSet;
|
||||
import org.apache.hadoop.util.LightWeightGSet;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -160,6 +165,8 @@ public class FSDirectory implements Closeable {
|
|||
private final int contentCountLimit; // max content summary counts per run
|
||||
private final long contentSleepMicroSec;
|
||||
private final INodeMap inodeMap; // Synchronized by dirLock
|
||||
// Temp InodeMap used when loading an FS image.
|
||||
private HashMap<Long, INodeWithAdditionalFields> tempInodeMap;
|
||||
private long yieldCount = 0; // keep track of lock yield count.
|
||||
private int quotaInitThreads;
|
||||
|
||||
|
@ -317,7 +324,12 @@ public class FSDirectory implements Closeable {
|
|||
FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
|
||||
this.inodeId = new INodeId();
|
||||
rootDir = createRoot(ns);
|
||||
inodeMap = INodeMap.newInstance(rootDir);
|
||||
inodeMap = INodeMap.newInstance(rootDir, ns);
|
||||
tempInodeMap = new HashMap<>(1000);
|
||||
|
||||
// add rootDir to inodeMapTemp.
|
||||
tempInodeMap.put(rootDir.getId(), rootDir);
|
||||
|
||||
this.isPermissionEnabled = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
|
||||
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
|
||||
|
@ -1476,6 +1488,23 @@ public class FSDirectory implements Closeable {
|
|||
return inodeMap;
|
||||
}
|
||||
|
||||
final void addToTempInodeMap(INode inode) {
|
||||
if (inode instanceof INodeWithAdditionalFields) {
|
||||
LOG.debug("addToTempInodeMap: id={}, inodeMapTemp.size={}",
|
||||
inode.getId(), tempInodeMap.size());
|
||||
tempInodeMap.put(inode.getId(), (INodeWithAdditionalFields) inode);
|
||||
if (!inode.isSymlink()) {
|
||||
final XAttrFeature xaf = inode.getXAttrFeature();
|
||||
addEncryptionZone((INodeWithAdditionalFields) inode, xaf);
|
||||
StoragePolicySatisfyManager spsManager =
|
||||
namesystem.getBlockManager().getSPSManager();
|
||||
if (spsManager != null && spsManager.isEnabled()) {
|
||||
addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is always called with writeLock of FSDirectory held.
|
||||
*/
|
||||
|
@ -1543,6 +1572,36 @@ public class FSDirectory implements Closeable {
|
|||
addEncryptionZone(rootDir, xaf);
|
||||
}
|
||||
|
||||
/**
|
||||
* After the inodes are set properly (set the parent for each inode), we move
|
||||
* them from INodeMapTemp to INodeMap.
|
||||
*/
|
||||
void moveInodes() throws IOException {
|
||||
long count=0, totalInodes = tempInodeMap.size();
|
||||
LOG.debug("inodeMapTemp={}", tempInodeMap);
|
||||
|
||||
for (Map.Entry e: tempInodeMap.entrySet()) {
|
||||
INodeWithAdditionalFields n = (INodeWithAdditionalFields)e.getValue();
|
||||
|
||||
LOG.debug("populate {}-th inode: id={}, fullpath={}",
|
||||
count, n.getId(), n.getFullPathName());
|
||||
|
||||
inodeMap.put(n);
|
||||
count++;
|
||||
}
|
||||
|
||||
if (count != totalInodes) {
|
||||
String msg = String.format("moveInodes: expected to move %l inodes, " +
|
||||
"but moved %l inodes", totalInodes, count);
|
||||
throw new IOException(msg);
|
||||
}
|
||||
|
||||
//inodeMap.show();
|
||||
tempInodeMap.clear();
|
||||
assert(tempInodeMap.isEmpty());
|
||||
tempInodeMap = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is always called with writeLock of FSDirectory held.
|
||||
*/
|
||||
|
@ -1860,6 +1919,17 @@ public class FSDirectory implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
public INode getInode(INode inode) {
|
||||
return inodeMap.get(inode);
|
||||
}
|
||||
public INode getInodeFromTempINodeMap(long id) {
|
||||
LOG.debug("getInodeFromTempINodeMap: id={}, TempINodeMap.size={}",
|
||||
id, tempInodeMap.size());
|
||||
if (id < INodeId.ROOT_INODE_ID)
|
||||
return null;
|
||||
|
||||
return tempInodeMap.get(id);
|
||||
}
|
||||
@VisibleForTesting
|
||||
FSPermissionChecker getPermissionChecker(String fsOwner, String superGroup,
|
||||
UserGroupInformation ugi) throws AccessControlException {
|
||||
|
|
|
@ -177,18 +177,23 @@ public class FSImage implements Closeable {
|
|||
|
||||
void format(FSNamesystem fsn, String clusterId, boolean force)
|
||||
throws IOException {
|
||||
long fileCount = fsn.getFilesTotal();
|
||||
// Expect 1 file, which is the root inode
|
||||
Preconditions.checkState(fileCount == 1,
|
||||
"FSImage.format should be called with an uninitialized namesystem, has " +
|
||||
fileCount + " files");
|
||||
NamespaceInfo ns = NNStorage.newNamespaceInfo();
|
||||
LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID());
|
||||
ns.clusterID = clusterId;
|
||||
|
||||
storage.format(ns);
|
||||
editLog.formatNonFileJournals(ns, force);
|
||||
saveFSImageInAllDirs(fsn, 0);
|
||||
fsn.readLock();
|
||||
try {
|
||||
long fileCount = fsn.getFilesTotal();
|
||||
// Expect 1 file, which is the root inode
|
||||
Preconditions.checkState(fileCount == 1,
|
||||
"FSImage.format should be called with an uninitialized namesystem, has " +
|
||||
fileCount + " files");
|
||||
NamespaceInfo ns = NNStorage.newNamespaceInfo();
|
||||
LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID());
|
||||
ns.clusterID = clusterId;
|
||||
|
||||
storage.format(ns);
|
||||
editLog.formatNonFileJournals(ns, force);
|
||||
saveFSImageInAllDirs(fsn, 0);
|
||||
} finally {
|
||||
fsn.readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -756,6 +761,16 @@ public class FSImage implements Closeable {
|
|||
"above for more info.");
|
||||
}
|
||||
prog.endPhase(Phase.LOADING_FSIMAGE);
|
||||
|
||||
/*
|
||||
* loadEdits always sets the parent of an inode before adding the inode to
|
||||
* inodeMap. So, it is safe to move inodes from inodeMapTemp to inodeMap
|
||||
* before loadEdits.
|
||||
*/
|
||||
FSDirectory dir = target.getFSDirectory();
|
||||
dir.moveInodes();
|
||||
LOG.info("LOADING_FSIMAGE: loaded {} inodes into inodeMap",
|
||||
dir.getINodeMap().size());
|
||||
|
||||
if (!rollingRollback) {
|
||||
prog.beginPhase(Phase.LOADING_EDITS);
|
||||
|
@ -771,6 +786,8 @@ public class FSImage implements Closeable {
|
|||
needToSave = false;
|
||||
}
|
||||
editLog.setNextTxId(lastAppliedTxId + 1);
|
||||
LOG.info("LOADING_EDITS: loaded {} inodes into inodeMap",
|
||||
dir.getINodeMap().size());
|
||||
return needToSave;
|
||||
}
|
||||
|
||||
|
|
|
@ -276,9 +276,10 @@ public final class FSImageFormatPBINode {
|
|||
if (e == null) {
|
||||
break;
|
||||
}
|
||||
INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
|
||||
INodeDirectory p =
|
||||
dir.getInodeFromTempINodeMap(e.getParent()).asDirectory();
|
||||
for (long id : e.getChildrenList()) {
|
||||
INode child = dir.getInode(id);
|
||||
INode child = dir.getInodeFromTempINodeMap(id);
|
||||
if (!addToParent(p, child)) {
|
||||
LOG.warn("Failed to add the inode {} to the directory {}",
|
||||
child.getId(), p.getId());
|
||||
|
@ -382,6 +383,7 @@ public final class FSImageFormatPBINode {
|
|||
if (p == null) {
|
||||
break;
|
||||
}
|
||||
LOG.debug("loadINodesInSection: cntr={}, inode={}", cntr, p.getId());
|
||||
if (p.getId() == INodeId.ROOT_INODE_ID) {
|
||||
synchronized(this) {
|
||||
loadRootINode(p);
|
||||
|
@ -389,7 +391,7 @@ public final class FSImageFormatPBINode {
|
|||
} else {
|
||||
INode n = loadINode(p);
|
||||
synchronized(this) {
|
||||
dir.addToInodeMap(n);
|
||||
dir.addToTempInodeMap(n);
|
||||
}
|
||||
fillUpInodeList(inodeList, n);
|
||||
}
|
||||
|
@ -761,7 +763,7 @@ public final class FSImageFormatPBINode {
|
|||
DirEntry.newBuilder().setParent(n.getId());
|
||||
for (INode inode : children) {
|
||||
// Error if the child inode doesn't exist in inodeMap
|
||||
if (dir.getInode(inode.getId()) == null) {
|
||||
if (dir.getInode(inode) == null) {
|
||||
FSImage.LOG.error(
|
||||
"FSImageFormatPBINode#serializeINodeDirectorySection: " +
|
||||
"Dangling child pointer found. Missing INode in " +
|
||||
|
@ -812,6 +814,7 @@ public final class FSImageFormatPBINode {
|
|||
Iterator<INodeWithAdditionalFields> iter = inodesMap.getMapIterator();
|
||||
while (iter.hasNext()) {
|
||||
INodeWithAdditionalFields n = iter.next();
|
||||
LOG.debug("i = {}, save inode: {}", i, n);
|
||||
save(out, n);
|
||||
++i;
|
||||
if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
|
||||
|
|
|
@ -1753,7 +1753,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
|
||||
public void readUnlock(String opName,
|
||||
Supplier<String> lockReportInfoSupplier) {
|
||||
this.fsLock.readUnlock(opName, lockReportInfoSupplier);
|
||||
this.fsLock.readUnlock(opName, lockReportInfoSupplier, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1786,7 +1786,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
|
||||
@Override
|
||||
public boolean hasWriteLock() {
|
||||
return this.fsLock.isWriteLockedByCurrentThread();
|
||||
return this.fsLock.isWriteLockedByCurrentThread() ||
|
||||
fsLock.hasWriteChildLock();
|
||||
}
|
||||
@Override
|
||||
public boolean hasReadLock() {
|
||||
|
@ -1801,6 +1802,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
return this.fsLock.getWriteHoldCount();
|
||||
}
|
||||
|
||||
public FSNamesystemLock getFSLock() {
|
||||
return this.fsLock;
|
||||
}
|
||||
|
||||
/** Lock the checkpoint lock */
|
||||
public void cpLock() {
|
||||
this.cpLock.lock();
|
||||
|
|
|
@ -18,6 +18,9 @@
|
|||
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -29,6 +32,7 @@ import java.util.function.Supplier;
|
|||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeMap.INodeMapLock;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.log.LogThrottlingHelper;
|
||||
import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
|
||||
|
@ -129,6 +133,32 @@ class FSNamesystemLock {
|
|||
|
||||
private static final String OVERALL_METRIC_NAME = "Overall";
|
||||
|
||||
private final ThreadLocal<Collection<INodeMapLock>> partitionLocks =
|
||||
new ThreadLocal<Collection<INodeMapLock>>() {
|
||||
@Override
|
||||
public Collection<INodeMapLock> initialValue() {
|
||||
return new ArrayList<INodeMapLock>();
|
||||
}
|
||||
};
|
||||
|
||||
void addChildLock(INodeMapLock lock) {
|
||||
partitionLocks.get().add(lock);
|
||||
}
|
||||
|
||||
boolean removeChildLock(INodeMapLock lock) {
|
||||
return partitionLocks.get().remove(lock);
|
||||
}
|
||||
|
||||
boolean hasWriteChildLock() {
|
||||
Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
|
||||
// FSNamesystem.LOG.debug("partitionLocks.size = {}", partitionLocks.get().size());
|
||||
while(iter.hasNext()) {
|
||||
if(iter.next().hasWriteChildLock())
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
FSNamesystemLock(Configuration conf,
|
||||
MutableRatesWithAggregation detailedHoldTimeMetrics) {
|
||||
this(conf, detailedHoldTimeMetrics, new Timer());
|
||||
|
@ -180,11 +210,29 @@ class FSNamesystemLock {
|
|||
|
||||
public void readUnlock(String opName,
|
||||
Supplier<String> lockReportInfoSupplier) {
|
||||
readUnlock(opName, lockReportInfoSupplier, true);
|
||||
}
|
||||
|
||||
public void readUnlock(String opName,
|
||||
Supplier<String> lockReportInfoSupplier,
|
||||
boolean unlockChildren) {
|
||||
final boolean needReport = coarseLock.getReadHoldCount() == 1;
|
||||
final long readLockIntervalNanos =
|
||||
timer.monotonicNowNanos() - readLockHeldTimeStampNanos.get();
|
||||
final long currentTimeMs = timer.now();
|
||||
coarseLock.readLock().unlock();
|
||||
|
||||
if(getReadHoldCount() > 0) { // Current thread holds the lock
|
||||
// Unlock the top FSNamesystemLock
|
||||
coarseLock.readLock().unlock();
|
||||
}
|
||||
|
||||
if(unlockChildren) { // Also unlock and remove children locks
|
||||
Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
|
||||
while(iter.hasNext()) {
|
||||
iter.next().readChildUnlock();
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
|
||||
if (needReport) {
|
||||
addMetric(opName, readLockIntervalNanos, false);
|
||||
|
@ -252,7 +300,7 @@ class FSNamesystemLock {
|
|||
* FSNamesystemLock#writeUnlock(String, boolean, Supplier)}
|
||||
*/
|
||||
public void writeUnlock() {
|
||||
writeUnlock(OP_NAME_OTHER, false, null);
|
||||
writeUnlock(OP_NAME_OTHER, false, null, true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -262,7 +310,7 @@ class FSNamesystemLock {
|
|||
* @param opName Operation name.
|
||||
*/
|
||||
public void writeUnlock(String opName) {
|
||||
writeUnlock(opName, false, null);
|
||||
writeUnlock(opName, false, null, true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -274,7 +322,7 @@ class FSNamesystemLock {
|
|||
*/
|
||||
public void writeUnlock(String opName,
|
||||
Supplier<String> lockReportInfoSupplier) {
|
||||
writeUnlock(opName, false, lockReportInfoSupplier);
|
||||
writeUnlock(opName, false, lockReportInfoSupplier, true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -286,7 +334,7 @@ class FSNamesystemLock {
|
|||
* for long time will be logged in logs and metrics.
|
||||
*/
|
||||
public void writeUnlock(String opName, boolean suppressWriteLockReport) {
|
||||
writeUnlock(opName, suppressWriteLockReport, null);
|
||||
writeUnlock(opName, suppressWriteLockReport, null, true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -297,8 +345,9 @@ class FSNamesystemLock {
|
|||
* for long time will be logged in logs and metrics.
|
||||
* @param lockReportInfoSupplier The info shown in the lock report
|
||||
*/
|
||||
private void writeUnlock(String opName, boolean suppressWriteLockReport,
|
||||
Supplier<String> lockReportInfoSupplier) {
|
||||
public void writeUnlock(String opName, boolean suppressWriteLockReport,
|
||||
Supplier<String> lockReportInfoSupplier,
|
||||
boolean unlockChildren) {
|
||||
final boolean needReport = !suppressWriteLockReport && coarseLock
|
||||
.getWriteHoldCount() == 1 && coarseLock.isWriteLockedByCurrentThread();
|
||||
final long writeLockIntervalNanos =
|
||||
|
@ -329,7 +378,18 @@ class FSNamesystemLock {
|
|||
longestWriteLockHeldInfo = new LockHeldInfo();
|
||||
}
|
||||
|
||||
coarseLock.writeLock().unlock();
|
||||
if(this.isWriteLockedByCurrentThread()) { // Current thread holds the lock
|
||||
// Unlock the top FSNamesystemLock
|
||||
coarseLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
if(unlockChildren) { // Unlock and remove children locks
|
||||
Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
|
||||
while(iter.hasNext()) {
|
||||
iter.next().writeChildUnlock();
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
|
||||
if (needReport) {
|
||||
addMetric(opName, writeLockIntervalNanos, true);
|
||||
|
@ -355,7 +415,25 @@ class FSNamesystemLock {
|
|||
public int getWriteHoldCount() {
|
||||
return coarseLock.getWriteHoldCount();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Queries if the write lock is held by any thread.
|
||||
* @return {@code true} if any thread holds the write lock and
|
||||
* {@code false} otherwise
|
||||
*/
|
||||
public boolean isReadLocked() {
|
||||
return coarseLock.getReadLockCount() > 0 || isWriteLocked();
|
||||
}
|
||||
|
||||
/**
|
||||
* Queries if the write lock is held by any thread.
|
||||
* @return {@code true} if any thread holds the write lock and
|
||||
* {@code false} otherwise
|
||||
*/
|
||||
public boolean isWriteLocked() {
|
||||
return coarseLock.isWriteLocked();
|
||||
}
|
||||
|
||||
public boolean isWriteLockedByCurrentThread() {
|
||||
return coarseLock.isWriteLockedByCurrentThread();
|
||||
}
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
|
|||
import java.io.PrintStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -577,6 +578,43 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
|
|||
return name == null? null: DFSUtil.bytes2String(name);
|
||||
}
|
||||
|
||||
private long[] namespaceKey;
|
||||
|
||||
/**
|
||||
* Key of an INode.
|
||||
* Defines partitioning of INodes in the INodeMap.
|
||||
*
|
||||
* @param level how many levels to be included in the key
|
||||
* @return
|
||||
*/
|
||||
public long[] getNamespaceKey(int level) {
|
||||
if(namespaceKey == null) { // generate the namespace key
|
||||
long[] buf = new long[level];
|
||||
INode cur = this;
|
||||
for(int l = 0; l < level; l++) {
|
||||
long curId = (cur == null) ? 0L : cur.getId();
|
||||
buf[level - l - 1] = curId;
|
||||
cur = (cur == null) ? null : cur.parent;
|
||||
}
|
||||
buf[0] = indexOf(buf);
|
||||
namespaceKey = buf;
|
||||
}
|
||||
return namespaceKey;
|
||||
}
|
||||
|
||||
private final static long LARGE_PRIME = 512927357;
|
||||
public static long indexOf(long[] key) {
|
||||
if(key[key.length-1] == INodeId.ROOT_INODE_ID) {
|
||||
return key[0];
|
||||
}
|
||||
long idx = LARGE_PRIME * key[0];
|
||||
idx = (idx ^ (idx >> 32)) & (INodeMap.NUM_RANGES_STATIC -1);
|
||||
return idx;
|
||||
}
|
||||
|
||||
/**
|
||||
* Key of a snapshot Diff Element
|
||||
*/
|
||||
@Override
|
||||
public final byte[] getKey() {
|
||||
return getLocalNameBytes();
|
||||
|
@ -636,7 +674,7 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getLocalName();
|
||||
return getLocalName() + ": " + Arrays.toString(namespaceKey);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
|
|
@ -17,44 +17,202 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.util.GSet;
|
||||
import org.apache.hadoop.util.LatchLock;
|
||||
import org.apache.hadoop.util.LightWeightGSet;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.util.PartitionedGSet;
|
||||
|
||||
/**
|
||||
* Storing all the {@link INode}s and maintaining the mapping between INode ID
|
||||
* and INode.
|
||||
*/
|
||||
public class INodeMap {
|
||||
|
||||
static INodeMap newInstance(INodeDirectory rootDir) {
|
||||
// Compute the map capacity by allocating 1% of total memory
|
||||
int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
|
||||
GSet<INode, INodeWithAdditionalFields> map =
|
||||
new LightWeightGSet<>(capacity);
|
||||
map.put(rootDir);
|
||||
return new INodeMap(map);
|
||||
static final int NAMESPACE_KEY_DEPTH = 2;
|
||||
static final int NUM_RANGES_STATIC = 256; // power of 2
|
||||
|
||||
public static class INodeKeyComparator implements Comparator<INode> {
|
||||
INodeKeyComparator() {
|
||||
FSDirectory.LOG.info("Namespace key depth = {}", NAMESPACE_KEY_DEPTH);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compare(INode i1, INode i2) {
|
||||
if (i1 == null || i2 == null) {
|
||||
throw new NullPointerException("Cannot compare null INodes");
|
||||
}
|
||||
long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEPTH);
|
||||
long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEPTH);
|
||||
for(int l = 0; l < NAMESPACE_KEY_DEPTH; l++) {
|
||||
if(key1[l] == key2[l]) continue;
|
||||
return (key1[l] < key2[l] ? -1 : 1);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INodeKeyComparator with Hashed Parent
|
||||
*
|
||||
*/
|
||||
public static class HPINodeKeyComparator implements Comparator<INode> {
|
||||
HPINodeKeyComparator() {
|
||||
FSDirectory.LOG.info("Namespace key depth = {}", NAMESPACE_KEY_DEPTH);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compare(INode i1, INode i2) {
|
||||
if (i1 == null || i2 == null) {
|
||||
throw new NullPointerException("Cannot compare null INodes");
|
||||
}
|
||||
long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEPTH);
|
||||
long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEPTH);
|
||||
long key1_0 = INode.indexOf(key1);
|
||||
long key2_0 = INode.indexOf(key2);
|
||||
if(key1_0 != key2_0)
|
||||
return (key1_0 < key2_0 ? -1 : 1);
|
||||
for(int l = 1; l < NAMESPACE_KEY_DEPTH; l++) {
|
||||
if(key1[l] == key2[l]) continue;
|
||||
return (key1[l] < key2[l] ? -1 : 1);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
public static class INodeIdComparator implements Comparator<INode> {
|
||||
@Override
|
||||
public int compare(INode i1, INode i2) {
|
||||
if (i1 == null || i2 == null) {
|
||||
throw new NullPointerException("Cannot compare null INodesl");
|
||||
}
|
||||
long id1 = i1.getId();
|
||||
long id2 = i2.getId();
|
||||
return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
public class INodeMapLock extends LatchLock<ReentrantReadWriteLock> {
|
||||
private ReentrantReadWriteLock childLock;
|
||||
|
||||
INodeMapLock() {
|
||||
this(null);
|
||||
}
|
||||
|
||||
private INodeMapLock(ReentrantReadWriteLock childLock) {
|
||||
assert namesystem != null : "namesystem is null";
|
||||
this.childLock = childLock;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isReadTopLocked() {
|
||||
return namesystem.getFSLock().isReadLocked();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isWriteTopLocked() {
|
||||
return namesystem.getFSLock().isWriteLocked();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void readTopUnlock() {
|
||||
namesystem.getFSLock().readUnlock("INodeMap", null, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeTopUnlock() {
|
||||
namesystem.getFSLock().writeUnlock("INodeMap", false, null, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean hasReadChildLock() {
|
||||
return this.childLock.getReadHoldCount() > 0 || hasWriteChildLock();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void readChildLock() {
|
||||
// LOG.info("readChildLock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
|
||||
this.childLock.readLock().lock();
|
||||
namesystem.getFSLock().addChildLock(this);
|
||||
// LOG.info("readChildLock: done");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void readChildUnlock() {
|
||||
// LOG.info("readChildUnlock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
|
||||
this.childLock.readLock().unlock();
|
||||
// LOG.info("readChildUnlock: done");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean hasWriteChildLock() {
|
||||
return this.childLock.isWriteLockedByCurrentThread() || namesystem
|
||||
.getFSLock().hasWriteChildLock();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeChildLock() {
|
||||
// LOG.info("writeChildLock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
|
||||
this.childLock.writeLock().lock();
|
||||
namesystem.getFSLock().addChildLock(this);
|
||||
// LOG.info("writeChildLock: done");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeChildUnlock() {
|
||||
// LOG.info("writeChildUnlock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
|
||||
this.childLock.writeLock().unlock();
|
||||
// LOG.info("writeChildUnlock: done");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected LatchLock<ReentrantReadWriteLock> clone() {
|
||||
return new INodeMapLock(new ReentrantReadWriteLock(false)); // not fair
|
||||
}
|
||||
}
|
||||
|
||||
static INodeMap newInstance(INodeDirectory rootDir,
|
||||
FSNamesystem ns) {
|
||||
return new INodeMap(rootDir, ns);
|
||||
}
|
||||
|
||||
/** Synchronized by external lock. */
|
||||
private final GSet<INode, INodeWithAdditionalFields> map;
|
||||
|
||||
private FSNamesystem namesystem;
|
||||
|
||||
public Iterator<INodeWithAdditionalFields> getMapIterator() {
|
||||
return map.iterator();
|
||||
}
|
||||
|
||||
private INodeMap(GSet<INode, INodeWithAdditionalFields> map) {
|
||||
Preconditions.checkArgument(map != null);
|
||||
this.map = map;
|
||||
private INodeMap(INodeDirectory rootDir, FSNamesystem ns) {
|
||||
this.namesystem = ns;
|
||||
// Compute the map capacity by allocating 1% of total memory
|
||||
int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
|
||||
this.map = new PartitionedGSet<>(capacity, new INodeKeyComparator(),
|
||||
new INodeMapLock());
|
||||
|
||||
// Pre-populate initial empty partitions
|
||||
PartitionedGSet<INode, INodeWithAdditionalFields> pgs =
|
||||
(PartitionedGSet<INode, INodeWithAdditionalFields>) map;
|
||||
PermissionStatus perm = new PermissionStatus(
|
||||
"", "", new FsPermission((short) 0));
|
||||
for(int p = 0; p < NUM_RANGES_STATIC; p++) {
|
||||
INodeDirectory key = new INodeDirectory(INodeId.ROOT_INODE_ID,
|
||||
"range key".getBytes(StandardCharsets.UTF_8), perm, 0);
|
||||
key.setParent(new INodeDirectory((long)p, null, perm, 0));
|
||||
pgs.addNewPartition(key);
|
||||
}
|
||||
|
||||
map.put(rootDir);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Add an {@link INode} into the {@link INode} map. Replace the old value if
|
||||
* necessary.
|
||||
|
@ -88,48 +246,58 @@ public class INodeMap {
|
|||
* such {@link INode} in the map.
|
||||
*/
|
||||
public INode get(long id) {
|
||||
INode inode = new INodeWithAdditionalFields(id, null, new PermissionStatus(
|
||||
"", "", new FsPermission((short) 0)), 0, 0) {
|
||||
|
||||
@Override
|
||||
void recordModification(int latestSnapshotId) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroyAndCollectBlocks(ReclaimContext reclaimContext) {
|
||||
// Nothing to do
|
||||
}
|
||||
PartitionedGSet<INode, INodeWithAdditionalFields> pgs =
|
||||
(PartitionedGSet<INode, INodeWithAdditionalFields>) map;
|
||||
/*
|
||||
* Convert a long inode id into an INode object. We only need to compare
|
||||
* two inodes by inode id. So, it can be any type of INode object.
|
||||
*/
|
||||
INode inode = new INodeDirectory(id, null,
|
||||
new PermissionStatus("", "", new FsPermission((short) 0)), 0);
|
||||
|
||||
@Override
|
||||
public QuotaCounts computeQuotaUsage(
|
||||
BlockStoragePolicySuite bsps, byte blockStoragePolicyId,
|
||||
boolean useCache, int lastSnapshotId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContentSummaryComputationContext computeContentSummary(
|
||||
int snapshotId, ContentSummaryComputationContext summary) {
|
||||
return null;
|
||||
}
|
||||
/*
|
||||
* Iterate all partitions of PGSet and return the INode.
|
||||
* Just for fallback.
|
||||
*/
|
||||
PermissionStatus perm =
|
||||
new PermissionStatus("", "", new FsPermission((short) 0));
|
||||
// TODO: create a static array, to avoid creation of keys each time.
|
||||
for (int p = 0; p < NUM_RANGES_STATIC; p++) {
|
||||
INodeDirectory key = new INodeDirectory(INodeId.ROOT_INODE_ID,
|
||||
"range key".getBytes(StandardCharsets.UTF_8), perm, 0);
|
||||
key.setParent(new INodeDirectory((long)p, null, perm, 0));
|
||||
PartitionedGSet.PartitionEntry e = pgs.getPartition(key);
|
||||
|
||||
@Override
|
||||
public void cleanSubtree(
|
||||
ReclaimContext reclaimContext, int snapshotId, int priorSnapshotId) {
|
||||
if (e.contains(inode)) {
|
||||
return (INode) e.get(inode);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getStoragePolicyID(){
|
||||
return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getLocalStoragePolicyID() {
|
||||
return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
|
||||
}
|
||||
};
|
||||
|
||||
return map.get(inode);
|
||||
public INode get(INode inode) {
|
||||
|
||||
/*
|
||||
* Check whether the Inode has (NAMESPACE_KEY_DEPTH - 1) levels of parent
|
||||
* dirs
|
||||
*/
|
||||
int i = NAMESPACE_KEY_DEPTH - 1;
|
||||
INode tmpInode = inode;
|
||||
while (i > 0 && tmpInode.getParent() != null) {
|
||||
tmpInode = tmpInode.getParent();
|
||||
i--;
|
||||
}
|
||||
|
||||
/*
|
||||
* If the Inode has (NAMESPACE_KEY_DEPTH - 1) levels of parent dirs,
|
||||
* use map.get(); else, fall back to get INode based on Inode ID.
|
||||
*/
|
||||
if (i == 0) {
|
||||
return map.get(inode);
|
||||
} else {
|
||||
return get(inode.getId());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -138,4 +306,27 @@ public class INodeMap {
|
|||
public void clear() {
|
||||
map.clear();
|
||||
}
|
||||
|
||||
public void latchWriteLock(INodesInPath iip, INode[] missing) {
|
||||
assert namesystem.hasReadLock() : "must have namesysem lock";
|
||||
assert iip.length() > 0 : "INodesInPath has 0 length";
|
||||
if(!(map instanceof PartitionedGSet)) {
|
||||
return;
|
||||
}
|
||||
// Locks partitions along the path starting from the first existing parent
|
||||
// Locking is in the hierarchical order
|
||||
INode[] allINodes = new INode[Math.min(1, iip.length()) + missing.length];
|
||||
allINodes[0] = iip.getLastINode();
|
||||
System.arraycopy(missing, 0, allINodes, 1, missing.length);
|
||||
/*
|
||||
// Locks all the partitions along the path in the hierarchical order
|
||||
INode[] allINodes = new INode[iip.length() + missing.length];
|
||||
INode[] existing = iip.getINodesArray();
|
||||
System.arraycopy(existing, 0, allINodes, 0, existing.length);
|
||||
System.arraycopy(missing, 0, allINodes, existing.length, missing.length);
|
||||
*/
|
||||
|
||||
((PartitionedGSet<INode, INodeWithAdditionalFields>)
|
||||
map).latchWriteLock(allINodes);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -186,6 +186,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.Whitebox;
|
||||
import org.apache.hadoop.util.GSet;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
|
@ -1997,6 +1998,7 @@ public class DFSTestUtil {
|
|||
GenericTestUtils.setLogLevel(NameNode.LOG, level);
|
||||
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
|
||||
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
|
||||
GenericTestUtils.setLogLevel(GSet.LOG, level);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -25,6 +25,8 @@ import java.io.IOException;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.InvalidPathException;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
@ -152,4 +154,55 @@ public class TestDFSMkdirs {
|
|||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMkDirsWithRestart() throws IOException {
|
||||
MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
try {
|
||||
Path dir1 = new Path("/mkdir-1");
|
||||
Path file1 = new Path(dir1, "file1");
|
||||
Path deleteDir = new Path("/deleteDir");
|
||||
Path deleteFile = new Path(dir1, "deleteFile");
|
||||
// Create a dir in root dir, should succeed
|
||||
assertTrue(dfs.mkdir(dir1, FsPermission.getDefault()));
|
||||
dfs.mkdir(deleteDir, FsPermission.getDefault());
|
||||
assertTrue(dfs.exists(deleteDir));
|
||||
dfs.delete(deleteDir, true);
|
||||
assertTrue(!dfs.exists(deleteDir));
|
||||
|
||||
DFSTestUtil.writeFile(dfs, file1, "hello world");
|
||||
DFSTestUtil.writeFile(dfs, deleteFile, "hello world");
|
||||
int totalFiles = getFileCount(dfs);
|
||||
//Before deletion there are 2 files
|
||||
assertTrue("Incorrect file count", 2 == totalFiles);
|
||||
dfs.delete(deleteFile, false);
|
||||
totalFiles = getFileCount(dfs);
|
||||
//After deletion, left with 1 file
|
||||
assertTrue("Incorrect file count", 1 == totalFiles);
|
||||
|
||||
cluster.restartNameNodes();
|
||||
dfs = cluster.getFileSystem();
|
||||
assertTrue(dfs.exists(dir1));
|
||||
assertTrue(!dfs.exists(deleteDir));
|
||||
assertTrue(dfs.exists(file1));
|
||||
totalFiles = getFileCount(dfs);
|
||||
assertTrue("Incorrect file count", 1 == totalFiles);
|
||||
} finally {
|
||||
dfs.close();
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private int getFileCount(DistributedFileSystem dfs) throws IOException {
|
||||
RemoteIterator<LocatedFileStatus> fileItr =
|
||||
dfs.listFiles(new Path("/"), true);
|
||||
int totalFiles = 0;
|
||||
while (fileItr.hasNext()) {
|
||||
fileItr.next();
|
||||
totalFiles++;
|
||||
}
|
||||
return totalFiles;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1313,6 +1313,10 @@ public class TestFileCreation {
|
|||
fail();
|
||||
} catch(FileNotFoundException e) {
|
||||
FileSystem.LOG.info("Caught Expected FileNotFoundException: ", e);
|
||||
} catch (AssertionError ae) {
|
||||
//FSDirWriteFileOp#completeFile throws AssertError if the given
|
||||
// id/node is not an instance of INodeFile.
|
||||
FileSystem.LOG.info("Caught Expected AssertionError: ", ae);
|
||||
}
|
||||
} finally {
|
||||
IOUtils.closeStream(dfs);
|
||||
|
|
|
@ -1017,23 +1017,38 @@ public class TestINodeFile {
|
|||
|
||||
final Path dir = new Path("/dir");
|
||||
hdfs.mkdirs(dir);
|
||||
INodeDirectory dirNode = getDir(fsdir, dir);
|
||||
INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
|
||||
assertSame(dirNode, dirNodeFromNode);
|
||||
cluster.getNamesystem().readLock();
|
||||
try {
|
||||
INodeDirectory dirNode = getDir(fsdir, dir);
|
||||
INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
|
||||
assertSame(dirNode, dirNodeFromNode);
|
||||
} finally {
|
||||
cluster.getNamesystem().readUnlock();
|
||||
}
|
||||
|
||||
// set quota to dir, which leads to node replacement
|
||||
hdfs.setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
|
||||
dirNode = getDir(fsdir, dir);
|
||||
assertTrue(dirNode.isWithQuota());
|
||||
// the inode in inodeMap should also be replaced
|
||||
dirNodeFromNode = fsdir.getInode(dirNode.getId());
|
||||
assertSame(dirNode, dirNodeFromNode);
|
||||
cluster.getNamesystem().readLock();
|
||||
try {
|
||||
INodeDirectory dirNode = getDir(fsdir, dir);
|
||||
assertTrue(dirNode.isWithQuota());
|
||||
// the inode in inodeMap should also be replaced
|
||||
INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
|
||||
assertSame(dirNode, dirNodeFromNode);
|
||||
} finally {
|
||||
cluster.getNamesystem().readUnlock();
|
||||
}
|
||||
|
||||
hdfs.setQuota(dir, -1, -1);
|
||||
dirNode = getDir(fsdir, dir);
|
||||
// the inode in inodeMap should also be replaced
|
||||
dirNodeFromNode = fsdir.getInode(dirNode.getId());
|
||||
assertSame(dirNode, dirNodeFromNode);
|
||||
cluster.getNamesystem().readLock();
|
||||
try {
|
||||
INodeDirectory dirNode = getDir(fsdir, dir);
|
||||
// the inode in inodeMap should also be replaced
|
||||
INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
|
||||
assertSame(dirNode, dirNodeFromNode);
|
||||
} finally {
|
||||
cluster.getNamesystem().readUnlock();
|
||||
}
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
|
|
Loading…
Reference in New Issue