Merge remote-tracking branch 'apache-commit/trunk' into HDDS-48

This commit is contained in:
Arpit Agarwal 2018-05-31 11:45:01 -07:00
commit 79b298111f
33 changed files with 441 additions and 214 deletions

View File

@ -36,7 +36,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;

View File

@ -40,7 +40,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.hadoop.test.UnitTestcaseTimeLimit;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.junit.Test;

View File

@ -79,6 +79,7 @@ message SCMHeartbeatRequestProto {
required DatanodeDetailsProto datanodeDetails = 1;
optional NodeReportProto nodeReport = 2;
optional ContainerReportsProto containerReport = 3;
optional ContainerActionsProto containerActions = 4;
}
/*
@ -122,20 +123,33 @@ enum StorageTypeProto {
PROVIDED = 5;
}
/**
A set of container reports, max count is generally set to
8192 since that keeps the size of the reports under 1 MB.
*/
message ContainerReportsProto {
repeated ContainerInfo reports = 2;
repeated ContainerInfo reports = 1;
}
message ContainerActionsProto {
repeated ContainerAction containerActions = 1;
}
message ContainerAction {
enum Action {
CLOSE = 1;
}
enum Reason {
CONTAINER_FULL = 1;
}
required ContainerInfo container = 1;
required Action action = 2;
optional Reason reason = 3;
}
/**
A container report contains the following information.
*/
message ContainerInfo {
optional string finalhash = 1;
required int64 containerID = 1;
optional int64 size = 2;
optional int64 used = 3;
optional int64 keyCount = 4;
@ -144,7 +158,7 @@ message ContainerInfo {
optional int64 writeCount = 6;
optional int64 readBytes = 7;
optional int64 writeBytes = 8;
required int64 containerID = 9;
optional string finalhash = 9;
optional hadoop.hdds.LifeCycleState state = 10;
}

View File

@ -146,6 +146,8 @@ public class WebHdfsFileSystem extends FileSystem
public static final String EZ_HEADER = "X-Hadoop-Accept-EZ";
public static final String FEFINFO_HEADER = "X-Hadoop-feInfo";
public static final String SPECIAL_FILENAME_CHARACTERS_REGEX = ".*[;+%].*";
/**
* Default connection factory may be overridden in tests to use smaller
* timeout values
@ -606,8 +608,10 @@ public class WebHdfsFileSystem extends FileSystem
if (fspath != null) {
URI fspathUri = fspath.toUri();
String fspathUriDecoded = fspathUri.getPath();
boolean pathAlreadyEncoded = false;
try {
fspathUriDecoded = URLDecoder.decode(fspathUri.getPath(), "UTF-8");
pathAlreadyEncoded = true;
} catch (IllegalArgumentException ex) {
LOG.trace("Cannot decode URL encoded file", ex);
}
@ -617,7 +621,12 @@ public class WebHdfsFileSystem extends FileSystem
StringBuilder fsPathEncodedItems = new StringBuilder();
for (String fsPathItem : fspathItems) {
fsPathEncodedItems.append("/");
fsPathEncodedItems.append(URLEncoder.encode(fsPathItem, "UTF-8"));
if (fsPathItem.matches(SPECIAL_FILENAME_CHARACTERS_REGEX) ||
pathAlreadyEncoded) {
fsPathEncodedItems.append(URLEncoder.encode(fsPathItem, "UTF-8"));
} else {
fsPathEncodedItems.append(fsPathItem);
}
}
encodedFSPath = new Path(fspathUri.getScheme(),
fspathUri.getAuthority(), fsPathEncodedItems.substring(1));

View File

@ -110,11 +110,15 @@ public class TestDFSAdmin {
private static final PrintStream OLD_OUT = System.out;
private static final PrintStream OLD_ERR = System.err;
private String tempResource = null;
private static final int NUM_DATANODES = 2;
@Before
public void setUp() throws Exception {
conf = new Configuration();
conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 3);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512);
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
GenericTestUtils.getRandomizedTempPath());
restartCluster();
admin = new DFSAdmin(conf);
@ -157,7 +161,8 @@ public class TestDFSAdmin {
if (cluster != null) {
cluster.shutdown();
}
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(NUM_DATANODES).build();
cluster.waitActive();
datanode = cluster.getDataNodes().get(0);
namenode = cluster.getNameNode();
@ -904,40 +909,34 @@ public class TestDFSAdmin {
@Test(timeout = 300000L)
public void testCheckNumOfBlocksInReportCommand() throws Exception {
Configuration config = new Configuration();
config.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512);
config.set(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, "3s");
DistributedFileSystem dfs = cluster.getFileSystem();
Path path = new Path("/tmp.txt");
int numOfDatanodes = 1;
MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(config)
.numDataNodes(numOfDatanodes).build();
try {
miniDFSCluster.waitActive();
DistributedFileSystem dfs = miniDFSCluster.getFileSystem();
Path path= new Path("/tmp.txt");
DatanodeInfo[] dn = dfs.getDataNodeStats();
assertEquals(dn.length, numOfDatanodes);
//Block count should be 0, as no files are created
assertEquals(dn[0].getNumBlocks(), 0);
//Create a file with 2 blocks
DFSTestUtil.createFile(dfs, path, 1024, (short) 1, 0);
int expectedBlockCount = 2;
//Wait for One Heartbeat
Thread.sleep(3 * 1000);
dn = dfs.getDataNodeStats();
assertEquals(dn.length, numOfDatanodes);
//Block count should be 2, as file is created with block count 2
assertEquals(dn[0].getNumBlocks(), expectedBlockCount);
} finally {
cluster.shutdown();
DatanodeInfo[] dn = dfs.getDataNodeStats();
assertEquals(dn.length, NUM_DATANODES);
// Block count should be 0, as no files are created
int actualBlockCount = 0;
for (DatanodeInfo d : dn) {
actualBlockCount += d.getNumBlocks();
}
assertEquals(0, actualBlockCount);
// Create a file with 2 blocks
DFSTestUtil.createFile(dfs, path, 1024, (short) 1, 0);
int expectedBlockCount = 2;
// Wait for One Heartbeat
Thread.sleep(3 * 1000);
dn = dfs.getDataNodeStats();
assertEquals(dn.length, NUM_DATANODES);
// Block count should be 2, as file is created with block count 2
actualBlockCount = 0;
for (DatanodeInfo d : dn) {
actualBlockCount += d.getNumBlocks();
}
assertEquals(expectedBlockCount, actualBlockCount);
}
@Test

View File

@ -414,4 +414,59 @@ public class TestWebHdfsUrl {
}
}
private static final String BACKWARD_COMPATIBLE_SPECIAL_CHARACTER_FILENAME =
"specialFile ?\"\\()[]_-=&,{}#'`~!@$^*|<>.";
@Test
public void testWebHdfsBackwardCompatibleSpecialCharacterFile()
throws Exception {
assertFalse(BACKWARD_COMPATIBLE_SPECIAL_CHARACTER_FILENAME
.matches(WebHdfsFileSystem.SPECIAL_FILENAME_CHARACTERS_REGEX));
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser("test-user");
ugi.setAuthenticationMethod(KERBEROS);
UserGroupInformation.setLoginUser(ugi);
final Configuration conf = WebHdfsTestUtil.createConf();
final Path dir = new Path("/testWebHdfsSpecialCharacterFile");
final short numDatanodes = 1;
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes)
.build();
try {
cluster.waitActive();
final FileSystem fs = WebHdfsTestUtil
.getWebHdfsFileSystem(conf, WebHdfs.SCHEME);
//create a file
final long length = 1L << 10;
final Path file1 = new Path(dir,
BACKWARD_COMPATIBLE_SPECIAL_CHARACTER_FILENAME);
DFSTestUtil.createFile(fs, file1, length, numDatanodes, 20120406L);
//get file status and check that it was written properly.
final FileStatus s1 = fs.getFileStatus(file1);
assertEquals("Write failed for file " + file1, length, s1.getLen());
boolean found = false;
RemoteIterator<LocatedFileStatus> statusRemoteIterator =
fs.listFiles(dir, false);
while (statusRemoteIterator.hasNext()) {
LocatedFileStatus locatedFileStatus = statusRemoteIterator.next();
if (locatedFileStatus.isFile() &&
BACKWARD_COMPATIBLE_SPECIAL_CHARACTER_FILENAME
.equals(locatedFileStatus.getPath().getName())) {
found = true;
}
}
assertFalse("Could not find file with special character", !found);
} finally {
cluster.shutdown();
}
}
}

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import static org.apache.commons.lang.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import java.io.IOException;
import java.net.InetAddress;

View File

@ -25,7 +25,7 @@ import java.net.URLDecoder;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;

View File

@ -27,7 +27,7 @@ import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
import java.util.EnumSet;
import java.util.Collection;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -134,8 +134,8 @@ public class TaskPage extends AppView {
.append(getAttemptId(taskId, ta)).append("\",\"")
.append(progress).append("\",\"")
.append(ta.getState().toString()).append("\",\"")
.append(StringEscapeUtils.escapeJavaScript(
StringEscapeUtils.escapeHtml(ta.getStatus()))).append("\",\"")
.append(StringEscapeUtils.escapeEcmaScript(
StringEscapeUtils.escapeHtml4(ta.getStatus()))).append("\",\"")
.append(nodeHttpAddr == null ? "N/A" :
"<a class='nodelink' href='" + MRWebAppUtil.getYARNWebappScheme() + nodeHttpAddr + "'>"
@ -151,8 +151,8 @@ public class TaskPage extends AppView {
.append(ta.getStartTime()).append("\",\"")
.append(ta.getFinishTime()).append("\",\"")
.append(ta.getElapsedTime()).append("\",\"")
.append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
diag)));
.append(StringEscapeUtils.escapeEcmaScript(
StringEscapeUtils.escapeHtml4(diag)));
if (enableUIActions) {
attemptsTableData.append("\",\"");
if (EnumSet.of(

View File

@ -24,7 +24,7 @@ import static org.apache.hadoop.yarn.util.StringHelper.join;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR_VALUE;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskInfo;
@ -103,8 +103,8 @@ public class TasksBlock extends HtmlBlock {
.append(join(pct, '%')).append("'> ").append("<div class='")
.append(C_PROGRESSBAR_VALUE).append("' style='")
.append(join("width:", pct, '%')).append("'> </div> </div>\",\"")
.append(StringEscapeUtils.escapeJavaScript(
StringEscapeUtils.escapeHtml(info.getStatus()))).append("\",\"")
.append(StringEscapeUtils.escapeEcmaScript(
StringEscapeUtils.escapeHtml4(info.getStatus()))).append("\",\"")
.append(info.getState()).append("\",\"")
.append(info.getStartTime()).append("\",\"")

View File

@ -22,7 +22,7 @@ import static org.mockito.Mockito.*;
import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobACL;

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.mapreduce.checkpoint;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang3.RandomStringUtils;
/**
* Simple naming service that generates a random checkpoint name.

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.JobStatus;

View File

@ -23,7 +23,7 @@ import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;

View File

@ -22,7 +22,7 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;

View File

@ -31,7 +31,7 @@ import java.util.HashSet;
import java.util.Arrays;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceAudience.Private;

View File

@ -22,7 +22,7 @@ import static org.junit.Assert.*;
import java.util.Arrays;
import java.util.Collections;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.mapreduce.v2.hs.webapp;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo;
@ -83,11 +83,11 @@ public class HsJobsBlock extends HtmlBlock {
.append(dateFormat.format(new Date(job.getFinishTime()))).append("\",\"")
.append("<a href='").append(url("job", job.getId())).append("'>")
.append(job.getId()).append("</a>\",\"")
.append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
.append(StringEscapeUtils.escapeEcmaScript(StringEscapeUtils.escapeHtml4(
job.getName()))).append("\",\"")
.append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
.append(StringEscapeUtils.escapeEcmaScript(StringEscapeUtils.escapeHtml4(
job.getUserName()))).append("\",\"")
.append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
.append(StringEscapeUtils.escapeEcmaScript(StringEscapeUtils.escapeHtml4(
job.getQueueName()))).append("\",\"")
.append(job.getState()).append("\",\"")
.append(String.valueOf(job.getMapsTotal())).append("\",\"")

View File

@ -29,7 +29,7 @@ import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
import java.util.Collection;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@ -147,8 +147,8 @@ public class HsTaskPage extends HsView {
attemptsTableData.append("[\"")
.append(getAttemptId(taskId, ta)).append("\",\"")
.append(ta.getState()).append("\",\"")
.append(StringEscapeUtils.escapeJavaScript(
StringEscapeUtils.escapeHtml(ta.getStatus()))).append("\",\"")
.append(StringEscapeUtils.escapeEcmaScript(
StringEscapeUtils.escapeHtml4(ta.getStatus()))).append("\",\"")
.append("<a class='nodelink' href='" + MRWebAppUtil.getYARNWebappScheme() + nodeHttpAddr + "'>")
.append(nodeRackName + "/" + nodeHttpAddr + "</a>\",\"")
@ -171,8 +171,8 @@ public class HsTaskPage extends HsView {
.append(elapsedReduceTime).append("\",\"");
}
attemptsTableData.append(attemptElapsed).append("\",\"")
.append(StringEscapeUtils.escapeJavaScript(
StringEscapeUtils.escapeHtml(ta.getNote())))
.append(StringEscapeUtils.escapeEcmaScript(
StringEscapeUtils.escapeHtml4(ta.getNote())))
.append("\"],\n");
}
//Remove the last comma and close off the array of arrays

View File

@ -22,7 +22,7 @@ import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.JobID;

View File

@ -28,7 +28,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.RPC;

View File

@ -23,7 +23,7 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
@ -167,7 +167,7 @@ public class NotRunningJob implements MRClientProtocol {
public GetTaskAttemptReportResponse getTaskAttemptReport(
GetTaskAttemptReportRequest request) throws IOException {
//not invoked by anybody
throw new NotImplementedException();
throw new NotImplementedException("Code is not implemented");
}
@Override
@ -222,26 +222,26 @@ public class NotRunningJob implements MRClientProtocol {
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws IOException {
/* Should not be invoked by anyone. */
throw new NotImplementedException();
throw new NotImplementedException("Code is not implemented");
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws IOException {
/* Should not be invoked by anyone. */
throw new NotImplementedException();
throw new NotImplementedException("Code is not implemented");
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws IOException {
/* Should not be invoked by anyone. */
throw new NotImplementedException();
throw new NotImplementedException("Code is not implemented");
}
@Override
public InetSocketAddress getConnectAddress() {
/* Should not be invoked by anyone. Normally used to set token service */
throw new NotImplementedException();
throw new NotImplementedException("Code is not implemented");
}
}

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.mapred;
import static org.apache.commons.lang.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.hadoop.mapreduce.MRJobConfig.MR_AM_RESOURCE_PREFIX;
import java.io.IOException;

View File

@ -29,7 +29,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.mapreduce.lib.db;
import java.io.IOException;
import java.lang.reflect.Field;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;

View File

@ -142,11 +142,6 @@
<artifactId>commons-cli</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>

View File

@ -290,6 +290,26 @@ public abstract class BaseServiceRecordProcessor
domain));
}
/**
* Return the DNS name constructed from the component name.
*
* @return the DNS naem.
* @throws PathNotFoundException
* @throws TextParseException
*/
protected Name getComponentName()
throws PathNotFoundException, TextParseException {
String service = RegistryPathUtils.lastPathEntry(
RegistryPathUtils.parentOf(RegistryPathUtils.parentOf(getPath())));
String component = getRecord().get("yarn:component").toLowerCase();
String user = RegistryPathUtils.getUsername(getPath());
return Name.fromString(MessageFormat.format("{0}.{1}.{2}.{3}",
component,
service,
user,
domain));
}
}
/**

View File

@ -242,7 +242,8 @@ public class ContainerServiceRecordProcessor extends
}
try {
this.setTarget(InetAddress.getByName(ip));
this.setNames(new Name[] {getContainerName(), getContainerIDName()});
this.setNames(new Name[] {getContainerName(), getContainerIDName(),
getComponentName()});
} catch (Exception e) {
throw new IllegalStateException(e);
}

View File

@ -115,32 +115,47 @@ public class TestRegistryDNS extends Assert {
+ "}\n";
static final String CONTAINER_RECORD = "{\n"
+ " \"type\" : \"JSONServiceRecord\",\n"
+ " \"description\" : \"COMP-NAME\",\n"
+ " \"description\" : \"httpd-1\",\n"
+ " \"external\" : [ ],\n"
+ " \"internal\" : [ ],\n"
+ " \"yarn:id\" : \"container_e50_1451931954322_0016_01_000002\",\n"
+ " \"yarn:persistence\" : \"container\",\n"
+ " \"yarn:ip\" : \"172.17.0.19\",\n"
+ " \"yarn:hostname\" : \"0a134d6329ba\"\n"
+ " \"yarn:hostname\" : \"host1\",\n"
+ " \"yarn:component\" : \"httpd\"\n"
+ "}\n";
static final String CONTAINER_RECORD2 = "{\n"
+ " \"type\" : \"JSONServiceRecord\",\n"
+ " \"description\" : \"httpd-2\",\n"
+ " \"external\" : [ ],\n"
+ " \"internal\" : [ ],\n"
+ " \"yarn:id\" : \"container_e50_1451931954322_0016_01_000003\",\n"
+ " \"yarn:persistence\" : \"container\",\n"
+ " \"yarn:ip\" : \"172.17.0.20\",\n"
+ " \"yarn:hostname\" : \"host2\",\n"
+ " \"yarn:component\" : \"httpd\"\n"
+ "}\n";
private static final String CONTAINER_RECORD_NO_IP = "{\n"
+ " \"type\" : \"JSONServiceRecord\",\n"
+ " \"description\" : \"COMP-NAME\",\n"
+ " \"description\" : \"httpd-1\",\n"
+ " \"external\" : [ ],\n"
+ " \"internal\" : [ ],\n"
+ " \"yarn:id\" : \"container_e50_1451931954322_0016_01_000002\",\n"
+ " \"yarn:persistence\" : \"container\"\n"
+ " \"yarn:persistence\" : \"container\",\n"
+ " \"yarn:component\" : \"httpd\"\n"
+ "}\n";
private static final String CONTAINER_RECORD_YARN_PERSISTANCE_ABSENT = "{\n"
+ " \"type\" : \"JSONServiceRecord\",\n"
+ " \"description\" : \"COMP-NAME\",\n"
+ " \"description\" : \"httpd-1\",\n"
+ " \"external\" : [ ],\n"
+ " \"internal\" : [ ],\n"
+ " \"yarn:id\" : \"container_e50_1451931954322_0016_01_000003\",\n"
+ " \"yarn:ip\" : \"172.17.0.19\",\n"
+ " \"yarn:hostname\" : \"0a134d6329bb\"\n"
+ " \"yarn:hostname\" : \"0a134d6329bb\",\n"
+ " \"yarn:component\" : \"httpd\""
+ "}\n";
@Before
@ -229,7 +244,7 @@ public class TestRegistryDNS extends Assert {
assertEquals("wrong result", "172.17.0.19",
((ARecord) recs[0]).getAddress().getHostAddress());
recs = assertDNSQuery("comp-name.test1.root.dev.test.", 1);
recs = assertDNSQuery("httpd-1.test1.root.dev.test.", 1);
assertTrue("not an ARecord", recs[0] instanceof ARecord);
}
@ -268,7 +283,7 @@ public class TestRegistryDNS extends Assert {
((ARecord) recs[0]).getAddress().getHostAddress());
assertEquals("wrong ttl", 30L, recs[0].getTTL());
recs = assertDNSQuery("comp-name.test1.root.dev.test.", 1);
recs = assertDNSQuery("httpd-1.test1.root.dev.test.", 1);
assertTrue("not an ARecord", recs[0] instanceof ARecord);
assertEquals("wrong ttl", 30L, recs[0].getTTL());
@ -286,7 +301,7 @@ public class TestRegistryDNS extends Assert {
// start assessing whether correct records are available
Record[] recs = assertDNSQuery("19.0.17.172.in-addr.arpa.", Type.PTR, 1);
assertEquals("wrong result",
"comp-name.test1.root.dev.test.",
"httpd-1.test1.root.dev.test.",
((PTRRecord) recs[0]).getTarget().toString());
}
@ -312,7 +327,7 @@ public class TestRegistryDNS extends Assert {
// start assessing whether correct records are available
Record[] recs = assertDNSQuery("19.0.17.172.in-addr.arpa.", Type.PTR, 1);
assertEquals("wrong result",
"comp-name.test1.root.dev.test.",
"httpd-1.test1.root.dev.test.",
((PTRRecord) recs[0]).getTarget().toString());
}
@ -490,7 +505,7 @@ public class TestRegistryDNS extends Assert {
assertEquals("wrong result", "172.17.0.19",
((AAAARecord) recs[0]).getAddress().getHostAddress());
recs = assertDNSQuery("comp-name.test1.root.dev.test.", Type.AAAA, 1);
recs = assertDNSQuery("httpd-1.test1.root.dev.test.", Type.AAAA, 1);
assertTrue("not an ARecord", recs[0] instanceof AAAARecord);
}
@ -565,13 +580,13 @@ public class TestRegistryDNS extends Assert {
assertEquals("wrong result", "172.17.0.19",
((ARecord) recs[0]).getAddress().getHostAddress());
recs = assertDNSQuery("comp-name.test1.root.dev.test.", 1);
recs = assertDNSQuery("httpd-1.test1.root.dev.test.", 1);
assertTrue("not an ARecord", recs[0] instanceof ARecord);
// lookup dyanmic reverse records
recs = assertDNSQuery("19.0.17.172.in-addr.arpa.", Type.PTR, 1);
assertEquals("wrong result",
"comp-name.test1.root.dev.test.",
"httpd-1.test1.root.dev.test.",
((PTRRecord) recs[0]).getTarget().toString());
// now lookup static reverse records
@ -649,6 +664,27 @@ public class TestRegistryDNS extends Assert {
assertDNSQueryNotNull("mail.yahoo.com.", Type.CNAME);
}
@Test
public void testMultiARecord() throws Exception {
ServiceRecord record = getMarshal().fromBytes("somepath",
CONTAINER_RECORD.getBytes());
ServiceRecord record2 = getMarshal().fromBytes("somepath",
CONTAINER_RECORD2.getBytes());
getRegistryDNS().register(
"/registry/users/root/services/org-apache-slider/test1/components/"
+ "ctr-e50-1451931954322-0016-01-000002",
record);
getRegistryDNS().register(
"/registry/users/root/services/org-apache-slider/test1/components/"
+ "ctr-e50-1451931954322-0016-01-000003",
record2);
// start assessing whether correct records are available
Record[] recs =
assertDNSQuery("httpd.test1.root.dev.test.", 2);
assertTrue("not an ARecord", recs[0] instanceof ARecord);
assertTrue("not an ARecord", recs[1] instanceof ARecord);
}
public RegistryDNS getRegistryDNS() {
return registryDNS;
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -238,110 +239,118 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
"Only GUARANTEED execution type is supported.");
}
PlacementConstraint constraint =
newSchedulingRequest.getPlacementConstraint();
// We only accept SingleConstraint
PlacementConstraint.AbstractConstraint ac = constraint.getConstraintExpr();
if (!(ac instanceof PlacementConstraint.SingleConstraint)) {
throwExceptionWithMetaInfo(
"Only accepts " + PlacementConstraint.SingleConstraint.class.getName()
+ " as constraint-expression. Rejecting the new added "
+ "constraint-expression.class=" + ac.getClass().getName());
}
PlacementConstraint.SingleConstraint singleConstraint =
(PlacementConstraint.SingleConstraint) ac;
// Make sure it is an anti-affinity request (actually this implementation
// should be able to support both affinity / anti-affinity without much
// effort. Considering potential test effort required. Limit to
// anti-affinity to intra-app and scope is node.
if (!singleConstraint.getScope().equals(PlacementConstraints.NODE)) {
throwExceptionWithMetaInfo(
"Only support scope=" + PlacementConstraints.NODE
+ "now. PlacementConstraint=" + singleConstraint);
}
if (singleConstraint.getMinCardinality() != 0
|| singleConstraint.getMaxCardinality() != 0) {
throwExceptionWithMetaInfo(
"Only support anti-affinity, which is: minCardinality=0, "
+ "maxCardinality=1");
}
Set<PlacementConstraint.TargetExpression> targetExpressionSet =
singleConstraint.getTargetExpressions();
if (targetExpressionSet == null || targetExpressionSet.isEmpty()) {
throwExceptionWithMetaInfo(
"TargetExpression should not be null or empty");
}
// Set node partition
// Node partition
String nodePartition = null;
// Target allocation tags
Set<String> targetAllocationTags = null;
for (PlacementConstraint.TargetExpression targetExpression : targetExpressionSet) {
// Handle node partition
if (targetExpression.getTargetType().equals(
PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE)) {
// For node attribute target, we only support Partition now. And once
// YARN-3409 is merged, we will support node attribute.
if (!targetExpression.getTargetKey().equals(NODE_PARTITION)) {
throwExceptionWithMetaInfo("When TargetType="
+ PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE
+ " only " + NODE_PARTITION + " is accepted as TargetKey.");
}
PlacementConstraint constraint =
newSchedulingRequest.getPlacementConstraint();
if (nodePartition != null) {
// This means we have duplicated node partition entry inside placement
// constraint, which might be set by mistake.
throwExceptionWithMetaInfo(
"Only one node partition targetExpression is allowed");
}
if (constraint != null) {
// We only accept SingleConstraint
PlacementConstraint.AbstractConstraint ac = constraint
.getConstraintExpr();
if (!(ac instanceof PlacementConstraint.SingleConstraint)) {
throwExceptionWithMetaInfo("Only accepts "
+ PlacementConstraint.SingleConstraint.class.getName()
+ " as constraint-expression. Rejecting the new added "
+ "constraint-expression.class=" + ac.getClass().getName());
}
Set<String> values = targetExpression.getTargetValues();
if (values == null || values.isEmpty()) {
nodePartition = RMNodeLabelsManager.NO_LABEL;
continue;
}
PlacementConstraint.SingleConstraint singleConstraint =
(PlacementConstraint.SingleConstraint) ac;
if (values.size() > 1) {
throwExceptionWithMetaInfo("Inside one targetExpression, we only "
+ "support affinity to at most one node partition now");
}
// Make sure it is an anti-affinity request (actually this implementation
// should be able to support both affinity / anti-affinity without much
// effort. Considering potential test effort required. Limit to
// anti-affinity to intra-app and scope is node.
if (!singleConstraint.getScope().equals(PlacementConstraints.NODE)) {
throwExceptionWithMetaInfo(
"Only support scope=" + PlacementConstraints.NODE
+ "now. PlacementConstraint=" + singleConstraint);
}
nodePartition = values.iterator().next();
} else if (targetExpression.getTargetType().equals(
PlacementConstraint.TargetExpression.TargetType.ALLOCATION_TAG)) {
// Handle allocation tags
if (targetAllocationTags != null) {
// This means we have duplicated AllocationTag expressions entries
// inside placement constraint, which might be set by mistake.
throwExceptionWithMetaInfo(
"Only one AllocationTag targetExpression is allowed");
}
if (singleConstraint.getMinCardinality() != 0
|| singleConstraint.getMaxCardinality() != 0) {
throwExceptionWithMetaInfo(
"Only support anti-affinity, which is: minCardinality=0, "
+ "maxCardinality=1");
}
if (targetExpression.getTargetValues() == null || targetExpression
.getTargetValues().isEmpty()) {
throwExceptionWithMetaInfo("Failed to find allocation tags from "
+ "TargetExpressions or couldn't find self-app target.");
}
Set<PlacementConstraint.TargetExpression> targetExpressionSet =
singleConstraint.getTargetExpressions();
if (targetExpressionSet == null || targetExpressionSet.isEmpty()) {
throwExceptionWithMetaInfo(
"TargetExpression should not be null or empty");
}
targetAllocationTags = new HashSet<>(
targetExpression.getTargetValues());
for (PlacementConstraint.TargetExpression targetExpression :
targetExpressionSet) {
// Handle node partition
if (targetExpression.getTargetType().equals(
PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE)) {
// For node attribute target, we only support Partition now. And once
// YARN-3409 is merged, we will support node attribute.
if (!targetExpression.getTargetKey().equals(NODE_PARTITION)) {
throwExceptionWithMetaInfo("When TargetType="
+ PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE
+ " only " + NODE_PARTITION + " is accepted as TargetKey.");
}
if (nodePartition != null) {
// This means we have duplicated node partition entry
// inside placement constraint, which might be set by mistake.
throwExceptionWithMetaInfo(
"Only one node partition targetExpression is allowed");
}
Set<String> values = targetExpression.getTargetValues();
if (values == null || values.isEmpty()) {
nodePartition = RMNodeLabelsManager.NO_LABEL;
continue;
}
if (values.size() > 1) {
throwExceptionWithMetaInfo("Inside one targetExpression, we only "
+ "support affinity to at most one node partition now");
}
nodePartition = values.iterator().next();
} else if (targetExpression.getTargetType().equals(
PlacementConstraint.TargetExpression.TargetType.ALLOCATION_TAG)) {
// Handle allocation tags
if (targetAllocationTags != null) {
// This means we have duplicated AllocationTag expressions entries
// inside placement constraint, which might be set by mistake.
throwExceptionWithMetaInfo(
"Only one AllocationTag targetExpression is allowed");
}
if (targetExpression.getTargetValues() == null ||
targetExpression.getTargetValues().isEmpty()) {
throwExceptionWithMetaInfo("Failed to find allocation tags from "
+ "TargetExpressions or couldn't find self-app target.");
}
targetAllocationTags = new HashSet<>(
targetExpression.getTargetValues());
}
}
if (targetAllocationTags == null) {
// That means we don't have ALLOCATION_TAG specified
throwExceptionWithMetaInfo(
"Couldn't find target expression with type == ALLOCATION_TAG,"
+ " it is required to include one and only one target"
+ " expression with type == ALLOCATION_TAG");
}
}
// If this scheduling request doesn't contain a placement constraint,
// we set allocation tags an empty set.
if (targetAllocationTags == null) {
// That means we don't have ALLOCATION_TAG specified
throwExceptionWithMetaInfo(
"Couldn't find target expression with type == ALLOCATION_TAG, it is "
+ "required to include one and only one target expression with "
+ "type == ALLOCATION_TAG");
targetAllocationTags = ImmutableSet.of();
}
if (nodePartition == null) {

View File

@ -18,8 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TargetApplicationsNamespace;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -39,6 +46,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.*;
public class TestSchedulingRequestContainerAllocation {
private final int GB = 1024;
@ -393,4 +402,79 @@ public class TestSchedulingRequestContainerAllocation {
Assert.assertTrue(caughtException);
rm1.close();
}
@Test
public void testSchedulingRequestWithNullConstraint() throws Exception {
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
new Configuration());
csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
// 4 NMs.
MockNM[] nms = new MockNM[4];
RMNode[] rmNodes = new RMNode[4];
for (int i = 0; i < 4; i++) {
nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
}
// app1 -> c
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
PlacementConstraint constraint = PlacementConstraints
.targetNotIn("node", allocationTag("t1"))
.build();
SchedulingRequest sc = SchedulingRequest
.newInstance(0, Priority.newInstance(1),
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
ImmutableSet.of("t1"),
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
constraint);
AllocateRequest request = AllocateRequest.newBuilder()
.schedulingRequests(ImmutableList.of(sc)).build();
am1.allocate(request);
for (int i = 0; i < 4; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
FiCaSchedulerApp schedApp = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(2, schedApp.getLiveContainers().size());
// Send another request with null placement constraint,
// ensure there is no NPE while handling this request.
sc = SchedulingRequest
.newInstance(1, Priority.newInstance(1),
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
ImmutableSet.of("t2"),
ResourceSizing.newInstance(2, Resource.newInstance(1024, 1)),
null);
AllocateRequest request1 = AllocateRequest.newBuilder()
.schedulingRequests(ImmutableList.of(sc)).build();
am1.allocate(request1);
for (int i = 0; i < 4; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
Assert.assertEquals(4, schedApp.getLiveContainers().size());
rm1.close();
}
}

View File

@ -65,6 +65,18 @@ Note that YARN service framework assigns `COMPONENT_INSTANCE_NAME` for each cont
assigned `0` since it is the first and only instance for the `hbasemaster` component. In case of `regionserver` component, it can have multiple containers
and so be named as such: `regionserver-0`, `regionserver-1`, `regionserver-2` ... etc
Each YARN service component also has Multi-A Records for container fault tolerance or load balancing via RegistryDNS. The naming format is defined as:
```
${COMPONENT_NAME}.${SERVICE_NAME}.${USER}.${DOMAIN}
```
For example, a component named www for application app launched by Chuck with 3 containers will have DNS records that look like:
```
www.app.chuck.example.com IN A 123.123.123.1
www.app.chuck.example.com IN A 123.123.123.1
www.app.chuck.example.com IN A 123.123.123.1
```
`Disclaimer`: The DNS implementation is still experimental. It should not be used as a fully-functional DNS.
@ -140,4 +152,4 @@ You can edit the `/etc/resolv.conf` to make your system use the registry DNS suc
```
nameserver 192.168.154.3
```
Alternatively, if you have a corporate DNS in your organization, you can configure zone forwarding so that the Registry DNS resolves hostnames for the domain used by the cluster.
Alternatively, if you have a corporate DNS in your organization, you can configure zone forwarding so that the Registry DNS resolves hostnames for the domain used by the cluster.

41
pom.xml
View File

@ -375,6 +375,23 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
</goals>
<phase>pre-site</phase>
</execution>
<execution>
<id>enforce-property</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<requireProperty>
<property>hadoop.version</property>
<message>You must set a hadoop.version to be the same as ${project.version}</message>
<regex>${project.version}</regex>
<regexMessage>The hadoop.version property should be set and should be ${project.version}.</regexMessage>
</requireProperty>
</rules>
<fail>true</fail>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
@ -428,30 +445,6 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
<artifactId>dependency-check-maven</artifactId>
<version>${dependency-check-maven.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<version>${maven-enforcer-plugin.version}</version>
<executions>
<execution>
<id>enforce-property</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<requireProperty>
<property>hadoop.version</property>
<message>You must set a hadoop.version to be the same as ${project.version}</message>
<regex>${project.version}</regex>
<regexMessage>The hadoop.version property should be set and should be ${project.version}.</regexMessage>
</requireProperty>
</rules>
<fail>true</fail>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>