MAPREDUCE-6684. High contention on scanning of user directory under immediate_done in Job History Server. Contributed by Haibo Chen
(cherry picked from commit 5ffb54694b
)
This commit is contained in:
parent
77d147e0ec
commit
d3dfed3fd0
|
@ -346,7 +346,7 @@ public class HistoryFileManager extends AbstractService {
|
|||
private Path confFile;
|
||||
private Path summaryFile;
|
||||
private JobIndexInfo jobIndexInfo;
|
||||
private HistoryInfoState state;
|
||||
private volatile HistoryInfoState state;
|
||||
|
||||
@VisibleForTesting
|
||||
protected HistoryFileInfo(Path historyFile, Path confFile,
|
||||
|
@ -360,20 +360,20 @@ public class HistoryFileManager extends AbstractService {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
synchronized boolean isMovePending() {
|
||||
boolean isMovePending() {
|
||||
return state == HistoryInfoState.IN_INTERMEDIATE
|
||||
|| state == HistoryInfoState.MOVE_FAILED;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
synchronized boolean didMoveFail() {
|
||||
boolean didMoveFail() {
|
||||
return state == HistoryInfoState.MOVE_FAILED;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return true if the files backed by this were deleted.
|
||||
*/
|
||||
public synchronized boolean isDeleted() {
|
||||
public boolean isDeleted() {
|
||||
return state == HistoryInfoState.DELETED;
|
||||
}
|
||||
|
||||
|
@ -566,13 +566,15 @@ public class HistoryFileManager extends AbstractService {
|
|||
int numMoveThreads = conf.getInt(
|
||||
JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
|
||||
JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
|
||||
moveToDoneExecutor = createMoveToDoneThreadPool(numMoveThreads);
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
protected ThreadPoolExecutor createMoveToDoneThreadPool(int numMoveThreads) {
|
||||
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
|
||||
"MoveIntermediateToDone Thread #%d").build();
|
||||
moveToDoneExecutor = new HadoopThreadPoolExecutor(numMoveThreads,
|
||||
numMoveThreads, 1, TimeUnit.HOURS,
|
||||
new LinkedBlockingQueue<Runnable>(), tf);
|
||||
|
||||
super.serviceInit(conf);
|
||||
return new HadoopThreadPoolExecutor(numMoveThreads, numMoveThreads,
|
||||
1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -708,6 +710,13 @@ public class HistoryFileManager extends AbstractService {
|
|||
}
|
||||
}
|
||||
|
||||
protected HistoryFileInfo createHistoryFileInfo(Path historyFile,
|
||||
Path confFile, Path summaryFile, JobIndexInfo jobIndexInfo,
|
||||
boolean isInDone) {
|
||||
return new HistoryFileInfo(
|
||||
historyFile, confFile, summaryFile, jobIndexInfo, isInDone);
|
||||
}
|
||||
|
||||
/**
|
||||
* Populates index data structures. Should only be called at initialization
|
||||
* times.
|
||||
|
@ -782,7 +791,7 @@ public class HistoryFileManager extends AbstractService {
|
|||
.getIntermediateConfFileName(jobIndexInfo.getJobId());
|
||||
String summaryFileName = JobHistoryUtils
|
||||
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
|
||||
HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
|
||||
HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs
|
||||
.getPath().getParent(), confFileName), new Path(fs.getPath()
|
||||
.getParent(), summaryFileName), jobIndexInfo, true);
|
||||
jobListCache.addIfAbsent(fileInfo);
|
||||
|
@ -880,7 +889,7 @@ public class HistoryFileManager extends AbstractService {
|
|||
.getIntermediateConfFileName(jobIndexInfo.getJobId());
|
||||
String summaryFileName = JobHistoryUtils
|
||||
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
|
||||
HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
|
||||
HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs
|
||||
.getPath().getParent(), confFileName), new Path(fs.getPath()
|
||||
.getParent(), summaryFileName), jobIndexInfo, false);
|
||||
|
||||
|
@ -940,7 +949,7 @@ public class HistoryFileManager extends AbstractService {
|
|||
.getIntermediateConfFileName(jobIndexInfo.getJobId());
|
||||
String summaryFileName = JobHistoryUtils
|
||||
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
|
||||
HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(
|
||||
HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(
|
||||
fs.getPath().getParent(), confFileName), new Path(fs.getPath()
|
||||
.getParent(), summaryFileName), jobIndexInfo, true);
|
||||
return fileInfo;
|
||||
|
@ -1105,7 +1114,7 @@ public class HistoryFileManager extends AbstractService {
|
|||
String confFileName = JobHistoryUtils
|
||||
.getIntermediateConfFileName(jobIndexInfo.getJobId());
|
||||
|
||||
fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path(
|
||||
fileInfo = createHistoryFileInfo(historyFile.getPath(), new Path(
|
||||
historyFile.getPath().getParent(), confFileName), null,
|
||||
jobIndexInfo, true);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,323 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.hs;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
* The test in this class is created specifically to address the issue in
|
||||
* MAPREDUCE-6684. In cases where there are two threads trying to load different
|
||||
* jobs through job history file manager, one thread could be blocked by the
|
||||
* other that is loading a huge job file, which is undesirable.
|
||||
*
|
||||
*/
|
||||
public class TestUnnecessaryBlockingOnHistoryFileInfo {
|
||||
/**
|
||||
* The intermediate done directory that JHS scans for completed jobs.
|
||||
*/
|
||||
private final static File INTERMEDIATE_DIR = new File("target",
|
||||
TestUnnecessaryBlockingOnHistoryFileInfo.class.getName() +
|
||||
"/intermediate");
|
||||
/**
|
||||
* A test user directory under intermediate done directory.
|
||||
*/
|
||||
private final static File USER_DIR = new File(INTERMEDIATE_DIR, "test");
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws IOException {
|
||||
if(USER_DIR.exists()) {
|
||||
FileUtils.cleanDirectory(USER_DIR);
|
||||
}
|
||||
USER_DIR.mkdirs();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void cleanUp() throws IOException {
|
||||
FileUtils.deleteDirectory(INTERMEDIATE_DIR);
|
||||
}
|
||||
|
||||
/**
|
||||
* This create a test case in which two threads are trying to load two
|
||||
* different jobs of the same user under the intermediate directory.
|
||||
* One thread should not be blocked by the other thread that is loading
|
||||
* a huge job files (This is simulated by hanging up parsing the job files
|
||||
* forever). The test will fail by triggering the timeout if one thread is
|
||||
* blocked by the other while the other thread is holding the lock on its
|
||||
* associated job files and hanging up parsing the files.
|
||||
*/
|
||||
@Test(timeout = 20000)
|
||||
public void testTwoThreadsQueryingDifferentJobOfSameUser()
|
||||
throws InterruptedException, IOException {
|
||||
final Configuration config = new Configuration();
|
||||
config.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR,
|
||||
INTERMEDIATE_DIR.getPath());
|
||||
config.setLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, Long.MAX_VALUE);
|
||||
|
||||
final JobId job1 = createJobId(0);
|
||||
final JobId job2 = createJobId(1);
|
||||
final HistoryFileManagerUnderContention historyFileManager =
|
||||
createHistoryFileManager(config, job1, job2);
|
||||
|
||||
Thread webRequest1 = null;
|
||||
Thread webRequest2 = null;
|
||||
try {
|
||||
/**
|
||||
* create a dummy .jhist file for job1, and try to load/parse the job
|
||||
* files in one child thread.
|
||||
*/
|
||||
createJhistFile(job1);
|
||||
webRequest1 = new Thread(
|
||||
new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
HistoryFileManager.HistoryFileInfo historyFileInfo =
|
||||
historyFileManager.getFileInfo(job1);
|
||||
historyFileInfo.loadJob();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
webRequest1.start();
|
||||
historyFileManager.waitUntilIntermediateDirIsScanned(job1);
|
||||
|
||||
/**
|
||||
* At this point, thread webRequest1 has finished scanning the
|
||||
* intermediate directory and is hanging up parsing the job files while
|
||||
* it's holding the lock on the associated HistoryFileInfo object.
|
||||
*/
|
||||
|
||||
/**
|
||||
* create a dummy .jhist file for job2 and try to load/parse the job files
|
||||
* in the other child thread. Because job files are not moved from the
|
||||
* intermediate directory to the done directory, thread webRequest2
|
||||
* will also see the job history files for job1.
|
||||
*/
|
||||
createJhistFile(job2);
|
||||
webRequest2 = new Thread(
|
||||
new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
HistoryFileManager.HistoryFileInfo historyFileInfo =
|
||||
historyFileManager.getFileInfo(job2);
|
||||
historyFileInfo.loadJob();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
webRequest2.start();
|
||||
historyFileManager.waitUntilIntermediateDirIsScanned(job2);
|
||||
|
||||
/**
|
||||
* If execution had gotten to this point, then thread webRequest2 would
|
||||
* not have tried to acquire the lock of the HistoryFileInfo object
|
||||
* associated job1, which is permanently held by thread webRequest1 that
|
||||
* is hanging up parsing the job history files, so it was able to proceed
|
||||
* with parsing job history files of job2.
|
||||
*/
|
||||
Assert.assertTrue("Thread 2 is blocked while it is trying to " +
|
||||
"load job2 by Thread 1 which is loading job1.",
|
||||
webRequest2.getState() != Thread.State.BLOCKED);
|
||||
} finally {
|
||||
if(webRequest1 != null) {
|
||||
webRequest1.interrupt();
|
||||
}
|
||||
if(webRequest2 != null) {
|
||||
webRequest2.interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create, initialize and start an instance of HistoryFileManager.
|
||||
* @param config the configuration to initialize the HistoryFileManager
|
||||
* instance.
|
||||
* @param jobIds the set of jobs expected to be loaded by HistoryFileManager.
|
||||
*/
|
||||
private HistoryFileManagerUnderContention createHistoryFileManager(
|
||||
Configuration config, JobId... jobIds) {
|
||||
HistoryFileManagerUnderContention historyFileManager =
|
||||
new HistoryFileManagerUnderContention(jobIds);
|
||||
historyFileManager.init(config);
|
||||
historyFileManager.start();
|
||||
return historyFileManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create, initialize and start an instance of CacheHistoryStorage.
|
||||
* @param config the config to initialize the storage
|
||||
* @param historyFileManager the HistoryFileManager to initializae the cache
|
||||
*/
|
||||
private static CachedHistoryStorage createHistoryStorage(
|
||||
Configuration config, HistoryFileManager historyFileManager) {
|
||||
CachedHistoryStorage historyStorage = new CachedHistoryStorage();
|
||||
historyStorage.setHistoryFileManager(historyFileManager);
|
||||
historyStorage.init(config);
|
||||
historyStorage.start();
|
||||
return historyStorage;
|
||||
}
|
||||
|
||||
private static JobId createJobId(int id) {
|
||||
JobId jobId = new JobIdPBImpl();
|
||||
jobId.setId(id);
|
||||
jobId.setAppId(ApplicationIdPBImpl.newInstance(0, id));
|
||||
return jobId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a dummy .jhist file under the intermediate directory for given job.
|
||||
* @param jobId the id of the given job
|
||||
* @return true if file is created successfully, false otherwise
|
||||
*/
|
||||
private static boolean createJhistFile(JobId jobId) throws IOException {
|
||||
StringBuilder fileName = new StringBuilder(jobId.toString());
|
||||
long finishTime = System.currentTimeMillis();
|
||||
fileName.append("-").append(finishTime - 1000)
|
||||
.append("-").append("test")
|
||||
.append("-").append(jobId.getId())
|
||||
.append("-").append(finishTime)
|
||||
.append(".jhist");
|
||||
File jhistFile = new File(USER_DIR, fileName.toString());
|
||||
return jhistFile.createNewFile();
|
||||
}
|
||||
|
||||
/**
|
||||
* A test implementation of HistoryFileManager that does not move files
|
||||
* from intermediate directory to done directory and hangs up parsing
|
||||
* job history files.
|
||||
*/
|
||||
class HistoryFileManagerUnderContention extends HistoryFileManager {
|
||||
/**
|
||||
* A map of job to a signal that indicates whether the intermediate
|
||||
* directory is done being scanned before the job files are parsed.
|
||||
*/
|
||||
private Map<JobId, CountDownLatch> scanningDoneSignals = new HashMap<>();
|
||||
|
||||
/**
|
||||
* A HistoryFileManager that expects to load given jobs and hangs up
|
||||
* parsing the job files. It perform no moving of files from the
|
||||
* intermediate directory to done directory.
|
||||
* @param jobId the set of jobs expected to load and parse
|
||||
*/
|
||||
public HistoryFileManagerUnderContention(JobId... jobId) {
|
||||
for(JobId job: jobId) {
|
||||
scanningDoneSignals.put(job, new CountDownLatch(1));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until scanning of the intermediate directory finishes and load
|
||||
* of the given job is started.
|
||||
*/
|
||||
public void waitUntilIntermediateDirIsScanned(JobId jobId)
|
||||
throws InterruptedException {
|
||||
if(scanningDoneSignals.containsKey(jobId)) {
|
||||
scanningDoneSignals.get(jobId).await();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a HistoryFileInfo instance that hangs on parsing job files.
|
||||
*/
|
||||
@Override
|
||||
protected HistoryFileManager.HistoryFileInfo createHistoryFileInfo(
|
||||
Path historyFile, Path confFile, Path summaryFile,
|
||||
JobIndexInfo jobIndexInfo, boolean isInDone) {
|
||||
return new HistoryFileInfo(historyFile, confFile, summaryFile,
|
||||
jobIndexInfo, isInDone,
|
||||
scanningDoneSignals.get(jobIndexInfo .getJobId()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a dummy ThreadPoolExecutor that does not execute submitted tasks.
|
||||
*/
|
||||
@Override
|
||||
protected ThreadPoolExecutor createMoveToDoneThreadPool(
|
||||
int numMoveThreads) {
|
||||
return mock(ThreadPoolExecutor.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* A HistoryFileInfo implementation that takes forever to parse the
|
||||
* associated job files. This mimics the behavior of parsing huge job files.
|
||||
*/
|
||||
class HistoryFileInfo extends HistoryFileManager.HistoryFileInfo {
|
||||
/**
|
||||
* A signal that indicates scanning of the intermediate directory is done
|
||||
* as HistoryFileManager is in the process of loading the HistoryFileInfo
|
||||
* instance.
|
||||
*/
|
||||
private final CountDownLatch scanningDoneSignal;
|
||||
|
||||
HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile,
|
||||
JobIndexInfo jobIndexInfo, boolean isInDone,
|
||||
CountDownLatch scanningDoneSignal) {
|
||||
super(historyFile, confFile, summaryFile, jobIndexInfo, isInDone);
|
||||
this.scanningDoneSignal = scanningDoneSignal;
|
||||
}
|
||||
|
||||
/**
|
||||
* An test implementation that takes forever to load a job in order to
|
||||
* mimic what happens when job files of large size are parsed in JHS.
|
||||
* Before loading, we signal that scanning of the intermediate directory
|
||||
* is finished.
|
||||
*/
|
||||
@Override
|
||||
public synchronized Job loadJob() throws IOException {
|
||||
if(scanningDoneSignal != null) {
|
||||
scanningDoneSignal.countDown();
|
||||
}
|
||||
while(!Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue