MAPREDUCE-6684. High contention on scanning of user directory under immediate_done in Job History Server. Contributed by Haibo Chen

This commit is contained in:
Jason Lowe 2016-05-10 16:03:49 +00:00
parent cd35b692de
commit 5ffb54694b
2 changed files with 346 additions and 14 deletions

View File

@ -346,7 +346,7 @@ public class HistoryFileManager extends AbstractService {
private Path confFile; private Path confFile;
private Path summaryFile; private Path summaryFile;
private JobIndexInfo jobIndexInfo; private JobIndexInfo jobIndexInfo;
private HistoryInfoState state; private volatile HistoryInfoState state;
@VisibleForTesting @VisibleForTesting
protected HistoryFileInfo(Path historyFile, Path confFile, protected HistoryFileInfo(Path historyFile, Path confFile,
@ -360,20 +360,20 @@ public class HistoryFileManager extends AbstractService {
} }
@VisibleForTesting @VisibleForTesting
synchronized boolean isMovePending() { boolean isMovePending() {
return state == HistoryInfoState.IN_INTERMEDIATE return state == HistoryInfoState.IN_INTERMEDIATE
|| state == HistoryInfoState.MOVE_FAILED; || state == HistoryInfoState.MOVE_FAILED;
} }
@VisibleForTesting @VisibleForTesting
synchronized boolean didMoveFail() { boolean didMoveFail() {
return state == HistoryInfoState.MOVE_FAILED; return state == HistoryInfoState.MOVE_FAILED;
} }
/** /**
* @return true if the files backed by this were deleted. * @return true if the files backed by this were deleted.
*/ */
public synchronized boolean isDeleted() { public boolean isDeleted() {
return state == HistoryInfoState.DELETED; return state == HistoryInfoState.DELETED;
} }
@ -566,13 +566,15 @@ public class HistoryFileManager extends AbstractService {
int numMoveThreads = conf.getInt( int numMoveThreads = conf.getInt(
JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT); JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
moveToDoneExecutor = createMoveToDoneThreadPool(numMoveThreads);
super.serviceInit(conf);
}
protected ThreadPoolExecutor createMoveToDoneThreadPool(int numMoveThreads) {
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
"MoveIntermediateToDone Thread #%d").build(); "MoveIntermediateToDone Thread #%d").build();
moveToDoneExecutor = new HadoopThreadPoolExecutor(numMoveThreads, return new HadoopThreadPoolExecutor(numMoveThreads, numMoveThreads,
numMoveThreads, 1, TimeUnit.HOURS, 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
new LinkedBlockingQueue<Runnable>(), tf);
super.serviceInit(conf);
} }
@VisibleForTesting @VisibleForTesting
@ -708,6 +710,13 @@ public class HistoryFileManager extends AbstractService {
} }
} }
protected HistoryFileInfo createHistoryFileInfo(Path historyFile,
Path confFile, Path summaryFile, JobIndexInfo jobIndexInfo,
boolean isInDone) {
return new HistoryFileInfo(
historyFile, confFile, summaryFile, jobIndexInfo, isInDone);
}
/** /**
* Populates index data structures. Should only be called at initialization * Populates index data structures. Should only be called at initialization
* times. * times.
@ -782,7 +791,7 @@ public class HistoryFileManager extends AbstractService {
.getIntermediateConfFileName(jobIndexInfo.getJobId()); .getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId()); .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs
.getPath().getParent(), confFileName), new Path(fs.getPath() .getPath().getParent(), confFileName), new Path(fs.getPath()
.getParent(), summaryFileName), jobIndexInfo, true); .getParent(), summaryFileName), jobIndexInfo, true);
jobListCache.addIfAbsent(fileInfo); jobListCache.addIfAbsent(fileInfo);
@ -880,7 +889,7 @@ public class HistoryFileManager extends AbstractService {
.getIntermediateConfFileName(jobIndexInfo.getJobId()); .getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId()); .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs
.getPath().getParent(), confFileName), new Path(fs.getPath() .getPath().getParent(), confFileName), new Path(fs.getPath()
.getParent(), summaryFileName), jobIndexInfo, false); .getParent(), summaryFileName), jobIndexInfo, false);
@ -940,7 +949,7 @@ public class HistoryFileManager extends AbstractService {
.getIntermediateConfFileName(jobIndexInfo.getJobId()); .getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId()); .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path( HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(
fs.getPath().getParent(), confFileName), new Path(fs.getPath() fs.getPath().getParent(), confFileName), new Path(fs.getPath()
.getParent(), summaryFileName), jobIndexInfo, true); .getParent(), summaryFileName), jobIndexInfo, true);
return fileInfo; return fileInfo;
@ -1105,7 +1114,7 @@ public class HistoryFileManager extends AbstractService {
String confFileName = JobHistoryUtils String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId()); .getIntermediateConfFileName(jobIndexInfo.getJobId());
fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path( fileInfo = createHistoryFileInfo(historyFile.getPath(), new Path(
historyFile.getPath().getParent(), confFileName), null, historyFile.getPath().getParent(), confFileName), null,
jobIndexInfo, true); jobIndexInfo, true);
} }

View File

@ -0,0 +1,323 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import static org.mockito.Mockito.mock;
/**
* The test in this class is created specifically to address the issue in
* MAPREDUCE-6684. In cases where there are two threads trying to load different
* jobs through job history file manager, one thread could be blocked by the
* other that is loading a huge job file, which is undesirable.
*
*/
public class TestUnnecessaryBlockingOnHistoryFileInfo {
/**
* The intermediate done directory that JHS scans for completed jobs.
*/
private final static File INTERMEDIATE_DIR = new File("target",
TestUnnecessaryBlockingOnHistoryFileInfo.class.getName() +
"/intermediate");
/**
* A test user directory under intermediate done directory.
*/
private final static File USER_DIR = new File(INTERMEDIATE_DIR, "test");
@BeforeClass
public static void setUp() throws IOException {
if(USER_DIR.exists()) {
FileUtils.cleanDirectory(USER_DIR);
}
USER_DIR.mkdirs();
}
@AfterClass
public static void cleanUp() throws IOException {
FileUtils.deleteDirectory(INTERMEDIATE_DIR);
}
/**
* This create a test case in which two threads are trying to load two
* different jobs of the same user under the intermediate directory.
* One thread should not be blocked by the other thread that is loading
* a huge job files (This is simulated by hanging up parsing the job files
* forever). The test will fail by triggering the timeout if one thread is
* blocked by the other while the other thread is holding the lock on its
* associated job files and hanging up parsing the files.
*/
@Test(timeout = 20000)
public void testTwoThreadsQueryingDifferentJobOfSameUser()
throws InterruptedException, IOException {
final Configuration config = new Configuration();
config.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR,
INTERMEDIATE_DIR.getPath());
config.setLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, Long.MAX_VALUE);
final JobId job1 = createJobId(0);
final JobId job2 = createJobId(1);
final HistoryFileManagerUnderContention historyFileManager =
createHistoryFileManager(config, job1, job2);
Thread webRequest1 = null;
Thread webRequest2 = null;
try {
/**
* create a dummy .jhist file for job1, and try to load/parse the job
* files in one child thread.
*/
createJhistFile(job1);
webRequest1 = new Thread(
new Runnable() {
@Override
public void run() {
try {
HistoryFileManager.HistoryFileInfo historyFileInfo =
historyFileManager.getFileInfo(job1);
historyFileInfo.loadJob();
} catch (IOException e) {
e.printStackTrace();
}
}
}
);
webRequest1.start();
historyFileManager.waitUntilIntermediateDirIsScanned(job1);
/**
* At this point, thread webRequest1 has finished scanning the
* intermediate directory and is hanging up parsing the job files while
* it's holding the lock on the associated HistoryFileInfo object.
*/
/**
* create a dummy .jhist file for job2 and try to load/parse the job files
* in the other child thread. Because job files are not moved from the
* intermediate directory to the done directory, thread webRequest2
* will also see the job history files for job1.
*/
createJhistFile(job2);
webRequest2 = new Thread(
new Runnable() {
@Override
public void run() {
try {
HistoryFileManager.HistoryFileInfo historyFileInfo =
historyFileManager.getFileInfo(job2);
historyFileInfo.loadJob();
} catch (IOException e) {
e.printStackTrace();
}
}
}
);
webRequest2.start();
historyFileManager.waitUntilIntermediateDirIsScanned(job2);
/**
* If execution had gotten to this point, then thread webRequest2 would
* not have tried to acquire the lock of the HistoryFileInfo object
* associated job1, which is permanently held by thread webRequest1 that
* is hanging up parsing the job history files, so it was able to proceed
* with parsing job history files of job2.
*/
Assert.assertTrue("Thread 2 is blocked while it is trying to " +
"load job2 by Thread 1 which is loading job1.",
webRequest2.getState() != Thread.State.BLOCKED);
} finally {
if(webRequest1 != null) {
webRequest1.interrupt();
}
if(webRequest2 != null) {
webRequest2.interrupt();
}
}
}
/**
* Create, initialize and start an instance of HistoryFileManager.
* @param config the configuration to initialize the HistoryFileManager
* instance.
* @param jobIds the set of jobs expected to be loaded by HistoryFileManager.
*/
private HistoryFileManagerUnderContention createHistoryFileManager(
Configuration config, JobId... jobIds) {
HistoryFileManagerUnderContention historyFileManager =
new HistoryFileManagerUnderContention(jobIds);
historyFileManager.init(config);
historyFileManager.start();
return historyFileManager;
}
/**
* Create, initialize and start an instance of CacheHistoryStorage.
* @param config the config to initialize the storage
* @param historyFileManager the HistoryFileManager to initializae the cache
*/
private static CachedHistoryStorage createHistoryStorage(
Configuration config, HistoryFileManager historyFileManager) {
CachedHistoryStorage historyStorage = new CachedHistoryStorage();
historyStorage.setHistoryFileManager(historyFileManager);
historyStorage.init(config);
historyStorage.start();
return historyStorage;
}
private static JobId createJobId(int id) {
JobId jobId = new JobIdPBImpl();
jobId.setId(id);
jobId.setAppId(ApplicationIdPBImpl.newInstance(0, id));
return jobId;
}
/**
* Create a dummy .jhist file under the intermediate directory for given job.
* @param jobId the id of the given job
* @return true if file is created successfully, false otherwise
*/
private static boolean createJhistFile(JobId jobId) throws IOException {
StringBuilder fileName = new StringBuilder(jobId.toString());
long finishTime = System.currentTimeMillis();
fileName.append("-").append(finishTime - 1000)
.append("-").append("test")
.append("-").append(jobId.getId())
.append("-").append(finishTime)
.append(".jhist");
File jhistFile = new File(USER_DIR, fileName.toString());
return jhistFile.createNewFile();
}
/**
* A test implementation of HistoryFileManager that does not move files
* from intermediate directory to done directory and hangs up parsing
* job history files.
*/
class HistoryFileManagerUnderContention extends HistoryFileManager {
/**
* A map of job to a signal that indicates whether the intermediate
* directory is done being scanned before the job files are parsed.
*/
private Map<JobId, CountDownLatch> scanningDoneSignals = new HashMap<>();
/**
* A HistoryFileManager that expects to load given jobs and hangs up
* parsing the job files. It perform no moving of files from the
* intermediate directory to done directory.
* @param jobId the set of jobs expected to load and parse
*/
public HistoryFileManagerUnderContention(JobId... jobId) {
for(JobId job: jobId) {
scanningDoneSignals.put(job, new CountDownLatch(1));
}
}
/**
* Wait until scanning of the intermediate directory finishes and load
* of the given job is started.
*/
public void waitUntilIntermediateDirIsScanned(JobId jobId)
throws InterruptedException {
if(scanningDoneSignals.containsKey(jobId)) {
scanningDoneSignals.get(jobId).await();
}
}
/**
* Create a HistoryFileInfo instance that hangs on parsing job files.
*/
@Override
protected HistoryFileManager.HistoryFileInfo createHistoryFileInfo(
Path historyFile, Path confFile, Path summaryFile,
JobIndexInfo jobIndexInfo, boolean isInDone) {
return new HistoryFileInfo(historyFile, confFile, summaryFile,
jobIndexInfo, isInDone,
scanningDoneSignals.get(jobIndexInfo .getJobId()));
}
/**
* Create a dummy ThreadPoolExecutor that does not execute submitted tasks.
*/
@Override
protected ThreadPoolExecutor createMoveToDoneThreadPool(
int numMoveThreads) {
return mock(ThreadPoolExecutor.class);
}
/**
* A HistoryFileInfo implementation that takes forever to parse the
* associated job files. This mimics the behavior of parsing huge job files.
*/
class HistoryFileInfo extends HistoryFileManager.HistoryFileInfo {
/**
* A signal that indicates scanning of the intermediate directory is done
* as HistoryFileManager is in the process of loading the HistoryFileInfo
* instance.
*/
private final CountDownLatch scanningDoneSignal;
HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile,
JobIndexInfo jobIndexInfo, boolean isInDone,
CountDownLatch scanningDoneSignal) {
super(historyFile, confFile, summaryFile, jobIndexInfo, isInDone);
this.scanningDoneSignal = scanningDoneSignal;
}
/**
* An test implementation that takes forever to load a job in order to
* mimic what happens when job files of large size are parsed in JHS.
* Before loading, we signal that scanning of the intermediate directory
* is finished.
*/
@Override
public synchronized Job loadJob() throws IOException {
if(scanningDoneSignal != null) {
scanningDoneSignal.countDown();
}
while(!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
}
return null;
}
}
}
}