Compare commits
6 Commits
Author | SHA1 | Date |
---|---|---|
prasad-acit | 892fa48c5a | |
Renukaprasad C | 88484fceb4 | |
Xing Lin | 4610e1d902 | |
Xing Lin | c06cd9ae8c | |
Konstantin V Shvachko | 455e8c0191 | |
Konstantin V Shvachko | a30e9f6663 |
|
@ -0,0 +1,64 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* LatchLock controls two hierarchical Read/Write locks:
|
||||||
|
* the topLock and the childLock.
|
||||||
|
* Typically an operation starts with the topLock already acquired.
|
||||||
|
* To acquire child lock LatchLock will
|
||||||
|
* first acquire the childLock, and then release the topLock.
|
||||||
|
*/
|
||||||
|
public abstract class LatchLock<C> {
|
||||||
|
// Interfaces methods to be defined for subclasses
|
||||||
|
/** @return true topLock is locked for read by any thread */
|
||||||
|
protected abstract boolean isReadTopLocked();
|
||||||
|
/** @return true topLock is locked for write by any thread */
|
||||||
|
protected abstract boolean isWriteTopLocked();
|
||||||
|
protected abstract void readTopUnlock();
|
||||||
|
protected abstract void writeTopUnlock();
|
||||||
|
|
||||||
|
protected abstract boolean hasReadChildLock();
|
||||||
|
protected abstract void readChildLock();
|
||||||
|
protected abstract void readChildUnlock();
|
||||||
|
|
||||||
|
protected abstract boolean hasWriteChildLock();
|
||||||
|
protected abstract void writeChildLock();
|
||||||
|
protected abstract void writeChildUnlock();
|
||||||
|
|
||||||
|
protected abstract LatchLock<C> clone();
|
||||||
|
|
||||||
|
// Public APIs to use with the class
|
||||||
|
public void readLock() {
|
||||||
|
readChildLock();
|
||||||
|
readTopUnlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void readUnlock() {
|
||||||
|
readChildUnlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void writeLock() {
|
||||||
|
writeChildLock();
|
||||||
|
writeTopUnlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void writeUnlock() {
|
||||||
|
writeChildUnlock();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,348 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.NavigableMap;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
import java.util.NoSuchElementException;
|
||||||
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An implementation of {@link GSet}, which splits a collection of elements
|
||||||
|
* into partitions each corresponding to a range of keys.
|
||||||
|
*
|
||||||
|
* This class does not support null element.
|
||||||
|
*
|
||||||
|
* This class is backed up by LatchLock for hierarchical synchronization.
|
||||||
|
*
|
||||||
|
* @param <K> Key type for looking up the elements
|
||||||
|
* @param <E> Element type, which must be
|
||||||
|
* (1) a subclass of K, and
|
||||||
|
* (2) implementing {@link LinkedElement} interface.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
|
||||||
|
|
||||||
|
private static final int DEFAULT_PARTITION_CAPACITY = 65536; // 4096; // 5120; // 2048; // 1027;
|
||||||
|
private static final float DEFAULT_PARTITION_OVERFLOW = 1.8f;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An ordered map of contiguous segments of elements.
|
||||||
|
* Each key in the map represent the smallest key in the mapped segment,
|
||||||
|
* so that all elements in this segment are >= the mapping key,
|
||||||
|
* but are smaller then the next key in the map.
|
||||||
|
* Elements within a partition do not need to be ordered.
|
||||||
|
*/
|
||||||
|
private final NavigableMap<K, PartitionEntry> partitions;
|
||||||
|
private LatchLock<?> latchLock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The number of elements in the set.
|
||||||
|
*/
|
||||||
|
protected volatile int size;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A single partition of the {@link PartitionedGSet}.
|
||||||
|
* Consists of a hash table {@link LightWeightGSet} and a lock, which
|
||||||
|
* controls access to this partition independently on the other ones.
|
||||||
|
*/
|
||||||
|
public class PartitionEntry extends LightWeightGSet<K, E> {
|
||||||
|
private final LatchLock<?> partLock;
|
||||||
|
|
||||||
|
PartitionEntry(int defaultPartitionCapacity) {
|
||||||
|
super(defaultPartitionCapacity);
|
||||||
|
this.partLock = latchLock.clone();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public PartitionedGSet(final int capacity,
|
||||||
|
final Comparator<? super K> comparator,
|
||||||
|
final LatchLock<?> latchLock) {
|
||||||
|
this.partitions = new TreeMap<K, PartitionEntry>(comparator);
|
||||||
|
this.latchLock = latchLock;
|
||||||
|
// addNewPartition(rootKey).put(rootKey);
|
||||||
|
// this.size = 1;
|
||||||
|
this.size = 0;
|
||||||
|
LOG.info("Partition capacity = {}", DEFAULT_PARTITION_CAPACITY);
|
||||||
|
LOG.info("Partition overflow factor = {}", DEFAULT_PARTITION_OVERFLOW);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates new empty partition.
|
||||||
|
* @param key
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public PartitionEntry addNewPartition(final K key) {
|
||||||
|
Entry<K, PartitionEntry> lastEntry = partitions.lastEntry();
|
||||||
|
PartitionEntry lastPart = null;
|
||||||
|
if(lastEntry != null)
|
||||||
|
lastPart = lastEntry.getValue();
|
||||||
|
|
||||||
|
PartitionEntry newPart =
|
||||||
|
new PartitionEntry(DEFAULT_PARTITION_CAPACITY);
|
||||||
|
// assert size == 0 || newPart.partLock.isWriteTopLocked() :
|
||||||
|
// "Must hold write Lock: key = " + key;
|
||||||
|
PartitionEntry oldPart = partitions.put(key, newPart);
|
||||||
|
assert oldPart == null :
|
||||||
|
"RangeMap already has a partition associated with " + key;
|
||||||
|
|
||||||
|
LOG.debug("Total GSet size = {}", size);
|
||||||
|
LOG.debug("Number of partitions = {}", partitions.size());
|
||||||
|
LOG.debug("Previous partition size = {}",
|
||||||
|
lastPart == null ? 0 : lastPart.size());
|
||||||
|
|
||||||
|
return newPart;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int size() {
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
public PartitionEntry getPartition(final K key) {
|
||||||
|
Entry<K, PartitionEntry> partEntry = partitions.floorEntry(key);
|
||||||
|
if(partEntry == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
PartitionEntry part = partEntry.getValue();
|
||||||
|
if(part == null) {
|
||||||
|
throw new IllegalStateException("Null partition for key: " + key);
|
||||||
|
}
|
||||||
|
assert size == 0 || part.partLock.isReadTopLocked() ||
|
||||||
|
part.partLock.hasReadChildLock() : "Must hold read Lock: key = " + key;
|
||||||
|
return part;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean contains(final K key) {
|
||||||
|
PartitionEntry part = getPartition(key);
|
||||||
|
if(part == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return part.contains(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public E get(final K key) {
|
||||||
|
PartitionEntry part = getPartition(key);
|
||||||
|
if(part == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
LOG.debug("get key: {}", key);
|
||||||
|
// part.partLock.readLock();
|
||||||
|
return part.get(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public E put(final E element) {
|
||||||
|
K key = element;
|
||||||
|
PartitionEntry part = getPartition(key);
|
||||||
|
if(part == null) {
|
||||||
|
throw new HadoopIllegalArgumentException("Illegal key: " + key);
|
||||||
|
}
|
||||||
|
assert size == 0 || part.partLock.isWriteTopLocked() ||
|
||||||
|
part.partLock.hasWriteChildLock() :
|
||||||
|
"Must hold write Lock: key = " + key;
|
||||||
|
LOG.debug("put key: {}", key);
|
||||||
|
PartitionEntry newPart = addNewPartitionIfNeeded(part, key);
|
||||||
|
if(newPart != part) {
|
||||||
|
newPart.partLock.writeChildLock();
|
||||||
|
part = newPart;
|
||||||
|
}
|
||||||
|
E result = part.put(element);
|
||||||
|
if(result == null) { // new element
|
||||||
|
size++;
|
||||||
|
LOG.debug("partitionPGSet.put: added key {}, size is now {} ", key, size);
|
||||||
|
} else {
|
||||||
|
LOG.debug("partitionPGSet.put: replaced key {}, size is now {}",
|
||||||
|
key, size);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private PartitionEntry addNewPartitionIfNeeded(
|
||||||
|
PartitionEntry curPart, K key) {
|
||||||
|
if(curPart.size() < DEFAULT_PARTITION_CAPACITY * DEFAULT_PARTITION_OVERFLOW
|
||||||
|
|| curPart.contains(key)) {
|
||||||
|
return curPart;
|
||||||
|
}
|
||||||
|
return addNewPartition(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public E remove(final K key) {
|
||||||
|
PartitionEntry part = getPartition(key);
|
||||||
|
if(part == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
E result = part.remove(key);
|
||||||
|
if(result != null) {
|
||||||
|
size--;
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clear() {
|
||||||
|
LOG.error("Total GSet size = {}", size);
|
||||||
|
LOG.error("Number of partitions = {}", partitions.size());
|
||||||
|
printStats();
|
||||||
|
// assert latchLock.hasWriteTopLock() : "Must hold write topLock";
|
||||||
|
// SHV May need to clear all partitions?
|
||||||
|
partitions.clear();
|
||||||
|
size = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void printStats() {
|
||||||
|
int partSizeMin = Integer.MAX_VALUE, partSizeAvg = 0, partSizeMax = 0;
|
||||||
|
long totalSize = 0;
|
||||||
|
int numEmptyPartitions = 0, numFullPartitions = 0;
|
||||||
|
Collection<PartitionEntry> parts = partitions.values();
|
||||||
|
Set<Entry<K, PartitionEntry>> entries = partitions.entrySet();
|
||||||
|
int i = 0;
|
||||||
|
for(Entry<K, PartitionEntry> e : entries) {
|
||||||
|
PartitionEntry part = e.getValue();
|
||||||
|
int s = part.size;
|
||||||
|
if(s == 0) numEmptyPartitions++;
|
||||||
|
if(s > DEFAULT_PARTITION_CAPACITY) numFullPartitions++;
|
||||||
|
totalSize += s;
|
||||||
|
partSizeMin = (s < partSizeMin ? s : partSizeMin);
|
||||||
|
partSizeMax = (partSizeMax < s ? s : partSizeMax);
|
||||||
|
Class<?> inodeClass = e.getKey().getClass();
|
||||||
|
try {
|
||||||
|
long[] key = (long[]) inodeClass.
|
||||||
|
getMethod("getNamespaceKey", int.class).invoke(e.getKey(), 2);
|
||||||
|
long[] firstKey = new long[key.length];
|
||||||
|
if(part.iterator().hasNext()) {
|
||||||
|
Object first = part.iterator().next();
|
||||||
|
long[] firstKeyRef = (long[]) inodeClass.getMethod(
|
||||||
|
"getNamespaceKey", int.class).invoke(first, 2);
|
||||||
|
Object parent = inodeClass.
|
||||||
|
getMethod("getParent").invoke(first);
|
||||||
|
long parentId = (parent == null ? 0L :
|
||||||
|
(long) inodeClass.getMethod("getId").invoke(parent));
|
||||||
|
for (int j=0; j < key.length; j++) {
|
||||||
|
firstKey[j] = firstKeyRef[j];
|
||||||
|
}
|
||||||
|
firstKey[0] = parentId;
|
||||||
|
}
|
||||||
|
LOG.error("Partition #{}\t key: {}\t size: {}\t first: {}",
|
||||||
|
i++, key, s, firstKey); // SHV should be info
|
||||||
|
} catch (NoSuchElementException ex) {
|
||||||
|
LOG.error("iterator.next() throws NoSuchElementException.");
|
||||||
|
throw ex;
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.error("Cannot find Method getNamespaceKey() in {}", inodeClass);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
partSizeAvg = (int) (totalSize / parts.size());
|
||||||
|
LOG.error("Partition sizes: min = {}, avg = {}, max = {}, sum = {}",
|
||||||
|
partSizeMin, partSizeAvg, partSizeMax, totalSize);
|
||||||
|
LOG.error("Number of partitions: empty = {}, in-use = {}, full = {}",
|
||||||
|
numEmptyPartitions, parts.size()-numEmptyPartitions, numFullPartitions);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<E> values() {
|
||||||
|
// TODO Auto-generated method stub
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<E> iterator() {
|
||||||
|
return new EntryIterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Iterator over the elements in the set.
|
||||||
|
* Iterates first by keys, then inside the partition
|
||||||
|
* corresponding to the key.
|
||||||
|
*
|
||||||
|
* Modifications are tracked by the underlying collections. We allow
|
||||||
|
* modifying other partitions, while iterating through the current one.
|
||||||
|
*/
|
||||||
|
private class EntryIterator implements Iterator<E> {
|
||||||
|
private Iterator<K> keyIterator;
|
||||||
|
private Iterator<E> partitionIterator;
|
||||||
|
|
||||||
|
// Set partitionIterator to point to the first partition, or set it to null
|
||||||
|
// when there is no partitions created for this PartitionedGSet.
|
||||||
|
public EntryIterator() {
|
||||||
|
keyIterator = partitions.keySet().iterator();
|
||||||
|
|
||||||
|
if (!keyIterator.hasNext()) {
|
||||||
|
partitionIterator = null;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
K firstKey = keyIterator.next();
|
||||||
|
partitionIterator = partitions.get(firstKey).iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
|
||||||
|
// Special case: an iterator was created for an empty PartitionedGSet.
|
||||||
|
// Check whether new partitions have been added since then.
|
||||||
|
if (partitionIterator == null) {
|
||||||
|
if (partitions.size() == 0) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
keyIterator = partitions.keySet().iterator();
|
||||||
|
K nextKey = keyIterator.next();
|
||||||
|
partitionIterator = partitions.get(nextKey).iterator();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
while(!partitionIterator.hasNext()) {
|
||||||
|
if(!keyIterator.hasNext()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
K curKey = keyIterator.next();
|
||||||
|
partitionIterator = getPartition(curKey).iterator();
|
||||||
|
}
|
||||||
|
return partitionIterator.hasNext();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public E next() {
|
||||||
|
if (!hasNext()) {
|
||||||
|
throw new NoSuchElementException("No more elements in this set.");
|
||||||
|
}
|
||||||
|
return partitionIterator.next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void latchWriteLock(K[] keys) {
|
||||||
|
// getPartition(parent).partLock.writeChildLock();
|
||||||
|
LatchLock<?> pLock = null;
|
||||||
|
for(K key : keys) {
|
||||||
|
pLock = getPartition(key).partLock;
|
||||||
|
pLock.writeChildLock();
|
||||||
|
}
|
||||||
|
assert pLock != null : "pLock is null";
|
||||||
|
pLock.writeTopUnlock();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,270 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/** Testing {@link PartitionedGSet} */
|
||||||
|
public class TestPartitionedGSet {
|
||||||
|
public static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestPartitionedGSet.class);
|
||||||
|
private static final int ELEMENT_NUM = 100;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate positive random numbers for testing. We want to use only positive
|
||||||
|
* numbers because the smallest partition used in testing is 0.
|
||||||
|
*
|
||||||
|
* @param length
|
||||||
|
* number of random numbers to be generated.
|
||||||
|
*
|
||||||
|
* @param randomSeed
|
||||||
|
* seed to be used for random number generator.
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
* An array of Integers
|
||||||
|
*/
|
||||||
|
private static ArrayList<Integer> getRandomList(int length, int randomSeed) {
|
||||||
|
Random random = new Random(randomSeed);
|
||||||
|
ArrayList<Integer> list = new ArrayList<Integer>(length);
|
||||||
|
for (int i = 0; i < length; i++) {
|
||||||
|
list.add(random.nextInt(Integer.MAX_VALUE));
|
||||||
|
}
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class TestElement implements LinkedElement {
|
||||||
|
private final int val;
|
||||||
|
private LinkedElement next;
|
||||||
|
|
||||||
|
TestElement(int val) {
|
||||||
|
this.val = val;
|
||||||
|
this.next = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getVal() {
|
||||||
|
return val;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setNext(LinkedElement next) {
|
||||||
|
this.next = next;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LinkedElement getNext() {
|
||||||
|
return next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class TestElementComparator implements Comparator<TestElement>
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public int compare(TestElement e1, TestElement e2) {
|
||||||
|
if (e1 == null || e2 == null) {
|
||||||
|
throw new NullPointerException("Cannot compare null elements");
|
||||||
|
}
|
||||||
|
|
||||||
|
return e1.getVal() - e2.getVal();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ReentrantReadWriteLock topLock =
|
||||||
|
new ReentrantReadWriteLock(false);
|
||||||
|
/**
|
||||||
|
* We are NOT testing any concurrent access to a PartitionedGSet here.
|
||||||
|
*/
|
||||||
|
private class NoOpLock extends LatchLock<ReentrantReadWriteLock> {
|
||||||
|
private ReentrantReadWriteLock childLock;
|
||||||
|
|
||||||
|
public NoOpLock() {
|
||||||
|
childLock = new ReentrantReadWriteLock(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean isReadTopLocked() {
|
||||||
|
return topLock.getReadLockCount() > 0 || isWriteTopLocked();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean isWriteTopLocked() {
|
||||||
|
return topLock.isWriteLocked();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void readTopUnlock() {
|
||||||
|
topLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void writeTopUnlock() {
|
||||||
|
topLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean hasReadChildLock() {
|
||||||
|
return childLock.getReadLockCount() > 0 || hasWriteChildLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void readChildLock() {
|
||||||
|
childLock.readLock().lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void readChildUnlock() {
|
||||||
|
childLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean hasWriteChildLock() {
|
||||||
|
return childLock.isWriteLockedByCurrentThread();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void writeChildLock() {
|
||||||
|
childLock.writeLock().lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void writeChildUnlock() {
|
||||||
|
childLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected LatchLock<ReentrantReadWriteLock> clone() {
|
||||||
|
return new NoOpLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test iterator for a PartitionedGSet with no partitions.
|
||||||
|
*/
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testIteratorForNoPartition() {
|
||||||
|
PartitionedGSet<TestElement, TestElement> set =
|
||||||
|
new PartitionedGSet<TestElement, TestElement>(
|
||||||
|
16, new TestElementComparator(), new NoOpLock());
|
||||||
|
|
||||||
|
topLock.readLock().lock();
|
||||||
|
int count = 0;
|
||||||
|
Iterator<TestElement> iter = set.iterator();
|
||||||
|
while( iter.hasNext() ) {
|
||||||
|
iter.next();
|
||||||
|
count ++;
|
||||||
|
}
|
||||||
|
topLock.readLock().unlock();
|
||||||
|
Assert.assertEquals(0, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test iterator for a PartitionedGSet with empty partitions.
|
||||||
|
*/
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testIteratorForEmptyPartitions() {
|
||||||
|
PartitionedGSet<TestElement, TestElement> set =
|
||||||
|
new PartitionedGSet<TestElement, TestElement>(
|
||||||
|
16, new TestElementComparator(), new NoOpLock());
|
||||||
|
|
||||||
|
set.addNewPartition(new TestElement(0));
|
||||||
|
set.addNewPartition(new TestElement(1000));
|
||||||
|
set.addNewPartition(new TestElement(2000));
|
||||||
|
|
||||||
|
topLock.readLock().lock();
|
||||||
|
int count = 0;
|
||||||
|
Iterator<TestElement> iter = set.iterator();
|
||||||
|
while( iter.hasNext() ) {
|
||||||
|
iter.next();
|
||||||
|
count ++;
|
||||||
|
}
|
||||||
|
topLock.readLock().unlock();
|
||||||
|
Assert.assertEquals(0, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test whether the iterator can return the same number of elements as stored
|
||||||
|
* into the PartitionedGSet.
|
||||||
|
*/
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testIteratorCountElements() {
|
||||||
|
ArrayList<Integer> list = getRandomList(ELEMENT_NUM, 123);
|
||||||
|
PartitionedGSet<TestElement, TestElement> set =
|
||||||
|
new PartitionedGSet<TestElement, TestElement>(
|
||||||
|
16, new TestElementComparator(), new NoOpLock());
|
||||||
|
|
||||||
|
set.addNewPartition(new TestElement(0));
|
||||||
|
set.addNewPartition(new TestElement(1000));
|
||||||
|
set.addNewPartition(new TestElement(2000));
|
||||||
|
|
||||||
|
topLock.writeLock().lock();
|
||||||
|
for (Integer i : list) {
|
||||||
|
set.put(new TestElement(i));
|
||||||
|
}
|
||||||
|
topLock.writeLock().unlock();
|
||||||
|
|
||||||
|
topLock.readLock().lock();
|
||||||
|
int count = 0;
|
||||||
|
Iterator<TestElement> iter = set.iterator();
|
||||||
|
while( iter.hasNext() ) {
|
||||||
|
iter.next();
|
||||||
|
count ++;
|
||||||
|
}
|
||||||
|
topLock.readLock().unlock();
|
||||||
|
Assert.assertEquals(ELEMENT_NUM, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test iterator when it is created before partitions/elements are
|
||||||
|
* added to the PartitionedGSet.
|
||||||
|
*/
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testIteratorAddElementsAfterIteratorCreation() {
|
||||||
|
PartitionedGSet<TestElement, TestElement> set =
|
||||||
|
new PartitionedGSet<TestElement, TestElement>(
|
||||||
|
16, new TestElementComparator(), new NoOpLock());
|
||||||
|
|
||||||
|
// Create the iterator before partitions are added.
|
||||||
|
Iterator<TestElement> iter = set.iterator();
|
||||||
|
|
||||||
|
set.addNewPartition(new TestElement(0));
|
||||||
|
set.addNewPartition(new TestElement(1000));
|
||||||
|
set.addNewPartition(new TestElement(2000));
|
||||||
|
|
||||||
|
// Added one element
|
||||||
|
topLock.writeLock().lock();
|
||||||
|
set.put(new TestElement(2500));
|
||||||
|
topLock.writeLock().unlock();
|
||||||
|
|
||||||
|
topLock.readLock().lock();
|
||||||
|
int count = 0;
|
||||||
|
while( iter.hasNext() ) {
|
||||||
|
iter.next();
|
||||||
|
count ++;
|
||||||
|
}
|
||||||
|
topLock.readLock().unlock();
|
||||||
|
Assert.assertEquals(1, count);
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import org.apache.hadoop.fs.permission.FsCreateModes;
|
import org.apache.hadoop.fs.permission.FsCreateModes;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||||
|
@ -69,18 +70,7 @@ class FSDirMkdirOp {
|
||||||
// create multiple inodes.
|
// create multiple inodes.
|
||||||
fsn.checkFsObjectLimit();
|
fsn.checkFsObjectLimit();
|
||||||
|
|
||||||
// Ensure that the user can traversal the path by adding implicit
|
iip = createMissingDirs(fsd, iip, permissions, false);
|
||||||
// u+wx permission to all ancestor directories.
|
|
||||||
INodesInPath existing =
|
|
||||||
createParentDirectories(fsd, iip, permissions, false);
|
|
||||||
if (existing != null) {
|
|
||||||
existing = createSingleDirectory(
|
|
||||||
fsd, existing, iip.getLastLocalName(), permissions);
|
|
||||||
}
|
|
||||||
if (existing == null) {
|
|
||||||
throw new IOException("Failed to create directory: " + src);
|
|
||||||
}
|
|
||||||
iip = existing;
|
|
||||||
}
|
}
|
||||||
return fsd.getAuditFileInfo(iip);
|
return fsd.getAuditFileInfo(iip);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -88,6 +78,36 @@ class FSDirMkdirOp {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static INodesInPath createMissingDirs(FSDirectory fsd, INodesInPath iip,
|
||||||
|
PermissionStatus permissions, boolean inheritPerms) throws IOException {
|
||||||
|
PermissionStatus basePerm = inheritPerms ?
|
||||||
|
iip.getExistingINodes().getLastINode().getPermissionStatus() :
|
||||||
|
permissions;
|
||||||
|
// create all missing directories along the path,
|
||||||
|
// but don't add them to the INodeMap yet
|
||||||
|
permissions = addImplicitUwx(basePerm, permissions);
|
||||||
|
INode[] missing = createPathDirectories(fsd, iip, permissions);
|
||||||
|
iip = iip.getExistingINodes();
|
||||||
|
if (missing.length == 0) {
|
||||||
|
return iip;
|
||||||
|
}
|
||||||
|
// switch the locks
|
||||||
|
fsd.getINodeMap().latchWriteLock(iip, missing);
|
||||||
|
int counter = 0;
|
||||||
|
// Add missing inodes to the INodeMap
|
||||||
|
for (INode dir : missing) {
|
||||||
|
if (counter++ == missing.length - 1) {
|
||||||
|
//Last folder in the path, use the user given permission
|
||||||
|
//For MKDIR - refers to the permission given by the user
|
||||||
|
//For create - refers to the parent directory permission.
|
||||||
|
permissions = basePerm;
|
||||||
|
}
|
||||||
|
iip = addSingleDirectory(fsd, iip, dir, permissions);
|
||||||
|
assert iip != null : "iip should not be null";
|
||||||
|
}
|
||||||
|
return iip;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For a given absolute path, create all ancestors as directories along the
|
* For a given absolute path, create all ancestors as directories along the
|
||||||
* path. All ancestors inherit their parent's permission plus an implicit
|
* path. All ancestors inherit their parent's permission plus an implicit
|
||||||
|
@ -132,6 +152,7 @@ class FSDirMkdirOp {
|
||||||
if (missing == 0) { // full path exists, return parents.
|
if (missing == 0) { // full path exists, return parents.
|
||||||
existing = iip.getParentINodesInPath();
|
existing = iip.getParentINodesInPath();
|
||||||
} else if (missing > 1) { // need to create at least one ancestor dir.
|
} else if (missing > 1) { // need to create at least one ancestor dir.
|
||||||
|
FSNamesystem.LOG.error("missing = " + missing);
|
||||||
// Ensure that the user can traversal the path by adding implicit
|
// Ensure that the user can traversal the path by adding implicit
|
||||||
// u+wx permission to all ancestor directories.
|
// u+wx permission to all ancestor directories.
|
||||||
PermissionStatus basePerm = inheritPerms
|
PermissionStatus basePerm = inheritPerms
|
||||||
|
@ -143,6 +164,14 @@ class FSDirMkdirOp {
|
||||||
for (int i = existing.length(); existing != null && i <= last; i++) {
|
for (int i = existing.length(); existing != null && i <= last; i++) {
|
||||||
byte[] component = iip.getPathComponent(i);
|
byte[] component = iip.getPathComponent(i);
|
||||||
existing = createSingleDirectory(fsd, existing, component, perm);
|
existing = createSingleDirectory(fsd, existing, component, perm);
|
||||||
|
if(existing == null) {
|
||||||
|
FSNamesystem.LOG.error("unprotectedMkdir returned null for "
|
||||||
|
+ iip.getPath() + " for "
|
||||||
|
+ new String(component, StandardCharsets.US_ASCII) + " i = " + i);
|
||||||
|
// Somebody already created the parent. Recalculate existing
|
||||||
|
existing = INodesInPath.resolve(fsd.getRoot(), iip.getPathComponents());
|
||||||
|
i = existing.length() - 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return existing;
|
return existing;
|
||||||
|
@ -228,5 +257,70 @@ class FSDirMkdirOp {
|
||||||
}
|
}
|
||||||
return iip;
|
return iip;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static INode createDirectoryINode(FSDirectory fsd,
|
||||||
|
INodesInPath parent, byte[] name, PermissionStatus permission)
|
||||||
|
throws FileAlreadyExistsException {
|
||||||
|
assert fsd.hasReadLock();
|
||||||
|
assert parent.getLastINode() != null;
|
||||||
|
if (!parent.getLastINode().isDirectory()) {
|
||||||
|
throw new FileAlreadyExistsException("Parent path is not a directory: " +
|
||||||
|
parent.getPath() + " " + DFSUtil.bytes2String(name));
|
||||||
|
}
|
||||||
|
final INodeDirectory dir = new INodeDirectory(
|
||||||
|
fsd.allocateNewInodeId(), name, permission, now());
|
||||||
|
return dir;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find-out missing iNodes for the current mkdir OP.
|
||||||
|
*/
|
||||||
|
private static INode[] createPathDirectories(FSDirectory fsd,
|
||||||
|
INodesInPath iip, PermissionStatus perm)
|
||||||
|
throws IOException {
|
||||||
|
assert fsd.hasWriteLock();
|
||||||
|
INodesInPath existing = iip.getExistingINodes();
|
||||||
|
assert existing != null : "existing should not be null";
|
||||||
|
int numMissing = iip.length() - existing.length();
|
||||||
|
if (numMissing == 0) { // full path exists
|
||||||
|
return new INode[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
// create the missing directories along the path
|
||||||
|
INode[] missing = new INode[numMissing];
|
||||||
|
final int last = iip.length();
|
||||||
|
for (int i = existing.length(); i < last; i++) {
|
||||||
|
byte[] component = iip.getPathComponent(i);
|
||||||
|
missing[i - existing.length()] =
|
||||||
|
createDirectoryINode(fsd, existing, component, perm);
|
||||||
|
}
|
||||||
|
return missing;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static INodesInPath addSingleDirectory(FSDirectory fsd,
|
||||||
|
INodesInPath existing, INode dir, PermissionStatus perm)
|
||||||
|
throws IOException {
|
||||||
|
assert fsd.hasWriteLock();
|
||||||
|
INodesInPath iip = fsd.addLastINode(existing, dir, perm.getPermission(), true);
|
||||||
|
if (iip == null) {
|
||||||
|
FSNamesystem.LOG.debug("somebody already created {} on path {}", dir, existing.getPath());
|
||||||
|
final INodeDirectory parent = existing.getLastINode().asDirectory();
|
||||||
|
dir = parent.getChild(dir.getLocalNameBytes(), Snapshot.CURRENT_STATE_ID);
|
||||||
|
return INodesInPath.append(existing, dir, dir.getLocalNameBytes());
|
||||||
|
}
|
||||||
|
existing = iip;
|
||||||
|
assert dir.equals(existing.getLastINode()) : "dir is not the last INode";
|
||||||
|
|
||||||
|
// Directory creation also count towards FilesCreated
|
||||||
|
// to match count of FilesDeleted metric.
|
||||||
|
NameNode.getNameNodeMetrics().incrFilesCreated();
|
||||||
|
|
||||||
|
assert dir.getPermissionStatus().getGroupName() != null :
|
||||||
|
"GroupName is null for " + existing.getPath();
|
||||||
|
String cur = existing.getPath();
|
||||||
|
fsd.getEditLog().logMkDir(cur, dir);
|
||||||
|
NameNode.stateChangeLog.debug("mkdirs: created directory {}", cur);
|
||||||
|
return existing;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -228,6 +228,13 @@ class FSDirWriteFileOp {
|
||||||
// while chooseTarget() was executing.
|
// while chooseTarget() was executing.
|
||||||
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
||||||
INodesInPath iip = fsn.dir.resolvePath(null, src, fileId);
|
INodesInPath iip = fsn.dir.resolvePath(null, src, fileId);
|
||||||
|
|
||||||
|
INode[] missing = new INode[]{iip.getLastINode()};
|
||||||
|
INodesInPath existing = iip.getParentINodesInPath();
|
||||||
|
FSDirectory fsd = fsn.getFSDirectory();
|
||||||
|
// switch the locks
|
||||||
|
fsd.getINodeMap().latchWriteLock(existing, missing);
|
||||||
|
|
||||||
FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
|
FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
|
||||||
previous, onRetryBlock);
|
previous, onRetryBlock);
|
||||||
final INodeFile pendingFile = fileState.inode;
|
final INodeFile pendingFile = fileState.inode;
|
||||||
|
@ -392,8 +399,8 @@ class FSDirWriteFileOp {
|
||||||
}
|
}
|
||||||
fsn.checkFsObjectLimit();
|
fsn.checkFsObjectLimit();
|
||||||
INodeFile newNode = null;
|
INodeFile newNode = null;
|
||||||
INodesInPath parent =
|
INodesInPath parent = FSDirMkdirOp.createMissingDirs(fsd,
|
||||||
FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions);
|
iip.getParentINodesInPath(), permissions, true);
|
||||||
if (parent != null) {
|
if (parent != null) {
|
||||||
iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
|
iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
|
||||||
replication, blockSize, holder, clientMachine, shouldReplicate,
|
replication, blockSize, holder, clientMachine, shouldReplicate,
|
||||||
|
@ -541,41 +548,22 @@ class FSDirWriteFileOp {
|
||||||
FSDirectory fsd, INodesInPath existing, byte[] localName,
|
FSDirectory fsd, INodesInPath existing, byte[] localName,
|
||||||
PermissionStatus permissions, short replication, long preferredBlockSize,
|
PermissionStatus permissions, short replication, long preferredBlockSize,
|
||||||
String clientName, String clientMachine, boolean shouldReplicate,
|
String clientName, String clientMachine, boolean shouldReplicate,
|
||||||
String ecPolicyName, String storagePolicy) throws IOException {
|
String ecPolicyName, String storagePolicy)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
Preconditions.checkNotNull(existing);
|
Preconditions.checkNotNull(existing);
|
||||||
long modTime = now();
|
long modTime = now();
|
||||||
INodesInPath newiip;
|
INodesInPath newiip;
|
||||||
fsd.writeLock();
|
fsd.writeLock();
|
||||||
try {
|
try {
|
||||||
boolean isStriped = false;
|
INodeFile newNode = createINodeFile(fsd, existing, localName,
|
||||||
ErasureCodingPolicy ecPolicy = null;
|
permissions, replication, preferredBlockSize, clientName,
|
||||||
byte storagepolicyid = 0;
|
clientMachine, shouldReplicate, ecPolicyName, storagePolicy, modTime);
|
||||||
if (storagePolicy != null && !storagePolicy.isEmpty()) {
|
|
||||||
BlockStoragePolicy policy =
|
INode[] missing = new INode[] {newNode};
|
||||||
fsd.getBlockManager().getStoragePolicy(storagePolicy);
|
// switch the locks
|
||||||
if (policy == null) {
|
fsd.getINodeMap().latchWriteLock(existing, missing);
|
||||||
throw new HadoopIllegalArgumentException(
|
|
||||||
"Cannot find a block policy with the name " + storagePolicy);
|
|
||||||
}
|
|
||||||
storagepolicyid = policy.getId();
|
|
||||||
}
|
|
||||||
if (!shouldReplicate) {
|
|
||||||
ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
|
|
||||||
fsd.getFSNamesystem(), ecPolicyName, existing);
|
|
||||||
if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {
|
|
||||||
isStriped = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
final BlockType blockType = isStriped ?
|
|
||||||
BlockType.STRIPED : BlockType.CONTIGUOUS;
|
|
||||||
final Short replicationFactor = (!isStriped ? replication : null);
|
|
||||||
final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
|
|
||||||
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
|
|
||||||
modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize,
|
|
||||||
storagepolicyid, blockType);
|
|
||||||
newNode.setLocalName(localName);
|
|
||||||
newNode.toUnderConstruction(clientName, clientMachine);
|
|
||||||
newiip = fsd.addINode(existing, newNode, permissions.getPermission());
|
newiip = fsd.addINode(existing, newNode, permissions.getPermission());
|
||||||
} finally {
|
} finally {
|
||||||
fsd.writeUnlock();
|
fsd.writeUnlock();
|
||||||
|
@ -593,6 +581,42 @@ class FSDirWriteFileOp {
|
||||||
return newiip;
|
return newiip;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static INodeFile createINodeFile(FSDirectory fsd,
|
||||||
|
INodesInPath existing, byte[] localName, PermissionStatus permissions,
|
||||||
|
short replication, long preferredBlockSize, String clientName,
|
||||||
|
String clientMachine, boolean shouldReplicate, String ecPolicyName,
|
||||||
|
String storagePolicy, long modTime) throws IOException {
|
||||||
|
boolean isStriped = false;
|
||||||
|
ErasureCodingPolicy ecPolicy = null;
|
||||||
|
byte storagepolicyid = 0;
|
||||||
|
if (storagePolicy != null && !storagePolicy.isEmpty()) {
|
||||||
|
BlockStoragePolicy policy =
|
||||||
|
fsd.getBlockManager().getStoragePolicy(storagePolicy);
|
||||||
|
if (policy == null) {
|
||||||
|
throw new HadoopIllegalArgumentException(
|
||||||
|
"Cannot find a block policy with the name " + storagePolicy);
|
||||||
|
}
|
||||||
|
storagepolicyid = policy.getId();
|
||||||
|
}
|
||||||
|
if (!shouldReplicate) {
|
||||||
|
ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
|
||||||
|
fsd.getFSNamesystem(), ecPolicyName, existing);
|
||||||
|
if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {
|
||||||
|
isStriped = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
final BlockType blockType = isStriped ?
|
||||||
|
BlockType.STRIPED : BlockType.CONTIGUOUS;
|
||||||
|
final Short replicationFactor = (!isStriped ? replication : null);
|
||||||
|
final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
|
||||||
|
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
|
||||||
|
modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize,
|
||||||
|
storagepolicyid, blockType);
|
||||||
|
newNode.setLocalName(localName);
|
||||||
|
newNode.toUnderConstruction(clientName, clientMachine);
|
||||||
|
return newNode;
|
||||||
|
}
|
||||||
|
|
||||||
private static FileState analyzeFileState(
|
private static FileState analyzeFileState(
|
||||||
FSNamesystem fsn, INodesInPath iip, long fileId, String clientName,
|
FSNamesystem fsn, INodesInPath iip, long fileId, String clientName,
|
||||||
ExtendedBlock previous, LocatedBlock[] onRetryBlock)
|
ExtendedBlock previous, LocatedBlock[] onRetryBlock)
|
||||||
|
@ -687,6 +711,14 @@ class FSDirWriteFileOp {
|
||||||
}
|
}
|
||||||
checkBlock(fsn, last);
|
checkBlock(fsn, last);
|
||||||
INodesInPath iip = fsn.dir.resolvePath(pc, src, fileId);
|
INodesInPath iip = fsn.dir.resolvePath(pc, src, fileId);
|
||||||
|
|
||||||
|
assert (iip.getLastINode() instanceof INodeFile);
|
||||||
|
INode[] missing = new INode[] {iip.getLastINode()};
|
||||||
|
INodesInPath existing = iip.getParentINodesInPath();
|
||||||
|
// switch the locks
|
||||||
|
FSDirectory fsd = fsn.getFSDirectory();
|
||||||
|
fsd.getINodeMap().latchWriteLock(existing, missing);
|
||||||
|
|
||||||
return completeFileInternal(fsn, iip, holder,
|
return completeFileInternal(fsn, iip, holder,
|
||||||
ExtendedBlock.getLocalBlock(last), fileId);
|
ExtendedBlock.getLocalBlock(last), fileId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||||
|
import org.apache.hadoop.util.GSet;
|
||||||
|
import org.apache.hadoop.util.LightWeightGSet;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -160,6 +165,8 @@ public class FSDirectory implements Closeable {
|
||||||
private final int contentCountLimit; // max content summary counts per run
|
private final int contentCountLimit; // max content summary counts per run
|
||||||
private final long contentSleepMicroSec;
|
private final long contentSleepMicroSec;
|
||||||
private final INodeMap inodeMap; // Synchronized by dirLock
|
private final INodeMap inodeMap; // Synchronized by dirLock
|
||||||
|
// Temp InodeMap used when loading an FS image.
|
||||||
|
private HashMap<Long, INodeWithAdditionalFields> tempInodeMap;
|
||||||
private long yieldCount = 0; // keep track of lock yield count.
|
private long yieldCount = 0; // keep track of lock yield count.
|
||||||
private int quotaInitThreads;
|
private int quotaInitThreads;
|
||||||
|
|
||||||
|
@ -317,7 +324,12 @@ public class FSDirectory implements Closeable {
|
||||||
FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
|
FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
|
||||||
this.inodeId = new INodeId();
|
this.inodeId = new INodeId();
|
||||||
rootDir = createRoot(ns);
|
rootDir = createRoot(ns);
|
||||||
inodeMap = INodeMap.newInstance(rootDir);
|
inodeMap = INodeMap.newInstance(rootDir, ns);
|
||||||
|
tempInodeMap = new HashMap<>(1000);
|
||||||
|
|
||||||
|
// add rootDir to inodeMapTemp.
|
||||||
|
tempInodeMap.put(rootDir.getId(), rootDir);
|
||||||
|
|
||||||
this.isPermissionEnabled = conf.getBoolean(
|
this.isPermissionEnabled = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
|
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
|
||||||
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
|
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
|
||||||
|
@ -1476,6 +1488,23 @@ public class FSDirectory implements Closeable {
|
||||||
return inodeMap;
|
return inodeMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final void addToTempInodeMap(INode inode) {
|
||||||
|
if (inode instanceof INodeWithAdditionalFields) {
|
||||||
|
LOG.debug("addToTempInodeMap: id={}, inodeMapTemp.size={}",
|
||||||
|
inode.getId(), tempInodeMap.size());
|
||||||
|
tempInodeMap.put(inode.getId(), (INodeWithAdditionalFields) inode);
|
||||||
|
if (!inode.isSymlink()) {
|
||||||
|
final XAttrFeature xaf = inode.getXAttrFeature();
|
||||||
|
addEncryptionZone((INodeWithAdditionalFields) inode, xaf);
|
||||||
|
StoragePolicySatisfyManager spsManager =
|
||||||
|
namesystem.getBlockManager().getSPSManager();
|
||||||
|
if (spsManager != null && spsManager.isEnabled()) {
|
||||||
|
addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method is always called with writeLock of FSDirectory held.
|
* This method is always called with writeLock of FSDirectory held.
|
||||||
*/
|
*/
|
||||||
|
@ -1543,6 +1572,36 @@ public class FSDirectory implements Closeable {
|
||||||
addEncryptionZone(rootDir, xaf);
|
addEncryptionZone(rootDir, xaf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* After the inodes are set properly (set the parent for each inode), we move
|
||||||
|
* them from INodeMapTemp to INodeMap.
|
||||||
|
*/
|
||||||
|
void moveInodes() throws IOException {
|
||||||
|
long count=0, totalInodes = tempInodeMap.size();
|
||||||
|
LOG.debug("inodeMapTemp={}", tempInodeMap);
|
||||||
|
|
||||||
|
for (Map.Entry e: tempInodeMap.entrySet()) {
|
||||||
|
INodeWithAdditionalFields n = (INodeWithAdditionalFields)e.getValue();
|
||||||
|
|
||||||
|
LOG.debug("populate {}-th inode: id={}, fullpath={}",
|
||||||
|
count, n.getId(), n.getFullPathName());
|
||||||
|
|
||||||
|
inodeMap.put(n);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (count != totalInodes) {
|
||||||
|
String msg = String.format("moveInodes: expected to move %l inodes, " +
|
||||||
|
"but moved %l inodes", totalInodes, count);
|
||||||
|
throw new IOException(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
//inodeMap.show();
|
||||||
|
tempInodeMap.clear();
|
||||||
|
assert(tempInodeMap.isEmpty());
|
||||||
|
tempInodeMap = null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method is always called with writeLock of FSDirectory held.
|
* This method is always called with writeLock of FSDirectory held.
|
||||||
*/
|
*/
|
||||||
|
@ -1860,6 +1919,17 @@ public class FSDirectory implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public INode getInode(INode inode) {
|
||||||
|
return inodeMap.get(inode);
|
||||||
|
}
|
||||||
|
public INode getInodeFromTempINodeMap(long id) {
|
||||||
|
LOG.debug("getInodeFromTempINodeMap: id={}, TempINodeMap.size={}",
|
||||||
|
id, tempInodeMap.size());
|
||||||
|
if (id < INodeId.ROOT_INODE_ID)
|
||||||
|
return null;
|
||||||
|
|
||||||
|
return tempInodeMap.get(id);
|
||||||
|
}
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
FSPermissionChecker getPermissionChecker(String fsOwner, String superGroup,
|
FSPermissionChecker getPermissionChecker(String fsOwner, String superGroup,
|
||||||
UserGroupInformation ugi) throws AccessControlException {
|
UserGroupInformation ugi) throws AccessControlException {
|
||||||
|
|
|
@ -177,18 +177,23 @@ public class FSImage implements Closeable {
|
||||||
|
|
||||||
void format(FSNamesystem fsn, String clusterId, boolean force)
|
void format(FSNamesystem fsn, String clusterId, boolean force)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
long fileCount = fsn.getFilesTotal();
|
fsn.readLock();
|
||||||
// Expect 1 file, which is the root inode
|
try {
|
||||||
Preconditions.checkState(fileCount == 1,
|
long fileCount = fsn.getFilesTotal();
|
||||||
"FSImage.format should be called with an uninitialized namesystem, has " +
|
// Expect 1 file, which is the root inode
|
||||||
fileCount + " files");
|
Preconditions.checkState(fileCount == 1,
|
||||||
NamespaceInfo ns = NNStorage.newNamespaceInfo();
|
"FSImage.format should be called with an uninitialized namesystem, has " +
|
||||||
LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID());
|
fileCount + " files");
|
||||||
ns.clusterID = clusterId;
|
NamespaceInfo ns = NNStorage.newNamespaceInfo();
|
||||||
|
LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID());
|
||||||
storage.format(ns);
|
ns.clusterID = clusterId;
|
||||||
editLog.formatNonFileJournals(ns, force);
|
|
||||||
saveFSImageInAllDirs(fsn, 0);
|
storage.format(ns);
|
||||||
|
editLog.formatNonFileJournals(ns, force);
|
||||||
|
saveFSImageInAllDirs(fsn, 0);
|
||||||
|
} finally {
|
||||||
|
fsn.readUnlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -756,6 +761,16 @@ public class FSImage implements Closeable {
|
||||||
"above for more info.");
|
"above for more info.");
|
||||||
}
|
}
|
||||||
prog.endPhase(Phase.LOADING_FSIMAGE);
|
prog.endPhase(Phase.LOADING_FSIMAGE);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* loadEdits always sets the parent of an inode before adding the inode to
|
||||||
|
* inodeMap. So, it is safe to move inodes from inodeMapTemp to inodeMap
|
||||||
|
* before loadEdits.
|
||||||
|
*/
|
||||||
|
FSDirectory dir = target.getFSDirectory();
|
||||||
|
dir.moveInodes();
|
||||||
|
LOG.info("LOADING_FSIMAGE: loaded {} inodes into inodeMap",
|
||||||
|
dir.getINodeMap().size());
|
||||||
|
|
||||||
if (!rollingRollback) {
|
if (!rollingRollback) {
|
||||||
prog.beginPhase(Phase.LOADING_EDITS);
|
prog.beginPhase(Phase.LOADING_EDITS);
|
||||||
|
@ -771,6 +786,8 @@ public class FSImage implements Closeable {
|
||||||
needToSave = false;
|
needToSave = false;
|
||||||
}
|
}
|
||||||
editLog.setNextTxId(lastAppliedTxId + 1);
|
editLog.setNextTxId(lastAppliedTxId + 1);
|
||||||
|
LOG.info("LOADING_EDITS: loaded {} inodes into inodeMap",
|
||||||
|
dir.getINodeMap().size());
|
||||||
return needToSave;
|
return needToSave;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -276,9 +276,10 @@ public final class FSImageFormatPBINode {
|
||||||
if (e == null) {
|
if (e == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
|
INodeDirectory p =
|
||||||
|
dir.getInodeFromTempINodeMap(e.getParent()).asDirectory();
|
||||||
for (long id : e.getChildrenList()) {
|
for (long id : e.getChildrenList()) {
|
||||||
INode child = dir.getInode(id);
|
INode child = dir.getInodeFromTempINodeMap(id);
|
||||||
if (!addToParent(p, child)) {
|
if (!addToParent(p, child)) {
|
||||||
LOG.warn("Failed to add the inode {} to the directory {}",
|
LOG.warn("Failed to add the inode {} to the directory {}",
|
||||||
child.getId(), p.getId());
|
child.getId(), p.getId());
|
||||||
|
@ -382,6 +383,7 @@ public final class FSImageFormatPBINode {
|
||||||
if (p == null) {
|
if (p == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
LOG.debug("loadINodesInSection: cntr={}, inode={}", cntr, p.getId());
|
||||||
if (p.getId() == INodeId.ROOT_INODE_ID) {
|
if (p.getId() == INodeId.ROOT_INODE_ID) {
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
loadRootINode(p);
|
loadRootINode(p);
|
||||||
|
@ -389,7 +391,7 @@ public final class FSImageFormatPBINode {
|
||||||
} else {
|
} else {
|
||||||
INode n = loadINode(p);
|
INode n = loadINode(p);
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
dir.addToInodeMap(n);
|
dir.addToTempInodeMap(n);
|
||||||
}
|
}
|
||||||
fillUpInodeList(inodeList, n);
|
fillUpInodeList(inodeList, n);
|
||||||
}
|
}
|
||||||
|
@ -761,7 +763,7 @@ public final class FSImageFormatPBINode {
|
||||||
DirEntry.newBuilder().setParent(n.getId());
|
DirEntry.newBuilder().setParent(n.getId());
|
||||||
for (INode inode : children) {
|
for (INode inode : children) {
|
||||||
// Error if the child inode doesn't exist in inodeMap
|
// Error if the child inode doesn't exist in inodeMap
|
||||||
if (dir.getInode(inode.getId()) == null) {
|
if (dir.getInode(inode) == null) {
|
||||||
FSImage.LOG.error(
|
FSImage.LOG.error(
|
||||||
"FSImageFormatPBINode#serializeINodeDirectorySection: " +
|
"FSImageFormatPBINode#serializeINodeDirectorySection: " +
|
||||||
"Dangling child pointer found. Missing INode in " +
|
"Dangling child pointer found. Missing INode in " +
|
||||||
|
@ -812,6 +814,7 @@ public final class FSImageFormatPBINode {
|
||||||
Iterator<INodeWithAdditionalFields> iter = inodesMap.getMapIterator();
|
Iterator<INodeWithAdditionalFields> iter = inodesMap.getMapIterator();
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
INodeWithAdditionalFields n = iter.next();
|
INodeWithAdditionalFields n = iter.next();
|
||||||
|
LOG.debug("i = {}, save inode: {}", i, n);
|
||||||
save(out, n);
|
save(out, n);
|
||||||
++i;
|
++i;
|
||||||
if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
|
if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
|
||||||
|
|
|
@ -1753,7 +1753,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
|
|
||||||
public void readUnlock(String opName,
|
public void readUnlock(String opName,
|
||||||
Supplier<String> lockReportInfoSupplier) {
|
Supplier<String> lockReportInfoSupplier) {
|
||||||
this.fsLock.readUnlock(opName, lockReportInfoSupplier);
|
this.fsLock.readUnlock(opName, lockReportInfoSupplier, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1786,7 +1786,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean hasWriteLock() {
|
public boolean hasWriteLock() {
|
||||||
return this.fsLock.isWriteLockedByCurrentThread();
|
return this.fsLock.isWriteLockedByCurrentThread() ||
|
||||||
|
fsLock.hasWriteChildLock();
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
public boolean hasReadLock() {
|
public boolean hasReadLock() {
|
||||||
|
@ -1801,6 +1802,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
return this.fsLock.getWriteHoldCount();
|
return this.fsLock.getWriteHoldCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public FSNamesystemLock getFSLock() {
|
||||||
|
return this.fsLock;
|
||||||
|
}
|
||||||
|
|
||||||
/** Lock the checkpoint lock */
|
/** Lock the checkpoint lock */
|
||||||
public void cpLock() {
|
public void cpLock() {
|
||||||
this.cpLock.lock();
|
this.cpLock.lock();
|
||||||
|
|
|
@ -18,6 +18,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
@ -29,6 +32,7 @@ import java.util.function.Supplier;
|
||||||
|
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.INodeMap.INodeMapLock;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.log.LogThrottlingHelper;
|
import org.apache.hadoop.log.LogThrottlingHelper;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
|
import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
|
||||||
|
@ -129,6 +133,32 @@ class FSNamesystemLock {
|
||||||
|
|
||||||
private static final String OVERALL_METRIC_NAME = "Overall";
|
private static final String OVERALL_METRIC_NAME = "Overall";
|
||||||
|
|
||||||
|
private final ThreadLocal<Collection<INodeMapLock>> partitionLocks =
|
||||||
|
new ThreadLocal<Collection<INodeMapLock>>() {
|
||||||
|
@Override
|
||||||
|
public Collection<INodeMapLock> initialValue() {
|
||||||
|
return new ArrayList<INodeMapLock>();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
void addChildLock(INodeMapLock lock) {
|
||||||
|
partitionLocks.get().add(lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean removeChildLock(INodeMapLock lock) {
|
||||||
|
return partitionLocks.get().remove(lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean hasWriteChildLock() {
|
||||||
|
Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
|
||||||
|
// FSNamesystem.LOG.debug("partitionLocks.size = {}", partitionLocks.get().size());
|
||||||
|
while(iter.hasNext()) {
|
||||||
|
if(iter.next().hasWriteChildLock())
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
FSNamesystemLock(Configuration conf,
|
FSNamesystemLock(Configuration conf,
|
||||||
MutableRatesWithAggregation detailedHoldTimeMetrics) {
|
MutableRatesWithAggregation detailedHoldTimeMetrics) {
|
||||||
this(conf, detailedHoldTimeMetrics, new Timer());
|
this(conf, detailedHoldTimeMetrics, new Timer());
|
||||||
|
@ -180,11 +210,29 @@ class FSNamesystemLock {
|
||||||
|
|
||||||
public void readUnlock(String opName,
|
public void readUnlock(String opName,
|
||||||
Supplier<String> lockReportInfoSupplier) {
|
Supplier<String> lockReportInfoSupplier) {
|
||||||
|
readUnlock(opName, lockReportInfoSupplier, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void readUnlock(String opName,
|
||||||
|
Supplier<String> lockReportInfoSupplier,
|
||||||
|
boolean unlockChildren) {
|
||||||
final boolean needReport = coarseLock.getReadHoldCount() == 1;
|
final boolean needReport = coarseLock.getReadHoldCount() == 1;
|
||||||
final long readLockIntervalNanos =
|
final long readLockIntervalNanos =
|
||||||
timer.monotonicNowNanos() - readLockHeldTimeStampNanos.get();
|
timer.monotonicNowNanos() - readLockHeldTimeStampNanos.get();
|
||||||
final long currentTimeMs = timer.now();
|
final long currentTimeMs = timer.now();
|
||||||
coarseLock.readLock().unlock();
|
|
||||||
|
if(getReadHoldCount() > 0) { // Current thread holds the lock
|
||||||
|
// Unlock the top FSNamesystemLock
|
||||||
|
coarseLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
if(unlockChildren) { // Also unlock and remove children locks
|
||||||
|
Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
|
||||||
|
while(iter.hasNext()) {
|
||||||
|
iter.next().readChildUnlock();
|
||||||
|
iter.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (needReport) {
|
if (needReport) {
|
||||||
addMetric(opName, readLockIntervalNanos, false);
|
addMetric(opName, readLockIntervalNanos, false);
|
||||||
|
@ -252,7 +300,7 @@ class FSNamesystemLock {
|
||||||
* FSNamesystemLock#writeUnlock(String, boolean, Supplier)}
|
* FSNamesystemLock#writeUnlock(String, boolean, Supplier)}
|
||||||
*/
|
*/
|
||||||
public void writeUnlock() {
|
public void writeUnlock() {
|
||||||
writeUnlock(OP_NAME_OTHER, false, null);
|
writeUnlock(OP_NAME_OTHER, false, null, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -262,7 +310,7 @@ class FSNamesystemLock {
|
||||||
* @param opName Operation name.
|
* @param opName Operation name.
|
||||||
*/
|
*/
|
||||||
public void writeUnlock(String opName) {
|
public void writeUnlock(String opName) {
|
||||||
writeUnlock(opName, false, null);
|
writeUnlock(opName, false, null, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -274,7 +322,7 @@ class FSNamesystemLock {
|
||||||
*/
|
*/
|
||||||
public void writeUnlock(String opName,
|
public void writeUnlock(String opName,
|
||||||
Supplier<String> lockReportInfoSupplier) {
|
Supplier<String> lockReportInfoSupplier) {
|
||||||
writeUnlock(opName, false, lockReportInfoSupplier);
|
writeUnlock(opName, false, lockReportInfoSupplier, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -286,7 +334,7 @@ class FSNamesystemLock {
|
||||||
* for long time will be logged in logs and metrics.
|
* for long time will be logged in logs and metrics.
|
||||||
*/
|
*/
|
||||||
public void writeUnlock(String opName, boolean suppressWriteLockReport) {
|
public void writeUnlock(String opName, boolean suppressWriteLockReport) {
|
||||||
writeUnlock(opName, suppressWriteLockReport, null);
|
writeUnlock(opName, suppressWriteLockReport, null, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -297,8 +345,9 @@ class FSNamesystemLock {
|
||||||
* for long time will be logged in logs and metrics.
|
* for long time will be logged in logs and metrics.
|
||||||
* @param lockReportInfoSupplier The info shown in the lock report
|
* @param lockReportInfoSupplier The info shown in the lock report
|
||||||
*/
|
*/
|
||||||
private void writeUnlock(String opName, boolean suppressWriteLockReport,
|
public void writeUnlock(String opName, boolean suppressWriteLockReport,
|
||||||
Supplier<String> lockReportInfoSupplier) {
|
Supplier<String> lockReportInfoSupplier,
|
||||||
|
boolean unlockChildren) {
|
||||||
final boolean needReport = !suppressWriteLockReport && coarseLock
|
final boolean needReport = !suppressWriteLockReport && coarseLock
|
||||||
.getWriteHoldCount() == 1 && coarseLock.isWriteLockedByCurrentThread();
|
.getWriteHoldCount() == 1 && coarseLock.isWriteLockedByCurrentThread();
|
||||||
final long writeLockIntervalNanos =
|
final long writeLockIntervalNanos =
|
||||||
|
@ -329,7 +378,18 @@ class FSNamesystemLock {
|
||||||
longestWriteLockHeldInfo = new LockHeldInfo();
|
longestWriteLockHeldInfo = new LockHeldInfo();
|
||||||
}
|
}
|
||||||
|
|
||||||
coarseLock.writeLock().unlock();
|
if(this.isWriteLockedByCurrentThread()) { // Current thread holds the lock
|
||||||
|
// Unlock the top FSNamesystemLock
|
||||||
|
coarseLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
if(unlockChildren) { // Unlock and remove children locks
|
||||||
|
Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
|
||||||
|
while(iter.hasNext()) {
|
||||||
|
iter.next().writeChildUnlock();
|
||||||
|
iter.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (needReport) {
|
if (needReport) {
|
||||||
addMetric(opName, writeLockIntervalNanos, true);
|
addMetric(opName, writeLockIntervalNanos, true);
|
||||||
|
@ -355,7 +415,25 @@ class FSNamesystemLock {
|
||||||
public int getWriteHoldCount() {
|
public int getWriteHoldCount() {
|
||||||
return coarseLock.getWriteHoldCount();
|
return coarseLock.getWriteHoldCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Queries if the write lock is held by any thread.
|
||||||
|
* @return {@code true} if any thread holds the write lock and
|
||||||
|
* {@code false} otherwise
|
||||||
|
*/
|
||||||
|
public boolean isReadLocked() {
|
||||||
|
return coarseLock.getReadLockCount() > 0 || isWriteLocked();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Queries if the write lock is held by any thread.
|
||||||
|
* @return {@code true} if any thread holds the write lock and
|
||||||
|
* {@code false} otherwise
|
||||||
|
*/
|
||||||
|
public boolean isWriteLocked() {
|
||||||
|
return coarseLock.isWriteLocked();
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isWriteLockedByCurrentThread() {
|
public boolean isWriteLockedByCurrentThread() {
|
||||||
return coarseLock.isWriteLockedByCurrentThread();
|
return coarseLock.isWriteLockedByCurrentThread();
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.io.StringWriter;
|
import java.io.StringWriter;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -577,6 +578,43 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
|
||||||
return name == null? null: DFSUtil.bytes2String(name);
|
return name == null? null: DFSUtil.bytes2String(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private long[] namespaceKey;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Key of an INode.
|
||||||
|
* Defines partitioning of INodes in the INodeMap.
|
||||||
|
*
|
||||||
|
* @param level how many levels to be included in the key
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public long[] getNamespaceKey(int level) {
|
||||||
|
if(namespaceKey == null) { // generate the namespace key
|
||||||
|
long[] buf = new long[level];
|
||||||
|
INode cur = this;
|
||||||
|
for(int l = 0; l < level; l++) {
|
||||||
|
long curId = (cur == null) ? 0L : cur.getId();
|
||||||
|
buf[level - l - 1] = curId;
|
||||||
|
cur = (cur == null) ? null : cur.parent;
|
||||||
|
}
|
||||||
|
buf[0] = indexOf(buf);
|
||||||
|
namespaceKey = buf;
|
||||||
|
}
|
||||||
|
return namespaceKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
private final static long LARGE_PRIME = 512927357;
|
||||||
|
public static long indexOf(long[] key) {
|
||||||
|
if(key[key.length-1] == INodeId.ROOT_INODE_ID) {
|
||||||
|
return key[0];
|
||||||
|
}
|
||||||
|
long idx = LARGE_PRIME * key[0];
|
||||||
|
idx = (idx ^ (idx >> 32)) & (INodeMap.NUM_RANGES_STATIC -1);
|
||||||
|
return idx;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Key of a snapshot Diff Element
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public final byte[] getKey() {
|
public final byte[] getKey() {
|
||||||
return getLocalNameBytes();
|
return getLocalNameBytes();
|
||||||
|
@ -636,7 +674,7 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return getLocalName();
|
return getLocalName() + ": " + Arrays.toString(namespaceKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
|
@ -17,44 +17,202 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||||
import org.apache.hadoop.util.GSet;
|
import org.apache.hadoop.util.GSet;
|
||||||
|
import org.apache.hadoop.util.LatchLock;
|
||||||
import org.apache.hadoop.util.LightWeightGSet;
|
import org.apache.hadoop.util.LightWeightGSet;
|
||||||
|
import org.apache.hadoop.util.PartitionedGSet;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Storing all the {@link INode}s and maintaining the mapping between INode ID
|
* Storing all the {@link INode}s and maintaining the mapping between INode ID
|
||||||
* and INode.
|
* and INode.
|
||||||
*/
|
*/
|
||||||
public class INodeMap {
|
public class INodeMap {
|
||||||
|
static final int NAMESPACE_KEY_DEPTH = 2;
|
||||||
static INodeMap newInstance(INodeDirectory rootDir) {
|
static final int NUM_RANGES_STATIC = 256; // power of 2
|
||||||
// Compute the map capacity by allocating 1% of total memory
|
|
||||||
int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
|
public static class INodeKeyComparator implements Comparator<INode> {
|
||||||
GSet<INode, INodeWithAdditionalFields> map =
|
INodeKeyComparator() {
|
||||||
new LightWeightGSet<>(capacity);
|
FSDirectory.LOG.info("Namespace key depth = {}", NAMESPACE_KEY_DEPTH);
|
||||||
map.put(rootDir);
|
}
|
||||||
return new INodeMap(map);
|
|
||||||
|
@Override
|
||||||
|
public int compare(INode i1, INode i2) {
|
||||||
|
if (i1 == null || i2 == null) {
|
||||||
|
throw new NullPointerException("Cannot compare null INodes");
|
||||||
|
}
|
||||||
|
long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEPTH);
|
||||||
|
long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEPTH);
|
||||||
|
for(int l = 0; l < NAMESPACE_KEY_DEPTH; l++) {
|
||||||
|
if(key1[l] == key2[l]) continue;
|
||||||
|
return (key1[l] < key2[l] ? -1 : 1);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INodeKeyComparator with Hashed Parent
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public static class HPINodeKeyComparator implements Comparator<INode> {
|
||||||
|
HPINodeKeyComparator() {
|
||||||
|
FSDirectory.LOG.info("Namespace key depth = {}", NAMESPACE_KEY_DEPTH);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compare(INode i1, INode i2) {
|
||||||
|
if (i1 == null || i2 == null) {
|
||||||
|
throw new NullPointerException("Cannot compare null INodes");
|
||||||
|
}
|
||||||
|
long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEPTH);
|
||||||
|
long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEPTH);
|
||||||
|
long key1_0 = INode.indexOf(key1);
|
||||||
|
long key2_0 = INode.indexOf(key2);
|
||||||
|
if(key1_0 != key2_0)
|
||||||
|
return (key1_0 < key2_0 ? -1 : 1);
|
||||||
|
for(int l = 1; l < NAMESPACE_KEY_DEPTH; l++) {
|
||||||
|
if(key1[l] == key2[l]) continue;
|
||||||
|
return (key1[l] < key2[l] ? -1 : 1);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class INodeIdComparator implements Comparator<INode> {
|
||||||
|
@Override
|
||||||
|
public int compare(INode i1, INode i2) {
|
||||||
|
if (i1 == null || i2 == null) {
|
||||||
|
throw new NullPointerException("Cannot compare null INodesl");
|
||||||
|
}
|
||||||
|
long id1 = i1.getId();
|
||||||
|
long id2 = i2.getId();
|
||||||
|
return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public class INodeMapLock extends LatchLock<ReentrantReadWriteLock> {
|
||||||
|
private ReentrantReadWriteLock childLock;
|
||||||
|
|
||||||
|
INodeMapLock() {
|
||||||
|
this(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private INodeMapLock(ReentrantReadWriteLock childLock) {
|
||||||
|
assert namesystem != null : "namesystem is null";
|
||||||
|
this.childLock = childLock;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean isReadTopLocked() {
|
||||||
|
return namesystem.getFSLock().isReadLocked();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean isWriteTopLocked() {
|
||||||
|
return namesystem.getFSLock().isWriteLocked();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void readTopUnlock() {
|
||||||
|
namesystem.getFSLock().readUnlock("INodeMap", null, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void writeTopUnlock() {
|
||||||
|
namesystem.getFSLock().writeUnlock("INodeMap", false, null, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean hasReadChildLock() {
|
||||||
|
return this.childLock.getReadHoldCount() > 0 || hasWriteChildLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void readChildLock() {
|
||||||
|
// LOG.info("readChildLock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
|
||||||
|
this.childLock.readLock().lock();
|
||||||
|
namesystem.getFSLock().addChildLock(this);
|
||||||
|
// LOG.info("readChildLock: done");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void readChildUnlock() {
|
||||||
|
// LOG.info("readChildUnlock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
|
||||||
|
this.childLock.readLock().unlock();
|
||||||
|
// LOG.info("readChildUnlock: done");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean hasWriteChildLock() {
|
||||||
|
return this.childLock.isWriteLockedByCurrentThread() || namesystem
|
||||||
|
.getFSLock().hasWriteChildLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void writeChildLock() {
|
||||||
|
// LOG.info("writeChildLock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
|
||||||
|
this.childLock.writeLock().lock();
|
||||||
|
namesystem.getFSLock().addChildLock(this);
|
||||||
|
// LOG.info("writeChildLock: done");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void writeChildUnlock() {
|
||||||
|
// LOG.info("writeChildUnlock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
|
||||||
|
this.childLock.writeLock().unlock();
|
||||||
|
// LOG.info("writeChildUnlock: done");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected LatchLock<ReentrantReadWriteLock> clone() {
|
||||||
|
return new INodeMapLock(new ReentrantReadWriteLock(false)); // not fair
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static INodeMap newInstance(INodeDirectory rootDir,
|
||||||
|
FSNamesystem ns) {
|
||||||
|
return new INodeMap(rootDir, ns);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Synchronized by external lock. */
|
/** Synchronized by external lock. */
|
||||||
private final GSet<INode, INodeWithAdditionalFields> map;
|
private final GSet<INode, INodeWithAdditionalFields> map;
|
||||||
|
private FSNamesystem namesystem;
|
||||||
|
|
||||||
public Iterator<INodeWithAdditionalFields> getMapIterator() {
|
public Iterator<INodeWithAdditionalFields> getMapIterator() {
|
||||||
return map.iterator();
|
return map.iterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
private INodeMap(GSet<INode, INodeWithAdditionalFields> map) {
|
private INodeMap(INodeDirectory rootDir, FSNamesystem ns) {
|
||||||
Preconditions.checkArgument(map != null);
|
this.namesystem = ns;
|
||||||
this.map = map;
|
// Compute the map capacity by allocating 1% of total memory
|
||||||
|
int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
|
||||||
|
this.map = new PartitionedGSet<>(capacity, new INodeKeyComparator(),
|
||||||
|
new INodeMapLock());
|
||||||
|
|
||||||
|
// Pre-populate initial empty partitions
|
||||||
|
PartitionedGSet<INode, INodeWithAdditionalFields> pgs =
|
||||||
|
(PartitionedGSet<INode, INodeWithAdditionalFields>) map;
|
||||||
|
PermissionStatus perm = new PermissionStatus(
|
||||||
|
"", "", new FsPermission((short) 0));
|
||||||
|
for(int p = 0; p < NUM_RANGES_STATIC; p++) {
|
||||||
|
INodeDirectory key = new INodeDirectory(INodeId.ROOT_INODE_ID,
|
||||||
|
"range key".getBytes(StandardCharsets.UTF_8), perm, 0);
|
||||||
|
key.setParent(new INodeDirectory((long)p, null, perm, 0));
|
||||||
|
pgs.addNewPartition(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
map.put(rootDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add an {@link INode} into the {@link INode} map. Replace the old value if
|
* Add an {@link INode} into the {@link INode} map. Replace the old value if
|
||||||
* necessary.
|
* necessary.
|
||||||
|
@ -88,48 +246,58 @@ public class INodeMap {
|
||||||
* such {@link INode} in the map.
|
* such {@link INode} in the map.
|
||||||
*/
|
*/
|
||||||
public INode get(long id) {
|
public INode get(long id) {
|
||||||
INode inode = new INodeWithAdditionalFields(id, null, new PermissionStatus(
|
PartitionedGSet<INode, INodeWithAdditionalFields> pgs =
|
||||||
"", "", new FsPermission((short) 0)), 0, 0) {
|
(PartitionedGSet<INode, INodeWithAdditionalFields>) map;
|
||||||
|
/*
|
||||||
@Override
|
* Convert a long inode id into an INode object. We only need to compare
|
||||||
void recordModification(int latestSnapshotId) {
|
* two inodes by inode id. So, it can be any type of INode object.
|
||||||
}
|
*/
|
||||||
|
INode inode = new INodeDirectory(id, null,
|
||||||
@Override
|
new PermissionStatus("", "", new FsPermission((short) 0)), 0);
|
||||||
public void destroyAndCollectBlocks(ReclaimContext reclaimContext) {
|
|
||||||
// Nothing to do
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
/*
|
||||||
public QuotaCounts computeQuotaUsage(
|
* Iterate all partitions of PGSet and return the INode.
|
||||||
BlockStoragePolicySuite bsps, byte blockStoragePolicyId,
|
* Just for fallback.
|
||||||
boolean useCache, int lastSnapshotId) {
|
*/
|
||||||
return null;
|
PermissionStatus perm =
|
||||||
}
|
new PermissionStatus("", "", new FsPermission((short) 0));
|
||||||
|
// TODO: create a static array, to avoid creation of keys each time.
|
||||||
@Override
|
for (int p = 0; p < NUM_RANGES_STATIC; p++) {
|
||||||
public ContentSummaryComputationContext computeContentSummary(
|
INodeDirectory key = new INodeDirectory(INodeId.ROOT_INODE_ID,
|
||||||
int snapshotId, ContentSummaryComputationContext summary) {
|
"range key".getBytes(StandardCharsets.UTF_8), perm, 0);
|
||||||
return null;
|
key.setParent(new INodeDirectory((long)p, null, perm, 0));
|
||||||
}
|
PartitionedGSet.PartitionEntry e = pgs.getPartition(key);
|
||||||
|
|
||||||
@Override
|
if (e.contains(inode)) {
|
||||||
public void cleanSubtree(
|
return (INode) e.get(inode);
|
||||||
ReclaimContext reclaimContext, int snapshotId, int priorSnapshotId) {
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
return null;
|
||||||
public byte getStoragePolicyID(){
|
}
|
||||||
return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
public INode get(INode inode) {
|
||||||
public byte getLocalStoragePolicyID() {
|
|
||||||
return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
|
/*
|
||||||
}
|
* Check whether the Inode has (NAMESPACE_KEY_DEPTH - 1) levels of parent
|
||||||
};
|
* dirs
|
||||||
|
*/
|
||||||
return map.get(inode);
|
int i = NAMESPACE_KEY_DEPTH - 1;
|
||||||
|
INode tmpInode = inode;
|
||||||
|
while (i > 0 && tmpInode.getParent() != null) {
|
||||||
|
tmpInode = tmpInode.getParent();
|
||||||
|
i--;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the Inode has (NAMESPACE_KEY_DEPTH - 1) levels of parent dirs,
|
||||||
|
* use map.get(); else, fall back to get INode based on Inode ID.
|
||||||
|
*/
|
||||||
|
if (i == 0) {
|
||||||
|
return map.get(inode);
|
||||||
|
} else {
|
||||||
|
return get(inode.getId());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -138,4 +306,27 @@ public class INodeMap {
|
||||||
public void clear() {
|
public void clear() {
|
||||||
map.clear();
|
map.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void latchWriteLock(INodesInPath iip, INode[] missing) {
|
||||||
|
assert namesystem.hasReadLock() : "must have namesysem lock";
|
||||||
|
assert iip.length() > 0 : "INodesInPath has 0 length";
|
||||||
|
if(!(map instanceof PartitionedGSet)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Locks partitions along the path starting from the first existing parent
|
||||||
|
// Locking is in the hierarchical order
|
||||||
|
INode[] allINodes = new INode[Math.min(1, iip.length()) + missing.length];
|
||||||
|
allINodes[0] = iip.getLastINode();
|
||||||
|
System.arraycopy(missing, 0, allINodes, 1, missing.length);
|
||||||
|
/*
|
||||||
|
// Locks all the partitions along the path in the hierarchical order
|
||||||
|
INode[] allINodes = new INode[iip.length() + missing.length];
|
||||||
|
INode[] existing = iip.getINodesArray();
|
||||||
|
System.arraycopy(existing, 0, allINodes, 0, existing.length);
|
||||||
|
System.arraycopy(missing, 0, allINodes, existing.length, missing.length);
|
||||||
|
*/
|
||||||
|
|
||||||
|
((PartitionedGSet<INode, INodeWithAdditionalFields>)
|
||||||
|
map).latchWriteLock(allINodes);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -186,6 +186,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.test.Whitebox;
|
import org.apache.hadoop.test.Whitebox;
|
||||||
|
import org.apache.hadoop.util.GSet;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
|
@ -1997,6 +1998,7 @@ public class DFSTestUtil {
|
||||||
GenericTestUtils.setLogLevel(NameNode.LOG, level);
|
GenericTestUtils.setLogLevel(NameNode.LOG, level);
|
||||||
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
|
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
|
||||||
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
|
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
|
||||||
|
GenericTestUtils.setLogLevel(GSet.LOG, level);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -25,6 +25,8 @@ import java.io.IOException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.InvalidPathException;
|
import org.apache.hadoop.fs.InvalidPathException;
|
||||||
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
@ -152,4 +154,55 @@ public class TestDFSMkdirs {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMkDirsWithRestart() throws IOException {
|
||||||
|
MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||||
|
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
try {
|
||||||
|
Path dir1 = new Path("/mkdir-1");
|
||||||
|
Path file1 = new Path(dir1, "file1");
|
||||||
|
Path deleteDir = new Path("/deleteDir");
|
||||||
|
Path deleteFile = new Path(dir1, "deleteFile");
|
||||||
|
// Create a dir in root dir, should succeed
|
||||||
|
assertTrue(dfs.mkdir(dir1, FsPermission.getDefault()));
|
||||||
|
dfs.mkdir(deleteDir, FsPermission.getDefault());
|
||||||
|
assertTrue(dfs.exists(deleteDir));
|
||||||
|
dfs.delete(deleteDir, true);
|
||||||
|
assertTrue(!dfs.exists(deleteDir));
|
||||||
|
|
||||||
|
DFSTestUtil.writeFile(dfs, file1, "hello world");
|
||||||
|
DFSTestUtil.writeFile(dfs, deleteFile, "hello world");
|
||||||
|
int totalFiles = getFileCount(dfs);
|
||||||
|
//Before deletion there are 2 files
|
||||||
|
assertTrue("Incorrect file count", 2 == totalFiles);
|
||||||
|
dfs.delete(deleteFile, false);
|
||||||
|
totalFiles = getFileCount(dfs);
|
||||||
|
//After deletion, left with 1 file
|
||||||
|
assertTrue("Incorrect file count", 1 == totalFiles);
|
||||||
|
|
||||||
|
cluster.restartNameNodes();
|
||||||
|
dfs = cluster.getFileSystem();
|
||||||
|
assertTrue(dfs.exists(dir1));
|
||||||
|
assertTrue(!dfs.exists(deleteDir));
|
||||||
|
assertTrue(dfs.exists(file1));
|
||||||
|
totalFiles = getFileCount(dfs);
|
||||||
|
assertTrue("Incorrect file count", 1 == totalFiles);
|
||||||
|
} finally {
|
||||||
|
dfs.close();
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getFileCount(DistributedFileSystem dfs) throws IOException {
|
||||||
|
RemoteIterator<LocatedFileStatus> fileItr =
|
||||||
|
dfs.listFiles(new Path("/"), true);
|
||||||
|
int totalFiles = 0;
|
||||||
|
while (fileItr.hasNext()) {
|
||||||
|
fileItr.next();
|
||||||
|
totalFiles++;
|
||||||
|
}
|
||||||
|
return totalFiles;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1313,6 +1313,10 @@ public class TestFileCreation {
|
||||||
fail();
|
fail();
|
||||||
} catch(FileNotFoundException e) {
|
} catch(FileNotFoundException e) {
|
||||||
FileSystem.LOG.info("Caught Expected FileNotFoundException: ", e);
|
FileSystem.LOG.info("Caught Expected FileNotFoundException: ", e);
|
||||||
|
} catch (AssertionError ae) {
|
||||||
|
//FSDirWriteFileOp#completeFile throws AssertError if the given
|
||||||
|
// id/node is not an instance of INodeFile.
|
||||||
|
FileSystem.LOG.info("Caught Expected AssertionError: ", ae);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeStream(dfs);
|
IOUtils.closeStream(dfs);
|
||||||
|
|
|
@ -1017,23 +1017,38 @@ public class TestINodeFile {
|
||||||
|
|
||||||
final Path dir = new Path("/dir");
|
final Path dir = new Path("/dir");
|
||||||
hdfs.mkdirs(dir);
|
hdfs.mkdirs(dir);
|
||||||
INodeDirectory dirNode = getDir(fsdir, dir);
|
cluster.getNamesystem().readLock();
|
||||||
INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
|
try {
|
||||||
assertSame(dirNode, dirNodeFromNode);
|
INodeDirectory dirNode = getDir(fsdir, dir);
|
||||||
|
INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
|
||||||
|
assertSame(dirNode, dirNodeFromNode);
|
||||||
|
} finally {
|
||||||
|
cluster.getNamesystem().readUnlock();
|
||||||
|
}
|
||||||
|
|
||||||
// set quota to dir, which leads to node replacement
|
// set quota to dir, which leads to node replacement
|
||||||
hdfs.setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
|
hdfs.setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
|
||||||
dirNode = getDir(fsdir, dir);
|
cluster.getNamesystem().readLock();
|
||||||
assertTrue(dirNode.isWithQuota());
|
try {
|
||||||
// the inode in inodeMap should also be replaced
|
INodeDirectory dirNode = getDir(fsdir, dir);
|
||||||
dirNodeFromNode = fsdir.getInode(dirNode.getId());
|
assertTrue(dirNode.isWithQuota());
|
||||||
assertSame(dirNode, dirNodeFromNode);
|
// the inode in inodeMap should also be replaced
|
||||||
|
INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
|
||||||
|
assertSame(dirNode, dirNodeFromNode);
|
||||||
|
} finally {
|
||||||
|
cluster.getNamesystem().readUnlock();
|
||||||
|
}
|
||||||
|
|
||||||
hdfs.setQuota(dir, -1, -1);
|
hdfs.setQuota(dir, -1, -1);
|
||||||
dirNode = getDir(fsdir, dir);
|
cluster.getNamesystem().readLock();
|
||||||
// the inode in inodeMap should also be replaced
|
try {
|
||||||
dirNodeFromNode = fsdir.getInode(dirNode.getId());
|
INodeDirectory dirNode = getDir(fsdir, dir);
|
||||||
assertSame(dirNode, dirNodeFromNode);
|
// the inode in inodeMap should also be replaced
|
||||||
|
INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
|
||||||
|
assertSame(dirNode, dirNodeFromNode);
|
||||||
|
} finally {
|
||||||
|
cluster.getNamesystem().readUnlock();
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
|
Loading…
Reference in New Issue