From d6f9460eba0db36553f7a9ee6b649bec237d80cb Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Wed, 7 Mar 2012 06:43:11 +0000 Subject: [PATCH] HDFS-2476. Merging change r1201991 from trunk to 0.23 git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1297863 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 4 + .../server/blockmanagement/BlockManager.java | 19 +- .../blockmanagement/DatanodeDescriptor.java | 47 +- .../blockmanagement/InvalidateBlocks.java | 31 +- .../UnderReplicatedBlocks.java | 11 +- .../hdfs/server/namenode/FSNamesystem.java | 41 +- .../server/namenode/NameNodeRpcServer.java | 9 +- .../hdfs/server/namenode/NamenodeFsck.java | 17 +- .../org/apache/hadoop/hdfs/tools/DFSck.java | 29 +- .../hadoop/hdfs/util/LightWeightHashSet.java | 618 ++++++++++++++++++ .../hdfs/util/LightWeightLinkedSet.java | 259 ++++++++ .../namenode/TestListCorruptFileBlocks.java | 9 +- .../hdfs/util/TestLightWeightHashSet.java | 425 ++++++++++++ .../hdfs/util/TestLightWeightLinkedSet.java | 363 ++++++++++ 14 files changed, 1772 insertions(+), 110 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightHashSet.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7a266565fe4..1971501791f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -109,6 +109,10 @@ Release 0.23.3 - UNRELEASED HDFS-2495. Increase granularity of write operations in ReplicationMonitor thus reducing contention for write lock. (Tomasz Nykiel via hairong) + HDFS-2476. More CPU efficient data structure for under-replicated, + over-replicated, and invalidated blocks. + (Tomasz Nykiel via todd) + BUG FIXES HDFS-2481. Unknown protocol: org.apache.hadoop.hdfs.protocol.ClientProtocol. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index f2b6cdabb3b..2a92b6f3906 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocat import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.net.Node; import org.apache.hadoop.util.Daemon; @@ -142,8 +143,8 @@ public class BlockManager { // eventually remove these extras. // Mapping: StorageID -> TreeSet // - public final Map> excessReplicateMap = - new TreeMap>(); + public final Map> excessReplicateMap = + new TreeMap>(); // // Store set of Blocks that need to be replicated 1 or more times. @@ -1255,7 +1256,7 @@ public class BlockManager { Collection nodesCorrupt = corruptReplicas.getNodes(block); while(it.hasNext()) { DatanodeDescriptor node = it.next(); - Collection excessBlocks = + LightWeightLinkedSet excessBlocks = excessReplicateMap.get(node.getStorageID()); if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) corrupt++; @@ -1987,7 +1988,7 @@ public class BlockManager { for (Iterator it = blocksMap.nodeIterator(block); it.hasNext();) { DatanodeDescriptor cur = it.next(); - Collection excessBlocks = excessReplicateMap.get(cur + LightWeightLinkedSet excessBlocks = excessReplicateMap.get(cur .getStorageID()); if (excessBlocks == null || !excessBlocks.contains(block)) { if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { @@ -2105,9 +2106,9 @@ public class BlockManager { private void addToExcessReplicate(DatanodeInfo dn, Block block) { assert namesystem.hasWriteLock(); - Collection excessBlocks = excessReplicateMap.get(dn.getStorageID()); + LightWeightLinkedSet excessBlocks = excessReplicateMap.get(dn.getStorageID()); if (excessBlocks == null) { - excessBlocks = new TreeSet(); + excessBlocks = new LightWeightLinkedSet(); excessReplicateMap.put(dn.getStorageID(), excessBlocks); } if (excessBlocks.add(block)) { @@ -2155,7 +2156,7 @@ public class BlockManager { // We've removed a block from a node, so it's definitely no longer // in "excess" there. // - Collection excessBlocks = excessReplicateMap.get(node + LightWeightLinkedSet excessBlocks = excessReplicateMap.get(node .getStorageID()); if (excessBlocks != null) { if (excessBlocks.remove(block)) { @@ -2305,8 +2306,8 @@ public class BlockManager { } else if (node.isDecommissionInProgress() || node.isDecommissioned()) { count++; } else { - Collection blocksExcess = - excessReplicateMap.get(node.getStorageID()); + LightWeightLinkedSet blocksExcess = excessReplicateMap.get(node + .getStorageID()); if (blocksExcess != null && blocksExcess.contains(b)) { excess++; } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 2ad49b7fa55..403fbabaa3f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.DeprecatedUTF8; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableUtils; @@ -120,11 +121,11 @@ public class DatanodeDescriptor extends DatanodeInfo { private BlockQueue recoverBlocks = new BlockQueue(); /** A set of blocks to be invalidated by this datanode */ - private Set invalidateBlocks = new TreeSet(); + private LightWeightHashSet invalidateBlocks = new LightWeightHashSet(); /* Variables for maintaining number of blocks scheduled to be written to * this datanode. This count is approximate and might be slightly bigger - * in case of errors (e.g. datanode does not report if an error occurs + * in case of errors (e.g. datanode does not report if an error occurs * while writing the block). */ private int currApproxBlocksScheduled = 0; @@ -400,45 +401,11 @@ public class DatanodeDescriptor extends DatanodeInfo { * Remove the specified number of blocks to be invalidated */ public Block[] getInvalidateBlocks(int maxblocks) { - return getBlockArray(invalidateBlocks, maxblocks); - } - - static private Block[] getBlockArray(Collection blocks, int max) { - Block[] blockarray = null; - synchronized(blocks) { - int available = blocks.size(); - int n = available; - if (max > 0 && n > 0) { - if (max < n) { - n = max; - } - // allocate the properly sized block array ... - blockarray = new Block[n]; - - // iterate tree collecting n blocks... - Iterator e = blocks.iterator(); - int blockCount = 0; - - while (blockCount < n && e.hasNext()) { - // insert into array ... - blockarray[blockCount++] = e.next(); - - // remove from tree via iterator, if we are removing - // less than total available blocks - if (n < available){ - e.remove(); - } - } - assert(blockarray.length == n); - - // now if the number of blocks removed equals available blocks, - // them remove all blocks in one fell swoop via clear - if (n == available) { - blocks.clear(); - } - } + synchronized (invalidateBlocks) { + Block[] deleteList = invalidateBlocks.pollToArray(new Block[Math.min( + invalidateBlocks.size(), maxblocks)]); + return deleteList.length == 0 ? null : deleteList; } - return blockarray; } /** Serialization for FSEditLog */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java index 7bad2b78fcb..6181588475d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java @@ -30,8 +30,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.util.LightWeightHashSet; -/** +/** * Keeps a Collection for every named machine containing blocks * that have recently been invalidated and are thought to live * on the machine in question. @@ -39,8 +40,8 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; @InterfaceAudience.Private class InvalidateBlocks { /** Mapping: StorageID -> Collection of Blocks */ - private final Map> node2blocks = - new TreeMap>(); + private final Map> node2blocks = + new TreeMap>(); /** The total number of blocks in the map. */ private long numBlocks = 0L; @@ -67,9 +68,9 @@ class InvalidateBlocks { */ synchronized void add(final Block block, final DatanodeInfo datanode, final boolean log) { - Collection set = node2blocks.get(datanode.getStorageID()); + LightWeightHashSet set = node2blocks.get(datanode.getStorageID()); if (set == null) { - set = new HashSet(); + set = new LightWeightHashSet(); node2blocks.put(datanode.getStorageID(), set); } if (set.add(block)) { @@ -83,7 +84,7 @@ class InvalidateBlocks { /** Remove a storage from the invalidatesSet */ synchronized void remove(final String storageID) { - final Collection blocks = node2blocks.remove(storageID); + final LightWeightHashSet blocks = node2blocks.remove(storageID); if (blocks != null) { numBlocks -= blocks.size(); } @@ -91,7 +92,7 @@ class InvalidateBlocks { /** Remove the block from the specified storage. */ synchronized void remove(final String storageID, final Block block) { - final Collection v = node2blocks.get(storageID); + final LightWeightHashSet v = node2blocks.get(storageID); if (v != null && v.remove(block)) { numBlocks--; if (v.isEmpty()) { @@ -109,8 +110,8 @@ class InvalidateBlocks { return; } - for(Map.Entry> entry : node2blocks.entrySet()) { - final Collection blocks = entry.getValue(); + for(Map.Entry> entry : node2blocks.entrySet()) { + final LightWeightHashSet blocks = entry.getValue(); if (blocks.size() > 0) { out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks); } @@ -143,21 +144,17 @@ class InvalidateBlocks { private synchronized List invalidateWork( final String storageId, final DatanodeDescriptor dn) { - final Collection set = node2blocks.get(storageId); + final LightWeightHashSet set = node2blocks.get(storageId); if (set == null) { return null; } // # blocks that can be sent in one message is limited final int limit = datanodeManager.blockInvalidateLimit; - final List toInvalidate = new ArrayList(limit); - final Iterator it = set.iterator(); - for(int count = 0; count < limit && it.hasNext(); count++) { - toInvalidate.add(it.next()); - it.remove(); - } + final List toInvalidate = set.pollN(limit); + // If we send everything in this message, remove this node entry - if (!it.hasNext()) { + if (set.isEmpty()) { remove(storageId); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java index dc8d9e8db37..d35db96ec09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java @@ -24,6 +24,7 @@ import java.util.NavigableSet; import java.util.TreeSet; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.hdfs.server.namenode.NameNode; /** @@ -80,13 +81,13 @@ class UnderReplicatedBlocks implements Iterable { /** The queue for corrupt blocks: {@value} */ static final int QUEUE_WITH_CORRUPT_BLOCKS = 4; /** the queues themselves */ - private final List> priorityQueues - = new ArrayList>(LEVEL); + private List> priorityQueues + = new ArrayList>(); /** Create an object. */ UnderReplicatedBlocks() { for (int i = 0; i < LEVEL; i++) { - priorityQueues.add(new TreeSet()); + priorityQueues.add(new LightWeightLinkedSet()); } } @@ -123,10 +124,10 @@ class UnderReplicatedBlocks implements Iterable { synchronized int getCorruptBlockSize() { return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size(); } - + /** Check if a block is in the neededReplication queue */ synchronized boolean contains(Block block) { - for (NavigableSet set : priorityQueues) { + for(LightWeightLinkedSet set : priorityQueues) { if (set.contains(block)) { return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 0e125631f55..5b2810b128a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -4013,7 +4013,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @throws IOException */ Collection listCorruptFileBlocks(String path, - String startBlockAfter) throws IOException { + String[] cookieTab) throws IOException { readLock(); try { @@ -4022,23 +4022,27 @@ public class FSNamesystem implements Namesystem, FSClusterStats, "replication queues have not been initialized."); } checkSuperuserPrivilege(); - long startBlockId = 0; // print a limited # of corrupt files per call int count = 0; ArrayList corruptFiles = new ArrayList(); - - if (startBlockAfter != null) { - startBlockId = Block.filename2id(startBlockAfter); - } final Iterator blkIterator = blockManager.getCorruptReplicaBlockIterator(); + + if (cookieTab == null) { + cookieTab = new String[] { null }; + } + int skip = getIntCookie(cookieTab[0]); + for (int i = 0; i < skip && blkIterator.hasNext(); i++) { + blkIterator.next(); + } + while (blkIterator.hasNext()) { Block blk = blkIterator.next(); INode inode = blockManager.getINode(blk); + skip++; if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) { String src = FSDirectory.getFullPathName(inode); - if (((startBlockAfter == null) || (blk.getBlockId() > startBlockId)) - && (src.startsWith(path))) { + if (src.startsWith(path)){ corruptFiles.add(new CorruptFileBlockInfo(src, blk)); count++; if (count >= DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED) @@ -4046,13 +4050,32 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } } } + cookieTab[0] = String.valueOf(skip); LOG.info("list corrupt file blocks returned: " + count); return corruptFiles; } finally { readUnlock(); } } - + + /** + * Convert string cookie to integer. + */ + private static int getIntCookie(String cookie){ + int c; + if(cookie == null){ + c = 0; + } else { + try{ + c = Integer.parseInt(cookie); + }catch (NumberFormatException e) { + c = 0; + } + } + c = Math.max(0, c); + return c; + } + /** * Create delegation token secret manager */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index f13a1ce8b57..8efeadab375 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -729,17 +729,16 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) throws IOException { + String[] cookieTab = new String[] { cookie }; Collection fbs = - namesystem.listCorruptFileBlocks(path, cookie); - + namesystem.listCorruptFileBlocks(path, cookieTab); + String[] files = new String[fbs.size()]; - String lastCookie = ""; int i = 0; for(FSNamesystem.CorruptFileBlockInfo fb: fbs) { files[i++] = fb.path; - lastCookie = fb.block.getBlockName(); } - return new CorruptFileBlocks(files, lastCookie); + return new CorruptFileBlocks(files, cookieTab[0]); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index b823a0082c6..0d74c8d54ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -113,11 +113,11 @@ public class NamenodeFsck { // We return back N files that are corrupt; the list of files returned is // ordered by block id; to allow continuation support, pass in the last block // # from previous call - private String startBlockAfter = null; - + private String[] currentCookie = new String[] { null }; + private final Configuration conf; private final PrintWriter out; - + /** * Filesystem checker. * @param conf configuration (namenode config) @@ -155,11 +155,11 @@ public class NamenodeFsck { this.showCorruptFileBlocks = true; } else if (key.equals("startblockafter")) { - this.startBlockAfter = pmap.get("startblockafter")[0]; + this.currentCookie[0] = pmap.get("startblockafter")[0]; } } } - + /** * Check files on DFS, starting from the indicated path. */ @@ -215,19 +215,20 @@ public class NamenodeFsck { out.close(); } } - + private void listCorruptFileBlocks() throws IOException { Collection corruptFiles = namenode. - getNamesystem().listCorruptFileBlocks(path, startBlockAfter); + getNamesystem().listCorruptFileBlocks(path, currentCookie); int numCorruptFiles = corruptFiles.size(); String filler; if (numCorruptFiles > 0) { filler = Integer.toString(numCorruptFiles); - } else if (startBlockAfter == null) { + } else if (currentCookie[0].equals("0")) { filler = "no"; } else { filler = "no more"; } + out.println("Cookie:\t" + currentCookie[0]); for (FSNamesystem.CorruptFileBlockInfo c : corruptFiles) { out.println(c.toString()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java index aa04c36501a..bc98995af30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java @@ -144,14 +144,15 @@ public class DFSck extends Configured implements Tool { throws IOException { int errCode = -1; int numCorrupt = 0; - String lastBlock = null; + int cookie = 0; final String noCorruptLine = "has no CORRUPT files"; final String noMoreCorruptLine = "has no more CORRUPT files"; + final String cookiePrefix = "Cookie:"; boolean allDone = false; while (!allDone) { final StringBuffer url = new StringBuffer(baseUrl); - if (lastBlock != null) { - url.append("&startblockafter=").append(lastBlock); + if (cookie > 0) { + url.append("&startblockafter=").append(String.valueOf(cookie)); } URL path = new URL(url.toString()); SecurityUtil.fetchServiceTicket(path); @@ -162,29 +163,31 @@ public class DFSck extends Configured implements Tool { try { String line = null; while ((line = input.readLine()) != null) { - if ((line.endsWith(noCorruptLine)) || + if (line.startsWith(cookiePrefix)){ + try{ + cookie = Integer.parseInt(line.split("\t")[1]); + } catch (Exception e){ + allDone = true; + break; + } + continue; + } + if ((line.endsWith(noCorruptLine)) || (line.endsWith(noMoreCorruptLine)) || (line.endsWith(NamenodeFsck.NONEXISTENT_STATUS))) { allDone = true; break; } if ((line.isEmpty()) - || (line.startsWith("FSCK started by")) + || (line.startsWith("FSCK started by")) || (line.startsWith("The filesystem under path"))) continue; numCorrupt++; if (numCorrupt == 1) { - out.println("The list of corrupt files under path '" + out.println("The list of corrupt files under path '" + dir + "' are:"); } out.println(line); - try { - // Get the block # that we need to send in next call - lastBlock = line.split("\t")[0]; - } catch (Exception e) { - allDone = true; - break; - } } } finally { input.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java new file mode 100644 index 00000000000..866734f17bd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java @@ -0,0 +1,618 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A low memory linked hash set implementation, which uses an array for storing + * the elements and linked lists for collision resolution. This class does not + * support null element. + * + * This class is not thread safe. + * + */ +public class LightWeightHashSet implements Collection { + /** + * Elements of {@link LightWeightLinkedSet}. + */ + static class LinkedElement { + protected final T element; + + // reference to the next entry within a bucket linked list + protected LinkedElement next; + + //hashCode of the element + protected final int hashCode; + + public LinkedElement(T elem, int hash) { + this.element = elem; + this.next = null; + this.hashCode = hash; + } + + public String toString() { + return element.toString(); + } + } + + protected static final float DEFAULT_MAX_LOAD_FACTOR = 0.75f; + protected static final float DEFAUT_MIN_LOAD_FACTOR = 0.2f; + protected static final int MINIMUM_CAPACITY = 16; + + static final int MAXIMUM_CAPACITY = 1 << 30; + private static final Log LOG = LogFactory.getLog(LightWeightHashSet.class); + + /** + * An internal array of entries, which are the rows of the hash table. The + * size must be a power of two. + */ + protected LinkedElement[] entries; + /** Size of the entry table. */ + private int capacity; + /** The size of the set (not the entry array). */ + protected int size = 0; + /** Hashmask used for determining the bucket index **/ + private int hash_mask; + /** Capacity at initialization time **/ + private final int initialCapacity; + + /** + * Modification version for fail-fast. + * + * @see ConcurrentModificationException + */ + protected volatile int modification = 0; + + private float maxLoadFactor; + private float minLoadFactor; + private int expandMultiplier = 2; + + private int expandThreshold; + private int shrinkThreshold; + + /** + * @param initCapacity + * Recommended size of the internal array. + * @param maxLoadFactor + * used to determine when to expand the internal array + * @param minLoadFactor + * used to determine when to shrink the internal array + */ + @SuppressWarnings("unchecked") + public LightWeightHashSet(int initCapacity, float maxLoadFactor, + float minLoadFactor) { + + if (maxLoadFactor <= 0 || maxLoadFactor > 1.0f) + throw new IllegalArgumentException("Illegal maxload factor: " + + maxLoadFactor); + + if (minLoadFactor <= 0 || minLoadFactor > maxLoadFactor) + throw new IllegalArgumentException("Illegal minload factor: " + + minLoadFactor); + + this.initialCapacity = computeCapacity(initCapacity); + this.capacity = this.initialCapacity; + this.hash_mask = capacity - 1; + + this.maxLoadFactor = maxLoadFactor; + this.expandThreshold = (int) (capacity * maxLoadFactor); + this.minLoadFactor = minLoadFactor; + this.shrinkThreshold = (int) (capacity * minLoadFactor); + + entries = new LinkedElement[capacity]; + LOG.debug("initial capacity=" + initialCapacity + ", max load factor= " + + maxLoadFactor + ", min load factor= " + minLoadFactor); + } + + public LightWeightHashSet() { + this(MINIMUM_CAPACITY, DEFAULT_MAX_LOAD_FACTOR, DEFAUT_MIN_LOAD_FACTOR); + } + + public LightWeightHashSet(int minCapacity) { + this(minCapacity, DEFAULT_MAX_LOAD_FACTOR, DEFAUT_MIN_LOAD_FACTOR); + } + + /** + * Check if the set is empty. + * + * @return true is set empty, false otherwise + */ + public boolean isEmpty() { + return size == 0; + } + + /** + * Return the current capacity (for testing). + */ + public int getCapacity() { + return capacity; + } + + /** + * Return the number of stored elements. + */ + public int size() { + return size; + } + + /** + * Get index in the internal table for a given hash. + */ + protected int getIndex(int hashCode) { + return hashCode & hash_mask; + } + + /** + * Check if the set contains given element + * + * @return true if element present, false otherwise. + */ + @SuppressWarnings("unchecked") + public boolean contains(final Object key) { + // validate key + if (key == null) { + throw new IllegalArgumentException("Null element is not supported."); + } + // find element + final int hashCode = ((T)key).hashCode(); + final int index = getIndex(hashCode); + return containsElem(index, (T) key, hashCode); + } + + /** + * Check if the set contains given element at given index. + * + * @return true if element present, false otherwise. + */ + protected boolean containsElem(int index, final T key, int hashCode) { + for (LinkedElement e = entries[index]; e != null; e = e.next) { + // element found + if (hashCode == e.hashCode && e.element.equals(key)) { + return true; + } + } + // element not found + return false; + } + + /** + * All all elements in the collection. Expand if necessary. + * + * @param toAdd - elements to add. + * @return true if the set has changed, false otherwise + */ + public boolean addAll(Collection toAdd) { + boolean changed = false; + for (T elem : toAdd) { + changed |= addElem(elem); + } + expandIfNecessary(); + return changed; + } + + /** + * Add given element to the hash table. Expand table if necessary. + * + * @return true if the element was not present in the table, false otherwise + */ + public boolean add(final T element) { + boolean added = addElem(element); + expandIfNecessary(); + return added; + } + + /** + * Add given element to the hash table + * + * @return true if the element was not present in the table, false otherwise + */ + protected boolean addElem(final T element) { + // validate element + if (element == null) { + throw new IllegalArgumentException("Null element is not supported."); + } + // find hashCode & index + final int hashCode = element.hashCode(); + final int index = getIndex(hashCode); + // return false if already present + if (containsElem(index, element, hashCode)) { + return false; + } + + modification++; + size++; + + // update bucket linked list + LinkedElement le = new LinkedElement(element, hashCode); + le.next = entries[index]; + entries[index] = le; + return true; + } + + /** + * Remove the element corresponding to the key. + * + * @return If such element exists, return true. Otherwise, return false. + */ + @SuppressWarnings("unchecked") + public boolean remove(final Object key) { + // validate key + if (key == null) { + throw new IllegalArgumentException("Null element is not supported."); + } + LinkedElement removed = removeElem((T) key); + shrinkIfNecessary(); + return removed == null ? false : true; + } + + /** + * Remove the element corresponding to the key, given key.hashCode() == index. + * + * @return If such element exists, return true. Otherwise, return false. + */ + protected LinkedElement removeElem(final T key) { + LinkedElement found = null; + final int hashCode = key.hashCode(); + final int index = getIndex(hashCode); + if (entries[index] == null) { + return null; + } else if (hashCode == entries[index].hashCode && + entries[index].element.equals(key)) { + // remove the head of the bucket linked list + modification++; + size--; + found = entries[index]; + entries[index] = found.next; + } else { + // head != null and key is not equal to head + // search the element + LinkedElement prev = entries[index]; + for (found = prev.next; found != null;) { + if (hashCode == found.hashCode && + found.element.equals(key)) { + // found the element, remove it + modification++; + size--; + prev.next = found.next; + found.next = null; + break; + } else { + prev = found; + found = found.next; + } + } + } + return found; + } + + /** + * Remove and return n elements from the hashtable. + * The order in which entries are removed is unspecified, and + * and may not correspond to the order in which they were inserted. + * + * @return first element + */ + public List pollN(int n) { + if (n >= size) { + return pollAll(); + } + List retList = new ArrayList(n); + if (n == 0) { + return retList; + } + boolean done = false; + int currentBucketIndex = 0; + + while (!done) { + LinkedElement current = entries[currentBucketIndex]; + while (current != null) { + retList.add(current.element); + current = current.next; + entries[currentBucketIndex] = current; + size--; + modification++; + if (--n == 0) { + done = true; + break; + } + } + currentBucketIndex++; + } + shrinkIfNecessary(); + return retList; + } + + /** + * Remove all elements from the set and return them. Clear the entries. + */ + public List pollAll() { + List retList = new ArrayList(size); + for (int i = 0; i < entries.length; i++) { + LinkedElement current = entries[i]; + while (current != null) { + retList.add(current.element); + current = current.next; + } + } + this.clear(); + return retList; + } + + /** + * Get array.length elements from the set, and put them into the array. + */ + @SuppressWarnings("unchecked") + public T[] pollToArray(T[] array) { + int currentIndex = 0; + LinkedElement current = null; + + if (array.length == 0) { + return array; + } + if (array.length > size) { + array = (T[]) java.lang.reflect.Array.newInstance(array.getClass() + .getComponentType(), size); + } + // do fast polling if the entire set needs to be fetched + if (array.length == size) { + for (int i = 0; i < entries.length; i++) { + current = entries[i]; + while (current != null) { + array[currentIndex++] = current.element; + current = current.next; + } + } + this.clear(); + return array; + } + + boolean done = false; + int currentBucketIndex = 0; + + while (!done) { + current = entries[currentBucketIndex]; + while (current != null) { + array[currentIndex++] = current.element; + current = current.next; + entries[currentBucketIndex] = current; + size--; + modification++; + if (currentIndex == array.length) { + done = true; + break; + } + } + currentBucketIndex++; + } + shrinkIfNecessary(); + return array; + } + + /** + * Compute capacity given initial capacity. + * + * @return final capacity, either MIN_CAPACITY, MAX_CAPACITY, or power of 2 + * closest to the requested capacity. + */ + private int computeCapacity(int initial) { + if (initial < MINIMUM_CAPACITY) { + return MINIMUM_CAPACITY; + } + if (initial > MAXIMUM_CAPACITY) { + return MAXIMUM_CAPACITY; + } + int capacity = 1; + while (capacity < initial) { + capacity <<= 1; + } + return capacity; + } + + /** + * Resize the internal table to given capacity. + */ + @SuppressWarnings("unchecked") + private void resize(int cap) { + int newCapacity = computeCapacity(cap); + if (newCapacity == this.capacity) { + return; + } + this.capacity = newCapacity; + this.expandThreshold = (int) (capacity * maxLoadFactor); + this.shrinkThreshold = (int) (capacity * minLoadFactor); + this.hash_mask = capacity - 1; + LinkedElement[] temp = entries; + entries = new LinkedElement[capacity]; + for (int i = 0; i < temp.length; i++) { + LinkedElement curr = temp[i]; + while (curr != null) { + LinkedElement next = curr.next; + int index = getIndex(curr.hashCode); + curr.next = entries[index]; + entries[index] = curr; + curr = next; + } + } + } + + /** + * Checks if we need to shrink, and shrinks if necessary. + */ + protected void shrinkIfNecessary() { + if (size < this.shrinkThreshold && capacity > initialCapacity) { + resize(capacity / expandMultiplier); + } + } + + /** + * Checks if we need to expand, and expands if necessary. + */ + protected void expandIfNecessary() { + if (size > this.expandThreshold && capacity < MAXIMUM_CAPACITY) { + resize(capacity * expandMultiplier); + } + } + + public Iterator iterator() { + return new LinkedSetIterator(); + } + + @Override + public String toString() { + final StringBuilder b = new StringBuilder(getClass().getSimpleName()); + b.append("(size=").append(size).append(", modification=") + .append(modification).append(", entries.length=") + .append(entries.length).append(")"); + return b.toString(); + } + + /** Print detailed information of this object. */ + public void printDetails(final PrintStream out) { + out.print(this + ", entries = ["); + for (int i = 0; i < entries.length; i++) { + if (entries[i] != null) { + LinkedElement e = entries[i]; + out.print("\n " + i + ": " + e); + for (e = e.next; e != null; e = e.next) { + out.print(" -> " + e); + } + } + } + out.println("\n]"); + } + + private class LinkedSetIterator implements Iterator { + /** The starting modification for fail-fast. */ + private final int startModification = modification; + /** The current index of the entry array. */ + private int index = -1; + /** The next element to return. */ + private LinkedElement next = nextNonemptyEntry(); + + private LinkedElement nextNonemptyEntry() { + for (index++; index < entries.length && entries[index] == null; index++); + return index < entries.length ? entries[index] : null; + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public T next() { + if (modification != startModification) { + throw new ConcurrentModificationException("modification=" + + modification + " != startModification = " + startModification); + } + if (next == null) { + throw new NoSuchElementException(); + } + final T e = next.element; + // find the next element + final LinkedElement n = next.next; + next = n != null ? n : nextNonemptyEntry(); + return e; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove is not supported."); + } + } + + /** + * Clear the set. Resize it to the original capacity. + */ + @SuppressWarnings("unchecked") + public void clear() { + this.capacity = this.initialCapacity; + this.hash_mask = capacity - 1; + + this.expandThreshold = (int) (capacity * maxLoadFactor); + this.shrinkThreshold = (int) (capacity * minLoadFactor); + + entries = new LinkedElement[capacity]; + size = 0; + modification++; + } + + @Override + public Object[] toArray() { + Object[] result = new Object[size]; + return toArray(result); + } + + @Override + @SuppressWarnings("unchecked") + public U[] toArray(U[] a) { + if (a == null) { + throw new NullPointerException("Input array can not be null"); + } + if (a.length < size) { + a = (U[]) java.lang.reflect.Array.newInstance(a.getClass() + .getComponentType(), size); + } + int currentIndex = 0; + for (int i = 0; i < entries.length; i++) { + LinkedElement current = entries[i]; + while (current != null) { + a[currentIndex++] = (U) current.element; + current = current.next; + } + } + return a; + } + + @Override + public boolean containsAll(Collection c) { + Iterator iter = c.iterator(); + while (iter.hasNext()) { + if (!contains(iter.next())) { + return false; + } + } + return true; + } + + @Override + public boolean removeAll(Collection c) { + boolean changed = false; + Iterator iter = c.iterator(); + while (iter.hasNext()) { + changed |= remove(iter.next()); + } + return changed; + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException("retainAll is not supported."); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java new file mode 100644 index 00000000000..c90d2c7aa81 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * A low memory linked hash set implementation, which uses an array for storing + * the elements and linked lists for collision resolution. In addition it stores + * elements in a linked list to ensure ordered traversal. This class does not + * support null element. + * + * This class is not thread safe. + * + */ +public class LightWeightLinkedSet extends LightWeightHashSet { + /** + * Elements of {@link LightWeightLinkedSet}. + */ + static class DoubleLinkedElement extends LinkedElement { + // references to elements within all-element linked list + private DoubleLinkedElement before; + private DoubleLinkedElement after; + + public DoubleLinkedElement(T elem, int hashCode) { + super(elem, hashCode); + this.before = null; + this.after = null; + } + + public String toString() { + return super.toString(); + } + } + + private DoubleLinkedElement head; + private DoubleLinkedElement tail; + + /** + * @param initCapacity + * Recommended size of the internal array. + * @param maxLoadFactor + * used to determine when to expand the internal array + * @param minLoadFactor + * used to determine when to shrink the internal array + */ + public LightWeightLinkedSet(int initCapacity, float maxLoadFactor, + float minLoadFactor) { + super(initCapacity, maxLoadFactor, minLoadFactor); + head = null; + tail = null; + } + + public LightWeightLinkedSet() { + this(MINIMUM_CAPACITY, DEFAULT_MAX_LOAD_FACTOR, DEFAUT_MIN_LOAD_FACTOR); + } + + /** + * Add given element to the hash table + * + * @return true if the element was not present in the table, false otherwise + */ + protected boolean addElem(final T element) { + // validate element + if (element == null) { + throw new IllegalArgumentException("Null element is not supported."); + } + // find hashCode & index + final int hashCode = element.hashCode(); + final int index = getIndex(hashCode); + // return false if already present + if (containsElem(index, element, hashCode)) { + return false; + } + + modification++; + size++; + + // update bucket linked list + DoubleLinkedElement le = new DoubleLinkedElement(element, hashCode); + le.next = entries[index]; + entries[index] = le; + + // insert to the end of the all-element linked list + le.after = null; + le.before = tail; + if (tail != null) { + tail.after = le; + } + tail = le; + if (head == null) { + head = le; + } + return true; + } + + /** + * Remove the element corresponding to the key, given key.hashCode() == index. + * + * @return Return the entry with the element if exists. Otherwise return null. + */ + protected DoubleLinkedElement removeElem(final T key) { + DoubleLinkedElement found = (DoubleLinkedElement) (super + .removeElem(key)); + if (found == null) { + return null; + } + + // update linked list + if (found.after != null) { + found.after.before = found.before; + } + if (found.before != null) { + found.before.after = found.after; + } + if (head == found) { + head = head.after; + } + if (tail == found) { + tail = tail.before; + } + return found; + } + + /** + * Remove and return first element on the linked list of all elements. + * + * @return first element + */ + public T pollFirst() { + if (head == null) { + return null; + } + T first = head.element; + this.remove(first); + return first; + } + + /** + * Remove and return n elements from the hashtable. + * The order in which entries are removed is corresponds + * to the order in which they were inserted. + * + * @return first element + */ + public List pollN(int n) { + if (n >= size) { + // if we need to remove all elements then do fast polling + return pollAll(); + } + List retList = new ArrayList(n); + while (n-- > 0 && head != null) { + T curr = head.element; + this.removeElem(curr); + retList.add(curr); + } + shrinkIfNecessary(); + return retList; + } + + /** + * Remove all elements from the set and return them in order. Traverse the + * link list, don't worry about hashtable - faster version of the parent + * method. + */ + public List pollAll() { + List retList = new ArrayList(size); + while (head != null) { + retList.add(head.element); + head = head.after; + } + this.clear(); + return retList; + } + + @Override + @SuppressWarnings("unchecked") + public U[] toArray(U[] a) { + if (a == null) { + throw new NullPointerException("Input array can not be null"); + } + if (a.length < size) { + a = (U[]) java.lang.reflect.Array.newInstance(a.getClass() + .getComponentType(), size); + } + int currentIndex = 0; + DoubleLinkedElement current = head; + while (current != null) { + T curr = current.element; + a[currentIndex++] = (U) curr; + current = current.after; + } + return a; + } + + public Iterator iterator() { + return new LinkedSetIterator(); + } + + private class LinkedSetIterator implements Iterator { + /** The starting modification for fail-fast. */ + private final int startModification = modification; + /** The next element to return. */ + private DoubleLinkedElement next = head; + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public T next() { + if (modification != startModification) { + throw new ConcurrentModificationException("modification=" + + modification + " != startModification = " + startModification); + } + if (next == null) { + throw new NoSuchElementException(); + } + final T e = next.element; + // find the next element + next = next.after; + return e; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove is not supported."); + } + } + + /** + * Clear the set. Resize it to the original capacity. + */ + public void clear() { + super.clear(); + this.head = null; + this.tail = null; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java index 0bb43a14752..79b0bf41526 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java @@ -323,9 +323,10 @@ public class TestListCorruptFileBlocks { FSNamesystem.CorruptFileBlockInfo[] cfb = corruptFileBlocks .toArray(new FSNamesystem.CorruptFileBlockInfo[0]); // now get the 2nd and 3rd file that is corrupt + String[] cookie = new String[]{"1"}; Collection nextCorruptFileBlocks = namenode.getNamesystem() - .listCorruptFileBlocks("/corruptData", cfb[0].block.getBlockName()); + .listCorruptFileBlocks("/corruptData", cookie); FSNamesystem.CorruptFileBlockInfo[] ncfb = nextCorruptFileBlocks .toArray(new FSNamesystem.CorruptFileBlockInfo[0]); numCorrupt = nextCorruptFileBlocks.size(); @@ -333,9 +334,9 @@ public class TestListCorruptFileBlocks { assertTrue(ncfb[0].block.getBlockName() .equalsIgnoreCase(cfb[1].block.getBlockName())); - corruptFileBlocks = - namenode.getNamesystem().listCorruptFileBlocks("/corruptData", - ncfb[1].block.getBlockName()); + corruptFileBlocks = + namenode.getNamesystem() + .listCorruptFileBlocks("/corruptData", cookie); numCorrupt = corruptFileBlocks.size(); assertTrue(numCorrupt == 0); // Do a listing on a dir which doesn't have any corrupt blocks and diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightHashSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightHashSet.java new file mode 100644 index 00000000000..e890cae8540 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightHashSet.java @@ -0,0 +1,425 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + +import org.junit.Test; +import org.junit.Before; +import static org.junit.Assert.*; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class TestLightWeightHashSet{ + + private static final Log LOG = LogFactory + .getLog("org.apache.hadoop.hdfs.TestLightWeightHashSet"); + private ArrayList list = new ArrayList(); + private final int NUM = 100; + private LightWeightHashSet set; + private Random rand; + + @Before + public void setUp() { + float maxF = LightWeightHashSet.DEFAULT_MAX_LOAD_FACTOR; + float minF = LightWeightHashSet.DEFAUT_MIN_LOAD_FACTOR; + int initCapacity = LightWeightHashSet.MINIMUM_CAPACITY; + rand = new Random(System.currentTimeMillis()); + list.clear(); + for (int i = 0; i < NUM; i++) { + list.add(rand.nextInt()); + } + set = new LightWeightHashSet(initCapacity, maxF, minF); + } + + @Test + public void testEmptyBasic() { + LOG.info("Test empty basic"); + Iterator iter = set.iterator(); + // iterator should not have next + assertFalse(iter.hasNext()); + assertEquals(0, set.size()); + assertTrue(set.isEmpty()); + LOG.info("Test empty - DONE"); + } + + @Test + public void testOneElementBasic() { + LOG.info("Test one element basic"); + set.add(list.get(0)); + // set should be non-empty + assertEquals(1, set.size()); + assertFalse(set.isEmpty()); + + // iterator should have next + Iterator iter = set.iterator(); + assertTrue(iter.hasNext()); + + // iterator should not have next + assertEquals(list.get(0), iter.next()); + assertFalse(iter.hasNext()); + LOG.info("Test one element basic - DONE"); + } + + @Test + public void testMultiBasic() { + LOG.info("Test multi element basic"); + // add once + for (Integer i : list) { + assertTrue(set.add(i)); + } + assertEquals(list.size(), set.size()); + + // check if the elements are in the set + for (Integer i : list) { + assertTrue(set.contains(i)); + } + + // add again - should return false each time + for (Integer i : list) { + assertFalse(set.add(i)); + } + + // check again if the elements are there + for (Integer i : list) { + assertTrue(set.contains(i)); + } + + Iterator iter = set.iterator(); + int num = 0; + while (iter.hasNext()) { + Integer next = iter.next(); + assertNotNull(next); + assertTrue(list.contains(next)); + num++; + } + // check the number of element from the iterator + assertEquals(list.size(), num); + LOG.info("Test multi element basic - DONE"); + } + + @Test + public void testRemoveOne() { + LOG.info("Test remove one"); + assertTrue(set.add(list.get(0))); + assertEquals(1, set.size()); + + // remove from the head/tail + assertTrue(set.remove(list.get(0))); + assertEquals(0, set.size()); + + // check the iterator + Iterator iter = set.iterator(); + assertFalse(iter.hasNext()); + + // add the element back to the set + assertTrue(set.add(list.get(0))); + assertEquals(1, set.size()); + + iter = set.iterator(); + assertTrue(iter.hasNext()); + LOG.info("Test remove one - DONE"); + } + + @Test + public void testRemoveMulti() { + LOG.info("Test remove multi"); + for (Integer i : list) { + assertTrue(set.add(i)); + } + for (int i = 0; i < NUM / 2; i++) { + assertTrue(set.remove(list.get(i))); + } + + // the deleted elements should not be there + for (int i = 0; i < NUM / 2; i++) { + assertFalse(set.contains(list.get(i))); + } + + // the rest should be there + for (int i = NUM / 2; i < NUM; i++) { + assertTrue(set.contains(list.get(i))); + } + LOG.info("Test remove multi - DONE"); + } + + @Test + public void testRemoveAll() { + LOG.info("Test remove all"); + for (Integer i : list) { + assertTrue(set.add(i)); + } + for (int i = 0; i < NUM; i++) { + assertTrue(set.remove(list.get(i))); + } + // the deleted elements should not be there + for (int i = 0; i < NUM; i++) { + assertFalse(set.contains(list.get(i))); + } + + // iterator should not have next + Iterator iter = set.iterator(); + assertFalse(iter.hasNext()); + assertTrue(set.isEmpty()); + LOG.info("Test remove all - DONE"); + } + + @Test + public void testPollAll() { + LOG.info("Test poll all"); + for (Integer i : list) { + assertTrue(set.add(i)); + } + // remove all elements by polling + List poll = set.pollAll(); + assertEquals(0, set.size()); + assertTrue(set.isEmpty()); + + // the deleted elements should not be there + for (int i = 0; i < NUM; i++) { + assertFalse(set.contains(list.get(i))); + } + + // we should get all original items + for (Integer i : poll) { + assertTrue(list.contains(i)); + } + + Iterator iter = set.iterator(); + assertFalse(iter.hasNext()); + LOG.info("Test poll all - DONE"); + } + + @Test + public void testPollNMulti() { + LOG.info("Test pollN multi"); + + // use addAll + set.addAll(list); + + // poll zero + List poll = set.pollN(0); + assertEquals(0, poll.size()); + for (Integer i : list) { + assertTrue(set.contains(i)); + } + + // poll existing elements (less than size) + poll = set.pollN(10); + assertEquals(10, poll.size()); + + for (Integer i : poll) { + // should be in original items + assertTrue(list.contains(i)); + // should not be in the set anymore + assertFalse(set.contains(i)); + } + + // poll more elements than present + poll = set.pollN(1000); + assertEquals(NUM - 10, poll.size()); + + for (Integer i : poll) { + // should be in original items + assertTrue(list.contains(i)); + } + + // set is empty + assertTrue(set.isEmpty()); + assertEquals(0, set.size()); + + LOG.info("Test pollN multi - DONE"); + } + + @Test + public void testPollNMultiArray() { + LOG.info("Test pollN multi array"); + + // use addAll + set.addAll(list); + + // poll existing elements (less than size) + Integer[] poll = new Integer[10]; + poll = set.pollToArray(poll); + assertEquals(10, poll.length); + + for (Integer i : poll) { + // should be in original items + assertTrue(list.contains(i)); + // should not be in the set anymore + assertFalse(set.contains(i)); + } + + // poll other elements (more than size) + poll = new Integer[NUM]; + poll = set.pollToArray(poll); + assertEquals(NUM - 10, poll.length); + + for (int i = 0; i < NUM - 10; i++) { + assertTrue(list.contains(poll[i])); + } + + // set is empty + assertTrue(set.isEmpty()); + assertEquals(0, set.size()); + + // ////// + set.addAll(list); + // poll existing elements (exactly the size) + poll = new Integer[NUM]; + poll = set.pollToArray(poll); + assertTrue(set.isEmpty()); + assertEquals(0, set.size()); + assertEquals(NUM, poll.length); + for (int i = 0; i < NUM; i++) { + assertTrue(list.contains(poll[i])); + } + // ////// + + // ////// + set.addAll(list); + // poll existing elements (exactly the size) + poll = new Integer[0]; + poll = set.pollToArray(poll); + for (int i = 0; i < NUM; i++) { + assertTrue(set.contains(list.get(i))); + } + assertEquals(0, poll.length); + // ////// + + LOG.info("Test pollN multi array- DONE"); + } + + @Test + public void testClear() { + LOG.info("Test clear"); + // use addAll + set.addAll(list); + assertEquals(NUM, set.size()); + assertFalse(set.isEmpty()); + + // clear the set + set.clear(); + assertEquals(0, set.size()); + assertTrue(set.isEmpty()); + + // iterator should be empty + Iterator iter = set.iterator(); + assertFalse(iter.hasNext()); + + LOG.info("Test clear - DONE"); + } + + @Test + public void testCapacity() { + LOG.info("Test capacity"); + float maxF = LightWeightHashSet.DEFAULT_MAX_LOAD_FACTOR; + float minF = LightWeightHashSet.DEFAUT_MIN_LOAD_FACTOR; + + // capacity lower than min_capacity + set = new LightWeightHashSet(1, maxF, minF); + assertEquals(LightWeightHashSet.MINIMUM_CAPACITY, set.getCapacity()); + + // capacity not a power of two + set = new LightWeightHashSet(30, maxF, minF); + assertEquals(Math.max(LightWeightHashSet.MINIMUM_CAPACITY, 32), + set.getCapacity()); + + // capacity valid + set = new LightWeightHashSet(64, maxF, minF); + assertEquals(Math.max(LightWeightHashSet.MINIMUM_CAPACITY, 64), + set.getCapacity()); + + // add NUM elements + set.addAll(list); + int expCap = LightWeightHashSet.MINIMUM_CAPACITY; + while (expCap < NUM && maxF * expCap < NUM) + expCap <<= 1; + assertEquals(expCap, set.getCapacity()); + + // see if the set shrinks if we remove elements by removing + set.clear(); + set.addAll(list); + int toRemove = set.size() - (int) (set.getCapacity() * minF) + 1; + for (int i = 0; i < toRemove; i++) { + set.remove(list.get(i)); + } + assertEquals(Math.max(LightWeightHashSet.MINIMUM_CAPACITY, expCap / 2), + set.getCapacity()); + + LOG.info("Test capacity - DONE"); + } + + @Test + public void testOther() { + LOG.info("Test other"); + + // remove all + assertTrue(set.addAll(list)); + assertTrue(set.removeAll(list)); + assertTrue(set.isEmpty()); + + // remove sublist + List sub = new LinkedList(); + for (int i = 0; i < 10; i++) { + sub.add(list.get(i)); + } + assertTrue(set.addAll(list)); + assertTrue(set.removeAll(sub)); + assertFalse(set.isEmpty()); + assertEquals(NUM - 10, set.size()); + + for (Integer i : sub) { + assertFalse(set.contains(i)); + } + + assertFalse(set.containsAll(sub)); + + // the rest of the elements should be there + List sub2 = new LinkedList(); + for (int i = 10; i < NUM; i++) { + sub2.add(list.get(i)); + } + assertTrue(set.containsAll(sub2)); + + // to array + Integer[] array = set.toArray(new Integer[0]); + assertEquals(NUM - 10, array.length); + for (int i = 0; i < array.length; i++) { + assertTrue(sub2.contains(array[i])); + } + assertEquals(NUM - 10, set.size()); + + // to array + Object[] array2 = set.toArray(); + assertEquals(NUM - 10, array2.length); + + for (int i = 0; i < array2.length; i++) { + assertTrue(sub2.contains((Integer) array2[i])); + } + + LOG.info("Test other - DONE"); + } + +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java new file mode 100644 index 00000000000..6a9e21fb567 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Random; + +import org.junit.Test; +import org.junit.Before; +import static org.junit.Assert.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; + +public class TestLightWeightLinkedSet { + + private static final Log LOG = LogFactory + .getLog("org.apache.hadoop.hdfs.TestLightWeightLinkedSet"); + private ArrayList list = new ArrayList(); + private final int NUM = 100; + private LightWeightLinkedSet set; + private Random rand; + + @Before + public void setUp() { + float maxF = LightWeightLinkedSet.DEFAULT_MAX_LOAD_FACTOR; + float minF = LightWeightLinkedSet.DEFAUT_MIN_LOAD_FACTOR; + int initCapacity = LightWeightLinkedSet.MINIMUM_CAPACITY; + rand = new Random(System.currentTimeMillis()); + list.clear(); + for (int i = 0; i < NUM; i++) { + list.add(rand.nextInt()); + } + set = new LightWeightLinkedSet(initCapacity, maxF, minF); + } + + @Test + public void testEmptyBasic() { + LOG.info("Test empty basic"); + Iterator iter = set.iterator(); + // iterator should not have next + assertFalse(iter.hasNext()); + assertEquals(0, set.size()); + assertTrue(set.isEmpty()); + + // poll should return nothing + assertNull(set.pollFirst()); + assertEquals(0, set.pollAll().size()); + assertEquals(0, set.pollN(10).size()); + + LOG.info("Test empty - DONE"); + } + + @Test + public void testOneElementBasic() { + LOG.info("Test one element basic"); + set.add(list.get(0)); + // set should be non-empty + assertEquals(1, set.size()); + assertFalse(set.isEmpty()); + + // iterator should have next + Iterator iter = set.iterator(); + assertTrue(iter.hasNext()); + + // iterator should not have next + assertEquals(list.get(0), iter.next()); + assertFalse(iter.hasNext()); + LOG.info("Test one element basic - DONE"); + } + + @Test + public void testMultiBasic() { + LOG.info("Test multi element basic"); + // add once + for (Integer i : list) { + assertTrue(set.add(i)); + } + assertEquals(list.size(), set.size()); + + // check if the elements are in the set + for (Integer i : list) { + assertTrue(set.contains(i)); + } + + // add again - should return false each time + for (Integer i : list) { + assertFalse(set.add(i)); + } + + // check again if the elements are there + for (Integer i : list) { + assertTrue(set.contains(i)); + } + + Iterator iter = set.iterator(); + int num = 0; + while (iter.hasNext()) { + assertEquals(list.get(num++), iter.next()); + } + // check the number of element from the iterator + assertEquals(list.size(), num); + LOG.info("Test multi element basic - DONE"); + } + + @Test + public void testRemoveOne() { + LOG.info("Test remove one"); + assertTrue(set.add(list.get(0))); + assertEquals(1, set.size()); + + // remove from the head/tail + assertTrue(set.remove(list.get(0))); + assertEquals(0, set.size()); + + // check the iterator + Iterator iter = set.iterator(); + assertFalse(iter.hasNext()); + + // poll should return nothing + assertNull(set.pollFirst()); + assertEquals(0, set.pollAll().size()); + assertEquals(0, set.pollN(10).size()); + + // add the element back to the set + assertTrue(set.add(list.get(0))); + assertEquals(1, set.size()); + + iter = set.iterator(); + assertTrue(iter.hasNext()); + LOG.info("Test remove one - DONE"); + } + + @Test + public void testRemoveMulti() { + LOG.info("Test remove multi"); + for (Integer i : list) { + assertTrue(set.add(i)); + } + for (int i = 0; i < NUM / 2; i++) { + assertTrue(set.remove(list.get(i))); + } + + // the deleted elements should not be there + for (int i = 0; i < NUM / 2; i++) { + assertFalse(set.contains(list.get(i))); + } + + // the rest should be there + for (int i = NUM / 2; i < NUM; i++) { + assertTrue(set.contains(list.get(i))); + } + + Iterator iter = set.iterator(); + // the remaining elements should be in order + int num = NUM / 2; + while (iter.hasNext()) { + assertEquals(list.get(num++), iter.next()); + } + assertEquals(num, NUM); + LOG.info("Test remove multi - DONE"); + } + + @Test + public void testRemoveAll() { + LOG.info("Test remove all"); + for (Integer i : list) { + assertTrue(set.add(i)); + } + for (int i = 0; i < NUM; i++) { + assertTrue(set.remove(list.get(i))); + } + // the deleted elements should not be there + for (int i = 0; i < NUM; i++) { + assertFalse(set.contains(list.get(i))); + } + + // iterator should not have next + Iterator iter = set.iterator(); + assertFalse(iter.hasNext()); + assertTrue(set.isEmpty()); + LOG.info("Test remove all - DONE"); + } + + @Test + public void testPollOneElement() { + LOG.info("Test poll one element"); + set.add(list.get(0)); + assertEquals(list.get(0), set.pollFirst()); + assertNull(set.pollFirst()); + LOG.info("Test poll one element - DONE"); + } + + @Test + public void testPollMulti() { + LOG.info("Test poll multi"); + for (Integer i : list) { + assertTrue(set.add(i)); + } + // remove half of the elements by polling + for (int i = 0; i < NUM / 2; i++) { + assertEquals(list.get(i), set.pollFirst()); + } + assertEquals(NUM / 2, set.size()); + // the deleted elements should not be there + for (int i = 0; i < NUM / 2; i++) { + assertFalse(set.contains(list.get(i))); + } + // the rest should be there + for (int i = NUM / 2; i < NUM; i++) { + assertTrue(set.contains(list.get(i))); + } + Iterator iter = set.iterator(); + // the remaining elements should be in order + int num = NUM / 2; + while (iter.hasNext()) { + assertEquals(list.get(num++), iter.next()); + } + assertEquals(num, NUM); + + // add elements back + for (int i = 0; i < NUM / 2; i++) { + assertTrue(set.add(list.get(i))); + } + // order should be switched + assertEquals(NUM, set.size()); + for (int i = NUM / 2; i < NUM; i++) { + assertEquals(list.get(i), set.pollFirst()); + } + for (int i = 0; i < NUM / 2; i++) { + assertEquals(list.get(i), set.pollFirst()); + } + assertEquals(0, set.size()); + assertTrue(set.isEmpty()); + LOG.info("Test poll multi - DONE"); + } + + @Test + public void testPollAll() { + LOG.info("Test poll all"); + for (Integer i : list) { + assertTrue(set.add(i)); + } + // remove all elements by polling + while (set.pollFirst() != null); + assertEquals(0, set.size()); + assertTrue(set.isEmpty()); + + // the deleted elements should not be there + for (int i = 0; i < NUM; i++) { + assertFalse(set.contains(list.get(i))); + } + + Iterator iter = set.iterator(); + assertFalse(iter.hasNext()); + LOG.info("Test poll all - DONE"); + } + + @Test + public void testPollNOne() { + LOG.info("Test pollN one"); + set.add(list.get(0)); + List l = set.pollN(10); + assertEquals(1, l.size()); + assertEquals(list.get(0), l.get(0)); + LOG.info("Test pollN one - DONE"); + } + + @Test + public void testPollNMulti() { + LOG.info("Test pollN multi"); + + // use addAll + set.addAll(list); + + // poll existing elements + List l = set.pollN(10); + assertEquals(10, l.size()); + + for (int i = 0; i < 10; i++) { + assertEquals(list.get(i), l.get(i)); + } + + // poll more elements than present + l = set.pollN(1000); + assertEquals(NUM - 10, l.size()); + + // check the order + for (int i = 10; i < NUM; i++) { + assertEquals(list.get(i), l.get(i - 10)); + } + // set is empty + assertTrue(set.isEmpty()); + assertEquals(0, set.size()); + + LOG.info("Test pollN multi - DONE"); + } + + @Test + public void testClear() { + LOG.info("Test clear"); + // use addAll + set.addAll(list); + assertEquals(NUM, set.size()); + assertFalse(set.isEmpty()); + + // clear the set + set.clear(); + assertEquals(0, set.size()); + assertTrue(set.isEmpty()); + + // poll should return an empty list + assertEquals(0, set.pollAll().size()); + assertEquals(0, set.pollN(10).size()); + assertNull(set.pollFirst()); + + // iterator should be empty + Iterator iter = set.iterator(); + assertFalse(iter.hasNext()); + + LOG.info("Test clear - DONE"); + } + + @Test + public void testOther() { + LOG.info("Test other"); + assertTrue(set.addAll(list)); + // to array + Integer[] array = set.toArray(new Integer[0]); + assertEquals(NUM, array.length); + for (int i = 0; i < array.length; i++) { + assertTrue(list.contains(array[i])); + } + assertEquals(NUM, set.size()); + + // to array + Object[] array2 = set.toArray(); + assertEquals(NUM, array2.length); + for (int i = 0; i < array2.length; i++) { + assertTrue(list.contains((Integer) array2[i])); + } + LOG.info("Test capacity - DONE"); + } + +} \ No newline at end of file