Merge trunk into branch

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1390199 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-09-25 22:43:04 +00:00
commit b1ceaac3e6
86 changed files with 1687 additions and 469 deletions

274
dev-support/relnotes.py Normal file
View File

@ -0,0 +1,274 @@
#!/usr/bin/python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import sys
from optparse import OptionParser
import httplib
import urllib
import cgi
try:
import json
except ImportError:
import simplejson as json
namePattern = re.compile(r' \([0-9]+\)')
def clean(str):
return quoteHtml(re.sub(namePattern, "", str))
def formatComponents(str):
str = re.sub(namePattern, '', str).replace("'", "")
if str != "":
ret = "(" + str + ")"
else:
ret = ""
return quoteHtml(ret)
def quoteHtml(str):
return cgi.escape(str).encode('ascii', 'xmlcharrefreplace')
def mstr(obj):
if (obj == None):
return ""
return unicode(obj)
class Version:
"""Represents a version number"""
def __init__(self, data):
self.mod = False
self.data = data
found = re.match('^((\d+)(\.\d+)*).*$', data)
if (found):
self.parts = [ int(p) for p in found.group(1).split('.') ]
else:
self.parts = []
# backfill version with zeroes if missing parts
self.parts.extend((0,) * (3 - len(self.parts)))
def decBugFix(self):
self.mod = True
self.parts[2] -= 1
return self
def __str__(self):
if (self.mod):
return '.'.join([ str(p) for p in self.parts ])
return self.data
def __cmp__(self, other):
return cmp(self.parts, other.parts)
class Jira:
"""A single JIRA"""
def __init__(self, data, parent):
self.key = data['key']
self.fields = data['fields']
self.parent = parent
self.notes = None
def getId(self):
return mstr(self.key)
def getDescription(self):
return mstr(self.fields['description'])
def getReleaseNote(self):
if (self.notes == None):
field = self.parent.fieldIdMap['Release Note']
if (self.fields.has_key(field)):
self.notes=mstr(self.fields[field])
else:
self.notes=self.getDescription()
return self.notes
def getPriority(self):
ret = ""
pri = self.fields['priority']
if(pri != None):
ret = pri['name']
return mstr(ret)
def getAssignee(self):
ret = ""
mid = self.fields['assignee']
if(mid != None):
ret = mid['displayName']
return mstr(ret)
def getComponents(self):
return " , ".join([ comp['name'] for comp in self.fields['components'] ])
def getSummary(self):
return self.fields['summary']
def getType(self):
ret = ""
mid = self.fields['issuetype']
if(mid != None):
ret = mid['name']
return mstr(ret)
def getReporter(self):
ret = ""
mid = self.fields['reporter']
if(mid != None):
ret = mid['displayName']
return mstr(ret)
def getProject(self):
ret = ""
mid = self.fields['project']
if(mid != None):
ret = mid['key']
return mstr(ret)
class JiraIter:
"""An Iterator of JIRAs"""
def __init__(self, versions):
self.versions = versions
resp = urllib.urlopen("https://issues.apache.org/jira/rest/api/2/field")
data = json.loads(resp.read())
self.fieldIdMap = {}
for part in data:
self.fieldIdMap[part['name']] = part['id']
self.jiras = []
at=0
end=1
count=100
while (at < end):
params = urllib.urlencode({'jql': "project in (HADOOP,HDFS,MAPREDUCE,YARN) and fixVersion in ('"+"' , '".join(versions)+"') and resolution = Fixed", 'startAt':at+1, 'maxResults':count})
resp = urllib.urlopen("https://issues.apache.org/jira/rest/api/2/search?%s"%params)
data = json.loads(resp.read())
if (data.has_key('errorMessages')):
raise Exception(data['errorMessages'])
at = data['startAt'] + data['maxResults']
end = data['total']
self.jiras.extend(data['issues'])
self.iter = self.jiras.__iter__()
def __iter__(self):
return self
def next(self):
data = self.iter.next()
j = Jira(data, self)
return j
class Outputs:
"""Several different files to output to at the same time"""
def __init__(self, base_file_name, file_name_pattern, keys, params={}):
self.params = params
self.base = open(base_file_name%params, 'w')
self.others = {}
for key in keys:
both = dict(params)
both['key'] = key
self.others[key] = open(file_name_pattern%both, 'w')
def writeAll(self, pattern):
both = dict(self.params)
both['key'] = ''
self.base.write(pattern%both)
for key in self.others.keys():
both = dict(self.params)
both['key'] = key
self.others[key].write(pattern%both)
def writeKeyRaw(self, key, str):
self.base.write(str)
if (self.others.has_key(key)):
self.others[key].write(str)
def close(self):
self.base.close()
for fd in self.others.values():
fd.close()
def main():
parser = OptionParser(usage="usage: %prog [options] [USER-ignored] [PASSWORD-ignored] [VERSION]")
parser.add_option("-v", "--version", dest="versions",
action="append", type="string",
help="versions in JIRA to include in releasenotes", metavar="VERSION")
parser.add_option("--previousVer", dest="previousVer",
action="store", type="string",
help="previous version to include in releasenotes", metavar="VERSION")
(options, args) = parser.parse_args()
if (options.versions == None):
options.versions = []
if (len(args) > 2):
options.versions.append(args[2])
if (len(options.versions) <= 0):
parser.error("At least one version needs to be supplied")
versions = [ Version(v) for v in options.versions];
versions.sort();
maxVersion = str(versions[-1])
if(options.previousVer == None):
options.previousVer = str(versions[0].decBugFix())
print >> sys.stderr, "WARNING: no previousVersion given, guessing it is "+options.previousVer
list = JiraIter(options.versions)
version = maxVersion
outputs = Outputs("releasenotes.%(ver)s.html",
"releasenotes.%(key)s.%(ver)s.html",
["HADOOP","HDFS","MAPREDUCE","YARN"], {"ver":maxVersion, "previousVer":options.previousVer})
head = '<META http-equiv="Content-Type" content="text/html; charset=UTF-8">\n' \
'<title>Hadoop %(key)s %(ver)s Release Notes</title>\n' \
'<STYLE type="text/css">\n' \
' H1 {font-family: sans-serif}\n' \
' H2 {font-family: sans-serif; margin-left: 7mm}\n' \
' TABLE {margin-left: 7mm}\n' \
'</STYLE>\n' \
'</head>\n' \
'<body>\n' \
'<h1>Hadoop %(key)s %(ver)s Release Notes</h1>\n' \
'These release notes include new developer and user-facing incompatibilities, features, and major improvements. \n' \
'<a name="changes"/>\n' \
'<h2>Changes since Hadoop %(previousVer)s</h2>\n' \
'<ul>\n'
outputs.writeAll(head)
for jira in list:
line = '<li> <a href="https://issues.apache.org/jira/browse/%s">%s</a>.\n' \
' %s %s reported by %s and fixed by %s %s<br>\n' \
' <b>%s</b><br>\n' \
' <blockquote>%s</blockquote></li>\n' \
% (quoteHtml(jira.getId()), quoteHtml(jira.getId()), clean(jira.getPriority()), clean(jira.getType()).lower(),
quoteHtml(jira.getReporter()), quoteHtml(jira.getAssignee()), formatComponents(jira.getComponents()),
quoteHtml(jira.getSummary()), quoteHtml(jira.getReleaseNote()))
outputs.writeKeyRaw(jira.getProject(), line)
outputs.writeAll("</ul>\n</body></html>\n")
outputs.close()
if __name__ == "__main__":
main()

View File

