Merging r1536182 through r1536558 from trunk to branch HDFS-2832

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1536801 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-10-29 16:44:53 +00:00
commit 16359ec2c7
10 changed files with 267 additions and 9 deletions

View File

@ -446,6 +446,9 @@ Release 2.2.1 - UNRELEASED
HDFS-5171. NFS should create input stream for a file and try to share it
with multiple read requests. (Haohui Mai via brandonli)
HDFS-5413. hdfs.cmd does not support passthrough to any arbitrary class.
(cnauroth)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -47,7 +47,17 @@ if "%1" == "--config" (
goto print_usage
)
call :%hdfs-command% %hdfs-command-arguments%
set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir
for %%i in ( %hdfscommands% ) do (
if %hdfs-command% == %%i set hdfscommand=true
)
if defined hdfscommand (
call :%hdfs-command%
) else (
set CLASSPATH=%CLASSPATH%;%CD%
set CLASS=%hdfs-command%
)
set java_arguments=%JAVA_HEAP_MAX% %HADOOP_OPTS% -classpath %CLASSPATH% %CLASS% %hdfs-command-arguments%
call %JAVA% %java_arguments%
@ -58,6 +68,11 @@ goto :eof
set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_NAMENODE_OPTS%
goto :eof
:journalnode
set CLASS=org.apache.hadoop.hdfs.qjournal.server.JournalNode
set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_JOURNALNODE_OPTS%
goto :eof
:zkfc
set CLASS=org.apache.hadoop.hdfs.tools.DFSZKFailoverController
set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_ZKFC_OPTS%
@ -161,9 +176,11 @@ goto :eof
@echo namenode -format format the DFS filesystem
@echo secondarynamenode run the DFS secondary namenode
@echo namenode run the DFS namenode
@echo journalnode run the DFS journalnode
@echo zkfc run the ZK Failover Controller daemon
@echo datanode run a DFS datanode
@echo dfsadmin run a DFS admin client
@echo haadmin run a DFS HA admin client
@echo fsck run a DFS filesystem checking utility
@echo balancer run a cluster balancing utility
@echo jmxget get JMX exported values from NameNode or DataNode.

View File

@ -214,6 +214,9 @@ Release 2.2.1 - UNRELEASED
OPTIMIZATIONS
MAPREDUCE-4680. Job history cleaner should only check timestamps of files in
old enough directories (Robert Kanter via Sandy Ryza)
BUG FIXES
MAPREDUCE-5569. FloatSplitter is not generating correct splits (Nathan

View File

@ -21,6 +21,7 @@
import java.io.File;
import java.io.IOException;
import java.util.Calendar;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@ -499,4 +500,72 @@ public static Path getPreviousJobHistoryPath(
return fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
histDirPath,jobId, (applicationAttemptId.getAttemptId() - 1)));
}
/**
* Looks for the dirs to clean. The folder structure is YYYY/MM/DD/Serial so
* we can use that to more efficiently find the directories to clean by
* comparing the cutoff timestamp with the timestamp from the folder
* structure.
*
* @param fc done dir FileContext
* @param root folder for completed jobs
* @param cutoff The cutoff for the max history age
* @return The list of directories for cleaning
* @throws IOException
*/
public static List<FileStatus> getHistoryDirsForCleaning(FileContext fc,
Path root, long cutoff) throws IOException {
List<FileStatus> fsList = new ArrayList<FileStatus>();
Calendar cCal = Calendar.getInstance();
cCal.setTimeInMillis(cutoff);
int cYear = cCal.get(Calendar.YEAR);
int cMonth = cCal.get(Calendar.MONTH) + 1;
int cDate = cCal.get(Calendar.DATE);
RemoteIterator<FileStatus> yearDirIt = fc.listStatus(root);
while (yearDirIt.hasNext()) {
FileStatus yearDir = yearDirIt.next();
try {
int year = Integer.parseInt(yearDir.getPath().getName());
if (year <= cYear) {
RemoteIterator<FileStatus> monthDirIt =
fc.listStatus(yearDir.getPath());
while (monthDirIt.hasNext()) {
FileStatus monthDir = monthDirIt.next();
try {
int month = Integer.parseInt(monthDir.getPath().getName());
// If we only checked the month here, then something like 07/2013
// would incorrectly not pass when the cutoff is 06/2014
if (year < cYear || month <= cMonth) {
RemoteIterator<FileStatus> dateDirIt =
fc.listStatus(monthDir.getPath());
while (dateDirIt.hasNext()) {
FileStatus dateDir = dateDirIt.next();
try {
int date = Integer.parseInt(dateDir.getPath().getName());
// If we only checked the date here, then something like
// 07/21/2013 would incorrectly not pass when the cutoff is
// 08/20/2013 or 07/20/2012
if (year < cYear || month < cMonth || date <= cDate) {
fsList.addAll(remoteIterToList(
fc.listStatus(dateDir.getPath())));
}
} catch (NumberFormatException nfe) {
// the directory didn't fit the format we're looking for so
// skip the dir
}
}
}
} catch (NumberFormatException nfe) {
// the directory didn't fit the format we're looking for so skip
// the dir
}
}
}
} catch (NumberFormatException nfe) {
// the directory didn't fit the format we're looking for so skip the dir
}
}
return fsList;
}
}

