diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 593eb43dadd..9346bbda03f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -269,6 +269,9 @@ Release 2.0.3-alpha - Unreleased MAPREDUCE-4948. Fix a failing unit test TestYARNRunner.testHistoryServerToken. (Junping Du via sseth) + MAPREDUCE-4803. Remove duplicate copy of TestIndexCache. (Mariappan Asokan + via sseth) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java deleted file mode 100644 index b6a2df08833..00000000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java +++ /dev/null @@ -1,324 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.mapred; - -import java.io.DataOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Random; -import java.util.zip.CRC32; -import java.util.zip.CheckedOutputStream; - -import org.apache.hadoop.fs.ChecksumException; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; - -import junit.framework.TestCase; - -public class TestIndexCache extends TestCase { - private JobConf conf; - private FileSystem fs; - private Path p; - - @Override - public void setUp() throws IOException { - conf = new JobConf(); - fs = FileSystem.getLocal(conf).getRaw(); - p = new Path(System.getProperty("test.build.data", "/tmp"), - "cache").makeQualified(fs.getUri(), fs.getWorkingDirectory()); - } - - public void testLRCPolicy() throws Exception { - Random r = new Random(); - long seed = r.nextLong(); - r.setSeed(seed); - System.out.println("seed: " + seed); - fs.delete(p, true); - conf.setInt(TTConfig.TT_INDEX_CACHE, 1); - final int partsPerMap = 1000; - final int bytesPerFile = partsPerMap * 24; - IndexCache cache = new IndexCache(conf); - - // fill cache - int totalsize = bytesPerFile; - for (; totalsize < 1024 * 1024; totalsize += bytesPerFile) { - Path f = new Path(p, Integer.toString(totalsize, 36)); - writeFile(fs, f, totalsize, partsPerMap); - IndexRecord rec = cache.getIndexInformation( - Integer.toString(totalsize, 36), r.nextInt(partsPerMap), f, - UserGroupInformation.getCurrentUser().getShortUserName()); - checkRecord(rec, totalsize); - } - - // delete files, ensure cache retains all elem - for (FileStatus stat : fs.listStatus(p)) { - fs.delete(stat.getPath(),true); - } - for (int i = bytesPerFile; i < 1024 * 1024; i += bytesPerFile) { - Path f = new Path(p, Integer.toString(i, 36)); - IndexRecord rec = cache.getIndexInformation(Integer.toString(i, 36), - r.nextInt(partsPerMap), f, - UserGroupInformation.getCurrentUser().getShortUserName()); - checkRecord(rec, i); - } - - // push oldest (bytesPerFile) out of cache - Path f = new Path(p, Integer.toString(totalsize, 36)); - writeFile(fs, f, totalsize, partsPerMap); - cache.getIndexInformation(Integer.toString(totalsize, 36), - r.nextInt(partsPerMap), f, - UserGroupInformation.getCurrentUser().getShortUserName()); - fs.delete(f, false); - - // oldest fails to read, or error - boolean fnf = false; - try { - cache.getIndexInformation(Integer.toString(bytesPerFile, 36), - r.nextInt(partsPerMap), new Path(p, Integer.toString(bytesPerFile)), - UserGroupInformation.getCurrentUser().getShortUserName()); - } catch (IOException e) { - if (e.getCause() == null || - !(e.getCause() instanceof FileNotFoundException)) { - throw e; - } - else { - fnf = true; - } - } - if (!fnf) - fail("Failed to push out last entry"); - // should find all the other entries - for (int i = bytesPerFile << 1; i < 1024 * 1024; i += bytesPerFile) { - IndexRecord rec = cache.getIndexInformation(Integer.toString(i, 36), - r.nextInt(partsPerMap), new Path(p, Integer.toString(i, 36)), - UserGroupInformation.getCurrentUser().getShortUserName()); - checkRecord(rec, i); - } - IndexRecord rec = cache.getIndexInformation(Integer.toString(totalsize, 36), - r.nextInt(partsPerMap), f, - UserGroupInformation.getCurrentUser().getShortUserName()); - - checkRecord(rec, totalsize); - } - - public void testBadIndex() throws Exception { - final int parts = 30; - fs.delete(p, true); - conf.setInt(TTConfig.TT_INDEX_CACHE, 1); - IndexCache cache = new IndexCache(conf); - - Path f = new Path(p, "badindex"); - FSDataOutputStream out = fs.create(f, false); - CheckedOutputStream iout = new CheckedOutputStream(out, new CRC32()); - DataOutputStream dout = new DataOutputStream(iout); - for (int i = 0; i < parts; ++i) { - for (int j = 0; j < MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8; ++j) { - if (0 == (i % 3)) { - dout.writeLong(i); - } else { - out.writeLong(i); - } - } - } - out.writeLong(iout.getChecksum().getValue()); - dout.close(); - try { - cache.getIndexInformation("badindex", 7, f, - UserGroupInformation.getCurrentUser().getShortUserName()); - fail("Did not detect bad checksum"); - } catch (IOException e) { - if (!(e.getCause() instanceof ChecksumException)) { - throw e; - } - } - } - - public void testInvalidReduceNumberOrLength() throws Exception { - fs.delete(p, true); - conf.setInt(TTConfig.TT_INDEX_CACHE, 1); - final int partsPerMap = 1000; - final int bytesPerFile = partsPerMap * 24; - IndexCache cache = new IndexCache(conf); - - // fill cache - Path feq = new Path(p, "invalidReduceOrPartsPerMap"); - writeFile(fs, feq, bytesPerFile, partsPerMap); - - // Number of reducers should always be less than partsPerMap as reducer - // numbers start from 0 and there cannot be more reducer than parts - - try { - // Number of reducers equal to partsPerMap - cache.getIndexInformation("reduceEqualPartsPerMap", - partsPerMap, // reduce number == partsPerMap - feq, UserGroupInformation.getCurrentUser().getShortUserName()); - fail("Number of reducers equal to partsPerMap did not fail"); - } catch (Exception e) { - if (!(e instanceof IOException)) { - throw e; - } - } - - try { - // Number of reducers more than partsPerMap - cache.getIndexInformation( - "reduceMorePartsPerMap", - partsPerMap + 1, // reduce number > partsPerMap - feq, UserGroupInformation.getCurrentUser().getShortUserName()); - fail("Number of reducers more than partsPerMap did not fail"); - } catch (Exception e) { - if (!(e instanceof IOException)) { - throw e; - } - } - } - - public void testRemoveMap() throws Exception { - // This test case use two thread to call getIndexInformation and - // removeMap concurrently, in order to construct race condition. - // This test case may not repeatable. But on my macbook this test - // fails with probability of 100% on code before MAPREDUCE-2541, - // so it is repeatable in practice. - fs.delete(p, true); - conf.setInt(TTConfig.TT_INDEX_CACHE, 10); - // Make a big file so removeMapThread almost surely runs faster than - // getInfoThread - final int partsPerMap = 100000; - final int bytesPerFile = partsPerMap * 24; - final IndexCache cache = new IndexCache(conf); - - final Path big = new Path(p, "bigIndex"); - final String user = - UserGroupInformation.getCurrentUser().getShortUserName(); - writeFile(fs, big, bytesPerFile, partsPerMap); - - // run multiple times - for (int i = 0; i < 20; ++i) { - Thread getInfoThread = new Thread() { - @Override - public void run() { - try { - cache.getIndexInformation("bigIndex", partsPerMap, big, user); - } catch (Exception e) { - // should not be here - } - } - }; - Thread removeMapThread = new Thread() { - @Override - public void run() { - cache.removeMap("bigIndex"); - } - }; - if (i%2==0) { - getInfoThread.start(); - removeMapThread.start(); - } else { - removeMapThread.start(); - getInfoThread.start(); - } - getInfoThread.join(); - removeMapThread.join(); - assertEquals(true, cache.checkTotalMemoryUsed()); - } - } - - public void testCreateRace() throws Exception { - fs.delete(p, true); - conf.setInt(TTConfig.TT_INDEX_CACHE, 1); - final int partsPerMap = 1000; - final int bytesPerFile = partsPerMap * 24; - final IndexCache cache = new IndexCache(conf); - - final Path racy = new Path(p, "racyIndex"); - final String user = - UserGroupInformation.getCurrentUser().getShortUserName(); - writeFile(fs, racy, bytesPerFile, partsPerMap); - - // run multiple instances - Thread[] getInfoThreads = new Thread[50]; - for (int i = 0; i < 50; i++) { - getInfoThreads[i] = new Thread() { - @Override - public void run() { - try { - cache.getIndexInformation("racyIndex", partsPerMap, racy, user); - cache.removeMap("racyIndex"); - } catch (Exception e) { - // should not be here - } - } - }; - } - - for (int i = 0; i < 50; i++) { - getInfoThreads[i].start(); - } - - final Thread mainTestThread = Thread.currentThread(); - - Thread timeoutThread = new Thread() { - @Override - public void run() { - try { - Thread.sleep(15000); - mainTestThread.interrupt(); - } catch (InterruptedException ie) { - // we are done; - } - } - }; - - for (int i = 0; i < 50; i++) { - try { - getInfoThreads[i].join(); - } catch (InterruptedException ie) { - // we haven't finished in time. Potential deadlock/race. - fail("Unexpectedly long delay during concurrent cache entry creations"); - } - } - // stop the timeoutThread. If we get interrupted before stopping, there - // must be something wrong, although it wasn't a deadlock. No need to - // catch and swallow. - timeoutThread.interrupt(); - } - - private static void checkRecord(IndexRecord rec, long fill) { - assertEquals(fill, rec.startOffset); - assertEquals(fill, rec.rawLength); - assertEquals(fill, rec.partLength); - } - - private static void writeFile(FileSystem fs, Path f, long fill, int parts) - throws IOException { - FSDataOutputStream out = fs.create(f, false); - CheckedOutputStream iout = new CheckedOutputStream(out, new CRC32()); - DataOutputStream dout = new DataOutputStream(iout); - for (int i = 0; i < parts; ++i) { - for (int j = 0; j < MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8; ++j) { - dout.writeLong(fill); - } - } - out.writeLong(iout.getChecksum().getValue()); - dout.close(); - } -}