MAPREDUCE-4680. Job history cleaner should only check timestamps of files in old enough directories (Robert Kanter via Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1536558 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a2d81e3017
commit
84cec3c805
|
@ -214,6 +214,9 @@ Release 2.2.1 - UNRELEASED
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
|
MAPREDUCE-4680. Job history cleaner should only check timestamps of files in
|
||||||
|
old enough directories (Robert Kanter via Sandy Ryza)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
MAPREDUCE-5569. FloatSplitter is not generating correct splits (Nathan
|
MAPREDUCE-5569. FloatSplitter is not generating correct splits (Nathan
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.jobhistory;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -499,4 +500,72 @@ public class JobHistoryUtils {
|
||||||
return fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
|
return fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
|
||||||
histDirPath,jobId, (applicationAttemptId.getAttemptId() - 1)));
|
histDirPath,jobId, (applicationAttemptId.getAttemptId() - 1)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Looks for the dirs to clean. The folder structure is YYYY/MM/DD/Serial so
|
||||||
|
* we can use that to more efficiently find the directories to clean by
|
||||||
|
* comparing the cutoff timestamp with the timestamp from the folder
|
||||||
|
* structure.
|
||||||
|
*
|
||||||
|
* @param fc done dir FileContext
|
||||||
|
* @param root folder for completed jobs
|
||||||
|
* @param cutoff The cutoff for the max history age
|
||||||
|
* @return The list of directories for cleaning
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static List<FileStatus> getHistoryDirsForCleaning(FileContext fc,
|
||||||
|
Path root, long cutoff) throws IOException {
|
||||||
|
List<FileStatus> fsList = new ArrayList<FileStatus>();
|
||||||
|
Calendar cCal = Calendar.getInstance();
|
||||||
|
cCal.setTimeInMillis(cutoff);
|
||||||
|
int cYear = cCal.get(Calendar.YEAR);
|
||||||
|
int cMonth = cCal.get(Calendar.MONTH) + 1;
|
||||||
|
int cDate = cCal.get(Calendar.DATE);
|
||||||
|
|
||||||
|
RemoteIterator<FileStatus> yearDirIt = fc.listStatus(root);
|
||||||
|
while (yearDirIt.hasNext()) {
|
||||||
|
FileStatus yearDir = yearDirIt.next();
|
||||||
|
try {
|
||||||
|
int year = Integer.parseInt(yearDir.getPath().getName());
|
||||||
|
if (year <= cYear) {
|
||||||
|
RemoteIterator<FileStatus> monthDirIt =
|
||||||
|
fc.listStatus(yearDir.getPath());
|
||||||
|
while (monthDirIt.hasNext()) {
|
||||||
|
FileStatus monthDir = monthDirIt.next();
|
||||||
|
try {
|
||||||
|
int month = Integer.parseInt(monthDir.getPath().getName());
|
||||||
|
// If we only checked the month here, then something like 07/2013
|
||||||
|
// would incorrectly not pass when the cutoff is 06/2014
|
||||||
|
if (year < cYear || month <= cMonth) {
|
||||||
|
RemoteIterator<FileStatus> dateDirIt =
|
||||||
|
fc.listStatus(monthDir.getPath());
|
||||||
|
while (dateDirIt.hasNext()) {
|
||||||
|
FileStatus dateDir = dateDirIt.next();
|
||||||
|
try {
|
||||||
|
int date = Integer.parseInt(dateDir.getPath().getName());
|
||||||
|
// If we only checked the date here, then something like
|
||||||
|
// 07/21/2013 would incorrectly not pass when the cutoff is
|
||||||
|
// 08/20/2013 or 07/20/2012
|
||||||
|
if (year < cYear || month < cMonth || date <= cDate) {
|
||||||
|
fsList.addAll(remoteIterToList(
|
||||||
|
fc.listStatus(dateDir.getPath())));
|
||||||
|
}
|
||||||
|
} catch (NumberFormatException nfe) {
|
||||||
|
// the directory didn't fit the format we're looking for so
|
||||||
|
// skip the dir
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (NumberFormatException nfe) {
|
||||||
|
// the directory didn't fit the format we're looking for so skip
|
||||||
|
// the dir
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (NumberFormatException nfe) {
|
||||||
|
// the directory didn't fit the format we're looking for so skip the dir
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fsList;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,143 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.v2.jobhistory;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Calendar;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestJobHistoryUtils {
|
||||||
|
|
||||||
|
final static String TEST_DIR = new File(System.getProperty("test.build.data"))
|
||||||
|
.getAbsolutePath();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testGetHistoryDirsForCleaning() throws IOException {
|
||||||
|
Path pRoot = new Path(TEST_DIR, "org.apache.hadoop.mapreduce.v2.jobhistory."
|
||||||
|
+ "TestJobHistoryUtils.testGetHistoryDirsForCleaning");
|
||||||
|
FileContext fc = FileContext.getFileContext();
|
||||||
|
Calendar cCal = Calendar.getInstance();
|
||||||
|
int year = 2013;
|
||||||
|
int month = 7;
|
||||||
|
int day = 21;
|
||||||
|
cCal.set(year, month - 1, day, 1, 0);
|
||||||
|
long cutoff = cCal.getTimeInMillis();
|
||||||
|
|
||||||
|
clearDir(fc, pRoot);
|
||||||
|
Path pId00 = createPath(fc, pRoot, year, month, day, "000000");
|
||||||
|
Path pId01 = createPath(fc, pRoot, year, month, day+1, "000001");
|
||||||
|
Path pId02 = createPath(fc, pRoot, year, month, day-1, "000002");
|
||||||
|
Path pId03 = createPath(fc, pRoot, year, month+1, day, "000003");
|
||||||
|
Path pId04 = createPath(fc, pRoot, year, month+1, day+1, "000004");
|
||||||
|
Path pId05 = createPath(fc, pRoot, year, month+1, day-1, "000005");
|
||||||
|
Path pId06 = createPath(fc, pRoot, year, month-1, day, "000006");
|
||||||
|
Path pId07 = createPath(fc, pRoot, year, month-1, day+1, "000007");
|
||||||
|
Path pId08 = createPath(fc, pRoot, year, month-1, day-1, "000008");
|
||||||
|
Path pId09 = createPath(fc, pRoot, year+1, month, day, "000009");
|
||||||
|
Path pId10 = createPath(fc, pRoot, year+1, month, day+1, "000010");
|
||||||
|
Path pId11 = createPath(fc, pRoot, year+1, month, day-1, "000011");
|
||||||
|
Path pId12 = createPath(fc, pRoot, year+1, month+1, day, "000012");
|
||||||
|
Path pId13 = createPath(fc, pRoot, year+1, month+1, day+1, "000013");
|
||||||
|
Path pId14 = createPath(fc, pRoot, year+1, month+1, day-1, "000014");
|
||||||
|
Path pId15 = createPath(fc, pRoot, year+1, month-1, day, "000015");
|
||||||
|
Path pId16 = createPath(fc, pRoot, year+1, month-1, day+1, "000016");
|
||||||
|
Path pId17 = createPath(fc, pRoot, year+1, month-1, day-1, "000017");
|
||||||
|
Path pId18 = createPath(fc, pRoot, year-1, month, day, "000018");
|
||||||
|
Path pId19 = createPath(fc, pRoot, year-1, month, day+1, "000019");
|
||||||
|
Path pId20 = createPath(fc, pRoot, year-1, month, day-1, "000020");
|
||||||
|
Path pId21 = createPath(fc, pRoot, year-1, month+1, day, "000021");
|
||||||
|
Path pId22 = createPath(fc, pRoot, year-1, month+1, day+1, "000022");
|
||||||
|
Path pId23 = createPath(fc, pRoot, year-1, month+1, day-1, "000023");
|
||||||
|
Path pId24 = createPath(fc, pRoot, year-1, month-1, day, "000024");
|
||||||
|
Path pId25 = createPath(fc, pRoot, year-1, month-1, day+1, "000025");
|
||||||
|
Path pId26 = createPath(fc, pRoot, year-1, month-1, day-1, "000026");
|
||||||
|
// non-expected names should be ignored without problems
|
||||||
|
Path pId27 = createPath(fc, pRoot, "foo", "" + month, "" + day, "000027");
|
||||||
|
Path pId28 = createPath(fc, pRoot, "" + year, "foo", "" + day, "000028");
|
||||||
|
Path pId29 = createPath(fc, pRoot, "" + year, "" + month, "foo", "000029");
|
||||||
|
|
||||||
|
List<FileStatus> dirs = JobHistoryUtils
|
||||||
|
.getHistoryDirsForCleaning(fc, pRoot, cutoff);
|
||||||
|
Collections.sort(dirs);
|
||||||
|
Assert.assertEquals(14, dirs.size());
|
||||||
|
Assert.assertEquals(pId26.toUri().getPath(),
|
||||||
|
dirs.get(0).getPath().toUri().getPath());
|
||||||
|
Assert.assertEquals(pId24.toUri().getPath(),
|
||||||
|
dirs.get(1).getPath().toUri().getPath());
|
||||||
|
Assert.assertEquals(pId25.toUri().getPath(),
|
||||||
|
dirs.get(2).getPath().toUri().getPath());
|
||||||
|
Assert.assertEquals(pId20.toUri().getPath(),
|
||||||
|
dirs.get(3).getPath().toUri().getPath());
|
||||||
|
Assert.assertEquals(pId18.toUri().getPath(),
|
||||||
|
dirs.get(4).getPath().toUri().getPath());
|
||||||
|
Assert.assertEquals(pId19.toUri().getPath(),
|
||||||
|
dirs.get(5).getPath().toUri().getPath());
|
||||||
|
Assert.assertEquals(pId23.toUri().getPath(),
|
||||||
|
dirs.get(6).getPath().toUri().getPath());
|
||||||
|
Assert.assertEquals(pId21.toUri().getPath(),
|
||||||
|
dirs.get(7).getPath().toUri().getPath());
|
||||||
|
Assert.assertEquals(pId22.toUri().getPath(),
|
||||||
|
dirs.get(8).getPath().toUri().getPath());
|
||||||
|
Assert.assertEquals(pId08.toUri().getPath(),
|
||||||
|
dirs.get(9).getPath().toUri().getPath());
|
||||||
|
Assert.assertEquals(pId06.toUri().getPath(),
|
||||||
|
dirs.get(10).getPath().toUri().getPath());
|
||||||
|
Assert.assertEquals(pId07.toUri().getPath(),
|
||||||
|
dirs.get(11).getPath().toUri().getPath());
|
||||||
|
Assert.assertEquals(pId02.toUri().getPath(),
|
||||||
|
dirs.get(12).getPath().toUri().getPath());
|
||||||
|
Assert.assertEquals(pId00.toUri().getPath(),
|
||||||
|
dirs.get(13).getPath().toUri().getPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void clearDir(FileContext fc, Path p) throws IOException {
|
||||||
|
try {
|
||||||
|
fc.delete(p, true);
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
fc.mkdir(p, FsPermission.getDirDefault(), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Path createPath(FileContext fc, Path root, int year, int month,
|
||||||
|
int day, String id) throws IOException {
|
||||||
|
Path path = new Path(root, year + Path.SEPARATOR + month + Path.SEPARATOR +
|
||||||
|
day + Path.SEPARATOR + id);
|
||||||
|
fc.mkdir(path, FsPermission.getDirDefault(), true);
|
||||||
|
return path;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Path createPath(FileContext fc, Path root, String year, String month,
|
||||||
|
String day, String id) throws IOException {
|
||||||
|
Path path = new Path(root, year + Path.SEPARATOR + month + Path.SEPARATOR +
|
||||||
|
day + Path.SEPARATOR + id);
|
||||||
|
fc.mkdir(path, FsPermission.getDirDefault(), true);
|
||||||
|
return path;
|
||||||
|
}
|
||||||
|
}
|
|
@ -924,6 +924,11 @@ public class HistoryFileManager extends AbstractService {
|
||||||
fileInfo.delete();
|
fileInfo.delete();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
List<FileStatus> getHistoryDirsForCleaning(long cutoff) throws IOException {
|
||||||
|
return JobHistoryUtils.
|
||||||
|
getHistoryDirsForCleaning(doneDirFc, doneDirPrefixPath, cutoff);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clean up older history files.
|
* Clean up older history files.
|
||||||
*
|
*
|
||||||
|
@ -932,12 +937,9 @@ public class HistoryFileManager extends AbstractService {
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
void clean() throws IOException {
|
void clean() throws IOException {
|
||||||
// TODO this should be replaced by something that knows about the directory
|
|
||||||
// structure and will put less of a load on HDFS.
|
|
||||||
long cutoff = System.currentTimeMillis() - maxHistoryAge;
|
long cutoff = System.currentTimeMillis() - maxHistoryAge;
|
||||||
boolean halted = false;
|
boolean halted = false;
|
||||||
// TODO Delete YYYY/MM/DD directories.
|
List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff);
|
||||||
List<FileStatus> serialDirList = findTimestampedDirectories();
|
|
||||||
// Sort in ascending order. Relies on YYYY/MM/DD/Serial
|
// Sort in ascending order. Relies on YYYY/MM/DD/Serial
|
||||||
Collections.sort(serialDirList);
|
Collections.sort(serialDirList);
|
||||||
for (FileStatus serialDir : serialDirList) {
|
for (FileStatus serialDir : serialDirList) {
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.*;
|
||||||
|
@ -175,7 +176,8 @@ public class TestJobHistory {
|
||||||
doReturn(list2).when(historyManager).scanDirectoryForHistoryFiles(
|
doReturn(list2).when(historyManager).scanDirectoryForHistoryFiles(
|
||||||
eq(donePathToday), any(FileContext.class));
|
eq(donePathToday), any(FileContext.class));
|
||||||
|
|
||||||
doReturn(fileStatusList).when(historyManager).findTimestampedDirectories();
|
doReturn(fileStatusList).when(historyManager)
|
||||||
|
.getHistoryDirsForCleaning(Mockito.anyLong());
|
||||||
doReturn(true).when(historyManager).deleteDir(any(FileStatus.class));
|
doReturn(true).when(historyManager).deleteDir(any(FileStatus.class));
|
||||||
|
|
||||||
JobListCache jobListCache = mock(JobListCache.class);
|
JobListCache jobListCache = mock(JobListCache.class);
|
||||||
|
|
Loading…
Reference in New Issue