View File

@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.jobhistory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.junit.Assert;
import org.junit.Test;
public class TestJobHistoryUtils {
final static String TEST_DIR = new File(System.getProperty("test.build.data"))
.getAbsolutePath();
@Test
@SuppressWarnings("unchecked")
public void testGetHistoryDirsForCleaning() throws IOException {
Path pRoot = new Path(TEST_DIR, "org.apache.hadoop.mapreduce.v2.jobhistory."
+ "TestJobHistoryUtils.testGetHistoryDirsForCleaning");
FileContext fc = FileContext.getFileContext();
Calendar cCal = Calendar.getInstance();
int year = 2013;
int month = 7;
int day = 21;
cCal.set(year, month - 1, day, 1, 0);
long cutoff = cCal.getTimeInMillis();
clearDir(fc, pRoot);
Path pId00 = createPath(fc, pRoot, year, month, day, "000000");
Path pId01 = createPath(fc, pRoot, year, month, day+1, "000001");
Path pId02 = createPath(fc, pRoot, year, month, day-1, "000002");
Path pId03 = createPath(fc, pRoot, year, month+1, day, "000003");
Path pId04 = createPath(fc, pRoot, year, month+1, day+1, "000004");
Path pId05 = createPath(fc, pRoot, year, month+1, day-1, "000005");
Path pId06 = createPath(fc, pRoot, year, month-1, day, "000006");
Path pId07 = createPath(fc, pRoot, year, month-1, day+1, "000007");
Path pId08 = createPath(fc, pRoot, year, month-1, day-1, "000008");
Path pId09 = createPath(fc, pRoot, year+1, month, day, "000009");
Path pId10 = createPath(fc, pRoot, year+1, month, day+1, "000010");
Path pId11 = createPath(fc, pRoot, year+1, month, day-1, "000011");
Path pId12 = createPath(fc, pRoot, year+1, month+1, day, "000012");
Path pId13 = createPath(fc, pRoot, year+1, month+1, day+1, "000013");
Path pId14 = createPath(fc, pRoot, year+1, month+1, day-1, "000014");
Path pId15 = createPath(fc, pRoot, year+1, month-1, day, "000015");
Path pId16 = createPath(fc, pRoot, year+1, month-1, day+1, "000016");
Path pId17 = createPath(fc, pRoot, year+1, month-1, day-1, "000017");
Path pId18 = createPath(fc, pRoot, year-1, month, day, "000018");
Path pId19 = createPath(fc, pRoot, year-1, month, day+1, "000019");
Path pId20 = createPath(fc, pRoot, year-1, month, day-1, "000020");
Path pId21 = createPath(fc, pRoot, year-1, month+1, day, "000021");
Path pId22 = createPath(fc, pRoot, year-1, month+1, day+1, "000022");
Path pId23 = createPath(fc, pRoot, year-1, month+1, day-1, "000023");
Path pId24 = createPath(fc, pRoot, year-1, month-1, day, "000024");
Path pId25 = createPath(fc, pRoot, year-1, month-1, day+1, "000025");
Path pId26 = createPath(fc, pRoot, year-1, month-1, day-1, "000026");
// non-expected names should be ignored without problems
Path pId27 = createPath(fc, pRoot, "foo", "" + month, "" + day, "000027");
Path pId28 = createPath(fc, pRoot, "" + year, "foo", "" + day, "000028");
Path pId29 = createPath(fc, pRoot, "" + year, "" + month, "foo", "000029");
List<FileStatus> dirs = JobHistoryUtils
.getHistoryDirsForCleaning(fc, pRoot, cutoff);
Collections.sort(dirs);
Assert.assertEquals(14, dirs.size());
Assert.assertEquals(pId26.toUri().getPath(),
dirs.get(0).getPath().toUri().getPath());
Assert.assertEquals(pId24.toUri().getPath(),
dirs.get(1).getPath().toUri().getPath());
Assert.assertEquals(pId25.toUri().getPath(),
dirs.get(2).getPath().toUri().getPath());
Assert.assertEquals(pId20.toUri().getPath(),
dirs.get(3).getPath().toUri().getPath());
Assert.assertEquals(pId18.toUri().getPath(),
dirs.get(4).getPath().toUri().getPath());
Assert.assertEquals(pId19.toUri().getPath(),
dirs.get(5).getPath().toUri().getPath());
Assert.assertEquals(pId23.toUri().getPath(),
dirs.get(6).getPath().toUri().getPath());
Assert.assertEquals(pId21.toUri().getPath(),
dirs.get(7).getPath().toUri().getPath());
Assert.assertEquals(pId22.toUri().getPath(),
dirs.get(8).getPath().toUri().getPath());
Assert.assertEquals(pId08.toUri().getPath(),
dirs.get(9).getPath().toUri().getPath());
Assert.assertEquals(pId06.toUri().getPath(),
dirs.get(10).getPath().toUri().getPath());
Assert.assertEquals(pId07.toUri().getPath(),
dirs.get(11).getPath().toUri().getPath());
Assert.assertEquals(pId02.toUri().getPath(),
dirs.get(12).getPath().toUri().getPath());
Assert.assertEquals(pId00.toUri().getPath(),
dirs.get(13).getPath().toUri().getPath());
}
private void clearDir(FileContext fc, Path p) throws IOException {
try {
fc.delete(p, true);
} catch (FileNotFoundException e) {
// ignore
}
fc.mkdir(p, FsPermission.getDirDefault(), false);
}
private Path createPath(FileContext fc, Path root, int year, int month,
int day, String id) throws IOException {
Path path = new Path(root, year + Path.SEPARATOR + month + Path.SEPARATOR +
day + Path.SEPARATOR + id);
fc.mkdir(path, FsPermission.getDirDefault(), true);
return path;
}
private Path createPath(FileContext fc, Path root, String year, String month,
String day, String id) throws IOException {
Path path = new Path(root, year + Path.SEPARATOR + month + Path.SEPARATOR +
day + Path.SEPARATOR + id);
fc.mkdir(path, FsPermission.getDirDefault(), true);
return path;
}
}