@ -250,7 +250,7 @@ verifyPatch () {
echo "PATCH APPLICATION FAILED"
JIRA_COMMENT="$JIRA_COMMENT
-1 patch. The patch command could not apply the patch."
{color:red}-1 patch{color}. The patch command could not apply the patch."
return 1
else
return 0
@ -305,12 +305,12 @@ checkAuthor () {
if [[ $authorTags != 0 ]] ; then
JIRA_COMMENT="$JIRA_COMMENT
-1 @author. The patch appears to contain $authorTags @author tags which the Hadoop community has agreed to not allow in code contributions."
{color:red}-1 @author{color}. The patch appears to contain $authorTags @author tags which the Hadoop community has agreed to not allow in code contributions."
return 1
fi
JIRA_COMMENT="$JIRA_COMMENT
+1 @author. The patch does not contain any @author tags."
{color:green}+1 @author{color}. The patch does not contain any @author tags."
return 0
}
@ -341,14 +341,14 @@ checkTests () {
fi
JIRA_COMMENT="$JIRA_COMMENT
-1 tests included. The patch doesn't appear to include any new or modified tests.
{color:red}-1 tests included{color}. The patch doesn't appear to include any new or modified tests.
Please justify why no new tests are needed for this patch.
Also please list what manual steps were performed to verify this patch."
return 1
fi
JIRA_COMMENT="$JIRA_COMMENT
+1 tests included. The patch appears to include $testReferences new or modified test files."
{color:green}+1 tests included{color}. The patch appears to include $testReferences new or modified test files."
return 0
}
@ -379,7 +379,7 @@ applyPatch () {
echo "PATCH APPLICATION FAILED"
JIRA_COMMENT="$JIRA_COMMENT
-1 patch. The patch command could not apply the patch."
{color:red}-1 patch{color}. The patch command could not apply the patch."
return 1
fi
return 0
@ -416,12 +416,12 @@ checkJavadocWarnings () {
if [[ $javadocWarnings -ne $OK_JAVADOC_WARNINGS ]] ; then
JIRA_COMMENT="$JIRA_COMMENT
-1 javadoc. The javadoc tool appears to have generated `expr $(($javadocWarnings-$OK_JAVADOC_WARNINGS))` warning messages."
{color:red}-1 javadoc{color}. The javadoc tool appears to have generated `expr $(($javadocWarnings-$OK_JAVADOC_WARNINGS))` warning messages."
return 1
fi
JIRA_COMMENT="$JIRA_COMMENT
+1 javadoc. The javadoc tool did not generate any warning messages."
{color:green}+1 javadoc{color}. The javadoc tool did not generate any warning messages."
return 0
}
@ -442,7 +442,7 @@ checkJavacWarnings () {
if [[ $? != 0 ]] ; then
JIRA_COMMENT="$JIRA_COMMENT
-1 javac. The patch appears to cause the build to fail."
{color:red}-1 javac{color:red}. The patch appears to cause the build to fail."
return 2
fi
### Compare trunk and patch javac warning numbers
@ -456,7 +456,7 @@ checkJavacWarnings () {
if [[ $patchJavacWarnings -gt $trunkJavacWarnings ]] ; then
JIRA_COMMENT="$JIRA_COMMENT
-1 javac. The applied patch generated $patchJavacWarnings javac compiler warnings (more than the trunk's current $trunkJavacWarnings warnings)."
{color:red}-1 javac{color}. The applied patch generated $patchJavacWarnings javac compiler warnings (more than the trunk's current $trunkJavacWarnings warnings)."
$DIFF $PATCH_DIR/filteredTrunkJavacWarnings.txt $PATCH_DIR/filteredPatchJavacWarnings.txt > $PATCH_DIR/diffJavacWarnings.txt
JIRA_COMMENT_FOOTER="Javac warnings: $BUILD_URL/artifact/trunk/patchprocess/diffJavacWarnings.txt
@ -468,7 +468,7 @@ $JIRA_COMMENT_FOOTER"
fi
JIRA_COMMENT="$JIRA_COMMENT
+1 javac. The applied patch does not increase the total number of javac compiler warnings."
{color:green}+1 javac{color}. The applied patch does not increase the total number of javac compiler warnings."
return 0
}
@ -498,7 +498,7 @@ checkReleaseAuditWarnings () {
if [[ $patchReleaseAuditWarnings -gt 0 ]] ; then
JIRA_COMMENT="$JIRA_COMMENT
-1 release audit. The applied patch generated $patchReleaseAuditWarnings release audit warnings."
{color:red}-1 release audit{color}. The applied patch generated $patchReleaseAuditWarnings release audit warnings."
$GREP '\!?????' $PATCH_DIR/patchReleaseAuditWarnings.txt > $PATCH_DIR/patchReleaseAuditProblems.txt
echo "Lines that start with ????? in the release audit report indicate files that do not have an Apache license header." >> $PATCH_DIR/patchReleaseAuditProblems.txt
JIRA_COMMENT_FOOTER="Release audit warnings: $BUILD_URL/artifact/trunk/patchprocess/patchReleaseAuditProblems.txt
@ -509,7 +509,7 @@ $JIRA_COMMENT_FOOTER"
fi
JIRA_COMMENT="$JIRA_COMMENT
+1 release audit. The applied patch does not increase the total number of release audit warnings."
{color:green}+1 release audit{color}. The applied patch does not increase the total number of release audit warnings."
return 0
}
@ -538,12 +538,12 @@ $JIRA_COMMENT_FOOTER"
# if [[ $patchStyleErrors != 0 ]] ; then
# JIRA_COMMENT="$JIRA_COMMENT
#
# -1 checkstyle. The patch generated $patchStyleErrors code style errors."
# {color:red}-1 checkstyle{color}. The patch generated $patchStyleErrors code style errors."
# return 1
# fi
# JIRA_COMMENT="$JIRA_COMMENT
#
# +1 checkstyle. The patch generated 0 code style errors."
# {color:green}+1 checkstyle{color}. The patch generated 0 code style errors."
return 0
}
@ -595,7 +595,7 @@ checkFindbugsWarnings () {
if [ $rc != 0 ] ; then
JIRA_COMMENT="$JIRA_COMMENT
-1 findbugs. The patch appears to cause Findbugs (version ${findbugs_version}) to fail."
{color:red}-1 findbugs{color}. The patch appears to cause Findbugs (version ${findbugs_version}) to fail."
return 1
fi
@ -628,12 +628,12 @@ $JIRA_COMMENT_FOOTER"
if [[ $findbugsWarnings -gt 0 ]] ; then
JIRA_COMMENT="$JIRA_COMMENT
-1 findbugs. The patch appears to introduce $findbugsWarnings new Findbugs (version ${findbugs_version}) warnings."
{color:red}-1 findbugs{color}. The patch appears to introduce $findbugsWarnings new Findbugs (version ${findbugs_version}) warnings."
return 1
fi
JIRA_COMMENT="$JIRA_COMMENT
+1 findbugs. The patch does not introduce any new Findbugs (version ${findbugs_version}) warnings."
{color:green}+1 findbugs{color}. The patch does not introduce any new Findbugs (version ${findbugs_version}) warnings."
return 0
}
@ -655,12 +655,12 @@ checkEclipseGeneration () {
if [[ $? != 0 ]] ; then
JIRA_COMMENT="$JIRA_COMMENT
-1 eclipse:eclipse. The patch failed to build with eclipse:eclipse."
{color:red}-1 eclipse:eclipse{color}. The patch failed to build with eclipse:eclipse."
return 1
fi
JIRA_COMMENT="$JIRA_COMMENT
+1 eclipse:eclipse. The patch built with eclipse:eclipse."
{color:green}+1 eclipse:eclipse{color}. The patch built with eclipse:eclipse."
return 0
}
@ -700,13 +700,13 @@ ${module_failed_tests}"
if [[ -n "$failed_tests" ]] ; then
JIRA_COMMENT="$JIRA_COMMENT
-1 core tests. The patch failed these unit tests in $modules:
{color:red}-1 core tests{color}. The patch failed these unit tests in $modules:
$failed_tests"
return 1
fi
JIRA_COMMENT="$JIRA_COMMENT
+1 core tests. The patch passed unit tests in $modules."
{color:green}+1 core tests{color}. The patch passed unit tests in $modules."
return 0
}
@ -782,12 +782,12 @@ runContribTests () {
if [[ $? != 0 ]] ; then
JIRA_COMMENT="$JIRA_COMMENT
-1 contrib tests. The patch failed contrib unit tests."
{color:red}-1 contrib tests{color}. The patch failed contrib unit tests."
return 1
fi
JIRA_COMMENT="$JIRA_COMMENT
+1 contrib tests. The patch passed contrib unit tests."
{color:green}+1 contrib tests{color}. The patch passed contrib unit tests."
return 0
}
@ -814,12 +814,12 @@ checkInjectSystemFaults () {
if [[ $? != 0 ]] ; then
JIRA_COMMENT="$JIRA_COMMENT
-1 system test framework. The patch failed system test framework compile."
{color:red}-1 system test framework{color}. The patch failed system test framework compile."
return 1
fi
JIRA_COMMENT="$JIRA_COMMENT
+1 system test framework. The patch passed system test framework compile."
{color:green}+1 system test framework{color}. The patch passed system test framework compile."
return 0
}
@ -832,11 +832,11 @@ submitJiraComment () {
JIRA_COMMENT_FOOTER=""
fi
if [[ $result == 0 ]] ; then
comment="+1 overall. $JIRA_COMMENT
comment="{color:green}+1 overall{color}. $JIRA_COMMENT
$JIRA_COMMENT_FOOTER"
else
comment="-1 overall. $JIRA_COMMENT
comment="{color:red}-1 overall{color}. $JIRA_COMMENT
$JIRA_COMMENT_FOOTER"
fi

View File

@ -101,7 +101,21 @@ Trunk (Unreleased)
HADOOP-8619. WritableComparator must implement no-arg constructor.
(Chris Douglas via Suresh)
HADOOP-8736. Add Builder for building RPC server. (Brandon Li via Suresh)
HADOOP-8814. Replace string equals "" by String#isEmpty().
(Brandon Li via suresh)
HADOOP-8588. SerializationFactory shouldn't throw a
NullPointerException if the serializations list is empty.
(Sho Shimauchi via harsh)
HADOOP-7930. Kerberos relogin interval in UserGroupInformation
should be configurable (Robert Kanter via harsh)
HADOOP-8838. Colorize the test-patch output sent to JIRA (Harsh J via
bobby)
HADOOP-8840. Fix the test-patch colorizer to cover all sorts of +1 lines.
(Harsh J via bobby)
BUG FIXES
@ -214,6 +228,22 @@ Trunk (Unreleased)
HADOOP-8821. Fix findbugs warning related to concatenating string in a
for loop in Configuration#dumpDeprecatedKeys(). (suresh)
HADOOP-7256. Resource leak during failure scenario of closing
of resources. (Ramkrishna S. Vasudevan via harsh)
HADOOP-8151. Error handling in snappy decompressor throws invalid
exceptions. (Matt Foley via harsh)
HADOOP-8813. Add InterfaceAudience and InterfaceStability annotations
to RPC Server and Client classes. (Brandon Li via suresh)
HADOOP-8815. RandomDatum needs to override hashCode().
(Brandon Li via suresh)
HADOOP-8436. NPE In getLocalPathForWrite ( path, conf ) when the
required context item is not configured
(Brahma Reddy Battula via harsh)
OPTIMIZATIONS
HADOOP-7761. Improve the performance of raw comparisons. (todd)
@ -240,7 +270,7 @@ Release 2.0.3-alpha - Unreleased
HADOOP-8812. ExitUtil#terminate should print Exception#toString. (eli)
HADOOP-8805. Move protocol buffer implementation of GetUserMappingProtocol from HDFS to Common. (bowang via tucu)
HADOOP-8736. Add Builder for building RPC server. (Brandon Li via Suresh)
OPTIMIZATIONS
@ -252,6 +282,12 @@ Release 2.0.3-alpha - Unreleased
HADOOP-8780. Update DeprecatedProperties apt file. (Ahmed Radwan via
tomwhite)
HADOOP-8833. fs -text should make sure to call inputstream.seek(0)
before using input stream. (tomwhite and harsh)
HADOOP-8791. Fix rm command documentation to indicte it deletes
files and not directories. (Jing Zhao via suresh)
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
@ -948,6 +984,8 @@ Release 0.23.4 - UNRELEASED
IMPROVEMENTS
HADOOP-8822. relnotes.py was deleted post mavenization (bobby)
OPTIMIZATIONS
BUG FIXES

View File

@ -418,15 +418,15 @@
<code>Usage: hdfs dfs -rm [-skipTrash] URI [URI &#x2026;] </code>
</p>
<p>
Delete files specified as args. Only deletes non empty directory and files. If the <code>-skipTrash</code> option
Delete files specified as args. Only deletes files. If the <code>-skipTrash</code> option
is specified, the trash, if enabled, will be bypassed and the specified file(s) deleted immediately. This can be
useful when it is necessary to delete files from an over-quota directory.
Refer to rmr for recursive deletes.<br/>
Use -rm -r or rmr for recursive deletes.<br/>
Example:
</p>
<ul>
<li>
<code> hdfs dfs -rm hdfs://nn.example.com/file /user/hadoop/emptydir </code>
<code> hdfs dfs -rm hdfs://nn.example.com/file </code>
</li>
</ul>
<p>Exit Code:</p>
@ -442,7 +442,7 @@
<p>
<code>Usage: hdfs dfs -rmr [-skipTrash] URI [URI &#x2026;]</code>
</p>
<p>Recursive version of delete. If the <code>-skipTrash</code> option
<p>Recursive version of delete. The rmr command recursively deletes the directory and any content under it. If the <code>-skipTrash</code> option
is specified, the trash, if enabled, will be bypassed and the specified file(s) deleted immediately. This can be
useful when it is necessary to delete files from an over-quota directory.<br/>
Example:

View File

@ -1073,7 +1073,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
*/
public boolean getBoolean(String name, boolean defaultValue) {
String valueString = getTrimmed(name);
if (null == valueString || "".equals(valueString)) {
if (null == valueString || valueString.isEmpty()) {
return defaultValue;
}
@ -1140,7 +1140,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
*/
public Pattern getPattern(String name, Pattern defaultValue) {
String valString = get(name);
if (null == valString || "".equals(valString)) {
if (null == valString || valString.isEmpty()) {
return defaultValue;
}
try {

View File

@ -153,9 +153,9 @@ public class ReconfigurationServlet extends HttpServlet {
StringEscapeUtils.unescapeHtml(req.getParameter(rawParam));
if (value != null) {
if (value.equals(newConf.getRaw(param)) || value.equals("default") ||
value.equals("null") || value.equals("")) {
value.equals("null") || value.isEmpty()) {
if ((value.equals("default") || value.equals("null") ||
value.equals("")) &&
value.isEmpty()) &&
oldConf.getRaw(param) != null) {
out.println("<p>Changed \"" +
StringEscapeUtils.escapeHtml(param) + "\" from \"" +
@ -163,7 +163,7 @@ public class ReconfigurationServlet extends HttpServlet {
"\" to default</p>");
reconf.reconfigureProperty(param, null);
} else if (!value.equals("default") && !value.equals("null") &&
!value.equals("") &&
!value.isEmpty() &&
(oldConf.getRaw(param) == null ||
!oldConf.getRaw(param).equals(value))) {
// change from default or value to different value

View File

@ -242,5 +242,11 @@ public class CommonConfigurationKeysPublic {
public static final String HADOOP_SSL_ENABLED_KEY = "hadoop.ssl.enabled";
public static final boolean HADOOP_SSL_ENABLED_DEFAULT = false;
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN =
"hadoop.kerberos.min.seconds.before.relogin";
/** Default value for HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN */
public static final int HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN_DEFAULT =
60;
}

View File

@ -2003,7 +2003,7 @@ public final class FileContext {
String filename = inPathPattern.toUri().getPath();
// path has only zero component
if ("".equals(filename) || Path.SEPARATOR.equals(filename)) {
if (filename.isEmpty() || Path.SEPARATOR.equals(filename)) {
Path p = inPathPattern.makeQualified(uri, null);
return getFileStatus(new Path[]{p});
}

View File

@ -1597,7 +1597,7 @@ public abstract class FileSystem extends Configured implements Closeable {
String filename = pathPattern.toUri().getPath();
// path has only zero component
if ("".equals(filename) || Path.SEPARATOR.equals(filename)) {
if (filename.isEmpty() || Path.SEPARATOR.equals(filename)) {
return getFileStatus(new Path[]{pathPattern});
}

View File

@ -265,6 +265,9 @@ public class LocalDirAllocator {
private synchronized void confChanged(Configuration conf)
throws IOException {
String newLocalDirs = conf.get(contextCfgItemName);
if (null == newLocalDirs) {
throw new IOException(contextCfgItemName + " not configured");
}
if (!newLocalDirs.equals(savedLocalDirs)) {
localDirs = StringUtils.getTrimmedStrings(newLocalDirs);
localFS = FileSystem.getLocal(conf);

View File

@ -68,7 +68,7 @@ public class Path implements Comparable {
// Add a slash to parent's path so resolution is compatible with URI's
URI parentUri = parent.uri;
String parentPath = parentUri.getPath();
if (!(parentPath.equals("/") || parentPath.equals(""))) {
if (!(parentPath.equals("/") || parentPath.isEmpty())) {
try {
parentUri = new URI(parentUri.getScheme(), parentUri.getAuthority(),
parentUri.getPath()+"/", null, parentUri.getFragment());

View File

@ -492,7 +492,7 @@ public class RawLocalFileSystem extends FileSystem {
* onwer.equals("").
*/
private boolean isPermissionLoaded() {
return !super.getOwner().equals("");
return !super.getOwner().isEmpty();
}
RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) {

View File

@ -128,7 +128,7 @@ public class RawLocalFs extends DelegateToFileSystem {
try {
FileStatus fs = getFileStatus(f);
// If f refers to a regular file or directory
if ("".equals(target)) {
if (target.isEmpty()) {
return fs;
}
// Otherwise f refers to a symlink
@ -150,7 +150,7 @@ public class RawLocalFs extends DelegateToFileSystem {
* the readBasicFileAttributes method in java.nio.file.attributes
* when available.
*/
if (!"".equals(target)) {
if (!target.isEmpty()) {
return new FileStatus(0, false, 0, 0, 0, 0, FsPermission.getDefault(),
"", "", new Path(target), f);
}

View File

@ -300,7 +300,7 @@ public class NativeS3FileSystem extends FileSystem {
}
private static String pathToKey(Path path) {
if (path.toUri().getScheme() != null && "".equals(path.toUri().getPath())) {
if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
// allow uris without trailing slash after bucket to refer to root,
// like s3n://mybucket
return "";

View File

@ -142,6 +142,7 @@ class Display extends FsCommand {
CompressionCodecFactory cf = new CompressionCodecFactory(getConf());
CompressionCodec codec = cf.getCodec(item.path);
if (codec != null) {
i.seek(0);
return codec.createInputStream(i);
}
break;

View File

@ -276,7 +276,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
String pathParts[] = znodeWorkingDir.split("/");
Preconditions.checkArgument(pathParts.length >= 1 &&
"".equals(pathParts[0]),
pathParts[0].isEmpty(),
"Invalid path: %s", znodeWorkingDir);
StringBuilder sb = new StringBuilder();

View File

@ -241,7 +241,7 @@ public class SshFenceByTcpPort extends Configured
sshPort = DEFAULT_SSH_PORT;
// Parse optional user and ssh port
if (arg != null && !"".equals(arg)) {
if (arg != null && !arg.isEmpty()) {
Matcher m = USER_PORT_RE.matcher(arg);
if (!m.matches()) {
throw new BadFencingConfigurationException(

View File

@ -192,7 +192,7 @@ public class DefaultStringifier<T> implements Stringifier<T> {
String[] parts = itemStr.split(SEPARATOR);
for (String part : parts) {
if (!part.equals(""))
if (!part.isEmpty())
list.add(stringifier.fromString(part));
}

View File

@ -25,6 +25,7 @@ import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -36,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class IOUtils {
public static final Log LOG = LogFactory.getLog(IOUtils.class);
/**
* Copies from one stream to another.
@ -235,7 +237,7 @@ public class IOUtils {
if (c != null) {
try {
c.close();
} catch(IOException e) {
} catch(Throwable e) {
if (log != null && log.isDebugEnabled()) {
log.debug("Exception in closing " + c, e);
}
@ -264,6 +266,7 @@ public class IOUtils {
try {
sock.close();
} catch (IOException ignored) {
LOG.debug("Ignoring exception while closing socket", ignored);
}
}
}

View File

@ -2105,7 +2105,7 @@ public class TFile {
}
public boolean isSorted() {
return !strComparator.equals("");
return !strComparator.isEmpty();
}
public String getComparatorString() {

View File

@ -40,12 +40,12 @@ import org.apache.hadoop.util.ReflectionUtils;
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public class SerializationFactory extends Configured {
private static final Log LOG =
static final Log LOG =
LogFactory.getLog(SerializationFactory.class.getName());
private List<Serialization<?>> serializations = new ArrayList<Serialization<?>>();
/**
* <p>
* Serializations are found by reading the <code>io.serializations</code>
@ -55,15 +55,21 @@ public class SerializationFactory extends Configured {
*/
public SerializationFactory(Configuration conf) {
super(conf);
for (String serializerName : conf.getStrings(
CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
new String[]{WritableSerialization.class.getName(),
AvroSpecificSerialization.class.getName(),
AvroReflectSerialization.class.getName()})) {
add(conf, serializerName);
if (conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).equals("")) {
LOG.warn("Serialization for various data types may not be available. Please configure "
+ CommonConfigurationKeys.IO_SERIALIZATIONS_KEY
+ " properly to have serialization support (it is currently not set).");
} else {
for (String serializerName : conf.getStrings(
CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, new String[] {
WritableSerialization.class.getName(),
AvroSpecificSerialization.class.getName(),
AvroReflectSerialization.class.getName() })) {
add(conf, serializerName);
}
}
}
@SuppressWarnings("unchecked")
private void add(Configuration conf, String serializationName) {
try {
@ -101,5 +107,5 @@ public class SerializationFactory extends Configured {
}
return null;
}
}

View File

@ -83,6 +83,8 @@ import org.apache.hadoop.util.Time;
*
* @see Server
*/
@InterfaceAudience.LimitedPrivate(value = { "Common", "HDFS", "MapReduce", "Yarn" })
@InterfaceStability.Evolving
public class Client {
public static final Log LOG = LogFactory.getLog(Client.class);

View File

@ -48,6 +48,8 @@ import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
@ -72,6 +74,8 @@ import com.google.protobuf.BlockingService;
* All methods in the protocol should throw only IOException. No field data of
* the protocol instance is transmitted.
*/
@InterfaceAudience.LimitedPrivate(value = { "Common", "HDFS", "MapReduce", "Yarn" })
@InterfaceStability.Evolving
public class RPC {
public enum RpcKind {
RPC_BUILTIN ((short) 1), // Used for built in calls by tests

View File

@ -64,6 +64,7 @@ import javax.security.sasl.SaslServer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -107,6 +108,8 @@ import com.google.common.annotations.VisibleForTesting;
*
* @see Client
*/
@InterfaceAudience.LimitedPrivate(value = { "Common", "HDFS", "MapReduce", "Yarn" })
@InterfaceStability.Evolving
public abstract class Server {
private final boolean authorize;
private boolean isSecurityEnabled;

View File

@ -129,7 +129,7 @@ public abstract class MetricsDynamicMBeanBase implements DynamicMBean {
@Override
public Object getAttribute(String attributeName) throws AttributeNotFoundException,
MBeanException, ReflectionException {
if (attributeName == null || attributeName.equals(""))
if (attributeName == null || attributeName.isEmpty())
throw new IllegalArgumentException();
updateMbeanInfoIfMetricsListChanged();
@ -197,7 +197,7 @@ public abstract class MetricsDynamicMBeanBase implements DynamicMBean {
public Object invoke(String actionName, Object[] parms, String[] signature)
throws MBeanException, ReflectionException {
if (actionName == null || actionName.equals(""))
if (actionName == null || actionName.isEmpty())
throw new IllegalArgumentException();

View File

@ -355,9 +355,8 @@ public class NetUtils {
}
/**
* Returns the InetSocketAddress that a client can use to connect to the
* given listening address. This returns "hostname:port" of the server,
* or "127.0.0.1:port" when given a wildcard address of "0.0.0.0:port".
* Returns an InetSocketAddress that a client can use to connect to the
* given listening address.
*
* @param addr of a listener
* @return socket address that a client can use to connect to the server.

View File

@ -144,7 +144,7 @@ public class CsvRecordInput implements RecordInput {
@Override
public void startRecord(String tag) throws IOException {
if (tag != null && !"".equals(tag)) {
if (tag != null && !tag.isEmpty()) {
char c1 = (char) stream.read();
char c2 = (char) stream.read();
if (c1 != 's' || c2 != '{') {
@ -156,7 +156,7 @@ public class CsvRecordInput implements RecordInput {
@Override
public void endRecord(String tag) throws IOException {
char c = (char) stream.read();
if (tag == null || "".equals(tag)) {
if (tag == null || tag.isEmpty()) {
if (c != '\n' && c != '\r') {
throw new IOException("Error deserializing record.");
} else {

View File

@ -115,7 +115,7 @@ public class CsvRecordOutput implements RecordOutput {
@Override
public void startRecord(Record r, String tag) throws IOException {
if (tag != null && !"".equals(tag)) {
if (tag != null && ! tag.isEmpty()) {
printCommaUnlessFirst();
stream.print("s{");
isFirst = true;
@ -124,7 +124,7 @@ public class CsvRecordOutput implements RecordOutput {
@Override
public void endRecord(Record r, String tag) throws IOException {
if (tag == null || "".equals(tag)) {
if (tag == null || tag.isEmpty()) {
stream.print("\n");
isFirst = true;
} else {

View File

@ -213,7 +213,7 @@ public class SecurityUtil {
private static String replacePattern(String[] components, String hostname)
throws IOException {
String fqdn = hostname;
if (fqdn == null || fqdn.equals("") || fqdn.equals("0.0.0.0")) {
if (fqdn == null || fqdn.isEmpty() || fqdn.equals("0.0.0.0")) {
fqdn = getLocalHostName();
}
return components[0] + "/" + fqdn.toLowerCase() + "@" + components[2];

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.security;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN_DEFAULT;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
@ -192,13 +194,12 @@ public class UserGroupInformation {
private static boolean useKerberos;
/** Server-side groups fetching service */
private static Groups groups;
/** Min time (in seconds) before relogin for Kerberos */
private static long kerberosMinSecondsBeforeRelogin;
/** The configuration to use */
private static Configuration conf;
/** Leave 10 minutes between relogin attempts. */
private static final long MIN_TIME_BEFORE_RELOGIN = 10 * 60 * 1000L;
/**Environment variable pointing to the token cache file*/
public static final String HADOOP_TOKEN_FILE_LOCATION =
"HADOOP_TOKEN_FILE_LOCATION";
@ -245,6 +246,16 @@ public class UserGroupInformation {
HADOOP_SECURITY_AUTHENTICATION +
" of " + value);
}
try {
kerberosMinSecondsBeforeRelogin = 1000L * conf.getLong(
HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN,
HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN_DEFAULT);
}
catch(NumberFormatException nfe) {
throw new IllegalArgumentException("Invalid attribute value for " +
HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN + " of " +
conf.get(HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN));
}
// If we haven't set up testing groups, use the configuration to find it
if (!(groups instanceof TestingGroups)) {
groups = Groups.getUserToGroupsMappingService(conf);
@ -729,7 +740,7 @@ public class UserGroupInformation {
return;
}
nextRefresh = Math.max(getRefreshTime(tgt),
now + MIN_TIME_BEFORE_RELOGIN);
now + kerberosMinSecondsBeforeRelogin);
} catch (InterruptedException ie) {
LOG.warn("Terminating renewal thread");
return;
@ -964,10 +975,10 @@ public class UserGroupInformation {
}
private boolean hasSufficientTimeElapsed(long now) {
if (now - user.getLastLogin() < MIN_TIME_BEFORE_RELOGIN ) {
if (now - user.getLastLogin() < kerberosMinSecondsBeforeRelogin ) {
LOG.warn("Not attempting to re-login since the last re-login was " +
"attempted less than " + (MIN_TIME_BEFORE_RELOGIN/1000) + " seconds"+
" before.");
"attempted less than " + (kerberosMinSecondsBeforeRelogin/1000) +
" seconds before.");
return false;
}
return true;
@ -992,7 +1003,7 @@ public class UserGroupInformation {
@InterfaceAudience.Public
@InterfaceStability.Evolving
public static UserGroupInformation createRemoteUser(String user) {
if (user == null || "".equals(user)) {
if (user == null || user.isEmpty()) {
throw new IllegalArgumentException("Null user");
}
Subject subject = new Subject();
@ -1027,7 +1038,7 @@ public class UserGroupInformation {
@InterfaceStability.Evolving
public static UserGroupInformation createProxyUser(String user,
UserGroupInformation realUser) {
if (user == null || "".equals(user)) {
if (user == null || user.isEmpty()) {
throw new IllegalArgumentException("Null user");
}
if (realUser == null) {

View File

@ -88,7 +88,7 @@ public class ServiceAuthorizationManager {
String clientPrincipal = null;
if (krbInfo != null) {
String clientKey = krbInfo.clientPrincipal();
if (clientKey != null && !clientKey.equals("")) {
if (clientKey != null && !clientKey.isEmpty()) {
try {
clientPrincipal = SecurityUtil.getServerPrincipal(
conf.get(clientKey), addr);

View File

@ -93,7 +93,7 @@ public abstract class SecretManager<T extends TokenIdentifier> {
/**
* The length of the random keys to use.
*/
private static final int KEY_LENGTH = 20;
private static final int KEY_LENGTH = 64;
/**
* A thread local store for the Macs.

View File

@ -87,12 +87,12 @@ extends TokenIdentifier {
*/
@Override
public UserGroupInformation getUser() {
if ( (owner == null) || ("".equals(owner.toString()))) {
if ( (owner == null) || (owner.toString().isEmpty())) {
return null;
}
final UserGroupInformation realUgi;
final UserGroupInformation ugi;
if ((realUser == null) || ("".equals(realUser.toString()))
if ((realUser == null) || (realUser.toString().isEmpty())
|| realUser.equals(owner)) {
ugi = realUgi = UserGroupInformation.createRemoteUser(owner.toString());
} else {

View File

@ -265,7 +265,7 @@ extends AbstractDelegationTokenIdentifier>
throw new InvalidToken("User " + renewer +
" tried to renew an expired token");
}
if ((id.getRenewer() == null) || ("".equals(id.getRenewer().toString()))) {
if ((id.getRenewer() == null) || (id.getRenewer().toString().isEmpty())) {
throw new AccessControlException("User " + renewer +
" tried to renew a token without " +
"a renewer");
@ -321,7 +321,7 @@ extends AbstractDelegationTokenIdentifier>
HadoopKerberosName cancelerKrbName = new HadoopKerberosName(canceller);
String cancelerShortName = cancelerKrbName.getShortName();
if (!canceller.equals(owner)
&& (renewer == null || "".equals(renewer.toString()) || !cancelerShortName
&& (renewer == null || renewer.toString().isEmpty() || !cancelerShortName
.equals(renewer.toString()))) {
throw new AccessControlException(canceller
+ " is not authorized to cancel the token");

View File

@ -63,7 +63,7 @@ public class HostsFileReader {
// Everything from now on is a comment
break;
}
if (!nodes[i].equals("")) {
if (!nodes[i].isEmpty()) {
LOG.info("Adding " + nodes[i] + " to the list of hosts from " + filename);
set.add(nodes[i]); // might need to add canonical name
}
@ -80,13 +80,13 @@ public class HostsFileReader {
public synchronized void refresh() throws IOException {
LOG.info("Refreshing hosts (include/exclude) list");
if (!includesFile.equals("")) {
if (!includesFile.isEmpty()) {
Set<String> newIncludes = new HashSet<String>();
readFileToSet(includesFile, newIncludes);
// switch the new hosts that are to be included
includes = newIncludes;
}
if (!excludesFile.equals("")) {
if (!excludesFile.isEmpty()) {
Set<String> newExcludes = new HashSet<String>();
readFileToSet(excludesFile, newExcludes);
// switch the excluded hosts

View File

@ -348,7 +348,7 @@ public class StringUtils {
* @return an array of <code>String</code> values
*/
public static String[] getTrimmedStrings(String str){
if (null == str || "".equals(str.trim())) {
if (null == str || str.trim().isEmpty()) {
return emptyStringArray;
}
@ -408,7 +408,7 @@ public class StringUtils {
String str, char separator) {
// String.split returns a single empty result for splitting the empty
// string.
if ("".equals(str)) {
if (str.isEmpty()) {
return new String[]{""};
}
ArrayList<String> strList = new ArrayList<String>();

View File

@ -88,7 +88,7 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_comp
compressed_direct_buf_len = LZ4_compress(uncompressed_bytes, compressed_bytes, uncompressed_direct_buf_len);
if (compressed_direct_buf_len < 0){
THROW(env, "Ljava/lang/InternalError", "LZ4_compress failed");
THROW(env, "java/lang/InternalError", "LZ4_compress failed");
}
(*env)->SetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen, 0);

View File

@ -85,7 +85,7 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Decompressor_de
uncompressed_direct_buf_len = LZ4_uncompress_unknownOutputSize(compressed_bytes, uncompressed_bytes, compressed_direct_buf_len, uncompressed_direct_buf_len);
if (uncompressed_direct_buf_len < 0) {
THROW(env, "Ljava/lang/InternalError", "LZ4_uncompress_unknownOutputSize failed.");
THROW(env, "java/lang/InternalError", "LZ4_uncompress_unknownOutputSize failed.");
}
(*env)->SetIntField(env, thisj, Lz4Decompressor_compressedDirectBufLen, 0);

View File

@ -98,11 +98,11 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyCompresso
snappy_status ret = dlsym_snappy_compress(uncompressed_bytes,
uncompressed_direct_buf_len, compressed_bytes, &buf_len);
if (ret != SNAPPY_OK){
THROW(env, "Ljava/lang/InternalError", "Could not compress data. Buffer length is too small.");
THROW(env, "java/lang/InternalError", "Could not compress data. Buffer length is too small.");
return 0;
}
if (buf_len > JINT_MAX) {
THROW(env, "Ljava/lang/InternalError", "Invalid return buffer length.");
THROW(env, "java/lang/InternalError", "Invalid return buffer length.");
return 0;
}

View File

@ -92,11 +92,11 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyDecompres
snappy_status ret = dlsym_snappy_uncompress(compressed_bytes, compressed_direct_buf_len, uncompressed_bytes, &uncompressed_direct_buf_len);
if (ret == SNAPPY_BUFFER_TOO_SMALL){
THROW(env, "Ljava/lang/InternalError", "Could not decompress data. Buffer length is too small.");
THROW(env, "java/lang/InternalError", "Could not decompress data. Buffer length is too small.");
} else if (ret == SNAPPY_INVALID_INPUT){
THROW(env, "Ljava/lang/InternalError", "Could not decompress data. Input is invalid.");
THROW(env, "java/lang/InternalError", "Could not decompress data. Input is invalid.");
} else if (ret != SNAPPY_OK){
THROW(env, "Ljava/lang/InternalError", "Could not decompress data.");
THROW(env, "java/lang/InternalError", "Could not decompress data.");
}
(*env)->SetIntField(env, thisj, SnappyDecompressor_compressedDirectBufLen, 0);

View File

@ -250,6 +250,14 @@
</description>
</property>
<property>
<name>hadoop.kerberos.min.seconds.before.relogin</name>
<value>60</value>
<description>The minimum time between relogin attempts for Kerberos, in
seconds.
</description>
</property>
<property>
<name>hadoop.security.auth_to_local</name>
<value></value>

View File

@ -293,6 +293,23 @@ public class TestLocalDirAllocator {
}
}
/*
* Test when mapred.local.dir not configured and called
* getLocalPathForWrite
*/
@Test
public void testShouldNotthrowNPE() throws Exception {
Configuration conf1 = new Configuration();
try {
dirAllocator.getLocalPathForWrite("/test", conf1);
fail("Exception not thrown when " + CONTEXT + " is not set");
} catch (IOException e) {
assertEquals(CONTEXT + " not configured", e.getMessage());
} catch (NullPointerException e) {
fail("Lack of configuration should not have thrown an NPE.");
}
}
/** Test no side effect files are left over. After creating a temp
* temp file, remove both the temp file and its parent. Verify that
* no files or directories are left over as can happen when File objects

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
@ -65,6 +66,11 @@ public class RandomDatum implements WritableComparable<RandomDatum> {
return compareTo((RandomDatum)o) == 0;
}
@Override
public int hashCode() {
return Arrays.hashCode(this.data);
}
private static final char[] HEX_DIGITS =
{'0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'};

View File

@ -66,6 +66,36 @@ public class TestIOUtils {
Mockito.verify(outputStream, Mockito.atLeastOnce()).close();
}
@Test
public void testCopyBytesShouldCloseInputSteamWhenOutputStreamCloseThrowsRunTimeException()
throws Exception {
InputStream inputStream = Mockito.mock(InputStream.class);
OutputStream outputStream = Mockito.mock(OutputStream.class);
Mockito.doReturn(-1).when(inputStream).read(new byte[1]);
Mockito.doThrow(new RuntimeException()).when(outputStream).close();
try {
IOUtils.copyBytes(inputStream, outputStream, 1, true);
fail("Didn't throw exception");
} catch (RuntimeException e) {
}
Mockito.verify(outputStream, Mockito.atLeastOnce()).close();
}
@Test
public void testCopyBytesShouldCloseInputSteamWhenInputStreamCloseThrowsRunTimeException()
throws Exception {
InputStream inputStream = Mockito.mock(InputStream.class);
OutputStream outputStream = Mockito.mock(OutputStream.class);
Mockito.doReturn(-1).when(inputStream).read(new byte[1]);
Mockito.doThrow(new RuntimeException()).when(inputStream).close();
try {
IOUtils.copyBytes(inputStream, outputStream, 1, true);
fail("Didn't throw exception");
} catch (RuntimeException e) {
}
Mockito.verify(inputStream, Mockito.atLeastOnce()).close();
}
@Test
public void testCopyBytesShouldNotCloseStreamsWhenCloseIsFalse()
throws Exception {
@ -76,7 +106,7 @@ public class TestIOUtils {
Mockito.verify(inputStream, Mockito.atMost(0)).close();
Mockito.verify(outputStream, Mockito.atMost(0)).close();
}
@Test
public void testCopyBytesWithCountShouldCloseStreamsWhenCloseIsTrue()
throws Exception {
@ -117,7 +147,7 @@ public class TestIOUtils {
Mockito.verify(inputStream, Mockito.atLeastOnce()).close();
Mockito.verify(outputStream, Mockito.atLeastOnce()).close();
}
@Test
public void testWriteFully() throws IOException {
final int INPUT_BUFFER_LEN = 10000;
@ -148,6 +178,7 @@ public class TestIOUtils {
for (int i = HALFWAY; i < input.length; i++) {
assertEquals(input[i - HALFWAY], output[i]);
}
raf.close();
} finally {
File f = new File(TEST_FILE_NAME);
if (f.exists()) {
@ -177,7 +208,7 @@ public class TestIOUtils {
"Error while reading compressed data", ioe);
}
}
@Test
public void testSkipFully() throws IOException {
byte inArray[] = new byte[] {0, 1, 2, 3, 4};

View File

@ -34,6 +34,8 @@ import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@ -202,6 +204,15 @@ public class TestCodec {
v2.readFields(inflateIn);
assertTrue("original and compressed-then-decompressed-output not equal",
k1.equals(k2) && v1.equals(v2));
// original and compressed-then-decompressed-output have the same hashCode
Map<RandomDatum, String> m = new HashMap<RandomDatum, String>();
m.put(k1, k1.toString());
m.put(v1, v1.toString());
String result = m.get(k2);
assertEquals("k1 and k2 hashcode not equal", result, k1.toString());
result = m.get(v2);
assertEquals("v1 and v2 hashcode not equal", result, v1.toString());
}
// De-compress data byte-at-a-time

View File

@ -67,7 +67,7 @@ public class TestNativeIO {
assertEquals(System.getProperty("user.name"), stat.getOwner());
assertNotNull(stat.getGroup());
assertTrue(!"".equals(stat.getGroup()));
assertTrue(!stat.getGroup().isEmpty());
assertEquals("Stat mode field should indicate a regular file",
NativeIO.Stat.S_IFREG, stat.getMode() & NativeIO.Stat.S_IFMT);
}
@ -96,7 +96,7 @@ public class TestNativeIO {
NativeIO.Stat stat = NativeIO.fstat(fos.getFD());
assertEquals(System.getProperty("user.name"), stat.getOwner());
assertNotNull(stat.getGroup());
assertTrue(!"".equals(stat.getGroup()));
assertTrue(!stat.getGroup().isEmpty());
assertEquals("Stat mode field should indicate a regular file",
NativeIO.Stat.S_IFREG, stat.getMode() & NativeIO.Stat.S_IFMT);
} catch (Throwable t) {

View File

@ -17,27 +17,62 @@
*/
package org.apache.hadoop.io.serializer;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.Writable;
import org.apache.log4j.Level;
public class TestSerializationFactory {
static {
((Log4JLogger) SerializationFactory.LOG).getLogger().setLevel(Level.ALL);
}
static Configuration conf;
static SerializationFactory factory;
@BeforeClass
public static void setup() throws Exception {
conf = new Configuration();
factory = new SerializationFactory(conf);
}
@Test
public void testSerializerAvailability() {
public void testSerializationKeyIsEmpty() {
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, "");
SerializationFactory factory = new SerializationFactory(conf);
}
@Test
public void testSerializationKeyIsInvalid() {
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, "INVALID_KEY_XXX");
SerializationFactory factory = new SerializationFactory(conf);
}
@Test
public void testGetSerializer() {
// Test that a valid serializer class is returned when its present
assertNotNull("A valid class must be returned for default Writable Serde",
assertNotNull("A valid class must be returned for default Writable SerDe",
factory.getSerializer(Writable.class));
assertNotNull("A valid class must be returned for default Writable serDe",
factory.getDeserializer(Writable.class));
// Test that a null is returned when none can be found.
assertNull("A null should be returned if there are no serializers found.",
factory.getSerializer(TestSerializationFactory.class));
}
@Test
public void testGetDeserializer() {
// Test that a valid serializer class is returned when its present
assertNotNull("A valid class must be returned for default Writable SerDe",
factory.getDeserializer(Writable.class));
// Test that a null is returned when none can be found.
assertNull("A null should be returned if there are no deserializers found",
factory.getDeserializer(TestSerializationFactory.class));
}

View File

@ -112,7 +112,7 @@ public class TestSaslRPC {
}
@Override
public UserGroupInformation getUser() {
if ("".equals(realUser.toString())) {
if (realUser.toString().isEmpty()) {
return UserGroupInformation.createRemoteUser(tokenid.toString());
} else {
UserGroupInformation realUgi = UserGroupInformation

View File

@ -24,6 +24,7 @@ import static org.mockito.Mockito.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Method;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.LinkedHashSet;
@ -49,6 +50,7 @@ public class TestUserGroupInformation {
final private static String GROUP3_NAME = "group3";
final private static String[] GROUP_NAMES =
new String[]{GROUP1_NAME, GROUP2_NAME, GROUP3_NAME};
private static Configuration conf;
/**
* UGI should not use the default security conf, else it will collide
@ -68,7 +70,7 @@ public class TestUserGroupInformation {
/** configure ugi */
@BeforeClass
public static void setup() {
Configuration conf = new Configuration();
conf = new Configuration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL,
"RULE:[2:$1@$0](.*@HADOOP.APACHE.ORG)s/@.*//" +
"RULE:[1:$1@$0](.*@HADOOP.APACHE.ORG)s/@.*//"
@ -537,4 +539,39 @@ public class TestUserGroupInformation {
}
});
}
/** Test hasSufficientTimeElapsed method */
@Test
public void testHasSufficientTimeElapsed() throws Exception {
// Make hasSufficientTimeElapsed public
Method method = UserGroupInformation.class
.getDeclaredMethod("hasSufficientTimeElapsed", long.class);
method.setAccessible(true);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
User user = ugi.getSubject().getPrincipals(User.class).iterator().next();
long now = System.currentTimeMillis();
// Using default relogin time (1 minute)
user.setLastLogin(now - 2 * 60 * 1000); // 2 minutes before "now"
assertTrue((Boolean)method.invoke(ugi, now));
user.setLastLogin(now - 30 * 1000); // 30 seconds before "now"
assertFalse((Boolean)method.invoke(ugi, now));
// Using relogin time of 10 minutes
Configuration conf2 = new Configuration(conf);
conf2.setLong(
CommonConfigurationKeysPublic.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN,
10 * 60);
UserGroupInformation.setConfiguration(conf2);
user.setLastLogin(now - 15 * 60 * 1000); // 15 minutes before "now"
assertTrue((Boolean)method.invoke(ugi, now));
user.setLastLogin(now - 6 * 60 * 1000); // 6 minutes before "now"
assertFalse((Boolean)method.invoke(ugi, now));
// Restore original conf to UGI
UserGroupInformation.setConfiguration(conf);
// Restore hasSufficientTimElapsed back to private
method.setAccessible(false);
}
}

View File

@ -231,7 +231,8 @@ public class TestStringUtils extends UnitTestcaseTimeLimit {
assertArrayEquals(expectedArray, StringUtils.getTrimmedStrings(pathologicalDirList2));
assertArrayEquals(emptyArray, StringUtils.getTrimmedStrings(emptyList1));
assertArrayEquals(emptyArray, StringUtils.getTrimmedStrings(emptyList2));
String[] estring = StringUtils.getTrimmedStrings(emptyList2);
assertArrayEquals(emptyArray, estring);
}
@Test

View File

@ -8,9 +8,6 @@ Trunk (Unreleased)
NEW FEATURES
HDFS-234. Integration with BookKeeper logging system. (Ivan Kelly
via jitendra)
HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
HDFS-3601. Add BlockPlacementPolicyWithNodeGroup to support block placement
@ -219,6 +216,9 @@ Trunk (Unreleased)
HDFS-3678. Edit log files are never being purged from 2NN. (atm)
HADOOP-8158. Interrupting hadoop fs -put from the command line
causes a LeaseExpiredException. (daryn via harsh)
Release 2.0.3-alpha - Unreleased
INCOMPATIBLE CHANGES
@ -246,6 +246,19 @@ Release 2.0.3-alpha - Unreleased
HDFS-3936. MiniDFSCluster shutdown races with BlocksMap usage. (eli)
HDFS-3951. datanode web ui does not work over HTTPS when datanode is started in secure mode. (tucu)
HDFS-3949. NameNodeRpcServer#join should join on both client and
server RPC servers. (eli)
HDFS-3932. NameNode Web UI broken if the rpc-address is set to the wildcard.
(Colin Patrick McCabe via eli)
HDFS-3931. TestDatanodeBlockScanner#testBlockCorruptionPolicy2 is broken.
(Andy Isaacson via eli)
HDFS-3964. Make NN log of fs.defaultFS debug rather than info. (eli)
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
@ -885,6 +898,9 @@ Release 2.0.0-alpha - 05-23-2012
HDFS-3298. Add HdfsDataOutputStream as a public API. (szetszwo)
HDFS-234. Integration with BookKeeper logging system. (Ivan Kelly
via jitendra)
IMPROVEMENTS
HDFS-2018. Move all journal stream management code into one place.

View File

@ -77,6 +77,7 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation;
@ -695,6 +696,17 @@ public class DFSClient implements java.io.Closeable {
}
}
/**
* Close all open streams, abandoning all of the leases and files being
* created.
* @param abort whether streams should be gracefully closed
*/
public void closeOutputStreams(boolean abort) {
if (clientRunning) {
closeAllFilesBeingWritten(abort);
}
}
/**
* Get the default block size for this cluster
* @return the default block size in bytes

View File

@ -535,10 +535,10 @@ public class DistributedFileSystem extends FileSystem {
@Override
public void close() throws IOException {
try {
super.processDeleteOnExit();
dfs.close();
} finally {
dfs.closeOutputStreams(false);
super.close();
} finally {
dfs.close();
}
}

View File

@ -36,6 +36,8 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
@ -65,8 +67,6 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.tools.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.impl.pb.client.GetUserMappingsProtocolPBClientImpl;
import com.google.common.base.Preconditions;
@ -218,7 +218,7 @@ public class NameNodeProxies {
throws IOException {
GetUserMappingsProtocolPB proxy = (GetUserMappingsProtocolPB)
createNameNodeProxy(address, conf, ugi, GetUserMappingsProtocolPB.class, 0);
return new GetUserMappingsProtocolPBClientImpl(proxy);
return new GetUserMappingsProtocolClientSideTranslatorPB(proxy);
}
private static NamenodeProtocol createNNProxyWithNamenodeProtocol(

View File

@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -16,66 +16,54 @@
* limitations under the License.
*/
package org.apache.hadoop.tools.impl.pb.client;
package org.apache.hadoop.hdfs.protocolPB;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.proto.GetUserMappingsProtocolProtos.GetGroupsForUserRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.GetUserMappingsProtocolProtos.GetGroupsForUserResponseProto;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.tools.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.proto.GetUserMappingsProtocol.GetGroupsForUserRequestProto;
import org.apache.hadoop.tools.proto.GetUserMappingsProtocol.GetGroupsForUserResponseProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
public class GetUserMappingsProtocolPBClientImpl implements
public class GetUserMappingsProtocolClientSideTranslatorPB implements
ProtocolMetaInterface, GetUserMappingsProtocol, Closeable {
private GetUserMappingsProtocolPB proxy;
/** RpcController is not used and hence is set to null */
private final static RpcController NULL_CONTROLLER = null;
private final GetUserMappingsProtocolPB rpcProxy;
public GetUserMappingsProtocolPBClientImpl(
long clientVersion, InetSocketAddress addr, Configuration conf)
throws IOException {
RPC.setProtocolEngine(conf, GetUserMappingsProtocolPB.class,
ProtobufRpcEngine.class);
proxy = (GetUserMappingsProtocolPB) RPC.getProxy(
GetUserMappingsProtocolPB.class, clientVersion, addr, conf);
public GetUserMappingsProtocolClientSideTranslatorPB(
GetUserMappingsProtocolPB rpcProxy) {
this.rpcProxy = rpcProxy;
}
public GetUserMappingsProtocolPBClientImpl(
GetUserMappingsProtocolPB proxy) {
this.proxy = proxy;
}
@Override
public void close() throws IOException {
RPC.stopProxy(proxy);
RPC.stopProxy(rpcProxy);
}
@Override
public String[] getGroupsForUser(String user) throws IOException {
GetGroupsForUserRequestProto requestProto =
GetGroupsForUserRequestProto.newBuilder().setUser(user).build();
GetGroupsForUserRequestProto request = GetGroupsForUserRequestProto
.newBuilder().setUser(user).build();
GetGroupsForUserResponseProto resp;
try {
GetGroupsForUserResponseProto responseProto =
proxy.getGroupsForUser(null, requestProto);
return (String[]) responseProto.getGroupsList().toArray(
new String[responseProto.getGroupsCount()]);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
resp = rpcProxy.getGroupsForUser(NULL_CONTROLLER, request);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
return resp.getGroupsList().toArray(new String[resp.getGroupsCount()]);
}
@Override
public boolean isMethodSupported(String methodName) throws IOException {
return RpcClientUtil.isMethodSupported(proxy,
return RpcClientUtil.isMethodSupported(rpcProxy,
GetUserMappingsProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(GetUserMappingsProtocolPB.class), methodName);
}

View File

@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -16,21 +16,21 @@
* limitations under the License.
*/
package org.apache.hadoop.tools;
package org.apache.hadoop.hdfs.protocolPB;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.protocol.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.tools.proto.GetUserMappingsProtocol.GetUserMappingsProtocolService;
@KerberosInfo(
serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
@ProtocolInfo(
protocolName = "org.apache.hadoop.tools.GetUserMappingsProtocol",
protocolName = "org.apache.hadoop.tools.GetUserMappingsProtocol",
protocolVersion = 1)
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce", "YARN"})
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public interface GetUserMappingsProtocolPB extends
GetUserMappingsProtocolService.BlockingInterface {

View File

@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -16,43 +16,42 @@
* limitations under the License.
*/
package org.apache.hadoop.tools.impl.pb.service;
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.proto.GetUserMappingsProtocolProtos.GetGroupsForUserRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.GetUserMappingsProtocolProtos.GetGroupsForUserResponseProto;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.tools.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.proto.GetUserMappingsProtocol.GetGroupsForUserRequestProto;
import org.apache.hadoop.tools.proto.GetUserMappingsProtocol.GetGroupsForUserResponseProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
public class GetUserMappingsProtocolPBServiceImpl implements
public class GetUserMappingsProtocolServerSideTranslatorPB implements
GetUserMappingsProtocolPB {
private GetUserMappingsProtocol real;
public GetUserMappingsProtocolPBServiceImpl(GetUserMappingsProtocol impl) {
this.real = impl;
private final GetUserMappingsProtocol impl;
public GetUserMappingsProtocolServerSideTranslatorPB(
GetUserMappingsProtocol impl) {
this.impl = impl;
}
@Override
public GetGroupsForUserResponseProto getGroupsForUser(
RpcController controller, GetGroupsForUserRequestProto request)
throws ServiceException {
String user = request.getUser();
String[] groups;
try {
String[] groups = real.getGroupsForUser(user);
GetGroupsForUserResponseProto.Builder responseBuilder =
GetGroupsForUserResponseProto.newBuilder();
for (String group : groups) {
responseBuilder.addGroups(group);
}
return responseBuilder.build();
groups = impl.getGroupsForUser(request.getUser());
} catch (IOException e) {
throw new ServiceException(e);
}
GetGroupsForUserResponseProto.Builder builder = GetGroupsForUserResponseProto
.newBuilder();
for (String g : groups) {
builder.addGroups(g);
}
return builder.build();
}
}

View File

@ -16,9 +16,11 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.ServerSocketChannel;
import java.security.GeneralSecurityException;
import org.apache.commons.daemon.Daemon;
import org.apache.commons.daemon.DaemonContext;
@ -26,9 +28,15 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.nio.SelectChannelConnector;
import org.mortbay.jetty.security.SslSocketConnector;
import javax.net.ssl.SSLServerSocketFactory;
/**
* Utility class to start a datanode in a secure cluster, first obtaining
@ -40,9 +48,9 @@ public class SecureDataNodeStarter implements Daemon {
*/
public static class SecureResources {
private final ServerSocket streamingSocket;
private final SelectChannelConnector listener;
private final Connector listener;
public SecureResources(ServerSocket streamingSocket,
SelectChannelConnector listener) {
Connector listener) {
this.streamingSocket = streamingSocket;
this.listener = listener;
@ -50,12 +58,13 @@ public class SecureDataNodeStarter implements Daemon {
public ServerSocket getStreamingSocket() { return streamingSocket; }
public SelectChannelConnector getListener() { return listener; }
public Connector getListener() { return listener; }
}
private String [] args;
private SecureResources resources;
private SSLFactory sslFactory;
@Override
public void init(DaemonContext context) throws Exception {
System.err.println("Initializing secure datanode resources");
@ -80,13 +89,30 @@ public class SecureDataNodeStarter implements Daemon {
}
// Obtain secure listener for web server
SelectChannelConnector listener =
(SelectChannelConnector)HttpServer.createDefaultChannelConnector();
Connector listener;
if (HttpConfig.isSecure()) {
sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
try {
sslFactory.init();
} catch (GeneralSecurityException ex) {
throw new IOException(ex);
}
SslSocketConnector sslListener = new SslSocketConnector() {
@Override
protected SSLServerSocketFactory createFactory() throws Exception {
return sslFactory.createSSLServerSocketFactory();
}
};
listener = sslListener;
} else {
listener = HttpServer.createDefaultChannelConnector();
}
InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
listener.setHost(infoSocAddr.getHostName());
listener.setPort(infoSocAddr.getPort());
// Open listener here in order to bind to port as root
listener.open();
listener.open();
if (listener.getPort() != infoSocAddr.getPort()) {
throw new RuntimeException("Unable to bind on specified info port in secure " +
"context. Needed " + streamingAddr.getPort() + ", got " + ss.getLocalPort());
@ -109,6 +135,9 @@ public class SecureDataNodeStarter implements Daemon {
DataNode.secureMain(args, resources);
}
@Override public void destroy() { /* Nothing to do */ }
@Override public void destroy() {
sslFactory.destroy();
}
@Override public void stop() throws Exception { /* Nothing to do */ }
}

View File

@ -1179,7 +1179,7 @@ public class NameNode {
URI defaultUri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://"
+ conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY));
conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
LOG.info("Setting " + FS_DEFAULT_NAME_KEY + " to " + defaultUri.toString());
LOG.debug("Setting " + FS_DEFAULT_NAME_KEY + " to " + defaultUri.toString());
}
}

View File

@ -140,7 +140,8 @@ public class NameNodeHttpServer {
.getPort());
}
httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn);
httpServer.setAttribute(NAMENODE_ADDRESS_ATTRIBUTE_KEY, nn.getNameNodeAddress());
httpServer.setAttribute(NAMENODE_ADDRESS_ATTRIBUTE_KEY,
NetUtils.getConnectAddress(nn.getNameNodeAddress()));
httpServer.setAttribute(FSIMAGE_ATTRIBUTE_KEY, nn.getFSImage());
httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
setupServlets(httpServer, conf);

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.RefreshAuthorizationPolicyProtocolProtos.RefreshAuthorizationPolicyProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.RefreshUserMappingsProtocolProtos.RefreshUserMappingsProtocolService;
@ -73,6 +74,8 @@ import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.RefreshAuthorizationPolicyProtocolPB;
@ -116,9 +119,6 @@ import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.tools.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.impl.pb.service.GetUserMappingsProtocolPBServiceImpl;
import org.apache.hadoop.tools.proto.GetUserMappingsProtocol.GetUserMappingsProtocolService;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
@ -189,8 +189,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService
.newReflectiveBlockingService(refreshUserMappingXlator);
GetUserMappingsProtocolPBServiceImpl getUserMappingXlator =
new GetUserMappingsProtocolPBServiceImpl(this);
GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator =
new GetUserMappingsProtocolServerSideTranslatorPB(this);
BlockingService getUserMappingService = GetUserMappingsProtocolService
.newReflectiveBlockingService(getUserMappingXlator);
@ -297,10 +297,13 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
/**
* Wait until the client RPC server has shutdown.
* Wait until the RPC servers have shutdown.
*/
void join() throws InterruptedException {
clientRpcServer.join();
if (serviceRpcServer != null) {
serviceRpcServer.join();
}
}
/**

View File

@ -15,9 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.tools.proto";
option java_outer_classname = "GetUserMappingsProtocol";
option java_package = "org.apache.hadoop.hdfs.protocol.proto";
option java_outer_classname = "GetUserMappingsProtocolProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;

View File

@ -507,7 +507,7 @@ public class DFSTestUtil {
public static void waitReplication(FileSystem fs, Path fileName, short replFactor)
throws IOException, InterruptedException, TimeoutException {
boolean correctReplFactor;
final int ATTEMPTS = 20;
final int ATTEMPTS = 40;
int count = 0;
do {

View File

@ -35,6 +35,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.Scanner;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.logging.Log;
@ -52,7 +53,10 @@ import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
@ -577,6 +581,8 @@ public class TestDFSShell {
try {
final FileSystem fs = root.getFileSystem(conf);
fs.mkdirs(root);
// Test the gzip type of files. Magic detection.
OutputStream zout = new GZIPOutputStream(
fs.create(new Path(root, "file.gz")));
Random r = new Random();
@ -601,7 +607,7 @@ public class TestDFSShell {
Arrays.equals(file.toByteArray(), out.toByteArray()));
// Create a sequence file with a gz extension, to test proper
// container detection
// container detection. Magic detection.
SequenceFile.Writer writer = SequenceFile.createWriter(
conf,
SequenceFile.Writer.file(new Path(root, "file.gz")),
@ -619,6 +625,45 @@ public class TestDFSShell {
assertTrue("Output doesn't match input",
Arrays.equals("Foo\tBar\n".getBytes(), out.toByteArray()));
out.reset();
// Test deflate. Extension-based detection.
OutputStream dout = new DeflaterOutputStream(
fs.create(new Path(root, "file.deflate")));
byte[] outbytes = "foo".getBytes();
dout.write(outbytes);
dout.close();
out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
argv = new String[2];
argv[0] = "-text";
argv[1] = new Path(root, "file.deflate").toString();
ret = ToolRunner.run(new FsShell(conf), argv);
assertEquals("'-text " + argv[1] + " returned " + ret, 0, ret);
assertTrue("Output doesn't match input",
Arrays.equals(outbytes, out.toByteArray()));
out.reset();
// Test a simple codec. Extension based detection. We use
// Bzip2 cause its non-native.
CompressionCodec codec = (CompressionCodec)
ReflectionUtils.newInstance(BZip2Codec.class, conf);
String extension = codec.getDefaultExtension();
Path p = new Path(root, "file." + extension);
OutputStream fout = new DataOutputStream(codec.createOutputStream(
fs.create(p, true)));
byte[] writebytes = "foo".getBytes();
fout.write(writebytes);
fout.close();
out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
argv = new String[2];
argv[0] = "-text";
argv[1] = new Path(root, p).toString();
ret = ToolRunner.run(new FsShell(conf), argv);
assertEquals("'-text " + argv[1] + " returned " + ret, 0, ret);
assertTrue("Output doesn't match input",
Arrays.equals(writebytes, out.toByteArray()));
out.reset();
} finally {
if (null != bak) {
System.setOut(bak);

View File

@ -269,6 +269,7 @@ public class TestDatanodeBlockScanner {
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 3);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3L);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 5L);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
cluster.waitActive();
@ -276,35 +277,47 @@ public class TestDatanodeBlockScanner {
Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0);
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
final int ITERATIONS = 10;
// Wait until block is replicated to numReplicas
DFSTestUtil.waitReplication(fs, file1, numReplicas);
// Corrupt numCorruptReplicas replicas of block
int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
if (corruptReplica(block, i)) {
corruptReplicasDNIDs[j++] = i;
LOG.info("successfully corrupted block " + block + " on node "
+ i + " " + cluster.getDataNodes().get(i).getDisplayName());
for (int k = 0; ; k++) {
// Corrupt numCorruptReplicas replicas of block
int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
if (corruptReplica(block, i)) {
corruptReplicasDNIDs[j++] = i;
LOG.info("successfully corrupted block " + block + " on node "
+ i + " " + cluster.getDataNodes().get(i).getDisplayName());
}
}
// Restart the datanodes containing corrupt replicas
// so they would be reported to namenode and re-replicated
// They MUST be restarted in reverse order from highest to lowest index,
// because the act of restarting them removes them from the ArrayList
// and causes the indexes of all nodes above them in the list to change.
for (int i = numCorruptReplicas - 1; i >= 0 ; i--) {
LOG.info("restarting node with corrupt replica: position "
+ i + " node " + corruptReplicasDNIDs[i] + " "
+ cluster.getDataNodes().get(corruptReplicasDNIDs[i]).getDisplayName());
cluster.restartDataNode(corruptReplicasDNIDs[i]);
}
}
// Restart the datanodes containing corrupt replicas
// so they would be reported to namenode and re-replicated
// They MUST be restarted in reverse order from highest to lowest index,
// because the act of restarting them removes them from the ArrayList
// and causes the indexes of all nodes above them in the list to change.
for (int i = numCorruptReplicas - 1; i >= 0 ; i--) {
LOG.info("restarting node with corrupt replica: position "
+ i + " node " + corruptReplicasDNIDs[i] + " "
+ cluster.getDataNodes().get(corruptReplicasDNIDs[i]).getDisplayName());
cluster.restartDataNode(corruptReplicasDNIDs[i]);
}
// Loop until all corrupt replicas are reported
DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1,
block, numCorruptReplicas);
// Loop until all corrupt replicas are reported
try {
DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1,
block, numCorruptReplicas);
} catch(TimeoutException e) {
if (k > ITERATIONS) {
throw e;
}
LOG.info("Timed out waiting for corrupt replicas, trying again, iteration " + k);
continue;
}
break;
}
// Loop until the block recovers after replication
DFSTestUtil.waitReplication(fs, file1, numReplicas);

View File

@ -22,7 +22,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
@ -54,6 +56,7 @@ import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.junit.Test;
import org.mockito.InOrder;
public class TestDistributedFileSystem {
private static final Random RAN = new Random();
@ -127,7 +130,31 @@ public class TestDistributedFileSystem {
if (cluster != null) {cluster.shutdown();}
}
}
@Test
public void testDFSCloseOrdering() throws Exception {
DistributedFileSystem fs = new MyDistributedFileSystem();
Path path = new Path("/a");
fs.deleteOnExit(path);
fs.close();
InOrder inOrder = inOrder(fs.dfs);
inOrder.verify(fs.dfs).closeOutputStreams(eq(false));
inOrder.verify(fs.dfs).delete(eq(path.toString()), eq(true));
inOrder.verify(fs.dfs).close();
}
private static class MyDistributedFileSystem extends DistributedFileSystem {
MyDistributedFileSystem() {
statistics = new FileSystem.Statistics("myhdfs"); // can't mock finals
dfs = mock(DFSClient.class);
}
@Override
public boolean exists(Path p) {
return true; // trick out deleteOnExit
}
}
@Test
public void testDFSSeekExceptions() throws IOException {
Configuration conf = getTestConfiguration();

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
@ -40,7 +41,6 @@ import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.tools.impl.pb.client.GetUserMappingsProtocolPBClientImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -141,8 +141,8 @@ public class TestIsMethodSupported {
@Test
public void testGetUserMappingsProtocol() throws IOException {
GetUserMappingsProtocolPBClientImpl translator =
(GetUserMappingsProtocolPBClientImpl)
GetUserMappingsProtocolClientSideTranslatorPB translator =
(GetUserMappingsProtocolClientSideTranslatorPB)
NameNodeProxies.createNonHAProxy(conf, nnAddress,
GetUserMappingsProtocol.class, UserGroupInformation.getCurrentUser(),
true).getProxy();

View File

@ -144,6 +144,9 @@ Release 2.0.3-alpha - Unreleased
MAPREDUCE-4646. Fixed MR framework to send diagnostic information correctly
to clients in case of failed jobs also. (Jason Lowe via vinodkv)
MAPREDUCE-4674. Hadoop examples secondarysort has a typo
"secondarysrot" in the usage. (Robert Justice via eli)
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
@ -172,9 +175,6 @@ Release 2.0.2-alpha - 2012-09-07
MAPREDUCE-4408. allow jobs to set a JAR that is in the distributed cached
(rkanter via tucu)
MAPREDUCE-2786. Add compression option for TestDFSIO.
(Plamen Jeliazkov via shv)
MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal
interface to allow schedulers to maintain their own. (acmurthy)
@ -530,6 +530,14 @@ Release 0.23.4 - UNRELEASED
IMPROVEMENTS
MAPREDUCE-2786. Add compression option for TestDFSIO.
(Plamen Jeliazkov via shv)
MAPREDUCE-4645. Provide a random seed to Slive to make the sequence
of file names deterministic. (Ravi Prakash via shv)
MAPREDUCE-4651. Benchmarking random reads with DFSIO. (shv)
OPTIMIZATIONS
BUG FIXES

View File

@ -17,14 +17,13 @@
*/
package org.apache.hadoop.fs;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Base mapper class for IO operations.
@ -35,7 +34,6 @@ import org.apache.hadoop.util.ReflectionUtils;
* statistics data to be collected by subsequent reducers.
*
*/
@SuppressWarnings("deprecation")
public abstract class IOMapperBase<T> extends Configured
implements Mapper<Text, LongWritable, Text, Text> {
@ -43,7 +41,7 @@ public abstract class IOMapperBase<T> extends Configured
protected int bufferSize;
protected FileSystem fs;
protected String hostName;
protected CompressionCodec compressionCodec;
protected Closeable stream;
public IOMapperBase() {
}
@ -62,22 +60,6 @@ public abstract class IOMapperBase<T> extends Configured
} catch(Exception e) {
hostName = "localhost";
}
//grab compression
String compression = getConf().get("test.io.compression.class", null);
Class<? extends CompressionCodec> codec;
//try to initialize codec
try {
codec = (compression == null) ? null :
Class.forName(compression).asSubclass(CompressionCodec.class);
} catch(Exception e) {
throw new RuntimeException("Compression codec not found: ", e);
}
if(codec != null) {
compressionCodec = (CompressionCodec) ReflectionUtils.newInstance(codec, getConf());
}
}
public void close() throws IOException {
@ -97,6 +79,18 @@ public abstract class IOMapperBase<T> extends Configured
String name,
long value) throws IOException;
/**
* Create an input or output stream based on the specified file.
* Subclasses should override this method to provide an actual stream.
*
* @param name file name
* @return the stream
* @throws IOException
*/
public Closeable getIOStream(String name) throws IOException {
return null;
}
/**
* Collect stat data to be combined by a subsequent reducer.
*
@ -132,9 +126,15 @@ public abstract class IOMapperBase<T> extends Configured
long longValue = value.get();
reporter.setStatus("starting " + name + " ::host = " + hostName);
this.stream = getIOStream(name);
T statValue = null;
long tStart = System.currentTimeMillis();
T statValue = doIO(reporter, name, longValue);
try {
statValue = doIO(reporter, name, longValue);
} finally {
if(stream != null) stream.close();
}
long tEnd = System.currentTimeMillis();
long execTime = tEnd - tStart;
collectStats(output, name, execTime, statValue);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileOutputStream;
@ -28,10 +29,9 @@ import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.Date;
import java.util.Random;
import java.util.StringTokenizer;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -39,18 +39,30 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Distributed i/o benchmark.
* <p>
* This test writes into or reads from a specified number of files.
* File size is specified as a parameter to the test.
* Number of bytes to write or read is specified as a parameter to the test.
* Each file is accessed in a separate map task.
* <p>
* The reducer collects the following statistics:
@ -73,24 +85,24 @@ import org.apache.hadoop.util.ToolRunner;
* <li>standard deviation of i/o rate </li>
* </ul>
*/
public class TestDFSIO extends TestCase implements Tool {
public class TestDFSIO implements Tool {
// Constants
private static final Log LOG = LogFactory.getLog(TestDFSIO.class);
private static final int TEST_TYPE_READ = 0;
private static final int TEST_TYPE_WRITE = 1;
private static final int TEST_TYPE_CLEANUP = 2;
private static final int TEST_TYPE_APPEND = 3;
private static final int DEFAULT_BUFFER_SIZE = 1000000;
private static final String BASE_FILE_NAME = "test_io_";
private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
private static final long MEGA = ByteMultiple.MB.value();
private static final int DEFAULT_NR_BYTES = 1;
private static final int DEFAULT_NR_FILES = 4;
private static final String USAGE =
"Usage: " + TestDFSIO.class.getSimpleName() +
" [genericOptions]" +
" -read | -write | -append | -clean [-nrFiles N]" +
" [-fileSize Size[B|KB|MB|GB|TB]]" +
" [-resFile resultFileName] [-bufferSize Bytes]" +
" [-rootDir]";
"Usage: " + TestDFSIO.class.getSimpleName() +
" [genericOptions]" +
" -read [-random | -backward | -skip [-skipSize Size]] |" +
" -write | -append | -clean" +
" [-nrFiles N]" +
" [-size Size[B|KB|MB|GB|TB]]" +
" [-resFile resultFileName] [-bufferSize Bytes]" +
" [-rootDir]";
private Configuration config;
@ -101,6 +113,27 @@ public class TestDFSIO extends TestCase implements Tool {
Configuration.addDefaultResource("mapred-site.xml");
}
private static enum TestType {
TEST_TYPE_READ("read"),
TEST_TYPE_WRITE("write"),
TEST_TYPE_CLEANUP("cleanup"),
TEST_TYPE_APPEND("append"),
TEST_TYPE_READ_RANDOM("random read"),
TEST_TYPE_READ_BACKWARD("backward read"),
TEST_TYPE_READ_SKIP("skip read");
private String type;
private TestType(String t) {
type = t;
}
@Override // String
public String toString() {
return type;
}
}
static enum ByteMultiple {
B(1L),
KB(0x400L),
@ -155,62 +188,100 @@ public class TestDFSIO extends TestCase implements Tool {
private static Path getAppendDir(Configuration conf) {
return new Path(getBaseDir(conf), "io_append");
}
private static Path getRandomReadDir(Configuration conf) {
return new Path(getBaseDir(conf), "io_random_read");
}
private static Path getDataDir(Configuration conf) {
return new Path(getBaseDir(conf), "io_data");
}
/**
* Run the test with default parameters.
*
* @throws Exception
*/
public void testIOs() throws Exception {
TestDFSIO bench = new TestDFSIO();
bench.testIOs(1, 4);
private static MiniDFSCluster cluster;
private static TestDFSIO bench;
@BeforeClass
public static void beforeClass() throws Exception {
bench = new TestDFSIO();
bench.getConf().setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
cluster = new MiniDFSCluster.Builder(bench.getConf())
.numDataNodes(2)
.format(true)
.build();
FileSystem fs = cluster.getFileSystem();
bench.createControlFile(fs, DEFAULT_NR_BYTES, DEFAULT_NR_FILES);
}
/**
* Run the test with the specified parameters.
*
* @param fileSize file size
* @param nrFiles number of files
* @throws IOException
*/
public void testIOs(int fileSize, int nrFiles)
throws IOException {
config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster(config, 2, true, null);
FileSystem fs = cluster.getFileSystem();
createControlFile(fs, fileSize, nrFiles);
long tStart = System.currentTimeMillis();
writeTest(fs);
long execTime = System.currentTimeMillis() - tStart;
analyzeResult(fs, TEST_TYPE_WRITE, execTime, DEFAULT_RES_FILE_NAME);
tStart = System.currentTimeMillis();
readTest(fs);
execTime = System.currentTimeMillis() - tStart;
analyzeResult(fs, TEST_TYPE_READ, execTime, DEFAULT_RES_FILE_NAME);
tStart = System.currentTimeMillis();
appendTest(fs);
execTime = System.currentTimeMillis() - tStart;
analyzeResult(fs, TEST_TYPE_APPEND, execTime, DEFAULT_RES_FILE_NAME);
cleanup(fs);
} finally {
if(cluster != null) cluster.shutdown();
}
@AfterClass
public static void afterClass() throws Exception {
if(cluster == null)
return;
FileSystem fs = cluster.getFileSystem();
bench.cleanup(fs);
cluster.shutdown();
}
@Test
public void testWrite() throws Exception {
FileSystem fs = cluster.getFileSystem();
long tStart = System.currentTimeMillis();
bench.writeTest(fs);
long execTime = System.currentTimeMillis() - tStart;
bench.analyzeResult(fs, TestType.TEST_TYPE_WRITE, execTime);
}
@Test
public void testRead() throws Exception {
FileSystem fs = cluster.getFileSystem();
long tStart = System.currentTimeMillis();
bench.readTest(fs);
long execTime = System.currentTimeMillis() - tStart;
bench.analyzeResult(fs, TestType.TEST_TYPE_READ, execTime);
}
@Test
public void testReadRandom() throws Exception {
FileSystem fs = cluster.getFileSystem();
long tStart = System.currentTimeMillis();
bench.getConf().setLong("test.io.skip.size", 0);
bench.randomReadTest(fs);
long execTime = System.currentTimeMillis() - tStart;
bench.analyzeResult(fs, TestType.TEST_TYPE_READ_RANDOM, execTime);
}
@Test
public void testReadBackward() throws Exception {
FileSystem fs = cluster.getFileSystem();
long tStart = System.currentTimeMillis();
bench.getConf().setLong("test.io.skip.size", -DEFAULT_BUFFER_SIZE);
bench.randomReadTest(fs);
long execTime = System.currentTimeMillis() - tStart;
bench.analyzeResult(fs, TestType.TEST_TYPE_READ_BACKWARD, execTime);
}
@Test
public void testReadSkip() throws Exception {
FileSystem fs = cluster.getFileSystem();
long tStart = System.currentTimeMillis();
bench.getConf().setLong("test.io.skip.size", 1);
bench.randomReadTest(fs);
long execTime = System.currentTimeMillis() - tStart;
bench.analyzeResult(fs, TestType.TEST_TYPE_READ_SKIP, execTime);
}
@Test
public void testAppend() throws Exception {
FileSystem fs = cluster.getFileSystem();
long tStart = System.currentTimeMillis();
bench.appendTest(fs);
long execTime = System.currentTimeMillis() - tStart;
bench.analyzeResult(fs, TestType.TEST_TYPE_APPEND, execTime);
}
@SuppressWarnings("deprecation")
private void createControlFile(FileSystem fs,
long fileSize, // in bytes
long nrBytes, // in bytes
int nrFiles
) throws IOException {
LOG.info("creating control file: "+fileSize+" bytes, "+nrFiles+" files");
LOG.info("creating control file: "+nrBytes+" bytes, "+nrFiles+" files");
Path controlDir = getControlDir(config);
fs.delete(controlDir, true);
@ -223,7 +294,7 @@ public class TestDFSIO extends TestCase implements Tool {
writer = SequenceFile.createWriter(fs, config, controlFile,
Text.class, LongWritable.class,
CompressionType.NONE);
writer.append(new Text(name), new LongWritable(fileSize));
writer.append(new Text(name), new LongWritable(nrBytes));
} catch(Exception e) {
throw new IOException(e.getLocalizedMessage());
} finally {
@ -251,10 +322,35 @@ public class TestDFSIO extends TestCase implements Tool {
* <li>i/o rate squared</li>
* </ul>
*/
private abstract static class IOStatMapper<T> extends IOMapperBase<T> {
IOStatMapper() {
private abstract static class IOStatMapper extends IOMapperBase<Long> {
protected CompressionCodec compressionCodec;
IOStatMapper() {
}
@Override // Mapper
public void configure(JobConf conf) {
super.configure(conf);
// grab compression
String compression = getConf().get("test.io.compression.class", null);
Class<? extends CompressionCodec> codec;
// try to initialize codec
try {
codec = (compression == null) ? null :
Class.forName(compression).asSubclass(CompressionCodec.class);
} catch(Exception e) {
throw new RuntimeException("Compression codec not found: ", e);
}
if(codec != null) {
compressionCodec = (CompressionCodec)
ReflectionUtils.newInstance(codec, getConf());
}
}
@Override // IOMapperBase
void collectStats(OutputCollector<Text, Text> output,
String name,
long execTime,
@ -281,36 +377,38 @@ public class TestDFSIO extends TestCase implements Tool {
/**
* Write mapper class.
*/
public static class WriteMapper extends IOStatMapper<Long> {
public static class WriteMapper extends IOStatMapper {
public WriteMapper() {
for(int i=0; i < bufferSize; i++)
buffer[i] = (byte)('0' + i % 50);
}
@Override
@Override // IOMapperBase
public Closeable getIOStream(String name) throws IOException {
// create file
OutputStream out =
fs.create(new Path(getDataDir(getConf()), name), true, bufferSize);
if(compressionCodec != null)
out = compressionCodec.createOutputStream(out);
LOG.info("out = " + out.getClass().getName());
return out;
}
@Override // IOMapperBase
public Long doIO(Reporter reporter,
String name,
long totalSize // in bytes
) throws IOException {
// create file
OutputStream out;
out = fs.create(new Path(getDataDir(getConf()), name), true, bufferSize);
if(compressionCodec != null) out = compressionCodec.createOutputStream(out);
try {
// write to the file
long nrRemaining;
for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
out.write(buffer, 0, curSize);
reporter.setStatus("writing " + name + "@" +
(totalSize - nrRemaining) + "/" + totalSize
+ " ::host = " + hostName);
}
} finally {
out.close();
OutputStream out = (OutputStream)this.stream;
// write to the file
long nrRemaining;
for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
out.write(buffer, 0, curSize);
reporter.setStatus("writing " + name + "@" +
(totalSize - nrRemaining) + "/" + totalSize
+ " ::host = " + hostName);
}
return Long.valueOf(totalSize);
}
@ -324,7 +422,6 @@ public class TestDFSIO extends TestCase implements Tool {
runIOTest(WriteMapper.class, writeDir);
}
@SuppressWarnings("deprecation")
private void runIOTest(
Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass,
Path outputDir) throws IOException {
@ -346,35 +443,38 @@ public class TestDFSIO extends TestCase implements Tool {
/**
* Append mapper class.
*/
public static class AppendMapper extends IOStatMapper<Long> {
public static class AppendMapper extends IOStatMapper {
public AppendMapper() {
for(int i=0; i < bufferSize; i++)
buffer[i] = (byte)('0' + i % 50);
}
@Override // IOMapperBase
public Closeable getIOStream(String name) throws IOException {
// open file for append
OutputStream out =
fs.append(new Path(getDataDir(getConf()), name), bufferSize);
if(compressionCodec != null)
out = compressionCodec.createOutputStream(out);
LOG.info("out = " + out.getClass().getName());
return out;
}
@Override // IOMapperBase
public Long doIO(Reporter reporter,
String name,
long totalSize // in bytes
) throws IOException {
// create file
OutputStream out;
out = fs.append(new Path(getDataDir(getConf()), name), bufferSize);
if(compressionCodec != null) out = compressionCodec.createOutputStream(out);
try {
// write to the file
long nrRemaining;
for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
out.write(buffer, 0, curSize);
reporter.setStatus("writing " + name + "@" +
(totalSize - nrRemaining) + "/" + totalSize
+ " ::host = " + hostName);
}
} finally {
out.close();
OutputStream out = (OutputStream)this.stream;
// write to the file
long nrRemaining;
for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
out.write(buffer, 0, curSize);
reporter.setStatus("writing " + name + "@" +
(totalSize - nrRemaining) + "/" + totalSize
+ " ::host = " + hostName);
}
return Long.valueOf(totalSize);
}
@ -389,32 +489,35 @@ public class TestDFSIO extends TestCase implements Tool {
/**
* Read mapper class.
*/
public static class ReadMapper extends IOStatMapper<Long> {
public static class ReadMapper extends IOStatMapper {
public ReadMapper() {
}
@Override // IOMapperBase
public Closeable getIOStream(String name) throws IOException {
// open file
InputStream in = fs.open(new Path(getDataDir(getConf()), name));
if(compressionCodec != null)
in = compressionCodec.createInputStream(in);
LOG.info("in = " + in.getClass().getName());
return in;
}
@Override // IOMapperBase
public Long doIO(Reporter reporter,
String name,
long totalSize // in bytes
) throws IOException {
// open file
InputStream in = fs.open(new Path(getDataDir(getConf()), name));
if(compressionCodec != null) in = compressionCodec.createInputStream(in);
InputStream in = (InputStream)this.stream;
long actualSize = 0;
try {
while (actualSize < totalSize) {
int curSize = in.read(buffer, 0, bufferSize);
if(curSize < 0) break;
actualSize += curSize;
reporter.setStatus("reading " + name + "@" +
actualSize + "/" + totalSize
+ " ::host = " + hostName);
}
} finally {
in.close();
while (actualSize < totalSize) {
int curSize = in.read(buffer, 0, bufferSize);
if(curSize < 0) break;
actualSize += curSize;
reporter.setStatus("reading " + name + "@" +
actualSize + "/" + totalSize
+ " ::host = " + hostName);
}
return Long.valueOf(actualSize);
}
@ -426,20 +529,111 @@ public class TestDFSIO extends TestCase implements Tool {
runIOTest(ReadMapper.class, readDir);
}
/**
* Mapper class for random reads.
* The mapper chooses a position in the file and reads bufferSize
* bytes starting at the chosen position.
* It stops after reading the totalSize bytes, specified by -size.
*
* There are three type of reads.
* 1) Random read always chooses a random position to read from: skipSize = 0
* 2) Backward read reads file in reverse order : skipSize < 0
* 3) Skip-read skips skipSize bytes after every read : skipSize > 0
*/
public static class RandomReadMapper extends IOStatMapper {
private Random rnd;
private long fileSize;
private long skipSize;
@Override // Mapper
public void configure(JobConf conf) {
super.configure(conf);
skipSize = conf.getLong("test.io.skip.size", 0);
}
public RandomReadMapper() {
rnd = new Random();
}
@Override // IOMapperBase
public Closeable getIOStream(String name) throws IOException {
Path filePath = new Path(getDataDir(getConf()), name);
this.fileSize = fs.getFileStatus(filePath).getLen();
InputStream in = fs.open(filePath);
if(compressionCodec != null)
in = new FSDataInputStream(compressionCodec.createInputStream(in));
LOG.info("in = " + in.getClass().getName());
LOG.info("skipSize = " + skipSize);
return in;
}
@Override // IOMapperBase
public Long doIO(Reporter reporter,
String name,
long totalSize // in bytes
) throws IOException {
PositionedReadable in = (PositionedReadable)this.stream;
long actualSize = 0;
for(long pos = nextOffset(-1);
actualSize < totalSize; pos = nextOffset(pos)) {
int curSize = in.read(pos, buffer, 0, bufferSize);
if(curSize < 0) break;
actualSize += curSize;
reporter.setStatus("reading " + name + "@" +
actualSize + "/" + totalSize
+ " ::host = " + hostName);
}
return Long.valueOf(actualSize);
}
/**
* Get next offset for reading.
* If current < 0 then choose initial offset according to the read type.
*
* @param current offset
* @return
*/
private long nextOffset(long current) {
if(skipSize == 0)
return rnd.nextInt((int)(fileSize));
if(skipSize > 0)
return (current < 0) ? 0 : (current + bufferSize + skipSize);
// skipSize < 0
return (current < 0) ? Math.max(0, fileSize - bufferSize) :
Math.max(0, current + skipSize);
}
}
private void randomReadTest(FileSystem fs) throws IOException {
Path readDir = getRandomReadDir(config);
fs.delete(readDir, true);
runIOTest(RandomReadMapper.class, readDir);
}
private void sequentialTest(FileSystem fs,
int testType,
TestType testType,
long fileSize, // in bytes
int nrFiles
) throws IOException {
IOStatMapper<Long> ioer = null;
if (testType == TEST_TYPE_READ)
IOStatMapper ioer = null;
switch(testType) {
case TEST_TYPE_READ:
ioer = new ReadMapper();
else if (testType == TEST_TYPE_WRITE)
break;
case TEST_TYPE_WRITE:
ioer = new WriteMapper();
else if (testType == TEST_TYPE_APPEND)
break;
case TEST_TYPE_APPEND:
ioer = new AppendMapper();
else
break;
case TEST_TYPE_READ_RANDOM:
case TEST_TYPE_READ_BACKWARD:
case TEST_TYPE_READ_SKIP:
ioer = new RandomReadMapper();
break;
default:
return;
}
for(int i=0; i < nrFiles; i++)
ioer.doIO(Reporter.NULL,
BASE_FILE_NAME+Integer.toString(i),
@ -462,14 +656,15 @@ public class TestDFSIO extends TestCase implements Tool {
@Override // Tool
public int run(String[] args) throws IOException {
int testType = TEST_TYPE_READ;
TestType testType = null;
int bufferSize = DEFAULT_BUFFER_SIZE;
long fileSize = 1*MEGA;
long nrBytes = 1*MEGA;
int nrFiles = 1;
long skipSize = 0;
String resFileName = DEFAULT_RES_FILE_NAME;
String compressionClass = null;
boolean isSequential = false;
String version = TestDFSIO.class.getSimpleName() + ".0.0.6";
String version = TestDFSIO.class.getSimpleName() + ".1.7";
LOG.info(version);
if (args.length == 0) {
@ -479,21 +674,32 @@ public class TestDFSIO extends TestCase implements Tool {
for (int i = 0; i < args.length; i++) { // parse command line
if (args[i].startsWith("-read")) {
testType = TEST_TYPE_READ;
testType = TestType.TEST_TYPE_READ;
} else if (args[i].equals("-write")) {
testType = TEST_TYPE_WRITE;
testType = TestType.TEST_TYPE_WRITE;
} else if (args[i].equals("-append")) {
testType = TEST_TYPE_APPEND;
testType = TestType.TEST_TYPE_APPEND;
} else if (args[i].equals("-random")) {
if(testType != TestType.TEST_TYPE_READ) return -1;
testType = TestType.TEST_TYPE_READ_RANDOM;
} else if (args[i].equals("-backward")) {
if(testType != TestType.TEST_TYPE_READ) return -1;
testType = TestType.TEST_TYPE_READ_BACKWARD;
} else if (args[i].equals("-skip")) {
if(testType != TestType.TEST_TYPE_READ) return -1;
testType = TestType.TEST_TYPE_READ_SKIP;
} else if (args[i].equals("-clean")) {
testType = TEST_TYPE_CLEANUP;
testType = TestType.TEST_TYPE_CLEANUP;
} else if (args[i].startsWith("-seq")) {
isSequential = true;
} else if (args[i].startsWith("-compression")) {
compressionClass = args[++i];
} else if (args[i].equals("-nrFiles")) {
nrFiles = Integer.parseInt(args[++i]);
} else if (args[i].equals("-fileSize")) {
fileSize = parseSize(args[++i]);
} else if (args[i].equals("-fileSize") || args[i].equals("-size")) {
nrBytes = parseSize(args[++i]);
} else if (args[i].equals("-skipSize")) {
skipSize = parseSize(args[++i]);
} else if (args[i].equals("-bufferSize")) {
bufferSize = Integer.parseInt(args[++i]);
} else if (args[i].equals("-resFile")) {
@ -503,10 +709,18 @@ public class TestDFSIO extends TestCase implements Tool {
return -1;
}
}
if(testType == null)
return -1;
if(testType == TestType.TEST_TYPE_READ_BACKWARD)
skipSize = -bufferSize;
else if(testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0)
skipSize = bufferSize;
LOG.info("nrFiles = " + nrFiles);
LOG.info("fileSize (MB) = " + toMB(fileSize));
LOG.info("nrBytes (MB) = " + toMB(nrBytes));
LOG.info("bufferSize = " + bufferSize);
if(skipSize > 0)
LOG.info("skipSize = " + skipSize);
LOG.info("baseDir = " + getBaseDir(config));
if(compressionClass != null) {
@ -515,29 +729,39 @@ public class TestDFSIO extends TestCase implements Tool {
}
config.setInt("test.io.file.buffer.size", bufferSize);
config.setLong("test.io.skip.size", skipSize);
config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
FileSystem fs = FileSystem.get(config);
if (isSequential) {
long tStart = System.currentTimeMillis();
sequentialTest(fs, testType, fileSize, nrFiles);
sequentialTest(fs, testType, nrBytes, nrFiles);
long execTime = System.currentTimeMillis() - tStart;
String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
LOG.info(resultLine);
return 0;
}
if (testType == TEST_TYPE_CLEANUP) {
if (testType == TestType.TEST_TYPE_CLEANUP) {
cleanup(fs);
return 0;
}
createControlFile(fs, fileSize, nrFiles);
createControlFile(fs, nrBytes, nrFiles);
long tStart = System.currentTimeMillis();
if (testType == TEST_TYPE_WRITE)
switch(testType) {
case TEST_TYPE_WRITE:
writeTest(fs);
if (testType == TEST_TYPE_READ)
break;
case TEST_TYPE_READ:
readTest(fs);
if (testType == TEST_TYPE_APPEND)
break;
case TEST_TYPE_APPEND:
appendTest(fs);
break;
case TEST_TYPE_READ_RANDOM:
case TEST_TYPE_READ_BACKWARD:
case TEST_TYPE_READ_SKIP:
randomReadTest(fs);
}
long execTime = System.currentTimeMillis() - tStart;
analyzeResult(fs, testType, execTime, resFileName);
@ -563,9 +787,9 @@ public class TestDFSIO extends TestCase implements Tool {
static long parseSize(String arg) {
String[] args = arg.split("\\D", 2); // get digits
assert args.length <= 2;
long fileSize = Long.parseLong(args[0]);
long nrBytes = Long.parseLong(args[0]);
String bytesMult = arg.substring(args[0].length()); // get byte multiple
return fileSize * ByteMultiple.parseString(bytesMult).value();
return nrBytes * ByteMultiple.parseString(bytesMult).value();
}
static float toMB(long bytes) {
@ -573,17 +797,11 @@ public class TestDFSIO extends TestCase implements Tool {
}
private void analyzeResult( FileSystem fs,
int testType,
TestType testType,
long execTime,
String resFileName
) throws IOException {
Path reduceFile;
if (testType == TEST_TYPE_WRITE)
reduceFile = new Path(getWriteDir(config), "part-00000");
else if (testType == TEST_TYPE_APPEND)
reduceFile = new Path(getAppendDir(config), "part-00000");
else // if (testType == TEST_TYPE_READ)
reduceFile = new Path(getReadDir(config), "part-00000");
Path reduceFile = getReduceFilePath(testType);
long tasks = 0;
long size = 0;
long time = 0;
@ -617,10 +835,7 @@ public class TestDFSIO extends TestCase implements Tool {
double med = rate / 1000 / tasks;
double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
String resultLines[] = {
"----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
(testType == TEST_TYPE_READ) ? "read" :
(testType == TEST_TYPE_APPEND) ? "append" :
"unknown"),
"----- TestDFSIO ----- : " + testType,
" Date & time: " + new Date(System.currentTimeMillis()),
" Number of files: " + tasks,
"Total MBytes processed: " + toMB(size),
@ -642,6 +857,27 @@ public class TestDFSIO extends TestCase implements Tool {
}
}
private Path getReduceFilePath(TestType testType) {
switch(testType) {
case TEST_TYPE_WRITE:
return new Path(getWriteDir(config), "part-00000");
case TEST_TYPE_APPEND:
return new Path(getAppendDir(config), "part-00000");
case TEST_TYPE_READ:
return new Path(getReadDir(config), "part-00000");
case TEST_TYPE_READ_RANDOM:
case TEST_TYPE_READ_BACKWARD:
case TEST_TYPE_READ_SKIP:
return new Path(getRandomReadDir(config), "part-00000");
}
return null;
}
private void analyzeResult(FileSystem fs, TestType testType, long execTime)
throws IOException {
analyzeResult(fs, testType, execTime, DEFAULT_RES_FILE_NAME);
}
private void cleanup(FileSystem fs)
throws IOException {
LOG.info("Cleaning up test files");

View File

@ -41,7 +41,9 @@ abstract class Operation {
this.config = cfg;
this.type = type;
this.rnd = rnd;
this.finder = new PathFinder(cfg, rnd);
// Use a new Random instance so that the sequence of file names produced is
// the same even in case of unsuccessful operations
this.finder = new PathFinder(cfg, new Random(rnd.nextInt()));
}
/**

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.StringUtils;
/**
@ -50,8 +52,7 @@ public class SliveMapper extends MapReduceBase implements
private FileSystem filesystem;
private ConfigExtractor config;
private WeightSelector selector;
private Random rnd;
private int taskId;
/*
* (non-Javadoc)
@ -70,19 +71,19 @@ public class SliveMapper extends MapReduceBase implements
}
try {
config = new ConfigExtractor(conf);
Long rndSeed = config.getRandomSeed();
if (rndSeed != null) {
rnd = new Random(rndSeed);
} else {
rnd = new Random();
}
selector = new WeightSelector(config, rnd);
ConfigExtractor.dumpOptions(config);
} catch (Exception e) {
LOG.error("Unable to setup slive " + StringUtils.stringifyException(e));
throw new RuntimeException("Unable to setup slive configuration", e);
}
if(conf.get(MRJobConfig.TASK_ATTEMPT_ID) != null ) {
this.taskId = TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID))
.getTaskID().getId();
} else {
// So that branch-1/0.20 can run this same code as well
this.taskId = TaskAttemptID.forName(conf.get("mapred.task.id"))
.getTaskID().getId();
}
}
/**
@ -94,15 +95,6 @@ public class SliveMapper extends MapReduceBase implements
return config;
}
/**
* Gets the operation selector to use for this object
*
* @return WeightSelector
*/
private WeightSelector getSelector() {
return selector;
}
/**
* Logs to the given reporter and logs to the internal logger at info level
*
@ -154,6 +146,10 @@ public class SliveMapper extends MapReduceBase implements
Reporter reporter) throws IOException {
logAndSetStatus(reporter, "Running slive mapper for dummy key " + key
+ " and dummy value " + value);
//Add taskID to randomSeed to deterministically seed rnd.
Random rnd = config.getRandomSeed() != null ?
new Random(this.taskId + config.getRandomSeed()) : new Random();
WeightSelector selector = new WeightSelector(config, rnd);
long startTime = Timer.now();
long opAm = 0;
long sleepOps = 0;
@ -163,7 +159,6 @@ public class SliveMapper extends MapReduceBase implements
if (sleepRange != null) {
sleeper = new SleepOp(getConfig(), rnd);
}
WeightSelector selector = getSelector();
while (Timer.elapsed(startTime) < duration) {
try {
logAndSetStatus(reporter, "Attempting to select operation #"

View File

@ -211,7 +211,7 @@ public class SecondarySort {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: secondarysrot <in> <out>");
System.err.println("Usage: secondarysort <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "secondary sort");

View File

@ -34,6 +34,12 @@ Release 2.0.3-alpha - Unreleased
YARN-28. Fixed TestCompositeService to not depend on test-order and thus
made it pass on JDK7 (Thomas Graves via vinodkv).
YARN-82. Change the default local and log dirs to be based on
hadoop.tmp.dir and yarn.log.dir. (Hemanth Yamijala via sseth)
YARN-53. Added the missing getGroups API to ResourceManager. (Bo Wang via
vinodkv)
OPTIMIZATIONS
BUG FIXES
@ -100,6 +106,9 @@ Release 0.23.4 - UNRELEASED
BUG FIXES
YARN-88. DefaultContainerExecutor can fail to set proper permissions.
(Jason Lowe via sseth)
Release 0.23.3 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -328,7 +328,7 @@
be subdirectories of this.
</description>
<name>yarn.nodemanager.local-dirs</name>
<value>/tmp/nm-local-dir</value>
<value>${hadoop.tmp.dir}/nm-local-dir</value>
</property>
<property>
@ -370,7 +370,7 @@
stderr, stdin, and syslog generated by that container.
</description>
<name>yarn.nodemanager.log-dirs</name>
<value>/tmp/logs</value>
<value>${yarn.log.dir}/userlogs</value>
</property>
<property>

View File

@ -109,6 +109,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
String userName, String appId, Path containerWorkDir,
List<String> localDirs, List<String> logDirs) throws IOException {
FsPermission dirPerm = new FsPermission(APPDIR_PERM);
ContainerId containerId = container.getContainerID();
// create container dirs on all disks
@ -123,7 +124,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE);
Path appDir = new Path(appCacheDir, appIdStr);
Path containerDir = new Path(appDir, containerIdStr);
lfs.mkdir(containerDir, null, false);
createDir(containerDir, dirPerm, false);
}
// Create the container log-dirs on all disks
@ -131,7 +132,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
Path tmpDir = new Path(containerWorkDir,
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
lfs.mkdir(tmpDir, null, false);
createDir(tmpDir, dirPerm, false);
// copy launch script to work dir
Path launchDst =
@ -286,20 +287,20 @@ public class DefaultContainerExecutor extends ContainerExecutor {
}
/** Permissions for user dir.
* $loaal.dir/usercache/$user */
private static final short USER_PERM = (short)0750;
* $local.dir/usercache/$user */
static final short USER_PERM = (short)0750;
/** Permissions for user appcache dir.
* $loaal.dir/usercache/$user/appcache */
private static final short APPCACHE_PERM = (short)0710;
* $local.dir/usercache/$user/appcache */
static final short APPCACHE_PERM = (short)0710;
/** Permissions for user filecache dir.
* $loaal.dir/usercache/$user/filecache */
private static final short FILECACHE_PERM = (short)0710;
* $local.dir/usercache/$user/filecache */
static final short FILECACHE_PERM = (short)0710;
/** Permissions for user app dir.
* $loaal.dir/usercache/$user/filecache */
private static final short APPDIR_PERM = (short)0710;
* $local.dir/usercache/$user/appcache/$appId */
static final short APPDIR_PERM = (short)0710;
/** Permissions for user log dir.
* $logdir/$user/$appId */
private static final short LOGDIR_PERM = (short)0710;
static final short LOGDIR_PERM = (short)0710;
private Path getFirstApplicationDir(List<String> localDirs, String user,
String appId) {
@ -324,20 +325,28 @@ public class DefaultContainerExecutor extends ContainerExecutor {
ContainerLocalizer.FILECACHE);
}
private void createDir(Path dirPath, FsPermission perms,
boolean createParent) throws IOException {
lfs.mkdir(dirPath, perms, createParent);
if (!perms.equals(perms.applyUMask(lfs.getUMask()))) {
lfs.setPermission(dirPath, perms);
}
}
/**
* Initialize the local directories for a particular user.
* <ul>
* <ul>.mkdir
* <li>$local.dir/usercache/$user</li>
* </ul>
*/
private void createUserLocalDirs(List<String> localDirs, String user)
void createUserLocalDirs(List<String> localDirs, String user)
throws IOException {
boolean userDirStatus = false;
FsPermission userperms = new FsPermission(USER_PERM);
for (String localDir : localDirs) {
// create $local.dir/usercache/$user and its immediate parent
try {
lfs.mkdir(getUserCacheDir(new Path(localDir), user), userperms, true);
createDir(getUserCacheDir(new Path(localDir), user), userperms, true);
} catch (IOException e) {
LOG.warn("Unable to create the user directory : " + localDir, e);
continue;
@ -359,7 +368,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
* <li>$local.dir/usercache/$user/filecache</li>
* </ul>
*/
private void createUserCacheDirs(List<String> localDirs, String user)
void createUserCacheDirs(List<String> localDirs, String user)
throws IOException {
LOG.info("Initializing user " + user);
@ -373,7 +382,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
Path localDirPath = new Path(localDir);
final Path appDir = getAppcacheDir(localDirPath, user);
try {
lfs.mkdir(appDir, appCachePerms, true);
createDir(appDir, appCachePerms, true);
appcacheDirStatus = true;
} catch (IOException e) {
LOG.warn("Unable to create app cache directory : " + appDir, e);
@ -381,7 +390,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
// create $local.dir/usercache/$user/filecache
final Path distDir = getFileCacheDir(localDirPath, user);
try {
lfs.mkdir(distDir, fileperms, true);
createDir(distDir, fileperms, true);
distributedCacheDirStatus = true;
} catch (IOException e) {
LOG.warn("Unable to create file cache directory : " + distDir, e);
@ -406,7 +415,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
* </ul>
* @param localDirs
*/
private void createAppDirs(List<String> localDirs, String user, String appId)
void createAppDirs(List<String> localDirs, String user, String appId)
throws IOException {
boolean initAppDirStatus = false;
FsPermission appperms = new FsPermission(APPDIR_PERM);
@ -414,7 +423,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
Path fullAppDir = getApplicationDir(new Path(localDir), user, appId);
// create $local.dir/usercache/$user/appcache/$appId
try {
lfs.mkdir(fullAppDir, appperms, true);
createDir(fullAppDir, appperms, true);
initAppDirStatus = true;
} catch (IOException e) {
LOG.warn("Unable to create app directory " + fullAppDir.toString(), e);
@ -430,7 +439,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
/**
* Create application log directories on all disks.
*/
private void createAppLogDirs(String appId, List<String> logDirs)
void createAppLogDirs(String appId, List<String> logDirs)
throws IOException {
boolean appLogDirStatus = false;
@ -439,7 +448,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
// create $log.dir/$appid
Path appLogDir = new Path(rootLogDir, appId);
try {
lfs.mkdir(appLogDir, appLogDirPerms, true);
createDir(appLogDir, appLogDirPerms, true);
} catch (IOException e) {
LOG.warn("Unable to create the app-log directory : " + appLogDir, e);
continue;
@ -455,7 +464,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
/**
* Create application log directories on all disks.
*/
private void createContainerLogDirs(String appId, String containerId,
void createContainerLogDirs(String appId, String containerId,
List<String> logDirs) throws IOException {
boolean containerLogDirStatus = false;
@ -465,7 +474,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
Path appLogDir = new Path(rootLogDir, appId);
Path containerLogDir = new Path(appLogDir, containerId);
try {
lfs.mkdir(containerLogDir, containerLogDirPerms, true);
createDir(containerLogDir, containerLogDirPerms, true);
} catch (IOException e) {
LOG.warn("Unable to create the container-log directory : "
+ appLogDir, e);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -26,8 +27,11 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
@ -38,6 +42,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream;
import static org.apache.hadoop.fs.CreateFlag.*;
@ -91,11 +96,16 @@ public class TestDefaultContainerExecutor {
}
*/
private static final Path BASE_TMP_PATH = new Path("target",
TestDefaultContainerExecutor.class.getSimpleName());
@AfterClass
public static void deleteTmpFiles() throws IOException {
FileContext lfs = FileContext.getLocalFSFileContext();
lfs.delete(new Path("target",
TestDefaultContainerExecutor.class.getSimpleName()), true);
try {
lfs.delete(BASE_TMP_PATH, true);
} catch (FileNotFoundException e) {
}
}
byte[] createTmpFile(Path dst, Random r, int len)
@ -116,6 +126,71 @@ public class TestDefaultContainerExecutor {
return bytes;
}
@Test
public void testDirPermissions() throws Exception {
deleteTmpFiles();
final String user = "somebody";
final String appId = "app_12345_123";
final FsPermission userCachePerm = new FsPermission(
DefaultContainerExecutor.USER_PERM);
final FsPermission appCachePerm = new FsPermission(
DefaultContainerExecutor.APPCACHE_PERM);
final FsPermission fileCachePerm = new FsPermission(
DefaultContainerExecutor.FILECACHE_PERM);
final FsPermission appDirPerm = new FsPermission(
DefaultContainerExecutor.APPDIR_PERM);
final FsPermission logDirPerm = new FsPermission(
DefaultContainerExecutor.LOGDIR_PERM);
List<String> localDirs = new ArrayList<String>();
localDirs.add(new Path(BASE_TMP_PATH, "localDirA").toString());
localDirs.add(new Path(BASE_TMP_PATH, "localDirB").toString());
List<String> logDirs = new ArrayList<String>();
logDirs.add(new Path(BASE_TMP_PATH, "logDirA").toString());
logDirs.add(new Path(BASE_TMP_PATH, "logDirB").toString());
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
FileContext lfs = FileContext.getLocalFSFileContext(conf);
DefaultContainerExecutor executor = new DefaultContainerExecutor(lfs);
executor.init();
try {
executor.createUserLocalDirs(localDirs, user);
executor.createUserCacheDirs(localDirs, user);
executor.createAppDirs(localDirs, user, appId);
for (String dir : localDirs) {
FileStatus stats = lfs.getFileStatus(
new Path(new Path(dir, ContainerLocalizer.USERCACHE), user));
Assert.assertEquals(userCachePerm, stats.getPermission());
}
for (String dir : localDirs) {
Path userCachePath = new Path(
new Path(dir, ContainerLocalizer.USERCACHE), user);
Path appCachePath = new Path(userCachePath,
ContainerLocalizer.APPCACHE);
FileStatus stats = lfs.getFileStatus(appCachePath);
Assert.assertEquals(appCachePerm, stats.getPermission());
stats = lfs.getFileStatus(
new Path(userCachePath, ContainerLocalizer.FILECACHE));
Assert.assertEquals(fileCachePerm, stats.getPermission());
stats = lfs.getFileStatus(new Path(appCachePath, appId));
Assert.assertEquals(appDirPerm, stats.getPermission());
}
executor.createAppLogDirs(appId, logDirs);
for (String dir : logDirs) {
FileStatus stats = lfs.getFileStatus(new Path(dir, appId));
Assert.assertEquals(logDirPerm, stats.getPermission());
}
} finally {
deleteTmpFiles();
}
}
// @Test
// public void testInit() throws IOException, InterruptedException {
// Configuration conf = new Configuration();

View File

@ -271,5 +271,10 @@ public class AdminService extends AbstractService implements RMAdminProtocol {
PolicyProvider policyProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
}
@Override
public String[] getGroupsForUser(String user) throws IOException {
return UserGroupInformation.createRemoteUser(user).getGroupNames();
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.api;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshAdminAclsResponse;
@ -32,7 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.Refresh
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
public interface RMAdminProtocol {
public interface RMAdminProtocol extends GetUserMappingsProtocol {
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws YarnRemoteException;

View File

@ -19,14 +19,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.api.impl.pb.client;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto;
@ -154,5 +156,18 @@ public class RMAdminProtocolPBClientImpl implements RMAdminProtocol {
}
}
@Override
public String[] getGroupsForUser(String user) throws IOException {
GetGroupsForUserRequestProto requestProto =
GetGroupsForUserRequestProto.newBuilder().setUser(user).build();
try {
GetGroupsForUserResponseProto responseProto =
proxy.getGroupsForUser(null, requestProto);
return (String[]) responseProto.getGroupsList().toArray(
new String[responseProto.getGroupsCount()]);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
}

View File

@ -18,9 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.api.impl.pb.service;
import java.io.IOException;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.*;
import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocol;
import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocolPB;
@ -139,4 +139,22 @@ public class RMAdminProtocolPBServiceImpl implements RMAdminProtocolPB {
}
}
@Override
public GetGroupsForUserResponseProto getGroupsForUser(
RpcController controller, GetGroupsForUserRequestProto request)
throws ServiceException {
String user = request.getUser();
try {
String[] groups = real.getGroupsForUser(user);
GetGroupsForUserResponseProto.Builder responseBuilder =
GetGroupsForUserResponseProto.newBuilder();
for (String group : groups) {
responseBuilder.addGroups(group);
}
return responseBuilder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.tools;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@ -63,6 +64,7 @@ public class RMAdmin extends Configured implements Tool {
" [-refreshUserToGroupsMappings]" +
" [-refreshAdminAcls]" +
" [-refreshServiceAcl]" +
" [-getGroup [username]]" +
" [-help [cmd]]\n";
String refreshQueues =
@ -81,12 +83,16 @@ public class RMAdmin extends Configured implements Tool {
String refreshAdminAcls =
"-refreshAdminAcls: Refresh acls for administration of ResourceManager\n";
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
"\t\tis specified.\n";
String refreshServiceAcl =
"-refreshServiceAcl: Reload the service-level authorization policy file\n" +
"\t\tResoureceManager will reload the authorization policy file.\n";
"-refreshServiceAcl: Reload the service-level authorization policy file\n" +
"\t\tResoureceManager will reload the authorization policy file.\n";
String getGroups =
"-getGroups [username]: Get the groups which given user belongs to\n";
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
"\t\tis specified.\n";
if ("refreshQueues".equals(cmd)) {
System.out.println(refreshQueues);
@ -100,6 +106,8 @@ public class RMAdmin extends Configured implements Tool {
System.out.println(refreshAdminAcls);
} else if ("refreshServiceAcl".equals(cmd)) {
System.out.println(refreshServiceAcl);
} else if ("getGroups".equals(cmd)) {
System.out.println(getGroups);
} else if ("help".equals(cmd)) {
System.out.println(help);
} else {
@ -110,6 +118,7 @@ public class RMAdmin extends Configured implements Tool {
System.out.println(refreshSuperUserGroupsConfiguration);
System.out.println(refreshAdminAcls);
System.out.println(refreshServiceAcl);
System.out.println(getGroups);
System.out.println(help);
System.out.println();
ToolRunner.printGenericCommandUsage(System.out);
@ -133,6 +142,8 @@ public class RMAdmin extends Configured implements Tool {
System.err.println("Usage: java RMAdmin" + " [-refreshAdminAcls]");
} else if ("-refreshService".equals(cmd)){
System.err.println("Usage: java RMAdmin" + " [-refreshServiceAcl]");
} else if ("-getGroups".equals(cmd)){
System.err.println("Usage: java RMAdmin" + " [-getGroups [username]]");
} else {
System.err.println("Usage: java RMAdmin");
System.err.println(" [-refreshQueues]");
@ -141,6 +152,7 @@ public class RMAdmin extends Configured implements Tool {
System.err.println(" [-refreshSuperUserGroupsConfiguration]");
System.err.println(" [-refreshAdminAcls]");
System.err.println(" [-refreshServiceAcl]");
System.err.println(" [-getGroups [username]]");
System.err.println(" [-help [cmd]]");
System.err.println();
ToolRunner.printGenericCommandUsage(System.err);
@ -229,6 +241,27 @@ public class RMAdmin extends Configured implements Tool {
return 0;
}
private int getGroups(String[] usernames) throws IOException {
// Get groups users belongs to
RMAdminProtocol adminProtocol = createAdminProtocol();
if (usernames.length == 0) {
usernames = new String[] { UserGroupInformation.getCurrentUser().getUserName() };
}
for (String username : usernames) {
StringBuilder sb = new StringBuilder();
sb.append(username + " :");
for (String group : adminProtocol.getGroupsForUser(username)) {
sb.append(" ");
sb.append(group);
}
System.out.println(sb);
}
return 0;
}
@Override
public int run(String[] args) throws Exception {
if (args.length < 1) {
@ -251,7 +284,7 @@ public class RMAdmin extends Configured implements Tool {
return exitCode;
}
}
exitCode = 0;
try {
if ("-refreshQueues".equals(cmd)) {
@ -266,6 +299,9 @@ public class RMAdmin extends Configured implements Tool {
exitCode = refreshAdminAcls();
} else if ("-refreshServiceAcl".equals(cmd)) {
exitCode = refreshServiceAcls();
} else if ("-getGroups".equals(cmd)) {
String[] usernames = Arrays.copyOfRange(args, i, args.length);
exitCode = getGroups(usernames);
} else if ("-help".equals(cmd)) {
if (i < args.length) {
printUsage(args[i]);

View File

@ -30,4 +30,5 @@ service RMAdminProtocolService {
rpc refreshUserToGroupsMappings(RefreshUserToGroupsMappingsRequestProto) returns (RefreshUserToGroupsMappingsResponseProto);
rpc refreshAdminAcls(RefreshAdminAclsRequestProto) returns (RefreshAdminAclsResponseProto);
rpc refreshServiceAcls(RefreshServiceAclsRequestProto) returns (RefreshServiceAclsResponseProto);
rpc getGroupsForUser(GetGroupsForUserRequestProto) returns (GetGroupsForUserResponseProto);
}

View File

@ -52,3 +52,10 @@ message RefreshServiceAclsRequestProto {
message RefreshServiceAclsResponseProto {
}
message GetGroupsForUserRequestProto {
required string user = 1;
}
message GetGroupsForUserResponseProto {
repeated string groups = 1;
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.tools;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tools.GetGroupsBase;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocol;
public class GetGroupsForTesting extends GetGroupsBase {
public GetGroupsForTesting(Configuration conf) {
super(conf);
}
public GetGroupsForTesting(Configuration conf, PrintStream out) {
super(conf, out);
}
@Override
protected InetSocketAddress getProtocolAddress(Configuration conf)
throws IOException {
return conf.getSocketAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
}
@Override
public void setConf(Configuration conf) {
conf = new YarnConfiguration(conf);
super.setConf(conf);
}
@Override
protected GetUserMappingsProtocol getUgmProtocol() throws IOException {
Configuration conf = getConf();
final InetSocketAddress addr = conf.getSocketAddr(
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
final YarnRPC rpc = YarnRPC.create(conf);
RMAdminProtocol adminProtocol = (RMAdminProtocol) rpc.getProxy(
RMAdminProtocol.class, addr, getConf());
return adminProtocol;
}
public static void main(String[] argv) throws Exception {
int res = ToolRunner.run(new GetGroupsForTesting(new YarnConfiguration()), argv);
System.exit(res);
}
}

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.tools;
import java.io.IOException;
import java.io.PrintStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tools.GetGroupsTestBase;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
public class TestGetGroups extends GetGroupsTestBase {
private static final Log LOG = LogFactory.getLog(TestGetGroups.class);
private static ResourceManager resourceManager;
private static Configuration conf;
@BeforeClass
public static void setUpResourceManager() throws IOException, InterruptedException {
conf = new YarnConfiguration();
Store store = StoreFactory.getStore(conf);
resourceManager = new ResourceManager(store) {
@Override
protected void doSecureLogin() throws IOException {
};
};
resourceManager.init(conf);
new Thread() {
public void run() {
resourceManager.start();
};
}.start();
int waitCount = 0;
while (resourceManager.getServiceState() == STATE.INITED
&& waitCount++ < 10) {
LOG.info("Waiting for RM to start...");
Thread.sleep(1000);
}
if (resourceManager.getServiceState() != STATE.STARTED) {
throw new IOException(
"ResourceManager failed to start. Final state is "
+ resourceManager.getServiceState());
}
LOG.info("ResourceManager RMAdmin address: " +
conf.get(YarnConfiguration.RM_ADMIN_ADDRESS));
}
@SuppressWarnings("static-access")
@Before
public void setUpConf() {
super.conf = this.conf;
}
@AfterClass
public static void tearDownResourceManager() throws InterruptedException {
if (resourceManager != null) {
LOG.info("Stopping ResourceManager...");
resourceManager.stop();
}
}
@Override
protected Tool getTool(PrintStream o) {
return new GetGroupsForTesting(conf, o);
}
}