HDFS-2476. Merging change r1201991 from trunk to 0.23

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1297863 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-03-07 06:43:11 +00:00
parent 051ac2d9fa
commit d6f9460eba
14 changed files with 1772 additions and 110 deletions

View File

@ -109,6 +109,10 @@ Release 0.23.3 - UNRELEASED
HDFS-2495. Increase granularity of write operations in ReplicationMonitor HDFS-2495. Increase granularity of write operations in ReplicationMonitor
thus reducing contention for write lock. (Tomasz Nykiel via hairong) thus reducing contention for write lock. (Tomasz Nykiel via hairong)
HDFS-2476. More CPU efficient data structure for under-replicated,
over-replicated, and invalidated blocks.
(Tomasz Nykiel via todd)
BUG FIXES BUG FIXES
HDFS-2481. Unknown protocol: org.apache.hadoop.hdfs.protocol.ClientProtocol. HDFS-2481. Unknown protocol: org.apache.hadoop.hdfs.protocol.ClientProtocol.

View File

@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocat
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
@ -142,8 +143,8 @@ public class BlockManager {
// eventually remove these extras. // eventually remove these extras.
// Mapping: StorageID -> TreeSet<Block> // Mapping: StorageID -> TreeSet<Block>
// //
public final Map<String, Collection<Block>> excessReplicateMap = public final Map<String, LightWeightLinkedSet<Block>> excessReplicateMap =
new TreeMap<String, Collection<Block>>(); new TreeMap<String, LightWeightLinkedSet<Block>>();
// //
// Store set of Blocks that need to be replicated 1 or more times. // Store set of Blocks that need to be replicated 1 or more times.
@ -1255,7 +1256,7 @@ public class BlockManager {
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block); Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
while(it.hasNext()) { while(it.hasNext()) {
DatanodeDescriptor node = it.next(); DatanodeDescriptor node = it.next();
Collection<Block> excessBlocks = LightWeightLinkedSet<Block> excessBlocks =
excessReplicateMap.get(node.getStorageID()); excessReplicateMap.get(node.getStorageID());
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
corrupt++; corrupt++;
@ -1987,7 +1988,7 @@ public class BlockManager {
for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block); for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
it.hasNext();) { it.hasNext();) {
DatanodeDescriptor cur = it.next(); DatanodeDescriptor cur = it.next();
Collection<Block> excessBlocks = excessReplicateMap.get(cur LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
.getStorageID()); .getStorageID());
if (excessBlocks == null || !excessBlocks.contains(block)) { if (excessBlocks == null || !excessBlocks.contains(block)) {
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
@ -2105,9 +2106,9 @@ public class BlockManager {
private void addToExcessReplicate(DatanodeInfo dn, Block block) { private void addToExcessReplicate(DatanodeInfo dn, Block block) {
assert namesystem.hasWriteLock(); assert namesystem.hasWriteLock();
Collection<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID()); LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
if (excessBlocks == null) { if (excessBlocks == null) {
excessBlocks = new TreeSet<Block>(); excessBlocks = new LightWeightLinkedSet<Block>();
excessReplicateMap.put(dn.getStorageID(), excessBlocks); excessReplicateMap.put(dn.getStorageID(), excessBlocks);
} }
if (excessBlocks.add(block)) { if (excessBlocks.add(block)) {
@ -2155,7 +2156,7 @@ public class BlockManager {
// We've removed a block from a node, so it's definitely no longer // We've removed a block from a node, so it's definitely no longer
// in "excess" there. // in "excess" there.
// //
Collection<Block> excessBlocks = excessReplicateMap.get(node LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node
.getStorageID()); .getStorageID());
if (excessBlocks != null) { if (excessBlocks != null) {
if (excessBlocks.remove(block)) { if (excessBlocks.remove(block)) {
@ -2305,8 +2306,8 @@ public class BlockManager {
} else if (node.isDecommissionInProgress() || node.isDecommissioned()) { } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
count++; count++;
} else { } else {
Collection<Block> blocksExcess = LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
excessReplicateMap.get(node.getStorageID()); .getStorageID());
if (blocksExcess != null && blocksExcess.contains(b)) { if (blocksExcess != null && blocksExcess.contains(b)) {
excess++; excess++;
} else { } else {

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
@ -120,7 +121,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
private BlockQueue<BlockInfoUnderConstruction> recoverBlocks = private BlockQueue<BlockInfoUnderConstruction> recoverBlocks =
new BlockQueue<BlockInfoUnderConstruction>(); new BlockQueue<BlockInfoUnderConstruction>();
/** A set of blocks to be invalidated by this datanode */ /** A set of blocks to be invalidated by this datanode */
private Set<Block> invalidateBlocks = new TreeSet<Block>(); private LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>();
/* Variables for maintaining number of blocks scheduled to be written to /* Variables for maintaining number of blocks scheduled to be written to
* this datanode. This count is approximate and might be slightly bigger * this datanode. This count is approximate and might be slightly bigger
@ -400,45 +401,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
* Remove the specified number of blocks to be invalidated * Remove the specified number of blocks to be invalidated
*/ */
public Block[] getInvalidateBlocks(int maxblocks) { public Block[] getInvalidateBlocks(int maxblocks) {
return getBlockArray(invalidateBlocks, maxblocks); synchronized (invalidateBlocks) {
} Block[] deleteList = invalidateBlocks.pollToArray(new Block[Math.min(
invalidateBlocks.size(), maxblocks)]);
static private Block[] getBlockArray(Collection<Block> blocks, int max) { return deleteList.length == 0 ? null : deleteList;
Block[] blockarray = null;
synchronized(blocks) {
int available = blocks.size();
int n = available;
if (max > 0 && n > 0) {
if (max < n) {
n = max;
}
// allocate the properly sized block array ...
blockarray = new Block[n];
// iterate tree collecting n blocks...
Iterator<Block> e = blocks.iterator();
int blockCount = 0;
while (blockCount < n && e.hasNext()) {
// insert into array ...
blockarray[blockCount++] = e.next();
// remove from tree via iterator, if we are removing
// less than total available blocks
if (n < available){
e.remove();
}
}
assert(blockarray.length == n);
// now if the number of blocks removed equals available blocks,
// them remove all blocks in one fell swoop via clear
if (n == available) {
blocks.clear();
}
}
} }
return blockarray;
} }
/** Serialization for FSEditLog */ /** Serialization for FSEditLog */

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
/** /**
* Keeps a Collection for every named machine containing blocks * Keeps a Collection for every named machine containing blocks
@ -39,8 +40,8 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
@InterfaceAudience.Private @InterfaceAudience.Private
class InvalidateBlocks { class InvalidateBlocks {
/** Mapping: StorageID -> Collection of Blocks */ /** Mapping: StorageID -> Collection of Blocks */
private final Map<String, Collection<Block>> node2blocks = private final Map<String, LightWeightHashSet<Block>> node2blocks =
new TreeMap<String, Collection<Block>>(); new TreeMap<String, LightWeightHashSet<Block>>();
/** The total number of blocks in the map. */ /** The total number of blocks in the map. */
private long numBlocks = 0L; private long numBlocks = 0L;
@ -67,9 +68,9 @@ class InvalidateBlocks {
*/ */
synchronized void add(final Block block, final DatanodeInfo datanode, synchronized void add(final Block block, final DatanodeInfo datanode,
final boolean log) { final boolean log) {
Collection<Block> set = node2blocks.get(datanode.getStorageID()); LightWeightHashSet<Block> set = node2blocks.get(datanode.getStorageID());
if (set == null) { if (set == null) {
set = new HashSet<Block>(); set = new LightWeightHashSet<Block>();
node2blocks.put(datanode.getStorageID(), set); node2blocks.put(datanode.getStorageID(), set);
} }
if (set.add(block)) { if (set.add(block)) {
@ -83,7 +84,7 @@ class InvalidateBlocks {
/** Remove a storage from the invalidatesSet */ /** Remove a storage from the invalidatesSet */
synchronized void remove(final String storageID) { synchronized void remove(final String storageID) {
final Collection<Block> blocks = node2blocks.remove(storageID); final LightWeightHashSet<Block> blocks = node2blocks.remove(storageID);
if (blocks != null) { if (blocks != null) {
numBlocks -= blocks.size(); numBlocks -= blocks.size();
} }
@ -91,7 +92,7 @@ class InvalidateBlocks {
/** Remove the block from the specified storage. */ /** Remove the block from the specified storage. */
synchronized void remove(final String storageID, final Block block) { synchronized void remove(final String storageID, final Block block) {
final Collection<Block> v = node2blocks.get(storageID); final LightWeightHashSet<Block> v = node2blocks.get(storageID);
if (v != null && v.remove(block)) { if (v != null && v.remove(block)) {
numBlocks--; numBlocks--;
if (v.isEmpty()) { if (v.isEmpty()) {
@ -109,8 +110,8 @@ class InvalidateBlocks {
return; return;
} }
for(Map.Entry<String,Collection<Block>> entry : node2blocks.entrySet()) { for(Map.Entry<String,LightWeightHashSet<Block>> entry : node2blocks.entrySet()) {
final Collection<Block> blocks = entry.getValue(); final LightWeightHashSet<Block> blocks = entry.getValue();
if (blocks.size() > 0) { if (blocks.size() > 0) {
out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks); out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks);
} }
@ -143,21 +144,17 @@ class InvalidateBlocks {
private synchronized List<Block> invalidateWork( private synchronized List<Block> invalidateWork(
final String storageId, final DatanodeDescriptor dn) { final String storageId, final DatanodeDescriptor dn) {
final Collection<Block> set = node2blocks.get(storageId); final LightWeightHashSet<Block> set = node2blocks.get(storageId);
if (set == null) { if (set == null) {
return null; return null;
} }
// # blocks that can be sent in one message is limited // # blocks that can be sent in one message is limited
final int limit = datanodeManager.blockInvalidateLimit; final int limit = datanodeManager.blockInvalidateLimit;
final List<Block> toInvalidate = new ArrayList<Block>(limit); final List<Block> toInvalidate = set.pollN(limit);
final Iterator<Block> it = set.iterator();
for(int count = 0; count < limit && it.hasNext(); count++) {
toInvalidate.add(it.next());
it.remove();
}
// If we send everything in this message, remove this node entry // If we send everything in this message, remove this node entry
if (!it.hasNext()) { if (set.isEmpty()) {
remove(storageId); remove(storageId);
} }

View File

@ -24,6 +24,7 @@ import java.util.NavigableSet;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
/** /**
@ -80,13 +81,13 @@ class UnderReplicatedBlocks implements Iterable<Block> {
/** The queue for corrupt blocks: {@value} */ /** The queue for corrupt blocks: {@value} */
static final int QUEUE_WITH_CORRUPT_BLOCKS = 4; static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
/** the queues themselves */ /** the queues themselves */
private final List<NavigableSet<Block>> priorityQueues private List<LightWeightLinkedSet<Block>> priorityQueues
= new ArrayList<NavigableSet<Block>>(LEVEL); = new ArrayList<LightWeightLinkedSet<Block>>();
/** Create an object. */ /** Create an object. */
UnderReplicatedBlocks() { UnderReplicatedBlocks() {
for (int i = 0; i < LEVEL; i++) { for (int i = 0; i < LEVEL; i++) {
priorityQueues.add(new TreeSet<Block>()); priorityQueues.add(new LightWeightLinkedSet<Block>());
} }
} }
@ -126,7 +127,7 @@ class UnderReplicatedBlocks implements Iterable<Block> {
/** Check if a block is in the neededReplication queue */ /** Check if a block is in the neededReplication queue */
synchronized boolean contains(Block block) { synchronized boolean contains(Block block) {
for (NavigableSet<Block> set : priorityQueues) { for(LightWeightLinkedSet<Block> set : priorityQueues) {
if (set.contains(block)) { if (set.contains(block)) {
return true; return true;
} }

View File

@ -4013,7 +4013,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @throws IOException * @throws IOException
*/ */
Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path, Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
String startBlockAfter) throws IOException { String[] cookieTab) throws IOException {
readLock(); readLock();
try { try {
@ -4022,23 +4022,27 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
"replication queues have not been initialized."); "replication queues have not been initialized.");
} }
checkSuperuserPrivilege(); checkSuperuserPrivilege();
long startBlockId = 0;
// print a limited # of corrupt files per call // print a limited # of corrupt files per call
int count = 0; int count = 0;
ArrayList<CorruptFileBlockInfo> corruptFiles = new ArrayList<CorruptFileBlockInfo>(); ArrayList<CorruptFileBlockInfo> corruptFiles = new ArrayList<CorruptFileBlockInfo>();
if (startBlockAfter != null) { final Iterator<Block> blkIterator = blockManager.getCorruptReplicaBlockIterator();
startBlockId = Block.filename2id(startBlockAfter);
if (cookieTab == null) {
cookieTab = new String[] { null };
}
int skip = getIntCookie(cookieTab[0]);
for (int i = 0; i < skip && blkIterator.hasNext(); i++) {
blkIterator.next();
} }
final Iterator<Block> blkIterator = blockManager.getCorruptReplicaBlockIterator();
while (blkIterator.hasNext()) { while (blkIterator.hasNext()) {
Block blk = blkIterator.next(); Block blk = blkIterator.next();
INode inode = blockManager.getINode(blk); INode inode = blockManager.getINode(blk);
skip++;
if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) { if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) {
String src = FSDirectory.getFullPathName(inode); String src = FSDirectory.getFullPathName(inode);
if (((startBlockAfter == null) || (blk.getBlockId() > startBlockId)) if (src.startsWith(path)){
&& (src.startsWith(path))) {
corruptFiles.add(new CorruptFileBlockInfo(src, blk)); corruptFiles.add(new CorruptFileBlockInfo(src, blk));
count++; count++;
if (count >= DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED) if (count >= DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED)
@ -4046,6 +4050,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
} }
} }
cookieTab[0] = String.valueOf(skip);
LOG.info("list corrupt file blocks returned: " + count); LOG.info("list corrupt file blocks returned: " + count);
return corruptFiles; return corruptFiles;
} finally { } finally {
@ -4053,6 +4058,24 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
} }
/**
* Convert string cookie to integer.
*/
private static int getIntCookie(String cookie){
int c;
if(cookie == null){
c = 0;
} else {
try{
c = Integer.parseInt(cookie);
}catch (NumberFormatException e) {
c = 0;
}
}
c = Math.max(0, c);
return c;
}
/** /**
* Create delegation token secret manager * Create delegation token secret manager
*/ */

View File

@ -729,17 +729,16 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol @Override // ClientProtocol
public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
throws IOException { throws IOException {
String[] cookieTab = new String[] { cookie };
Collection<FSNamesystem.CorruptFileBlockInfo> fbs = Collection<FSNamesystem.CorruptFileBlockInfo> fbs =
namesystem.listCorruptFileBlocks(path, cookie); namesystem.listCorruptFileBlocks(path, cookieTab);
String[] files = new String[fbs.size()]; String[] files = new String[fbs.size()];
String lastCookie = "";
int i = 0; int i = 0;
for(FSNamesystem.CorruptFileBlockInfo fb: fbs) { for(FSNamesystem.CorruptFileBlockInfo fb: fbs) {
files[i++] = fb.path; files[i++] = fb.path;
lastCookie = fb.block.getBlockName();
} }
return new CorruptFileBlocks(files, lastCookie); return new CorruptFileBlocks(files, cookieTab[0]);
} }
/** /**

View File

@ -113,7 +113,7 @@ public class NamenodeFsck {
// We return back N files that are corrupt; the list of files returned is // We return back N files that are corrupt; the list of files returned is
// ordered by block id; to allow continuation support, pass in the last block // ordered by block id; to allow continuation support, pass in the last block
// # from previous call // # from previous call
private String startBlockAfter = null; private String[] currentCookie = new String[] { null };
private final Configuration conf; private final Configuration conf;
private final PrintWriter out; private final PrintWriter out;
@ -155,7 +155,7 @@ public class NamenodeFsck {
this.showCorruptFileBlocks = true; this.showCorruptFileBlocks = true;
} }
else if (key.equals("startblockafter")) { else if (key.equals("startblockafter")) {
this.startBlockAfter = pmap.get("startblockafter")[0]; this.currentCookie[0] = pmap.get("startblockafter")[0];
} }
} }
} }
@ -218,16 +218,17 @@ public class NamenodeFsck {
private void listCorruptFileBlocks() throws IOException { private void listCorruptFileBlocks() throws IOException {
Collection<FSNamesystem.CorruptFileBlockInfo> corruptFiles = namenode. Collection<FSNamesystem.CorruptFileBlockInfo> corruptFiles = namenode.
getNamesystem().listCorruptFileBlocks(path, startBlockAfter); getNamesystem().listCorruptFileBlocks(path, currentCookie);
int numCorruptFiles = corruptFiles.size(); int numCorruptFiles = corruptFiles.size();
String filler; String filler;
if (numCorruptFiles > 0) { if (numCorruptFiles > 0) {
filler = Integer.toString(numCorruptFiles); filler = Integer.toString(numCorruptFiles);
} else if (startBlockAfter == null) { } else if (currentCookie[0].equals("0")) {
filler = "no"; filler = "no";
} else { } else {
filler = "no more"; filler = "no more";
} }
out.println("Cookie:\t" + currentCookie[0]);
for (FSNamesystem.CorruptFileBlockInfo c : corruptFiles) { for (FSNamesystem.CorruptFileBlockInfo c : corruptFiles) {
out.println(c.toString()); out.println(c.toString());
} }

View File

@ -144,14 +144,15 @@ public class DFSck extends Configured implements Tool {
throws IOException { throws IOException {
int errCode = -1; int errCode = -1;
int numCorrupt = 0; int numCorrupt = 0;
String lastBlock = null; int cookie = 0;
final String noCorruptLine = "has no CORRUPT files"; final String noCorruptLine = "has no CORRUPT files";
final String noMoreCorruptLine = "has no more CORRUPT files"; final String noMoreCorruptLine = "has no more CORRUPT files";
final String cookiePrefix = "Cookie:";
boolean allDone = false; boolean allDone = false;
while (!allDone) { while (!allDone) {
final StringBuffer url = new StringBuffer(baseUrl); final StringBuffer url = new StringBuffer(baseUrl);
if (lastBlock != null) { if (cookie > 0) {
url.append("&startblockafter=").append(lastBlock); url.append("&startblockafter=").append(String.valueOf(cookie));
} }
URL path = new URL(url.toString()); URL path = new URL(url.toString());
SecurityUtil.fetchServiceTicket(path); SecurityUtil.fetchServiceTicket(path);
@ -162,6 +163,15 @@ public class DFSck extends Configured implements Tool {
try { try {
String line = null; String line = null;
while ((line = input.readLine()) != null) { while ((line = input.readLine()) != null) {
if (line.startsWith(cookiePrefix)){
try{
cookie = Integer.parseInt(line.split("\t")[1]);
} catch (Exception e){
allDone = true;
break;
}
continue;
}
if ((line.endsWith(noCorruptLine)) || if ((line.endsWith(noCorruptLine)) ||
(line.endsWith(noMoreCorruptLine)) || (line.endsWith(noMoreCorruptLine)) ||
(line.endsWith(NamenodeFsck.NONEXISTENT_STATUS))) { (line.endsWith(NamenodeFsck.NONEXISTENT_STATUS))) {
@ -178,13 +188,6 @@ public class DFSck extends Configured implements Tool {
+ dir + "' are:"); + dir + "' are:");
} }
out.println(line); out.println(line);
try {
// Get the block # that we need to send in next call
lastBlock = line.split("\t")[0];
} catch (Exception e) {
allDone = true;
break;
}
} }
} finally { } finally {
input.close(); input.close();

View File

@ -0,0 +1,618 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A low memory linked hash set implementation, which uses an array for storing
* the elements and linked lists for collision resolution. This class does not
* support null element.
*
* This class is not thread safe.
*
*/
public class LightWeightHashSet<T> implements Collection<T> {
/**
* Elements of {@link LightWeightLinkedSet}.
*/
static class LinkedElement<T> {
protected final T element;
// reference to the next entry within a bucket linked list
protected LinkedElement<T> next;
//hashCode of the element
protected final int hashCode;
public LinkedElement(T elem, int hash) {
this.element = elem;
this.next = null;
this.hashCode = hash;
}
public String toString() {
return element.toString();
}
}
protected static final float DEFAULT_MAX_LOAD_FACTOR = 0.75f;
protected static final float DEFAUT_MIN_LOAD_FACTOR = 0.2f;
protected static final int MINIMUM_CAPACITY = 16;
static final int MAXIMUM_CAPACITY = 1 << 30;
private static final Log LOG = LogFactory.getLog(LightWeightHashSet.class);
/**
* An internal array of entries, which are the rows of the hash table. The
* size must be a power of two.
*/
protected LinkedElement<T>[] entries;
/** Size of the entry table. */
private int capacity;
/** The size of the set (not the entry array). */
protected int size = 0;
/** Hashmask used for determining the bucket index **/
private int hash_mask;
/** Capacity at initialization time **/
private final int initialCapacity;
/**
* Modification version for fail-fast.
*
* @see ConcurrentModificationException
*/
protected volatile int modification = 0;
private float maxLoadFactor;
private float minLoadFactor;
private int expandMultiplier = 2;
private int expandThreshold;
private int shrinkThreshold;
/**
* @param initCapacity
* Recommended size of the internal array.
* @param maxLoadFactor
* used to determine when to expand the internal array
* @param minLoadFactor
* used to determine when to shrink the internal array
*/
@SuppressWarnings("unchecked")
public LightWeightHashSet(int initCapacity, float maxLoadFactor,
float minLoadFactor) {
if (maxLoadFactor <= 0 || maxLoadFactor > 1.0f)
throw new IllegalArgumentException("Illegal maxload factor: "
+ maxLoadFactor);
if (minLoadFactor <= 0 || minLoadFactor > maxLoadFactor)
throw new IllegalArgumentException("Illegal minload factor: "
+ minLoadFactor);
this.initialCapacity = computeCapacity(initCapacity);
this.capacity = this.initialCapacity;
this.hash_mask = capacity - 1;
this.maxLoadFactor = maxLoadFactor;
this.expandThreshold = (int) (capacity * maxLoadFactor);
this.minLoadFactor = minLoadFactor;
this.shrinkThreshold = (int) (capacity * minLoadFactor);
entries = new LinkedElement[capacity];
LOG.debug("initial capacity=" + initialCapacity + ", max load factor= "
+ maxLoadFactor + ", min load factor= " + minLoadFactor);
}
public LightWeightHashSet() {
this(MINIMUM_CAPACITY, DEFAULT_MAX_LOAD_FACTOR, DEFAUT_MIN_LOAD_FACTOR);
}
public LightWeightHashSet(int minCapacity) {
this(minCapacity, DEFAULT_MAX_LOAD_FACTOR, DEFAUT_MIN_LOAD_FACTOR);
}
/**
* Check if the set is empty.
*
* @return true is set empty, false otherwise
*/
public boolean isEmpty() {
return size == 0;
}
/**
* Return the current capacity (for testing).
*/
public int getCapacity() {
return capacity;
}
/**
* Return the number of stored elements.
*/
public int size() {
return size;
}
/**
* Get index in the internal table for a given hash.
*/
protected int getIndex(int hashCode) {
return hashCode & hash_mask;
}
/**
* Check if the set contains given element
*
* @return true if element present, false otherwise.
*/
@SuppressWarnings("unchecked")
public boolean contains(final Object key) {
// validate key
if (key == null) {
throw new IllegalArgumentException("Null element is not supported.");
}
// find element
final int hashCode = ((T)key).hashCode();
final int index = getIndex(hashCode);
return containsElem(index, (T) key, hashCode);
}
/**
* Check if the set contains given element at given index.
*
* @return true if element present, false otherwise.
*/
protected boolean containsElem(int index, final T key, int hashCode) {
for (LinkedElement<T> e = entries[index]; e != null; e = e.next) {
// element found
if (hashCode == e.hashCode && e.element.equals(key)) {
return true;
}
}
// element not found
return false;
}
/**
* All all elements in the collection. Expand if necessary.
*
* @param toAdd - elements to add.
* @return true if the set has changed, false otherwise
*/
public boolean addAll(Collection<? extends T> toAdd) {
boolean changed = false;
for (T elem : toAdd) {
changed |= addElem(elem);
}
expandIfNecessary();
return changed;
}
/**
* Add given element to the hash table. Expand table if necessary.
*
* @return true if the element was not present in the table, false otherwise
*/
public boolean add(final T element) {
boolean added = addElem(element);
expandIfNecessary();
return added;
}
/**
* Add given element to the hash table
*
* @return true if the element was not present in the table, false otherwise
*/
protected boolean addElem(final T element) {
// validate element
if (element == null) {
throw new IllegalArgumentException("Null element is not supported.");
}
// find hashCode & index
final int hashCode = element.hashCode();
final int index = getIndex(hashCode);
// return false if already present
if (containsElem(index, element, hashCode)) {
return false;
}
modification++;
size++;
// update bucket linked list
LinkedElement<T> le = new LinkedElement<T>(element, hashCode);
le.next = entries[index];
entries[index] = le;
return true;
}
/**
* Remove the element corresponding to the key.
*
* @return If such element exists, return true. Otherwise, return false.
*/
@SuppressWarnings("unchecked")
public boolean remove(final Object key) {
// validate key
if (key == null) {
throw new IllegalArgumentException("Null element is not supported.");
}
LinkedElement<T> removed = removeElem((T) key);
shrinkIfNecessary();
return removed == null ? false : true;
}
/**
* Remove the element corresponding to the key, given key.hashCode() == index.
*
* @return If such element exists, return true. Otherwise, return false.
*/
protected LinkedElement<T> removeElem(final T key) {
LinkedElement<T> found = null;
final int hashCode = key.hashCode();
final int index = getIndex(hashCode);
if (entries[index] == null) {
return null;
} else if (hashCode == entries[index].hashCode &&
entries[index].element.equals(key)) {
// remove the head of the bucket linked list
modification++;
size--;
found = entries[index];
entries[index] = found.next;
} else {
// head != null and key is not equal to head
// search the element
LinkedElement<T> prev = entries[index];
for (found = prev.next; found != null;) {
if (hashCode == found.hashCode &&
found.element.equals(key)) {
// found the element, remove it
modification++;
size--;
prev.next = found.next;
found.next = null;
break;
} else {
prev = found;
found = found.next;
}
}
}
return found;
}
/**
* Remove and return n elements from the hashtable.
* The order in which entries are removed is unspecified, and
* and may not correspond to the order in which they were inserted.
*
* @return first element
*/
public List<T> pollN(int n) {
if (n >= size) {
return pollAll();
}
List<T> retList = new ArrayList<T>(n);
if (n == 0) {
return retList;
}
boolean done = false;
int currentBucketIndex = 0;
while (!done) {
LinkedElement<T> current = entries[currentBucketIndex];
while (current != null) {
retList.add(current.element);
current = current.next;
entries[currentBucketIndex] = current;
size--;
modification++;
if (--n == 0) {
done = true;
break;
}
}
currentBucketIndex++;
}
shrinkIfNecessary();
return retList;
}
/**
* Remove all elements from the set and return them. Clear the entries.
*/
public List<T> pollAll() {
List<T> retList = new ArrayList<T>(size);
for (int i = 0; i < entries.length; i++) {
LinkedElement<T> current = entries[i];
while (current != null) {
retList.add(current.element);
current = current.next;
}
}
this.clear();
return retList;
}
/**
* Get array.length elements from the set, and put them into the array.
*/
@SuppressWarnings("unchecked")
public T[] pollToArray(T[] array) {
int currentIndex = 0;
LinkedElement<T> current = null;
if (array.length == 0) {
return array;
}
if (array.length > size) {
array = (T[]) java.lang.reflect.Array.newInstance(array.getClass()
.getComponentType(), size);
}
// do fast polling if the entire set needs to be fetched
if (array.length == size) {
for (int i = 0; i < entries.length; i++) {
current = entries[i];
while (current != null) {
array[currentIndex++] = current.element;
current = current.next;
}
}
this.clear();
return array;
}
boolean done = false;
int currentBucketIndex = 0;
while (!done) {
current = entries[currentBucketIndex];
while (current != null) {
array[currentIndex++] = current.element;
current = current.next;
entries[currentBucketIndex] = current;
size--;
modification++;
if (currentIndex == array.length) {
done = true;
break;
}
}
currentBucketIndex++;
}
shrinkIfNecessary();
return array;
}
/**
* Compute capacity given initial capacity.
*
* @return final capacity, either MIN_CAPACITY, MAX_CAPACITY, or power of 2
* closest to the requested capacity.
*/
private int computeCapacity(int initial) {
if (initial < MINIMUM_CAPACITY) {
return MINIMUM_CAPACITY;
}
if (initial > MAXIMUM_CAPACITY) {
return MAXIMUM_CAPACITY;
}
int capacity = 1;
while (capacity < initial) {
capacity <<= 1;
}
return capacity;
}
/**
* Resize the internal table to given capacity.
*/
@SuppressWarnings("unchecked")
private void resize(int cap) {
int newCapacity = computeCapacity(cap);
if (newCapacity == this.capacity) {
return;
}
this.capacity = newCapacity;
this.expandThreshold = (int) (capacity * maxLoadFactor);
this.shrinkThreshold = (int) (capacity * minLoadFactor);
this.hash_mask = capacity - 1;
LinkedElement<T>[] temp = entries;
entries = new LinkedElement[capacity];
for (int i = 0; i < temp.length; i++) {
LinkedElement<T> curr = temp[i];
while (curr != null) {
LinkedElement<T> next = curr.next;
int index = getIndex(curr.hashCode);
curr.next = entries[index];
entries[index] = curr;
curr = next;
}
}
}
/**
* Checks if we need to shrink, and shrinks if necessary.
*/
protected void shrinkIfNecessary() {
if (size < this.shrinkThreshold && capacity > initialCapacity) {
resize(capacity / expandMultiplier);
}
}
/**
* Checks if we need to expand, and expands if necessary.
*/
protected void expandIfNecessary() {
if (size > this.expandThreshold && capacity < MAXIMUM_CAPACITY) {
resize(capacity * expandMultiplier);
}
}
public Iterator<T> iterator() {
return new LinkedSetIterator();
}
@Override
public String toString() {
final StringBuilder b = new StringBuilder(getClass().getSimpleName());
b.append("(size=").append(size).append(", modification=")
.append(modification).append(", entries.length=")
.append(entries.length).append(")");
return b.toString();
}
/** Print detailed information of this object. */
public void printDetails(final PrintStream out) {
out.print(this + ", entries = [");
for (int i = 0; i < entries.length; i++) {
if (entries[i] != null) {
LinkedElement<T> e = entries[i];
out.print("\n " + i + ": " + e);
for (e = e.next; e != null; e = e.next) {
out.print(" -> " + e);
}
}
}
out.println("\n]");
}
private class LinkedSetIterator implements Iterator<T> {
/** The starting modification for fail-fast. */
private final int startModification = modification;
/** The current index of the entry array. */
private int index = -1;
/** The next element to return. */
private LinkedElement<T> next = nextNonemptyEntry();
private LinkedElement<T> nextNonemptyEntry() {
for (index++; index < entries.length && entries[index] == null; index++);
return index < entries.length ? entries[index] : null;
}
@Override
public boolean hasNext() {
return next != null;
}
@Override
public T next() {
if (modification != startModification) {
throw new ConcurrentModificationException("modification="
+ modification + " != startModification = " + startModification);
}
if (next == null) {
throw new NoSuchElementException();
}
final T e = next.element;
// find the next element
final LinkedElement<T> n = next.next;
next = n != null ? n : nextNonemptyEntry();
return e;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove is not supported.");
}
}
/**
* Clear the set. Resize it to the original capacity.
*/
@SuppressWarnings("unchecked")
public void clear() {
this.capacity = this.initialCapacity;
this.hash_mask = capacity - 1;
this.expandThreshold = (int) (capacity * maxLoadFactor);
this.shrinkThreshold = (int) (capacity * minLoadFactor);
entries = new LinkedElement[capacity];
size = 0;
modification++;
}
@Override
public Object[] toArray() {
Object[] result = new Object[size];
return toArray(result);
}
@Override
@SuppressWarnings("unchecked")
public <U> U[] toArray(U[] a) {
if (a == null) {
throw new NullPointerException("Input array can not be null");
}
if (a.length < size) {
a = (U[]) java.lang.reflect.Array.newInstance(a.getClass()
.getComponentType(), size);
}
int currentIndex = 0;
for (int i = 0; i < entries.length; i++) {
LinkedElement<T> current = entries[i];
while (current != null) {
a[currentIndex++] = (U) current.element;
current = current.next;
}
}
return a;
}
@Override
public boolean containsAll(Collection<?> c) {
Iterator<?> iter = c.iterator();
while (iter.hasNext()) {
if (!contains(iter.next())) {
return false;
}
}
return true;
}
@Override
public boolean removeAll(Collection<?> c) {
boolean changed = false;
Iterator<?> iter = c.iterator();
while (iter.hasNext()) {
changed |= remove(iter.next());
}
return changed;
}
@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException("retainAll is not supported.");
}
}

View File

@ -0,0 +1,259 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
/**
* A low memory linked hash set implementation, which uses an array for storing
* the elements and linked lists for collision resolution. In addition it stores
* elements in a linked list to ensure ordered traversal. This class does not
* support null element.
*
* This class is not thread safe.
*
*/
public class LightWeightLinkedSet<T> extends LightWeightHashSet<T> {
/**
* Elements of {@link LightWeightLinkedSet}.
*/
static class DoubleLinkedElement<T> extends LinkedElement<T> {
// references to elements within all-element linked list
private DoubleLinkedElement<T> before;
private DoubleLinkedElement<T> after;
public DoubleLinkedElement(T elem, int hashCode) {
super(elem, hashCode);
this.before = null;
this.after = null;
}
public String toString() {
return super.toString();
}
}
private DoubleLinkedElement<T> head;
private DoubleLinkedElement<T> tail;
/**
* @param initCapacity
* Recommended size of the internal array.
* @param maxLoadFactor
* used to determine when to expand the internal array
* @param minLoadFactor
* used to determine when to shrink the internal array
*/
public LightWeightLinkedSet(int initCapacity, float maxLoadFactor,
float minLoadFactor) {
super(initCapacity, maxLoadFactor, minLoadFactor);
head = null;
tail = null;
}
public LightWeightLinkedSet() {
this(MINIMUM_CAPACITY, DEFAULT_MAX_LOAD_FACTOR, DEFAUT_MIN_LOAD_FACTOR);
}
/**
* Add given element to the hash table
*
* @return true if the element was not present in the table, false otherwise
*/
protected boolean addElem(final T element) {
// validate element
if (element == null) {
throw new IllegalArgumentException("Null element is not supported.");
}
// find hashCode & index
final int hashCode = element.hashCode();
final int index = getIndex(hashCode);
// return false if already present
if (containsElem(index, element, hashCode)) {
return false;
}
modification++;
size++;
// update bucket linked list
DoubleLinkedElement<T> le = new DoubleLinkedElement<T>(element, hashCode);
le.next = entries[index];
entries[index] = le;
// insert to the end of the all-element linked list
le.after = null;
le.before = tail;
if (tail != null) {
tail.after = le;
}
tail = le;
if (head == null) {
head = le;
}
return true;
}
/**
* Remove the element corresponding to the key, given key.hashCode() == index.
*
* @return Return the entry with the element if exists. Otherwise return null.
*/
protected DoubleLinkedElement<T> removeElem(final T key) {
DoubleLinkedElement<T> found = (DoubleLinkedElement<T>) (super
.removeElem(key));
if (found == null) {
return null;
}
// update linked list
if (found.after != null) {
found.after.before = found.before;
}
if (found.before != null) {
found.before.after = found.after;
}
if (head == found) {
head = head.after;
}
if (tail == found) {
tail = tail.before;
}
return found;
}
/**
* Remove and return first element on the linked list of all elements.
*
* @return first element
*/
public T pollFirst() {
if (head == null) {
return null;
}
T first = head.element;
this.remove(first);
return first;
}
/**
* Remove and return n elements from the hashtable.
* The order in which entries are removed is corresponds
* to the order in which they were inserted.
*
* @return first element
*/
public List<T> pollN(int n) {
if (n >= size) {
// if we need to remove all elements then do fast polling
return pollAll();
}
List<T> retList = new ArrayList<T>(n);
while (n-- > 0 && head != null) {
T curr = head.element;
this.removeElem(curr);
retList.add(curr);
}
shrinkIfNecessary();
return retList;
}
/**
* Remove all elements from the set and return them in order. Traverse the
* link list, don't worry about hashtable - faster version of the parent
* method.
*/
public List<T> pollAll() {
List<T> retList = new ArrayList<T>(size);
while (head != null) {
retList.add(head.element);
head = head.after;
}
this.clear();
return retList;
}
@Override
@SuppressWarnings("unchecked")
public <U> U[] toArray(U[] a) {
if (a == null) {
throw new NullPointerException("Input array can not be null");
}
if (a.length < size) {
a = (U[]) java.lang.reflect.Array.newInstance(a.getClass()
.getComponentType(), size);
}
int currentIndex = 0;
DoubleLinkedElement<T> current = head;
while (current != null) {
T curr = current.element;
a[currentIndex++] = (U) curr;
current = current.after;
}
return a;
}
public Iterator<T> iterator() {
return new LinkedSetIterator();
}
private class LinkedSetIterator implements Iterator<T> {
/** The starting modification for fail-fast. */
private final int startModification = modification;
/** The next element to return. */
private DoubleLinkedElement<T> next = head;
@Override
public boolean hasNext() {
return next != null;
}
@Override
public T next() {
if (modification != startModification) {
throw new ConcurrentModificationException("modification="
+ modification + " != startModification = " + startModification);
}
if (next == null) {
throw new NoSuchElementException();
}
final T e = next.element;
// find the next element
next = next.after;
return e;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove is not supported.");
}
}
/**
* Clear the set. Resize it to the original capacity.
*/
public void clear() {
super.clear();
this.head = null;
this.tail = null;
}
}

View File

@ -323,9 +323,10 @@ public class TestListCorruptFileBlocks {
FSNamesystem.CorruptFileBlockInfo[] cfb = corruptFileBlocks FSNamesystem.CorruptFileBlockInfo[] cfb = corruptFileBlocks
.toArray(new FSNamesystem.CorruptFileBlockInfo[0]); .toArray(new FSNamesystem.CorruptFileBlockInfo[0]);
// now get the 2nd and 3rd file that is corrupt // now get the 2nd and 3rd file that is corrupt
String[] cookie = new String[]{"1"};
Collection<FSNamesystem.CorruptFileBlockInfo> nextCorruptFileBlocks = Collection<FSNamesystem.CorruptFileBlockInfo> nextCorruptFileBlocks =
namenode.getNamesystem() namenode.getNamesystem()
.listCorruptFileBlocks("/corruptData", cfb[0].block.getBlockName()); .listCorruptFileBlocks("/corruptData", cookie);
FSNamesystem.CorruptFileBlockInfo[] ncfb = nextCorruptFileBlocks FSNamesystem.CorruptFileBlockInfo[] ncfb = nextCorruptFileBlocks
.toArray(new FSNamesystem.CorruptFileBlockInfo[0]); .toArray(new FSNamesystem.CorruptFileBlockInfo[0]);
numCorrupt = nextCorruptFileBlocks.size(); numCorrupt = nextCorruptFileBlocks.size();
@ -334,8 +335,8 @@ public class TestListCorruptFileBlocks {
.equalsIgnoreCase(cfb[1].block.getBlockName())); .equalsIgnoreCase(cfb[1].block.getBlockName()));
corruptFileBlocks = corruptFileBlocks =
namenode.getNamesystem().listCorruptFileBlocks("/corruptData", namenode.getNamesystem()
ncfb[1].block.getBlockName()); .listCorruptFileBlocks("/corruptData", cookie);
numCorrupt = corruptFileBlocks.size(); numCorrupt = corruptFileBlocks.size();
assertTrue(numCorrupt == 0); assertTrue(numCorrupt == 0);
// Do a listing on a dir which doesn't have any corrupt blocks and // Do a listing on a dir which doesn't have any corrupt blocks and

View File

@ -0,0 +1,425 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.junit.Test;
import org.junit.Before;
import static org.junit.Assert.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class TestLightWeightHashSet{
private static final Log LOG = LogFactory
.getLog("org.apache.hadoop.hdfs.TestLightWeightHashSet");
private ArrayList<Integer> list = new ArrayList<Integer>();
private final int NUM = 100;
private LightWeightHashSet<Integer> set;
private Random rand;
@Before
public void setUp() {
float maxF = LightWeightHashSet.DEFAULT_MAX_LOAD_FACTOR;
float minF = LightWeightHashSet.DEFAUT_MIN_LOAD_FACTOR;
int initCapacity = LightWeightHashSet.MINIMUM_CAPACITY;
rand = new Random(System.currentTimeMillis());
list.clear();
for (int i = 0; i < NUM; i++) {
list.add(rand.nextInt());
}
set = new LightWeightHashSet<Integer>(initCapacity, maxF, minF);
}
@Test
public void testEmptyBasic() {
LOG.info("Test empty basic");
Iterator<Integer> iter = set.iterator();
// iterator should not have next
assertFalse(iter.hasNext());
assertEquals(0, set.size());
assertTrue(set.isEmpty());
LOG.info("Test empty - DONE");
}
@Test
public void testOneElementBasic() {
LOG.info("Test one element basic");
set.add(list.get(0));
// set should be non-empty
assertEquals(1, set.size());
assertFalse(set.isEmpty());
// iterator should have next
Iterator<Integer> iter = set.iterator();
assertTrue(iter.hasNext());
// iterator should not have next
assertEquals(list.get(0), iter.next());
assertFalse(iter.hasNext());
LOG.info("Test one element basic - DONE");
}
@Test
public void testMultiBasic() {
LOG.info("Test multi element basic");
// add once
for (Integer i : list) {
assertTrue(set.add(i));
}
assertEquals(list.size(), set.size());
// check if the elements are in the set
for (Integer i : list) {
assertTrue(set.contains(i));
}
// add again - should return false each time
for (Integer i : list) {
assertFalse(set.add(i));
}
// check again if the elements are there
for (Integer i : list) {
assertTrue(set.contains(i));
}
Iterator<Integer> iter = set.iterator();
int num = 0;
while (iter.hasNext()) {
Integer next = iter.next();
assertNotNull(next);
assertTrue(list.contains(next));
num++;
}
// check the number of element from the iterator
assertEquals(list.size(), num);
LOG.info("Test multi element basic - DONE");
}
@Test
public void testRemoveOne() {
LOG.info("Test remove one");
assertTrue(set.add(list.get(0)));
assertEquals(1, set.size());
// remove from the head/tail
assertTrue(set.remove(list.get(0)));
assertEquals(0, set.size());
// check the iterator
Iterator<Integer> iter = set.iterator();
assertFalse(iter.hasNext());
// add the element back to the set
assertTrue(set.add(list.get(0)));
assertEquals(1, set.size());
iter = set.iterator();
assertTrue(iter.hasNext());
LOG.info("Test remove one - DONE");
}
@Test
public void testRemoveMulti() {
LOG.info("Test remove multi");
for (Integer i : list) {
assertTrue(set.add(i));
}
for (int i = 0; i < NUM / 2; i++) {
assertTrue(set.remove(list.get(i)));
}
// the deleted elements should not be there
for (int i = 0; i < NUM / 2; i++) {
assertFalse(set.contains(list.get(i)));
}
// the rest should be there
for (int i = NUM / 2; i < NUM; i++) {
assertTrue(set.contains(list.get(i)));
}
LOG.info("Test remove multi - DONE");
}
@Test
public void testRemoveAll() {
LOG.info("Test remove all");
for (Integer i : list) {
assertTrue(set.add(i));
}
for (int i = 0; i < NUM; i++) {
assertTrue(set.remove(list.get(i)));
}
// the deleted elements should not be there
for (int i = 0; i < NUM; i++) {
assertFalse(set.contains(list.get(i)));
}
// iterator should not have next
Iterator<Integer> iter = set.iterator();
assertFalse(iter.hasNext());
assertTrue(set.isEmpty());
LOG.info("Test remove all - DONE");
}
@Test
public void testPollAll() {
LOG.info("Test poll all");
for (Integer i : list) {
assertTrue(set.add(i));
}
// remove all elements by polling
List<Integer> poll = set.pollAll();
assertEquals(0, set.size());
assertTrue(set.isEmpty());
// the deleted elements should not be there
for (int i = 0; i < NUM; i++) {
assertFalse(set.contains(list.get(i)));
}
// we should get all original items
for (Integer i : poll) {
assertTrue(list.contains(i));
}
Iterator<Integer> iter = set.iterator();
assertFalse(iter.hasNext());
LOG.info("Test poll all - DONE");
}
@Test
public void testPollNMulti() {
LOG.info("Test pollN multi");
// use addAll
set.addAll(list);
// poll zero
List<Integer> poll = set.pollN(0);
assertEquals(0, poll.size());
for (Integer i : list) {
assertTrue(set.contains(i));
}
// poll existing elements (less than size)
poll = set.pollN(10);
assertEquals(10, poll.size());
for (Integer i : poll) {
// should be in original items
assertTrue(list.contains(i));
// should not be in the set anymore
assertFalse(set.contains(i));
}
// poll more elements than present
poll = set.pollN(1000);
assertEquals(NUM - 10, poll.size());
for (Integer i : poll) {
// should be in original items
assertTrue(list.contains(i));
}
// set is empty
assertTrue(set.isEmpty());
assertEquals(0, set.size());
LOG.info("Test pollN multi - DONE");
}
@Test
public void testPollNMultiArray() {
LOG.info("Test pollN multi array");
// use addAll
set.addAll(list);
// poll existing elements (less than size)
Integer[] poll = new Integer[10];
poll = set.pollToArray(poll);
assertEquals(10, poll.length);
for (Integer i : poll) {
// should be in original items
assertTrue(list.contains(i));
// should not be in the set anymore
assertFalse(set.contains(i));
}
// poll other elements (more than size)
poll = new Integer[NUM];
poll = set.pollToArray(poll);
assertEquals(NUM - 10, poll.length);
for (int i = 0; i < NUM - 10; i++) {
assertTrue(list.contains(poll[i]));
}
// set is empty
assertTrue(set.isEmpty());
assertEquals(0, set.size());
// //////
set.addAll(list);
// poll existing elements (exactly the size)
poll = new Integer[NUM];
poll = set.pollToArray(poll);
assertTrue(set.isEmpty());
assertEquals(0, set.size());
assertEquals(NUM, poll.length);
for (int i = 0; i < NUM; i++) {
assertTrue(list.contains(poll[i]));
}
// //////
// //////
set.addAll(list);
// poll existing elements (exactly the size)
poll = new Integer[0];
poll = set.pollToArray(poll);
for (int i = 0; i < NUM; i++) {
assertTrue(set.contains(list.get(i)));
}
assertEquals(0, poll.length);
// //////
LOG.info("Test pollN multi array- DONE");
}
@Test
public void testClear() {
LOG.info("Test clear");
// use addAll
set.addAll(list);
assertEquals(NUM, set.size());
assertFalse(set.isEmpty());
// clear the set
set.clear();
assertEquals(0, set.size());
assertTrue(set.isEmpty());
// iterator should be empty
Iterator<Integer> iter = set.iterator();
assertFalse(iter.hasNext());
LOG.info("Test clear - DONE");
}
@Test
public void testCapacity() {
LOG.info("Test capacity");
float maxF = LightWeightHashSet.DEFAULT_MAX_LOAD_FACTOR;
float minF = LightWeightHashSet.DEFAUT_MIN_LOAD_FACTOR;
// capacity lower than min_capacity
set = new LightWeightHashSet<Integer>(1, maxF, minF);
assertEquals(LightWeightHashSet.MINIMUM_CAPACITY, set.getCapacity());
// capacity not a power of two
set = new LightWeightHashSet<Integer>(30, maxF, minF);
assertEquals(Math.max(LightWeightHashSet.MINIMUM_CAPACITY, 32),
set.getCapacity());
// capacity valid
set = new LightWeightHashSet<Integer>(64, maxF, minF);
assertEquals(Math.max(LightWeightHashSet.MINIMUM_CAPACITY, 64),
set.getCapacity());
// add NUM elements
set.addAll(list);
int expCap = LightWeightHashSet.MINIMUM_CAPACITY;
while (expCap < NUM && maxF * expCap < NUM)
expCap <<= 1;
assertEquals(expCap, set.getCapacity());
// see if the set shrinks if we remove elements by removing
set.clear();
set.addAll(list);
int toRemove = set.size() - (int) (set.getCapacity() * minF) + 1;
for (int i = 0; i < toRemove; i++) {
set.remove(list.get(i));
}
assertEquals(Math.max(LightWeightHashSet.MINIMUM_CAPACITY, expCap / 2),
set.getCapacity());
LOG.info("Test capacity - DONE");
}
@Test
public void testOther() {
LOG.info("Test other");
// remove all
assertTrue(set.addAll(list));
assertTrue(set.removeAll(list));
assertTrue(set.isEmpty());
// remove sublist
List<Integer> sub = new LinkedList<Integer>();
for (int i = 0; i < 10; i++) {
sub.add(list.get(i));
}
assertTrue(set.addAll(list));
assertTrue(set.removeAll(sub));
assertFalse(set.isEmpty());
assertEquals(NUM - 10, set.size());
for (Integer i : sub) {
assertFalse(set.contains(i));
}
assertFalse(set.containsAll(sub));
// the rest of the elements should be there
List<Integer> sub2 = new LinkedList<Integer>();
for (int i = 10; i < NUM; i++) {
sub2.add(list.get(i));
}
assertTrue(set.containsAll(sub2));
// to array
Integer[] array = set.toArray(new Integer[0]);
assertEquals(NUM - 10, array.length);
for (int i = 0; i < array.length; i++) {
assertTrue(sub2.contains(array[i]));
}
assertEquals(NUM - 10, set.size());
// to array
Object[] array2 = set.toArray();
assertEquals(NUM - 10, array2.length);
for (int i = 0; i < array2.length; i++) {
assertTrue(sub2.contains((Integer) array2[i]));
}
LOG.info("Test other - DONE");
}
}

View File

@ -0,0 +1,363 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.junit.Test;
import org.junit.Before;
import static org.junit.Assert.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
public class TestLightWeightLinkedSet {
private static final Log LOG = LogFactory
.getLog("org.apache.hadoop.hdfs.TestLightWeightLinkedSet");
private ArrayList<Integer> list = new ArrayList<Integer>();
private final int NUM = 100;
private LightWeightLinkedSet<Integer> set;
private Random rand;
@Before
public void setUp() {
float maxF = LightWeightLinkedSet.DEFAULT_MAX_LOAD_FACTOR;
float minF = LightWeightLinkedSet.DEFAUT_MIN_LOAD_FACTOR;
int initCapacity = LightWeightLinkedSet.MINIMUM_CAPACITY;
rand = new Random(System.currentTimeMillis());
list.clear();
for (int i = 0; i < NUM; i++) {
list.add(rand.nextInt());
}
set = new LightWeightLinkedSet<Integer>(initCapacity, maxF, minF);
}
@Test
public void testEmptyBasic() {
LOG.info("Test empty basic");
Iterator<Integer> iter = set.iterator();
// iterator should not have next
assertFalse(iter.hasNext());
assertEquals(0, set.size());
assertTrue(set.isEmpty());
// poll should return nothing
assertNull(set.pollFirst());
assertEquals(0, set.pollAll().size());
assertEquals(0, set.pollN(10).size());
LOG.info("Test empty - DONE");
}
@Test
public void testOneElementBasic() {
LOG.info("Test one element basic");
set.add(list.get(0));
// set should be non-empty
assertEquals(1, set.size());
assertFalse(set.isEmpty());
// iterator should have next
Iterator<Integer> iter = set.iterator();
assertTrue(iter.hasNext());
// iterator should not have next
assertEquals(list.get(0), iter.next());
assertFalse(iter.hasNext());
LOG.info("Test one element basic - DONE");
}
@Test
public void testMultiBasic() {
LOG.info("Test multi element basic");
// add once
for (Integer i : list) {
assertTrue(set.add(i));
}
assertEquals(list.size(), set.size());
// check if the elements are in the set
for (Integer i : list) {
assertTrue(set.contains(i));
}
// add again - should return false each time
for (Integer i : list) {
assertFalse(set.add(i));
}
// check again if the elements are there
for (Integer i : list) {
assertTrue(set.contains(i));
}
Iterator<Integer> iter = set.iterator();
int num = 0;
while (iter.hasNext()) {
assertEquals(list.get(num++), iter.next());
}
// check the number of element from the iterator
assertEquals(list.size(), num);
LOG.info("Test multi element basic - DONE");
}
@Test
public void testRemoveOne() {
LOG.info("Test remove one");
assertTrue(set.add(list.get(0)));
assertEquals(1, set.size());
// remove from the head/tail
assertTrue(set.remove(list.get(0)));
assertEquals(0, set.size());
// check the iterator
Iterator<Integer> iter = set.iterator();
assertFalse(iter.hasNext());
// poll should return nothing
assertNull(set.pollFirst());
assertEquals(0, set.pollAll().size());
assertEquals(0, set.pollN(10).size());
// add the element back to the set
assertTrue(set.add(list.get(0)));
assertEquals(1, set.size());
iter = set.iterator();
assertTrue(iter.hasNext());
LOG.info("Test remove one - DONE");
}
@Test
public void testRemoveMulti() {
LOG.info("Test remove multi");
for (Integer i : list) {
assertTrue(set.add(i));
}
for (int i = 0; i < NUM / 2; i++) {
assertTrue(set.remove(list.get(i)));
}
// the deleted elements should not be there
for (int i = 0; i < NUM / 2; i++) {
assertFalse(set.contains(list.get(i)));
}
// the rest should be there
for (int i = NUM / 2; i < NUM; i++) {
assertTrue(set.contains(list.get(i)));
}
Iterator<Integer> iter = set.iterator();
// the remaining elements should be in order
int num = NUM / 2;
while (iter.hasNext()) {
assertEquals(list.get(num++), iter.next());
}
assertEquals(num, NUM);
LOG.info("Test remove multi - DONE");
}
@Test
public void testRemoveAll() {
LOG.info("Test remove all");
for (Integer i : list) {
assertTrue(set.add(i));
}
for (int i = 0; i < NUM; i++) {
assertTrue(set.remove(list.get(i)));
}
// the deleted elements should not be there
for (int i = 0; i < NUM; i++) {
assertFalse(set.contains(list.get(i)));
}
// iterator should not have next
Iterator<Integer> iter = set.iterator();
assertFalse(iter.hasNext());
assertTrue(set.isEmpty());
LOG.info("Test remove all - DONE");
}
@Test
public void testPollOneElement() {
LOG.info("Test poll one element");
set.add(list.get(0));
assertEquals(list.get(0), set.pollFirst());
assertNull(set.pollFirst());
LOG.info("Test poll one element - DONE");
}
@Test
public void testPollMulti() {
LOG.info("Test poll multi");
for (Integer i : list) {
assertTrue(set.add(i));
}
// remove half of the elements by polling
for (int i = 0; i < NUM / 2; i++) {
assertEquals(list.get(i), set.pollFirst());
}
assertEquals(NUM / 2, set.size());
// the deleted elements should not be there
for (int i = 0; i < NUM / 2; i++) {
assertFalse(set.contains(list.get(i)));
}
// the rest should be there
for (int i = NUM / 2; i < NUM; i++) {
assertTrue(set.contains(list.get(i)));
}
Iterator<Integer> iter = set.iterator();
// the remaining elements should be in order
int num = NUM / 2;
while (iter.hasNext()) {
assertEquals(list.get(num++), iter.next());
}
assertEquals(num, NUM);
// add elements back
for (int i = 0; i < NUM / 2; i++) {
assertTrue(set.add(list.get(i)));
}
// order should be switched
assertEquals(NUM, set.size());
for (int i = NUM / 2; i < NUM; i++) {
assertEquals(list.get(i), set.pollFirst());
}
for (int i = 0; i < NUM / 2; i++) {
assertEquals(list.get(i), set.pollFirst());
}
assertEquals(0, set.size());
assertTrue(set.isEmpty());
LOG.info("Test poll multi - DONE");
}
@Test
public void testPollAll() {
LOG.info("Test poll all");
for (Integer i : list) {
assertTrue(set.add(i));
}
// remove all elements by polling
while (set.pollFirst() != null);
assertEquals(0, set.size());
assertTrue(set.isEmpty());
// the deleted elements should not be there
for (int i = 0; i < NUM; i++) {
assertFalse(set.contains(list.get(i)));
}
Iterator<Integer> iter = set.iterator();
assertFalse(iter.hasNext());
LOG.info("Test poll all - DONE");
}
@Test
public void testPollNOne() {
LOG.info("Test pollN one");
set.add(list.get(0));
List<Integer> l = set.pollN(10);
assertEquals(1, l.size());
assertEquals(list.get(0), l.get(0));
LOG.info("Test pollN one - DONE");
}
@Test
public void testPollNMulti() {
LOG.info("Test pollN multi");
// use addAll
set.addAll(list);
// poll existing elements
List<Integer> l = set.pollN(10);
assertEquals(10, l.size());
for (int i = 0; i < 10; i++) {
assertEquals(list.get(i), l.get(i));
}
// poll more elements than present
l = set.pollN(1000);
assertEquals(NUM - 10, l.size());
// check the order
for (int i = 10; i < NUM; i++) {
assertEquals(list.get(i), l.get(i - 10));
}
// set is empty
assertTrue(set.isEmpty());
assertEquals(0, set.size());
LOG.info("Test pollN multi - DONE");
}
@Test
public void testClear() {
LOG.info("Test clear");
// use addAll
set.addAll(list);
assertEquals(NUM, set.size());
assertFalse(set.isEmpty());
// clear the set
set.clear();
assertEquals(0, set.size());
assertTrue(set.isEmpty());
// poll should return an empty list
assertEquals(0, set.pollAll().size());
assertEquals(0, set.pollN(10).size());
assertNull(set.pollFirst());
// iterator should be empty
Iterator<Integer> iter = set.iterator();
assertFalse(iter.hasNext());
LOG.info("Test clear - DONE");
}
@Test
public void testOther() {
LOG.info("Test other");
assertTrue(set.addAll(list));
// to array
Integer[] array = set.toArray(new Integer[0]);
assertEquals(NUM, array.length);
for (int i = 0; i < array.length; i++) {
assertTrue(list.contains(array[i]));
}
assertEquals(NUM, set.size());
// to array
Object[] array2 = set.toArray();
assertEquals(NUM, array2.length);
for (int i = 0; i < array2.length; i++) {
assertTrue(list.contains((Integer) array2[i]));
}
LOG.info("Test capacity - DONE");
}
}