View File

@ -924,6 +924,11 @@ private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException {
fileInfo.delete();
}
List<FileStatus> getHistoryDirsForCleaning(long cutoff) throws IOException {
return JobHistoryUtils.
getHistoryDirsForCleaning(doneDirFc, doneDirPrefixPath, cutoff);
}
/**
* Clean up older history files.
*
@ -932,12 +937,9 @@ private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException {
*/
@SuppressWarnings("unchecked")
void clean() throws IOException {
// TODO this should be replaced by something that knows about the directory
// structure and will put less of a load on HDFS.
long cutoff = System.currentTimeMillis() - maxHistoryAge;
boolean halted = false;
// TODO Delete YYYY/MM/DD directories.
List<FileStatus> serialDirList = findTimestampedDirectories();
List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff);
// Sort in ascending order. Relies on YYYY/MM/DD/Serial
Collections.sort(serialDirList);
for (FileStatus serialDir : serialDirList) {

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.junit.After;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
@ -175,7 +176,8 @@ public void testRefreshJobRetentionSettings() throws IOException,
doReturn(list2).when(historyManager).scanDirectoryForHistoryFiles(
eq(donePathToday), any(FileContext.class));
doReturn(fileStatusList).when(historyManager).findTimestampedDirectories();
doReturn(fileStatusList).when(historyManager)
.getHistoryDirsForCleaning(Mockito.anyLong());
doReturn(true).when(historyManager).deleteDir(any(FileStatus.class));
JobListCache jobListCache = mock(JobListCache.class);

View File

@ -148,6 +148,11 @@ Release 2.2.1 - UNRELEASED
YARN-1330. Fair Scheduler: defaultQueueSchedulingPolicy does not take effect
(Sandy Ryza)
YARN-1022. Unnecessary INFO logs in AMRMClientAsync (haosdent via bikas)
YARN-1349. yarn.cmd does not support passthrough to any arbitrary class.
(cnauroth)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -133,7 +133,21 @@ if "%1" == "--config" (
set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\%YARN_DIR%\*
set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\%YARN_LIB_JARS_DIR%\*
call :%yarn-command% %yarn-command-arguments%
if %yarn-command% == classpath (
@echo %CLASSPATH%
goto :eof
)
set yarncommands=resourcemanager nodemanager proxyserver rmadmin version jar application node logs daemonlog
for %%i in ( %yarncommands% ) do (
if %yarn-command% == %%i set yarncommand=true
)
if defined yarncommand (
call :%yarn-command%
) else (
set CLASSPATH=%CLASSPATH%;%CD%
set CLASS=%yarn-command%
)
if defined JAVA_LIBRARY_PATH (
set YARN_OPTS=%YARN_OPTS% -Djava.library.path=%JAVA_LIBRARY_PATH%

View File

@ -240,7 +240,7 @@ public void run() {
}
break;
} catch (InterruptedException ex) {
LOG.info("Interrupted while waiting to put on response queue", ex);
LOG.debug("Interrupted while waiting to put on response queue", ex);
}
}
}
@ -248,7 +248,7 @@ public void run() {
try {
Thread.sleep(heartbeatIntervalMs.get());
} catch (InterruptedException ex) {
LOG.info("Heartbeater interrupted", ex);
LOG.debug("Heartbeater interrupted", ex);
}
}
}