diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index e35957bbcbf..119abfe7b09 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -369,6 +369,9 @@ Release 2.1.0-beta - 2013-07-02 HADOOP-8470. Add NetworkTopologyWithNodeGroup, a 4-layer implementation of NetworkTopology. (Junping Du via szetszwo) + HADOOP-9763. Extends LightWeightGSet to support eviction of expired + elements. (Tsz Wo (Nicholas) SZE via jing9) + IMPROVEMENTS HADOOP-9164. Print paths of loaded native libraries in diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/GSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/GSet.java index 4f38001ca57..26e73cf1e38 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/GSet.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/GSet.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.util; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; /** @@ -31,6 +33,8 @@ import org.apache.hadoop.classification.InterfaceAudience; */ @InterfaceAudience.Private public interface GSet extends Iterable { + static final Log LOG = LogFactory.getLog(GSet.class); + /** * @return The size of this set. */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightCache.java new file mode 100644 index 00000000000..7e7ad2c3458 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightCache.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.util.Comparator; +import java.util.PriorityQueue; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * A low memory footprint Cache which extends {@link LightWeightGSet}. + * An entry in the cache is expired if + * (1) it is added to the cache longer than the creation-expiration period, and + * (2) it is not accessed for the access-expiration period. + * When an entry is expired, it may be evicted from the cache. + * When the size limit of the cache is set, the cache will evict the entries + * with earliest expiration time, even if they are not expired. + * + * It is guaranteed that number of entries in the cache is less than or equal + * to the size limit. However, It is not guaranteed that expired entries are + * evicted from the cache. An expired entry may possibly be accessed after its + * expiration time. In such case, the expiration time may be updated. + * + * This class does not support null entry. + * + * This class is not thread safe. + * + * @param Key type for looking up the entries + * @param Entry type, which must be + * (1) a subclass of K, and + * (2) implementing {@link Entry} interface, and + */ +@InterfaceAudience.Private +public class LightWeightCache extends LightWeightGSet { + /** Limit the number of entries in each eviction. */ + private static final int EVICTION_LIMIT = 1 << 16; + + /** + * Entries of {@link LightWeightCache}. + */ + public static interface Entry extends LinkedElement { + /** Set the expiration time. */ + public void setExpirationTime(long timeNano); + + /** Get the expiration time. */ + public long getExpirationTime(); + } + + /** Comparator for sorting entries by expiration time in ascending order. */ + private static final Comparator expirationTimeComparator + = new Comparator() { + @Override + public int compare(Entry left, Entry right) { + final long l = left.getExpirationTime(); + final long r = right.getExpirationTime(); + return l > r? 1: l < r? -1: 0; + } + }; + + /** A clock for measuring time so that it can be mocked in unit tests. */ + static class Clock { + /** @return the current time. */ + long currentTime() { + return System.nanoTime(); + } + } + + private static int updateRecommendedLength(int recommendedLength, + int sizeLimit) { + return sizeLimit > 0 && sizeLimit < recommendedLength? + (sizeLimit/4*3) // 0.75 load factor + : recommendedLength; + } + + /* + * The memory footprint for java.util.PriorityQueue is low but the + * remove(Object) method runs in linear time. We may improve it by using a + * balanced tree. However, we do not yet have a low memory footprint balanced + * tree implementation. + */ + private final PriorityQueue queue; + private final long creationExpirationPeriod; + private final long accessExpirationPeriod; + private final int sizeLimit; + private final Clock clock; + + /** + * @param recommendedLength Recommended size of the internal array. + * @param sizeLimit the limit of the size of the cache. + * The limit is disabled if it is <= 0. + * @param creationExpirationPeriod the time period C > 0 in nanoseconds that + * the creation of an entry is expired if it is added to the cache + * longer than C. + * @param accessExpirationPeriod the time period A >= 0 in nanoseconds that + * the access of an entry is expired if it is not accessed + * longer than A. + */ + public LightWeightCache(final int recommendedLength, + final int sizeLimit, + final long creationExpirationPeriod, + final long accessExpirationPeriod) { + this(recommendedLength, sizeLimit, + creationExpirationPeriod, accessExpirationPeriod, new Clock()); + } + + @VisibleForTesting + LightWeightCache(final int recommendedLength, + final int sizeLimit, + final long creationExpirationPeriod, + final long accessExpirationPeriod, + final Clock clock) { + super(updateRecommendedLength(recommendedLength, sizeLimit)); + + this.sizeLimit = sizeLimit; + + if (creationExpirationPeriod <= 0) { + throw new IllegalArgumentException("creationExpirationPeriod = " + + creationExpirationPeriod + " <= 0"); + } + this.creationExpirationPeriod = creationExpirationPeriod; + + if (accessExpirationPeriod < 0) { + throw new IllegalArgumentException("accessExpirationPeriod = " + + accessExpirationPeriod + " < 0"); + } + this.accessExpirationPeriod = accessExpirationPeriod; + + this.queue = new PriorityQueue( + sizeLimit > 0? sizeLimit + 1: 1 << 10, expirationTimeComparator); + this.clock = clock; + } + + void setExpirationTime(final Entry e, final long expirationPeriod) { + e.setExpirationTime(clock.currentTime() + expirationPeriod); + } + + boolean isExpired(final Entry e, final long now) { + return now > e.getExpirationTime(); + } + + private E evict() { + @SuppressWarnings("unchecked") + final E polled = (E)queue.poll(); + final E removed = super.remove(polled); + Preconditions.checkState(removed == polled); + return polled; + } + + /** Evict expired entries. */ + private void evictExpiredEntries() { + final long now = clock.currentTime(); + for(int i = 0; i < EVICTION_LIMIT; i++) { + final Entry peeked = queue.peek(); + if (peeked == null || !isExpired(peeked, now)) { + return; + } + + final E evicted = evict(); + Preconditions.checkState(evicted == peeked); + } + } + + /** Evict entries in order to enforce the size limit of the cache. */ + private void evictEntries() { + if (sizeLimit > 0) { + for(int i = size(); i > sizeLimit; i--) { + evict(); + } + } + } + + @Override + public E get(K key) { + final E entry = super.get(key); + if (entry != null) { + if (accessExpirationPeriod > 0) { + // update expiration time + final Entry existing = (Entry)entry; + Preconditions.checkState(queue.remove(existing)); + setExpirationTime(existing, accessExpirationPeriod); + queue.offer(existing); + } + } + return entry; + } + + @Override + public E put(final E entry) { + if (!(entry instanceof Entry)) { + throw new HadoopIllegalArgumentException( + "!(entry instanceof Entry), entry.getClass()=" + entry.getClass()); + } + + evictExpiredEntries(); + + final E existing = super.put(entry); + if (existing != null) { + queue.remove(existing); + } + + final Entry e = (Entry)entry; + setExpirationTime(e, creationExpirationPeriod); + queue.offer(e); + + evictEntries(); + return existing; + } + + @Override + public E remove(K key) { + evictExpiredEntries(); + + final E removed = super.remove(key); + if (removed != null) { + Preconditions.checkState(queue.remove(removed)); + } + return removed; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightGSet.java index e0a62b978f7..768606969fc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightGSet.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightGSet.java @@ -21,11 +21,9 @@ import java.io.PrintStream; import java.util.ConcurrentModificationException; import java.util.Iterator; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.HadoopIllegalArgumentException; import com.google.common.annotations.VisibleForTesting; @@ -59,7 +57,6 @@ public class LightWeightGSet implements GSet { public LinkedElement getNext(); } - public static final Log LOG = LogFactory.getLog(GSet.class); static final int MAX_ARRAY_LENGTH = 1 << 30; //prevent int overflow problem static final int MIN_ARRAY_LENGTH = 1; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestGSet.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestGSet.java index 3a9391318ee..af880eecdc0 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestGSet.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestGSet.java @@ -169,9 +169,8 @@ public class TestGSet { } /** - * A long test, - * which may take ~5 hours, - * with various data sets and parameters. + * A long running test with various data sets and parameters. + * It may take ~5 hours, * If you are changing the implementation, * please un-comment the following line in order to run the test. */ @@ -327,8 +326,6 @@ public class TestGSet { } else { Assert.assertEquals(e.id, gset.remove(key).id); } - - check(); return e; } @Override @@ -391,7 +388,9 @@ public class TestGSet { @Override public void clear() { + expected.clear(); gset.clear(); + Assert.assertEquals(0, size()); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLightWeightCache.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLightWeightCache.java new file mode 100644 index 00000000000..5c515daaf58 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLightWeightCache.java @@ -0,0 +1,456 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.util.Date; +import java.util.Iterator; +import java.util.Random; + +import org.junit.Assert; +import org.junit.Test; + +/** Testing {@link LightWeightCache} */ +public class TestLightWeightCache { + private static final long starttime = Time.now(); + private static final Random ran = new Random(starttime); + static { + println("Start time = " + new Date(starttime) + ", seed=" + starttime); + } + + private static void print(Object s) { + System.out.print(s); + System.out.flush(); + } + + private static void println(Object s) { + System.out.println(s); + } + + @Test + public void testLightWeightCache() { + // test randomized creation expiration with zero access expiration + { + final long creationExpiration = ran.nextInt(1024) + 1; + check(1, creationExpiration, 0L, 1 << 10, 65537); + check(17, creationExpiration, 0L, 1 << 16, 17); + check(255, creationExpiration, 0L, 1 << 16, 65537); + } + + // test randomized creation/access expiration periods + for(int i = 0; i < 3; i++) { + final long creationExpiration = ran.nextInt(1024) + 1; + final long accessExpiration = ran.nextInt(1024) + 1; + + check(1, creationExpiration, accessExpiration, 1 << 10, 65537); + check(17, creationExpiration, accessExpiration, 1 << 16, 17); + check(255, creationExpiration, accessExpiration, 1 << 16, 65537); + } + + // test size limit + final int dataSize = 1 << 16; + for(int i = 0; i < 10; i++) { + final int modulus = ran.nextInt(1024) + 1; + final int sizeLimit = ran.nextInt(modulus); + checkSizeLimit(sizeLimit, dataSize, modulus); + } + } + + private static void checkSizeLimit(final int sizeLimit, final int datasize, + final int modulus) { + final LightWeightCacheTestCase test = new LightWeightCacheTestCase( + sizeLimit, sizeLimit, 1L << 32, 1L << 32, datasize, modulus); + + // keep putting entries and check size limit + print(" check size ................. "); + for(int i = 0; i < test.data.size(); i++) { + test.cache.put(test.data.get(i)); + Assert.assertTrue(test.cache.size() <= sizeLimit); + } + println("DONE " + test.stat()); + } + + /** + * Test various createionExpirationPeriod and accessExpirationPeriod. + * It runs ~2 minutes. If you are changing the implementation, + * please un-comment the following line in order to run the test. + */ +// @Test + public void testExpirationPeriods() { + for(int k = -4; k < 10; k += 4) { + final long accessExpirationPeriod = k < 0? 0L: (1L << k); + for(int j = 0; j < 10; j += 4) { + final long creationExpirationPeriod = 1L << j; + runTests(1, creationExpirationPeriod, accessExpirationPeriod); + for(int i = 1; i < Integer.SIZE - 1; i += 8) { + runTests((1 << i) + 1, creationExpirationPeriod, accessExpirationPeriod); + } + } + } + } + + /** Run tests with various table lengths. */ + private static void runTests(final int modulus, + final long creationExpirationPeriod, + final long accessExpirationPeriod) { + println("\n\n\n*** runTest: modulus=" + modulus + + ", creationExpirationPeriod=" + creationExpirationPeriod + + ", accessExpirationPeriod=" + accessExpirationPeriod); + for(int i = 0; i <= 16; i += 4) { + final int tablelength = (1 << i); + + final int upper = i + 2; + final int steps = Math.max(1, upper/3); + + for(int j = upper; j > 0; j -= steps) { + final int datasize = 1 << j; + check(tablelength, creationExpirationPeriod, accessExpirationPeriod, + datasize, modulus); + } + } + } + + private static void check(int tablelength, long creationExpirationPeriod, + long accessExpirationPeriod, int datasize, int modulus) { + check(new LightWeightCacheTestCase(tablelength, -1, + creationExpirationPeriod, accessExpirationPeriod, datasize, modulus)); + } + + /** + * check the following operations + * (1) put + * (2) remove & put + * (3) remove + * (4) remove & put again + */ + private static void check(final LightWeightCacheTestCase test) { + //check put + print(" check put .................. "); + for(int i = 0; i < test.data.size()/2; i++) { + test.put(test.data.get(i)); + } + for(int i = 0; i < test.data.size(); i++) { + test.put(test.data.get(i)); + } + println("DONE " + test.stat()); + + //check remove and put + print(" check remove & put ......... "); + for(int j = 0; j < 10; j++) { + for(int i = 0; i < test.data.size()/2; i++) { + final int r = ran.nextInt(test.data.size()); + test.remove(test.data.get(r)); + } + for(int i = 0; i < test.data.size()/2; i++) { + final int r = ran.nextInt(test.data.size()); + test.put(test.data.get(r)); + } + } + println("DONE " + test.stat()); + + //check remove + print(" check remove ............... "); + for(int i = 0; i < test.data.size(); i++) { + test.remove(test.data.get(i)); + } + Assert.assertEquals(0, test.cache.size()); + println("DONE " + test.stat()); + + //check remove and put again + print(" check remove & put again ... "); + for(int j = 0; j < 10; j++) { + for(int i = 0; i < test.data.size()/2; i++) { + final int r = ran.nextInt(test.data.size()); + test.remove(test.data.get(r)); + } + for(int i = 0; i < test.data.size()/2; i++) { + final int r = ran.nextInt(test.data.size()); + test.put(test.data.get(r)); + } + } + println("DONE " + test.stat()); + + final long s = (Time.now() - starttime)/1000L; + println("total time elapsed=" + s + "s\n"); + } + + /** + * The test case contains two data structures, a cache and a hashMap. + * The hashMap is used to verify the correctness of the cache. Note that + * no automatic eviction is performed in the hashMap. Thus, we have + * (1) If an entry exists in cache, it MUST exist in the hashMap. + * (2) If an entry does not exist in the cache, it may or may not exist in the + * hashMap. If it exists, it must be expired. + */ + private static class LightWeightCacheTestCase implements GSet { + /** hashMap will not evict entries automatically. */ + final GSet hashMap + = new GSetByHashMap(1024, 0.75f); + + final LightWeightCache cache; + final IntData data; + + final String info; + final long starttime = Time.now(); + /** Determine the probability in {@link #check()}. */ + final int denominator; + int iterate_count = 0; + int contain_count = 0; + + private long currentTestTime = ran.nextInt(); + + LightWeightCacheTestCase(int tablelength, int sizeLimit, + long creationExpirationPeriod, long accessExpirationPeriod, + int datasize, int modulus) { + denominator = Math.min((datasize >> 7) + 1, 1 << 16); + info = getClass().getSimpleName() + "(" + new Date(starttime) + + "): tablelength=" + tablelength + + ", creationExpirationPeriod=" + creationExpirationPeriod + + ", accessExpirationPeriod=" + accessExpirationPeriod + + ", datasize=" + datasize + + ", modulus=" + modulus + + ", denominator=" + denominator; + println(info); + + data = new IntData(datasize, modulus); + cache = new LightWeightCache(tablelength, sizeLimit, + creationExpirationPeriod, 0, new LightWeightCache.Clock() { + @Override + long currentTime() { + return currentTestTime; + } + }); + + Assert.assertEquals(0, cache.size()); + } + + private boolean containsTest(IntEntry key) { + final boolean c = cache.contains(key); + if (c) { + Assert.assertTrue(hashMap.contains(key)); + } else { + final IntEntry h = hashMap.remove(key); + if (h != null) { + Assert.assertTrue(cache.isExpired(h, currentTestTime)); + } + } + return c; + } + @Override + public boolean contains(IntEntry key) { + final boolean e = containsTest(key); + check(); + return e; + } + + private IntEntry getTest(IntEntry key) { + final IntEntry c = cache.get(key); + if (c != null) { + Assert.assertEquals(hashMap.get(key).id, c.id); + } else { + final IntEntry h = hashMap.remove(key); + if (h != null) { + Assert.assertTrue(cache.isExpired(h, currentTestTime)); + } + } + return c; + } + @Override + public IntEntry get(IntEntry key) { + final IntEntry e = getTest(key); + check(); + return e; + } + + private IntEntry putTest(IntEntry entry) { + final IntEntry c = cache.put(entry); + if (c != null) { + Assert.assertEquals(hashMap.put(entry).id, c.id); + } else { + final IntEntry h = hashMap.put(entry); + if (h != null && h != entry) { + // if h == entry, its expiration time is already updated + Assert.assertTrue(cache.isExpired(h, currentTestTime)); + } + } + return c; + } + @Override + public IntEntry put(IntEntry entry) { + final IntEntry e = putTest(entry); + check(); + return e; + } + + private IntEntry removeTest(IntEntry key) { + final IntEntry c = cache.remove(key); + if (c != null) { + Assert.assertEquals(c.id, hashMap.remove(key).id); + } else { + final IntEntry h = hashMap.remove(key); + if (h != null) { + Assert.assertTrue(cache.isExpired(h, currentTestTime)); + } + } + return c; + } + @Override + public IntEntry remove(IntEntry key) { + final IntEntry e = removeTest(key); + check(); + return e; + } + + private int sizeTest() { + final int c = cache.size(); + Assert.assertTrue(hashMap.size() >= c); + return c; + } + @Override + public int size() { + final int s = sizeTest(); + check(); + return s; + } + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + + boolean tossCoin() { + return ran.nextInt(denominator) == 0; + } + + void check() { + currentTestTime += ran.nextInt() & 0x3; + + //test size + sizeTest(); + + if (tossCoin()) { + //test get(..), check content and test iterator + iterate_count++; + for(IntEntry i : cache) { + getTest(i); + } + } + + if (tossCoin()) { + //test contains(..) + contain_count++; + final int count = Math.min(data.size(), 1000); + if (count == data.size()) { + for(IntEntry i : data.integers) { + containsTest(i); + } + } else { + for(int j = 0; j < count; j++) { + containsTest(data.get(ran.nextInt(data.size()))); + } + } + } + } + + String stat() { + final long t = Time.now() - starttime; + return String.format(" iterate=%5d, contain=%5d, time elapsed=%5d.%03ds", + iterate_count, contain_count, t/1000, t%1000); + } + + @Override + public void clear() { + hashMap.clear(); + cache.clear(); + Assert.assertEquals(0, size()); + } + } + + private static class IntData { + final IntEntry[] integers; + + IntData(int size, int modulus) { + integers = new IntEntry[size]; + for(int i = 0; i < integers.length; i++) { + integers[i] = new IntEntry(i, ran.nextInt(modulus)); + } + } + + IntEntry get(int i) { + return integers[i]; + } + + int size() { + return integers.length; + } + } + + /** Entries of {@link LightWeightCache} in this test */ + private static class IntEntry implements LightWeightCache.Entry, + Comparable { + private LightWeightGSet.LinkedElement next; + final int id; + final int value; + private long expirationTime = 0; + + IntEntry(int id, int value) { + this.id = id; + this.value = value; + } + + @Override + public boolean equals(Object obj) { + return obj != null && obj instanceof IntEntry + && value == ((IntEntry)obj).value; + } + + @Override + public int hashCode() { + return value; + } + + @Override + public int compareTo(IntEntry that) { + return value - that.value; + } + + @Override + public String toString() { + return id + "#" + value + ",expirationTime=" + expirationTime; + } + + @Override + public LightWeightGSet.LinkedElement getNext() { + return next; + } + + @Override + public void setNext(LightWeightGSet.LinkedElement e) { + next = e; + } + + @Override + public void setExpirationTime(long timeNano) { + this.expirationTime = timeNano; + } + + @Override + public long getExpirationTime() { + return expirationTime; + } + } +}