diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java index e861ef9adaf..27e743c53b2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java @@ -346,7 +346,7 @@ public class HistoryFileManager extends AbstractService { private Path confFile; private Path summaryFile; private JobIndexInfo jobIndexInfo; - private HistoryInfoState state; + private volatile HistoryInfoState state; @VisibleForTesting protected HistoryFileInfo(Path historyFile, Path confFile, @@ -360,20 +360,20 @@ public class HistoryFileManager extends AbstractService { } @VisibleForTesting - synchronized boolean isMovePending() { + boolean isMovePending() { return state == HistoryInfoState.IN_INTERMEDIATE || state == HistoryInfoState.MOVE_FAILED; } @VisibleForTesting - synchronized boolean didMoveFail() { + boolean didMoveFail() { return state == HistoryInfoState.MOVE_FAILED; } - + /** * @return true if the files backed by this were deleted. */ - public synchronized boolean isDeleted() { + public boolean isDeleted() { return state == HistoryInfoState.DELETED; } @@ -566,13 +566,15 @@ public class HistoryFileManager extends AbstractService { int numMoveThreads = conf.getInt( JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT); + moveToDoneExecutor = createMoveToDoneThreadPool(numMoveThreads); + super.serviceInit(conf); + } + + protected ThreadPoolExecutor createMoveToDoneThreadPool(int numMoveThreads) { ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( "MoveIntermediateToDone Thread #%d").build(); - moveToDoneExecutor = new HadoopThreadPoolExecutor(numMoveThreads, - numMoveThreads, 1, TimeUnit.HOURS, - new LinkedBlockingQueue(), tf); - - super.serviceInit(conf); + return new HadoopThreadPoolExecutor(numMoveThreads, numMoveThreads, + 1, TimeUnit.HOURS, new LinkedBlockingQueue(), tf); } @VisibleForTesting @@ -708,6 +710,13 @@ public class HistoryFileManager extends AbstractService { } } + protected HistoryFileInfo createHistoryFileInfo(Path historyFile, + Path confFile, Path summaryFile, JobIndexInfo jobIndexInfo, + boolean isInDone) { + return new HistoryFileInfo( + historyFile, confFile, summaryFile, jobIndexInfo, isInDone); + } + /** * Populates index data structures. Should only be called at initialization * times. @@ -782,7 +791,7 @@ public class HistoryFileManager extends AbstractService { .getIntermediateConfFileName(jobIndexInfo.getJobId()); String summaryFileName = JobHistoryUtils .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); - HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs + HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs .getPath().getParent(), confFileName), new Path(fs.getPath() .getParent(), summaryFileName), jobIndexInfo, true); jobListCache.addIfAbsent(fileInfo); @@ -880,7 +889,7 @@ public class HistoryFileManager extends AbstractService { .getIntermediateConfFileName(jobIndexInfo.getJobId()); String summaryFileName = JobHistoryUtils .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); - HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs + HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs .getPath().getParent(), confFileName), new Path(fs.getPath() .getParent(), summaryFileName), jobIndexInfo, false); @@ -940,7 +949,7 @@ public class HistoryFileManager extends AbstractService { .getIntermediateConfFileName(jobIndexInfo.getJobId()); String summaryFileName = JobHistoryUtils .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); - HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path( + HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path( fs.getPath().getParent(), confFileName), new Path(fs.getPath() .getParent(), summaryFileName), jobIndexInfo, true); return fileInfo; @@ -1105,7 +1114,7 @@ public class HistoryFileManager extends AbstractService { String confFileName = JobHistoryUtils .getIntermediateConfFileName(jobIndexInfo.getJobId()); - fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path( + fileInfo = createHistoryFileInfo(historyFile.getPath(), new Path( historyFile.getPath().getParent(), confFileName), null, jobIndexInfo, true); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestUnnecessaryBlockingOnHistoryFileInfo.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestUnnecessaryBlockingOnHistoryFileInfo.java new file mode 100644 index 00000000000..06045b5a76f --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestUnnecessaryBlockingOnHistoryFileInfo.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.hs; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadPoolExecutor; + +import static org.mockito.Mockito.mock; + +/** + * The test in this class is created specifically to address the issue in + * MAPREDUCE-6684. In cases where there are two threads trying to load different + * jobs through job history file manager, one thread could be blocked by the + * other that is loading a huge job file, which is undesirable. + * + */ +public class TestUnnecessaryBlockingOnHistoryFileInfo { + /** + * The intermediate done directory that JHS scans for completed jobs. + */ + private final static File INTERMEDIATE_DIR = new File("target", + TestUnnecessaryBlockingOnHistoryFileInfo.class.getName() + + "/intermediate"); + /** + * A test user directory under intermediate done directory. + */ + private final static File USER_DIR = new File(INTERMEDIATE_DIR, "test"); + + @BeforeClass + public static void setUp() throws IOException { + if(USER_DIR.exists()) { + FileUtils.cleanDirectory(USER_DIR); + } + USER_DIR.mkdirs(); + } + + @AfterClass + public static void cleanUp() throws IOException { + FileUtils.deleteDirectory(INTERMEDIATE_DIR); + } + + /** + * This create a test case in which two threads are trying to load two + * different jobs of the same user under the intermediate directory. + * One thread should not be blocked by the other thread that is loading + * a huge job files (This is simulated by hanging up parsing the job files + * forever). The test will fail by triggering the timeout if one thread is + * blocked by the other while the other thread is holding the lock on its + * associated job files and hanging up parsing the files. + */ + @Test(timeout = 20000) + public void testTwoThreadsQueryingDifferentJobOfSameUser() + throws InterruptedException, IOException { + final Configuration config = new Configuration(); + config.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, + INTERMEDIATE_DIR.getPath()); + config.setLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, Long.MAX_VALUE); + + final JobId job1 = createJobId(0); + final JobId job2 = createJobId(1); + final HistoryFileManagerUnderContention historyFileManager = + createHistoryFileManager(config, job1, job2); + + Thread webRequest1 = null; + Thread webRequest2 = null; + try { + /** + * create a dummy .jhist file for job1, and try to load/parse the job + * files in one child thread. + */ + createJhistFile(job1); + webRequest1 = new Thread( + new Runnable() { + @Override + public void run() { + try { + HistoryFileManager.HistoryFileInfo historyFileInfo = + historyFileManager.getFileInfo(job1); + historyFileInfo.loadJob(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + ); + webRequest1.start(); + historyFileManager.waitUntilIntermediateDirIsScanned(job1); + + /** + * At this point, thread webRequest1 has finished scanning the + * intermediate directory and is hanging up parsing the job files while + * it's holding the lock on the associated HistoryFileInfo object. + */ + + /** + * create a dummy .jhist file for job2 and try to load/parse the job files + * in the other child thread. Because job files are not moved from the + * intermediate directory to the done directory, thread webRequest2 + * will also see the job history files for job1. + */ + createJhistFile(job2); + webRequest2 = new Thread( + new Runnable() { + @Override + public void run() { + try { + HistoryFileManager.HistoryFileInfo historyFileInfo = + historyFileManager.getFileInfo(job2); + historyFileInfo.loadJob(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + ); + webRequest2.start(); + historyFileManager.waitUntilIntermediateDirIsScanned(job2); + + /** + * If execution had gotten to this point, then thread webRequest2 would + * not have tried to acquire the lock of the HistoryFileInfo object + * associated job1, which is permanently held by thread webRequest1 that + * is hanging up parsing the job history files, so it was able to proceed + * with parsing job history files of job2. + */ + Assert.assertTrue("Thread 2 is blocked while it is trying to " + + "load job2 by Thread 1 which is loading job1.", + webRequest2.getState() != Thread.State.BLOCKED); + } finally { + if(webRequest1 != null) { + webRequest1.interrupt(); + } + if(webRequest2 != null) { + webRequest2.interrupt(); + } + } + } + + /** + * Create, initialize and start an instance of HistoryFileManager. + * @param config the configuration to initialize the HistoryFileManager + * instance. + * @param jobIds the set of jobs expected to be loaded by HistoryFileManager. + */ + private HistoryFileManagerUnderContention createHistoryFileManager( + Configuration config, JobId... jobIds) { + HistoryFileManagerUnderContention historyFileManager = + new HistoryFileManagerUnderContention(jobIds); + historyFileManager.init(config); + historyFileManager.start(); + return historyFileManager; + } + + /** + * Create, initialize and start an instance of CacheHistoryStorage. + * @param config the config to initialize the storage + * @param historyFileManager the HistoryFileManager to initializae the cache + */ + private static CachedHistoryStorage createHistoryStorage( + Configuration config, HistoryFileManager historyFileManager) { + CachedHistoryStorage historyStorage = new CachedHistoryStorage(); + historyStorage.setHistoryFileManager(historyFileManager); + historyStorage.init(config); + historyStorage.start(); + return historyStorage; + } + + private static JobId createJobId(int id) { + JobId jobId = new JobIdPBImpl(); + jobId.setId(id); + jobId.setAppId(ApplicationIdPBImpl.newInstance(0, id)); + return jobId; + } + + /** + * Create a dummy .jhist file under the intermediate directory for given job. + * @param jobId the id of the given job + * @return true if file is created successfully, false otherwise + */ + private static boolean createJhistFile(JobId jobId) throws IOException { + StringBuilder fileName = new StringBuilder(jobId.toString()); + long finishTime = System.currentTimeMillis(); + fileName.append("-").append(finishTime - 1000) + .append("-").append("test") + .append("-").append(jobId.getId()) + .append("-").append(finishTime) + .append(".jhist"); + File jhistFile = new File(USER_DIR, fileName.toString()); + return jhistFile.createNewFile(); + } + + /** + * A test implementation of HistoryFileManager that does not move files + * from intermediate directory to done directory and hangs up parsing + * job history files. + */ + class HistoryFileManagerUnderContention extends HistoryFileManager { + /** + * A map of job to a signal that indicates whether the intermediate + * directory is done being scanned before the job files are parsed. + */ + private Map scanningDoneSignals = new HashMap<>(); + + /** + * A HistoryFileManager that expects to load given jobs and hangs up + * parsing the job files. It perform no moving of files from the + * intermediate directory to done directory. + * @param jobId the set of jobs expected to load and parse + */ + public HistoryFileManagerUnderContention(JobId... jobId) { + for(JobId job: jobId) { + scanningDoneSignals.put(job, new CountDownLatch(1)); + } + } + + /** + * Wait until scanning of the intermediate directory finishes and load + * of the given job is started. + */ + public void waitUntilIntermediateDirIsScanned(JobId jobId) + throws InterruptedException { + if(scanningDoneSignals.containsKey(jobId)) { + scanningDoneSignals.get(jobId).await(); + } + } + + /** + * Create a HistoryFileInfo instance that hangs on parsing job files. + */ + @Override + protected HistoryFileManager.HistoryFileInfo createHistoryFileInfo( + Path historyFile, Path confFile, Path summaryFile, + JobIndexInfo jobIndexInfo, boolean isInDone) { + return new HistoryFileInfo(historyFile, confFile, summaryFile, + jobIndexInfo, isInDone, + scanningDoneSignals.get(jobIndexInfo .getJobId())); + } + + /** + * Create a dummy ThreadPoolExecutor that does not execute submitted tasks. + */ + @Override + protected ThreadPoolExecutor createMoveToDoneThreadPool( + int numMoveThreads) { + return mock(ThreadPoolExecutor.class); + } + + /** + * A HistoryFileInfo implementation that takes forever to parse the + * associated job files. This mimics the behavior of parsing huge job files. + */ + class HistoryFileInfo extends HistoryFileManager.HistoryFileInfo { + /** + * A signal that indicates scanning of the intermediate directory is done + * as HistoryFileManager is in the process of loading the HistoryFileInfo + * instance. + */ + private final CountDownLatch scanningDoneSignal; + + HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile, + JobIndexInfo jobIndexInfo, boolean isInDone, + CountDownLatch scanningDoneSignal) { + super(historyFile, confFile, summaryFile, jobIndexInfo, isInDone); + this.scanningDoneSignal = scanningDoneSignal; + } + + /** + * An test implementation that takes forever to load a job in order to + * mimic what happens when job files of large size are parsed in JHS. + * Before loading, we signal that scanning of the intermediate directory + * is finished. + */ + @Override + public synchronized Job loadJob() throws IOException { + if(scanningDoneSignal != null) { + scanningDoneSignal.countDown(); + } + while(!Thread.currentThread().isInterrupted()) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } + } + return null; + } + } + } +}