Merging r1549626 through r1549948 from trunk to branch HDFS-2832

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1549949 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-12-10 19:50:02 +00:00
commit f3cef1c673
43 changed files with 1304 additions and 242 deletions

View File

@ -175,6 +175,11 @@ public class LineReader implements Closeable {
} }
} }
protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
throws IOException {
return in.read(buffer);
}
/** /**
* Read a line terminated by one of CR, LF, or CRLF. * Read a line terminated by one of CR, LF, or CRLF.
*/ */
@ -208,7 +213,7 @@ public class LineReader implements Closeable {
if (prevCharCR) { if (prevCharCR) {
++bytesConsumed; //account for CR from previous read ++bytesConsumed; //account for CR from previous read
} }
bufferLength = in.read(buffer); bufferLength = fillBuffer(in, buffer, prevCharCR);
if (bufferLength <= 0) { if (bufferLength <= 0) {
break; // EOF break; // EOF
} }
@ -296,7 +301,7 @@ public class LineReader implements Closeable {
int startPosn = bufferPosn; // Start from previous end position int startPosn = bufferPosn; // Start from previous end position
if (bufferPosn >= bufferLength) { if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0; startPosn = bufferPosn = 0;
bufferLength = in.read(buffer); bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
if (bufferLength <= 0) { if (bufferLength <= 0) {
str.append(recordDelimiterBytes, 0, ambiguousByteCount); str.append(recordDelimiterBytes, 0, ambiguousByteCount);
break; // EOF break; // EOF

View File

@ -239,6 +239,9 @@ Trunk (Unreleased)
HDFS-5554. Flatten INodeFile hierarchy: Replace INodeFileWithSnapshot with HDFS-5554. Flatten INodeFile hierarchy: Replace INodeFileWithSnapshot with
FileWithSnapshotFeature. (jing9 via szetszwo) FileWithSnapshotFeature. (jing9 via szetszwo)
HDFS-5629. Support HTTPS in JournalNode and SecondaryNameNode.
(Haohui Mai via jing9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
@ -584,9 +587,6 @@ Release 2.4.0 - UNRELEASED
HDFS-5633. Improve OfflineImageViewer to use less memory. (jing9) HDFS-5633. Improve OfflineImageViewer to use less memory. (jing9)
HDFS-4983. Numeric usernames do not work with WebHDFS FS. (Yongjun Zhang via
jing9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
@ -690,6 +690,9 @@ Release 2.3.0 - UNRELEASED
HDFS-5568. Support includeSnapshots option with Fsck command. (Vinayakumar B via umamahesh) HDFS-5568. Support includeSnapshots option with Fsck command. (Vinayakumar B via umamahesh)
HDFS-4983. Numeric usernames do not work with WebHDFS FS. (Yongjun Zhang via
jing9)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -130,6 +130,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT = 0; public static final int DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT = 0;
public static final String DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY = "dfs.namenode.secondary.http-address"; public static final String DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY = "dfs.namenode.secondary.http-address";
public static final String DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50090"; public static final String DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50090";
public static final String DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_KEY = "dfs.namenode.secondary.https-address";
public static final String DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:50091";
public static final String DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY = "dfs.namenode.checkpoint.check.period"; public static final String DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY = "dfs.namenode.checkpoint.check.period";
public static final long DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT = 60; public static final long DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT = 60;
public static final String DFS_NAMENODE_CHECKPOINT_PERIOD_KEY = "dfs.namenode.checkpoint.period"; public static final String DFS_NAMENODE_CHECKPOINT_PERIOD_KEY = "dfs.namenode.checkpoint.period";
@ -504,6 +506,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_JOURNALNODE_HTTP_ADDRESS_KEY = "dfs.journalnode.http-address"; public static final String DFS_JOURNALNODE_HTTP_ADDRESS_KEY = "dfs.journalnode.http-address";
public static final int DFS_JOURNALNODE_HTTP_PORT_DEFAULT = 8480; public static final int DFS_JOURNALNODE_HTTP_PORT_DEFAULT = 8480;
public static final String DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_HTTP_PORT_DEFAULT; public static final String DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_HTTP_PORT_DEFAULT;
public static final String DFS_JOURNALNODE_HTTPS_ADDRESS_KEY = "dfs.journalnode.https-address";
public static final int DFS_JOURNALNODE_HTTPS_PORT_DEFAULT = 8481;
public static final String DFS_JOURNALNODE_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_HTTPS_PORT_DEFAULT;
public static final String DFS_JOURNALNODE_KEYTAB_FILE_KEY = "dfs.journalnode.keytab.file"; public static final String DFS_JOURNALNODE_KEYTAB_FILE_KEY = "dfs.journalnode.keytab.file";
public static final String DFS_JOURNALNODE_USER_NAME_KEY = "dfs.journalnode.kerberos.principal"; public static final String DFS_JOURNALNODE_USER_NAME_KEY = "dfs.journalnode.kerberos.principal";

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
@ -89,6 +90,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
@ -1590,4 +1592,67 @@ public class DFSUtil {
} }
return ttl*1000; return ttl*1000;
} }
/**
* Load HTTPS-related configuration.
*/
public static Configuration loadSslConfiguration(Configuration conf) {
Configuration sslConf = new Configuration(false);
sslConf.addResource(conf.get(
DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
boolean requireClientAuth = conf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
sslConf.setBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY, requireClientAuth);
return sslConf;
}
/**
* Return a HttpServer.Builder that the journalnode / namenode / secondary
* namenode can use to initialize their HTTP / HTTPS server.
*
*/
public static HttpServer.Builder httpServerTemplateForNNAndJN(
Configuration conf, final InetSocketAddress httpAddr,
final InetSocketAddress httpsAddr, String name, String spnegoUserNameKey,
String spnegoKeytabFileKey) throws IOException {
HttpConfig.Policy policy = getHttpPolicy(conf);
HttpServer.Builder builder = new HttpServer.Builder().setName(name)
.setConf(conf).setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
.setSecurityEnabled(UserGroupInformation.isSecurityEnabled())
.setUsernameConfKey(spnegoUserNameKey)
.setKeytabConfKey(getSpnegoKeytabKey(conf, spnegoKeytabFileKey));
// initialize the webserver for uploading/downloading files.
LOG.info("Starting web server as: "
+ SecurityUtil.getServerPrincipal(conf.get(spnegoUserNameKey),
httpAddr.getHostName()));
if (policy.isHttpEnabled()) {
if (httpAddr.getPort() == 0) {
builder.setFindPort(true);
}
URI uri = URI.create("http://" + NetUtils.getHostPortString(httpAddr));
builder.addEndpoint(uri);
LOG.info("Starting Web-server for " + name + " at: " + uri);
}
if (policy.isHttpsEnabled() && httpsAddr != null) {
Configuration sslConf = loadSslConfiguration(conf);
loadSslConfToHttpServerBuilder(builder, sslConf);
if (httpsAddr.getPort() == 0) {
builder.setFindPort(true);
}
URI uri = URI.create("https://" + NetUtils.getHostPortString(httpsAddr));
builder.addEndpoint(uri);
LOG.info("Starting Web-server for " + name + " at: " + uri);
}
return builder;
}
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.qjournal.client;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL; import java.net.URL;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -84,7 +85,8 @@ public class IPCLoggerChannel implements AsyncLogger {
private final String journalId; private final String journalId;
private final NamespaceInfo nsInfo; private final NamespaceInfo nsInfo;
private int httpPort = -1;
private URL httpServerURL;
private final IPCLoggerChannelMetrics metrics; private final IPCLoggerChannelMetrics metrics;
@ -241,13 +243,12 @@ public class IPCLoggerChannel implements AsyncLogger {
public URL buildURLToFetchLogs(long segmentTxId) { public URL buildURLToFetchLogs(long segmentTxId) {
Preconditions.checkArgument(segmentTxId > 0, Preconditions.checkArgument(segmentTxId > 0,
"Invalid segment: %s", segmentTxId); "Invalid segment: %s", segmentTxId);
Preconditions.checkState(httpPort != -1, Preconditions.checkState(hasHttpServerEndPoint(), "No HTTP/HTTPS endpoint");
"HTTP port not set yet");
try { try {
String path = GetJournalEditServlet.buildPath( String path = GetJournalEditServlet.buildPath(
journalId, segmentTxId, nsInfo); journalId, segmentTxId, nsInfo);
return new URL("http", addr.getHostName(), httpPort, path.toString()); return new URL(httpServerURL, path);
} catch (MalformedURLException e) { } catch (MalformedURLException e) {
// should never get here. // should never get here.
throw new RuntimeException(e); throw new RuntimeException(e);
@ -313,7 +314,7 @@ public class IPCLoggerChannel implements AsyncLogger {
public GetJournalStateResponseProto call() throws IOException { public GetJournalStateResponseProto call() throws IOException {
GetJournalStateResponseProto ret = GetJournalStateResponseProto ret =
getProxy().getJournalState(journalId); getProxy().getJournalState(journalId);
httpPort = ret.getHttpPort(); constructHttpServerURI(ret);
return ret; return ret;
} }
}); });
@ -528,7 +529,7 @@ public class IPCLoggerChannel implements AsyncLogger {
journalId, fromTxnId, forReading, inProgressOk); journalId, fromTxnId, forReading, inProgressOk);
// Update the http port, since we need this to build URLs to any of the // Update the http port, since we need this to build URLs to any of the
// returned logs. // returned logs.
httpPort = ret.getHttpPort(); constructHttpServerURI(ret);
return PBHelper.convert(ret.getManifest()); return PBHelper.convert(ret.getManifest());
} }
}); });
@ -540,10 +541,12 @@ public class IPCLoggerChannel implements AsyncLogger {
return executor.submit(new Callable<PrepareRecoveryResponseProto>() { return executor.submit(new Callable<PrepareRecoveryResponseProto>() {
@Override @Override
public PrepareRecoveryResponseProto call() throws IOException { public PrepareRecoveryResponseProto call() throws IOException {
if (httpPort < 0) { if (!hasHttpServerEndPoint()) {
// If the HTTP port hasn't been set yet, force an RPC call so we know // force an RPC call so we know what the HTTP port should be if it
// what the HTTP port should be. // haven't done so.
httpPort = getProxy().getJournalState(journalId).getHttpPort(); GetJournalStateResponseProto ret = getProxy().getJournalState(
journalId);
constructHttpServerURI(ret);
} }
return getProxy().prepareRecovery(createReqInfo(), segmentTxId); return getProxy().prepareRecovery(createReqInfo(), segmentTxId);
} }
@ -594,4 +597,43 @@ public class IPCLoggerChannel implements AsyncLogger {
Math.max(lastCommitNanos - lastAckNanos, 0), Math.max(lastCommitNanos - lastAckNanos, 0),
TimeUnit.NANOSECONDS); TimeUnit.NANOSECONDS);
} }
private void constructHttpServerURI(GetEditLogManifestResponseProto ret) {
if (ret.hasFromURL()) {
URI uri = URI.create(ret.getFromURL());
httpServerURL = getHttpServerURI(uri.getScheme(), uri.getPort());
} else {
httpServerURL = getHttpServerURI("http", ret.getHttpPort());;
}
}
private void constructHttpServerURI(GetJournalStateResponseProto ret) {
if (ret.hasFromURL()) {
URI uri = URI.create(ret.getFromURL());
httpServerURL = getHttpServerURI(uri.getScheme(), uri.getPort());
} else {
httpServerURL = getHttpServerURI("http", ret.getHttpPort());;
}
}
/**
* Construct the http server based on the response.
*
* The fromURL field in the response specifies the endpoint of the http
* server. However, the address might not be accurate since the server can
* bind to multiple interfaces. Here the client plugs in the address specified
* in the configuration and generates the URI.
*/
private URL getHttpServerURI(String scheme, int port) {
try {
return new URL(scheme, addr.getHostName(), port, "");
} catch (MalformedURLException e) {
// Unreachable
throw new RuntimeException(e);
}
}
private boolean hasHttpServerEndPoint() {
return httpServerURL != null;
}
} }

View File

@ -64,7 +64,7 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
private JournalNodeHttpServer httpServer; private JournalNodeHttpServer httpServer;
private Map<String, Journal> journalsById = Maps.newHashMap(); private Map<String, Journal> journalsById = Maps.newHashMap();
private ObjectName journalNodeInfoBeanName; private ObjectName journalNodeInfoBeanName;
private String httpServerURI;
private File localDir; private File localDir;
static { static {
@ -140,6 +140,8 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
httpServer = new JournalNodeHttpServer(conf, this); httpServer = new JournalNodeHttpServer(conf, this);
httpServer.start(); httpServer.start();
httpServerURI = httpServer.getServerURI().toString();
rpcServer = new JournalNodeRpcServer(conf, this); rpcServer = new JournalNodeRpcServer(conf, this);
rpcServer.start(); rpcServer.start();
} }
@ -155,11 +157,14 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
return rpcServer.getAddress(); return rpcServer.getAddress();
} }
@Deprecated
public InetSocketAddress getBoundHttpAddress() { public InetSocketAddress getBoundHttpAddress() {
return httpServer.getAddress(); return httpServer.getAddress();
} }
public String getHttpServerURI() {
return httpServerURI;
}
/** /**
* Stop the daemon with the given status code * Stop the daemon with the given status code

View File

@ -17,19 +17,12 @@
*/ */
package org.apache.hadoop.hdfs.qjournal.server; package org.apache.hadoop.hdfs.qjournal.server;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException;
import javax.servlet.ServletContext; import javax.servlet.ServletContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -37,22 +30,15 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.http.HttpServer; import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
/** /**
* Encapsulates the HTTP server started by the Journal Service. * Encapsulates the HTTP server started by the Journal Service.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class JournalNodeHttpServer { public class JournalNodeHttpServer {
public static final Log LOG = LogFactory.getLog(
JournalNodeHttpServer.class);
public static final String JN_ATTRIBUTE_KEY = "localjournal"; public static final String JN_ATTRIBUTE_KEY = "localjournal";
private HttpServer httpServer; private HttpServer httpServer;
private int infoPort;
private JournalNode localJournalNode; private JournalNode localJournalNode;
private final Configuration conf; private final Configuration conf;
@ -63,40 +49,24 @@ public class JournalNodeHttpServer {
} }
void start() throws IOException { void start() throws IOException {
final InetSocketAddress bindAddr = getAddress(conf); final InetSocketAddress httpAddr = getAddress(conf);
// initialize the webserver for uploading/downloading files. final String httpsAddrString = conf.get(
LOG.info("Starting web server as: "+ SecurityUtil.getServerPrincipal(conf DFSConfigKeys.DFS_JOURNALNODE_HTTPS_ADDRESS_KEY,
.get(DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY), DFSConfigKeys.DFS_JOURNALNODE_HTTPS_ADDRESS_DEFAULT);
bindAddr.getHostName())); InetSocketAddress httpsAddr = NetUtils.createSocketAddr(httpsAddrString);
int tmpInfoPort = bindAddr.getPort(); HttpServer.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf,
URI httpEndpoint; httpAddr, httpsAddr, "journal",
try { DFSConfigKeys.DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY,
httpEndpoint = new URI("http://" + NetUtils.getHostPortString(bindAddr)); DFSConfigKeys.DFS_JOURNALNODE_KEYTAB_FILE_KEY);
} catch (URISyntaxException e) {
throw new IOException(e);
}
httpServer = new HttpServer.Builder().setName("journal") httpServer = builder.build();
.addEndpoint(httpEndpoint)
.setFindPort(tmpInfoPort == 0).setConf(conf).setACL(
new AccessControlList(conf.get(DFS_ADMIN, " ")))
.setSecurityEnabled(UserGroupInformation.isSecurityEnabled())
.setUsernameConfKey(
DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY)
.setKeytabConfKey(DFSUtil.getSpnegoKeytabKey(conf,
DFS_JOURNALNODE_KEYTAB_FILE_KEY)).build();
httpServer.setAttribute(JN_ATTRIBUTE_KEY, localJournalNode); httpServer.setAttribute(JN_ATTRIBUTE_KEY, localJournalNode);
httpServer.setAttribute(JspHelper.CURRENT_CONF, conf); httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
httpServer.addInternalServlet("getJournal", "/getJournal", httpServer.addInternalServlet("getJournal", "/getJournal",
GetJournalEditServlet.class, true); GetJournalEditServlet.class, true);
httpServer.start(); httpServer.start();
// The web-server port can be ephemeral... ensure we have the correct info
infoPort = httpServer.getConnectorAddress(0).getPort();
LOG.info("Journal Web-server up at: " + bindAddr + ":" + infoPort);
} }
void stop() throws IOException { void stop() throws IOException {
@ -112,12 +82,25 @@ public class JournalNodeHttpServer {
/** /**
* Return the actual address bound to by the running server. * Return the actual address bound to by the running server.
*/ */
@Deprecated
public InetSocketAddress getAddress() { public InetSocketAddress getAddress() {
InetSocketAddress addr = httpServer.getConnectorAddress(0); InetSocketAddress addr = httpServer.getConnectorAddress(0);
assert addr.getPort() != 0; assert addr.getPort() != 0;
return addr; return addr;
} }
/**
* Return the URI that locates the HTTP server.
*/
URI getServerURI() {
// getHttpClientScheme() only returns https for HTTPS_ONLY policy. This
// matches the behavior that the first connector is a HTTPS connector only
// for HTTPS_ONLY policy.
InetSocketAddress addr = httpServer.getConnectorAddress(0);
return URI.create(DFSUtil.getHttpClientScheme(conf) + "://"
+ NetUtils.getHostPortString(addr));
}
private static InetSocketAddress getAddress(Configuration conf) { private static InetSocketAddress getAddress(Configuration conf) {
String addr = conf.get(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, String addr = conf.get(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY,
DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT); DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT);

View File

@ -115,6 +115,7 @@ class JournalNodeRpcServer implements QJournalProtocol {
return jn.getOrCreateJournal(journalId).isFormatted(); return jn.getOrCreateJournal(journalId).isFormatted();
} }
@SuppressWarnings("deprecation")
@Override @Override
public GetJournalStateResponseProto getJournalState(String journalId) public GetJournalStateResponseProto getJournalState(String journalId)
throws IOException { throws IOException {
@ -122,6 +123,7 @@ class JournalNodeRpcServer implements QJournalProtocol {
return GetJournalStateResponseProto.newBuilder() return GetJournalStateResponseProto.newBuilder()
.setLastPromisedEpoch(epoch) .setLastPromisedEpoch(epoch)
.setHttpPort(jn.getBoundHttpAddress().getPort()) .setHttpPort(jn.getBoundHttpAddress().getPort())
.setFromURL(jn.getHttpServerURI())
.build(); .build();
} }
@ -173,6 +175,7 @@ class JournalNodeRpcServer implements QJournalProtocol {
.purgeLogsOlderThan(reqInfo, minTxIdToKeep); .purgeLogsOlderThan(reqInfo, minTxIdToKeep);
} }
@SuppressWarnings("deprecation")
@Override @Override
public GetEditLogManifestResponseProto getEditLogManifest(String jid, public GetEditLogManifestResponseProto getEditLogManifest(String jid,
long sinceTxId, boolean forReading, boolean inProgressOk) long sinceTxId, boolean forReading, boolean inProgressOk)
@ -184,6 +187,7 @@ class JournalNodeRpcServer implements QJournalProtocol {
return GetEditLogManifestResponseProto.newBuilder() return GetEditLogManifestResponseProto.newBuilder()
.setManifest(PBHelper.convert(manifest)) .setManifest(PBHelper.convert(manifest))
.setHttpPort(jn.getBoundHttpAddress().getPort()) .setHttpPort(jn.getBoundHttpAddress().getPort())
.setFromURL(jn.getHttpServerURI())
.build(); .build();
} }

View File

@ -383,12 +383,7 @@ public class DataNode extends Configured
InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get( InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + 0)); DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + 0));
Configuration sslConf = new Configuration(false); Configuration sslConf = DFSUtil.loadSslConfiguration(conf);
sslConf.addResource(conf.get(
DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
sslConf.setBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY, conf.getBoolean(
DFS_CLIENT_HTTPS_NEED_AUTH_KEY, DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT));
DFSUtil.loadSslConfToHttpServerBuilder(builder, sslConf); DFSUtil.loadSslConfToHttpServerBuilder(builder, sslConf);
int port = secInfoSocAddr.getPort(); int port = secInfoSocAddr.getPort();

View File

@ -17,13 +17,9 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -45,7 +41,6 @@ import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
/** /**
* Encapsulates the HTTP server started by the NameNode. * Encapsulates the HTTP server started by the NameNode.
@ -102,51 +97,16 @@ public class NameNodeHttpServer {
HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf); HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
final String infoHost = bindAddress.getHostName(); final String infoHost = bindAddress.getHostName();
HttpServer.Builder builder = new HttpServer.Builder() final InetSocketAddress httpAddr = bindAddress;
.setName("hdfs")
.setConf(conf)
.setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
.setSecurityEnabled(UserGroupInformation.isSecurityEnabled())
.setUsernameConfKey(
DFSConfigKeys.DFS_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY)
.setKeytabConfKey(
DFSUtil.getSpnegoKeytabKey(conf,
DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY));
if (policy.isHttpEnabled()) {
int port = bindAddress.getPort();
if (port == 0) {
builder.setFindPort(true);
}
builder.addEndpoint(URI.create("http://" + infoHost + ":" + port));
}
if (policy.isHttpsEnabled()) {
final String httpsAddrString = conf.get( final String httpsAddrString = conf.get(
DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT); DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT);
InetSocketAddress addr = NetUtils.createSocketAddr(httpsAddrString); InetSocketAddress httpsAddr = NetUtils.createSocketAddr(httpsAddrString);
Configuration sslConf = new Configuration(false); HttpServer.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf,
httpAddr, httpsAddr, "hdfs",
sslConf.addResource(conf.get( DFSConfigKeys.DFS_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY,
DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY);
DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
sslConf.addResource(conf.get(
DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
sslConf.setBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY, conf.getBoolean(
DFS_CLIENT_HTTPS_NEED_AUTH_KEY, DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT));
DFSUtil.loadSslConfToHttpServerBuilder(builder, sslConf);
if (addr.getPort() == 0) {
builder.setFindPort(true);
}
builder.addEndpoint(URI.create("https://"
+ NetUtils.getHostPortString(addr)));
}
httpServer = builder.build(); httpServer = builder.build();

View File

@ -17,19 +17,12 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY;
import static org.apache.hadoop.util.ExitUtil.terminate; import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.File; import java.io.File;
import java.io.FilenameFilter; import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI; import java.net.URI;
import java.net.URL; import java.net.URL;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
@ -71,6 +64,7 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StorageP
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.http.HttpServer; import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
@ -79,7 +73,6 @@ import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -121,8 +114,7 @@ public class SecondaryNameNode implements Runnable {
private InetSocketAddress nameNodeAddr; private InetSocketAddress nameNodeAddr;
private volatile boolean shouldRun; private volatile boolean shouldRun;
private HttpServer infoServer; private HttpServer infoServer;
private int infoPort; private URL imageListenURL;
private String infoBindAddress;
private Collection<URI> checkpointDirs; private Collection<URI> checkpointDirs;
private List<URI> checkpointEditsDirs; private List<URI> checkpointEditsDirs;
@ -210,8 +202,8 @@ public class SecondaryNameNode implements Runnable {
public static InetSocketAddress getHttpAddress(Configuration conf) { public static InetSocketAddress getHttpAddress(Configuration conf) {
return NetUtils.createSocketAddr(conf.get( return NetUtils.createSocketAddr(conf.get(
DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_DEFAULT)); DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_DEFAULT));
} }
/** /**
@ -221,16 +213,18 @@ public class SecondaryNameNode implements Runnable {
private void initialize(final Configuration conf, private void initialize(final Configuration conf,
CommandLineOpts commandLineOpts) throws IOException { CommandLineOpts commandLineOpts) throws IOException {
final InetSocketAddress infoSocAddr = getHttpAddress(conf); final InetSocketAddress infoSocAddr = getHttpAddress(conf);
infoBindAddress = infoSocAddr.getHostName(); final String infoBindAddress = infoSocAddr.getHostName();
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
SecurityUtil.login(conf, DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY, SecurityUtil.login(conf,
DFS_SECONDARY_NAMENODE_USER_NAME_KEY, infoBindAddress); DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY,
DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY, infoBindAddress);
} }
// initiate Java VM metrics // initiate Java VM metrics
DefaultMetricsSystem.initialize("SecondaryNameNode"); DefaultMetricsSystem.initialize("SecondaryNameNode");
JvmMetrics.create("SecondaryNameNode", JvmMetrics.create("SecondaryNameNode",
conf.get(DFS_METRICS_SESSION_ID_KEY), DefaultMetricsSystem.instance()); conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY),
DefaultMetricsSystem.instance());
// Create connection to the namenode. // Create connection to the namenode.
shouldRun = true; shouldRun = true;
@ -256,19 +250,19 @@ public class SecondaryNameNode implements Runnable {
// Initialize other scheduling parameters from the configuration // Initialize other scheduling parameters from the configuration
checkpointConf = new CheckpointConf(conf); checkpointConf = new CheckpointConf(conf);
// initialize the webserver for uploading files. final InetSocketAddress httpAddr = infoSocAddr;
int tmpInfoPort = infoSocAddr.getPort();
URI httpEndpoint = URI.create("http://" + NetUtils.getHostPortString(infoSocAddr));
infoServer = new HttpServer.Builder().setName("secondary") final String httpsAddrString = conf.get(
.addEndpoint(httpEndpoint) DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_KEY,
.setFindPort(tmpInfoPort == 0).setConf(conf).setACL( DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_DEFAULT);
new AccessControlList(conf.get(DFS_ADMIN, " "))) InetSocketAddress httpsAddr = NetUtils.createSocketAddr(httpsAddrString);
.setSecurityEnabled(UserGroupInformation.isSecurityEnabled())
.setUsernameConfKey( HttpServer.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf,
DFSConfigKeys.DFS_SECONDARY_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY) httpAddr, httpsAddr, "secondary",
.setKeytabConfKey(DFSUtil.getSpnegoKeytabKey(conf, DFSConfigKeys.DFS_SECONDARY_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY,
DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY)).build(); DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY);
infoServer = builder.build();
infoServer.setAttribute("secondary.name.node", this); infoServer.setAttribute("secondary.name.node", this);
infoServer.setAttribute("name.system.image", checkpointImage); infoServer.setAttribute("name.system.image", checkpointImage);
@ -278,14 +272,25 @@ public class SecondaryNameNode implements Runnable {
infoServer.start(); infoServer.start();
LOG.info("Web server init done"); LOG.info("Web server init done");
imageListenURL = new URL(DFSUtil.getHttpClientScheme(conf) + "://"
+ NetUtils.getHostPortString(infoServer.getConnectorAddress(0)));
// The web-server port can be ephemeral... ensure we have the correct info HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
infoPort = infoServer.getConnectorAddress(0).getPort(); int connIdx = 0;
if (policy.isHttpEnabled()) {
InetSocketAddress httpAddress = infoServer.getConnectorAddress(connIdx++);
conf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
NetUtils.getHostPortString(httpAddress));
}
conf.set(DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, infoBindAddress + ":" + infoPort); if (policy.isHttpsEnabled()) {
LOG.info("Secondary Web-server up at: " + infoBindAddress + ":" + infoPort); InetSocketAddress httpsAddress = infoServer.getConnectorAddress(connIdx);
LOG.info("Checkpoint Period :" + checkpointConf.getPeriod() + " secs " + conf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_KEY,
"(" + checkpointConf.getPeriod() / 60 + " min)"); NetUtils.getHostPortString(httpsAddress));
}
LOG.info("Checkpoint Period :" + checkpointConf.getPeriod() + " secs "
+ "(" + checkpointConf.getPeriod() / 60 + " min)");
LOG.info("Log Size Trigger :" + checkpointConf.getTxnCount() + " txns"); LOG.info("Log Size Trigger :" + checkpointConf.getTxnCount() + " txns");
} }
@ -487,15 +492,7 @@ public class SecondaryNameNode implements Runnable {
* for image transfers * for image transfers
*/ */
private URL getImageListenAddress() { private URL getImageListenAddress() {
StringBuilder sb = new StringBuilder() return imageListenURL;
.append(DFSUtil.getHttpClientScheme(conf)).append("://")
.append(infoBindAddress).append(":").append(infoPort);
try {
return new URL(sb.toString());
} catch (MalformedURLException e) {
// Unreachable
throw new RuntimeException(e);
}
} }
/** /**

View File

@ -142,7 +142,9 @@ message GetJournalStateRequestProto {
message GetJournalStateResponseProto { message GetJournalStateResponseProto {
required uint64 lastPromisedEpoch = 1; required uint64 lastPromisedEpoch = 1;
// Deprecated by fromURL
required uint32 httpPort = 2; required uint32 httpPort = 2;
optional string fromURL = 3;
} }
/** /**
@ -182,7 +184,9 @@ message GetEditLogManifestRequestProto {
message GetEditLogManifestResponseProto { message GetEditLogManifestResponseProto {
required RemoteEditLogManifestProto manifest = 1; required RemoteEditLogManifestProto manifest = 1;
// Deprecated by fromURL
required uint32 httpPort = 2; required uint32 httpPort = 2;
optional string fromURL = 3;
// TODO: we should add nsinfo somewhere // TODO: we should add nsinfo somewhere
// to verify that it matches up with our expectation // to verify that it matches up with our expectation

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode; import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
import org.apache.hadoop.net.NetUtils;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -66,11 +67,21 @@ public class MiniJournalCluster {
} }
} }
private static final class JNInfo {
private JournalNode node;
private InetSocketAddress ipcAddr;
private String httpServerURI;
private JNInfo(JournalNode node) {
this.node = node;
this.ipcAddr = node.getBoundIpcAddress();
this.httpServerURI = node.getHttpServerURI();
}
}
private static final Log LOG = LogFactory.getLog(MiniJournalCluster.class); private static final Log LOG = LogFactory.getLog(MiniJournalCluster.class);
private File baseDir; private File baseDir;
private JournalNode nodes[]; private JNInfo nodes[];
private InetSocketAddress ipcAddrs[];
private InetSocketAddress httpAddrs[];
private MiniJournalCluster(Builder b) throws IOException { private MiniJournalCluster(Builder b) throws IOException {
LOG.info("Starting MiniJournalCluster with " + LOG.info("Starting MiniJournalCluster with " +
@ -82,21 +93,18 @@ public class MiniJournalCluster {
this.baseDir = new File(MiniDFSCluster.getBaseDirectory()); this.baseDir = new File(MiniDFSCluster.getBaseDirectory());
} }
nodes = new JournalNode[b.numJournalNodes]; nodes = new JNInfo[b.numJournalNodes];
ipcAddrs = new InetSocketAddress[b.numJournalNodes];
httpAddrs = new InetSocketAddress[b.numJournalNodes];
for (int i = 0; i < b.numJournalNodes; i++) { for (int i = 0; i < b.numJournalNodes; i++) {
if (b.format) { if (b.format) {
File dir = getStorageDir(i); File dir = getStorageDir(i);
LOG.debug("Fully deleting JN directory " + dir); LOG.debug("Fully deleting JN directory " + dir);
FileUtil.fullyDelete(dir); FileUtil.fullyDelete(dir);
} }
nodes[i] = new JournalNode(); JournalNode jn = new JournalNode();
nodes[i].setConf(createConfForNode(b, i)); jn.setConf(createConfForNode(b, i));
nodes[i].start(); jn.start();
nodes[i] = new JNInfo(jn);
ipcAddrs[i] = nodes[i].getBoundIpcAddress();
httpAddrs[i] = nodes[i].getBoundHttpAddress();
} }
} }
@ -106,8 +114,8 @@ public class MiniJournalCluster {
*/ */
public URI getQuorumJournalURI(String jid) { public URI getQuorumJournalURI(String jid) {
List<String> addrs = Lists.newArrayList(); List<String> addrs = Lists.newArrayList();
for (InetSocketAddress addr : ipcAddrs) { for (JNInfo info : nodes) {
addrs.add("127.0.0.1:" + addr.getPort()); addrs.add("127.0.0.1:" + info.ipcAddr.getPort());
} }
String addrsVal = Joiner.on(";").join(addrs); String addrsVal = Joiner.on(";").join(addrs);
LOG.debug("Setting logger addresses to: " + addrsVal); LOG.debug("Setting logger addresses to: " + addrsVal);
@ -122,8 +130,8 @@ public class MiniJournalCluster {
* Start the JournalNodes in the cluster. * Start the JournalNodes in the cluster.
*/ */
public void start() throws IOException { public void start() throws IOException {
for (JournalNode jn : nodes) { for (JNInfo info : nodes) {
jn.start(); info.node.start();
} }
} }
@ -133,12 +141,12 @@ public class MiniJournalCluster {
*/ */
public void shutdown() throws IOException { public void shutdown() throws IOException {
boolean failed = false; boolean failed = false;
for (JournalNode jn : nodes) { for (JNInfo info : nodes) {
try { try {
jn.stopAndJoin(0); info.node.stopAndJoin(0);
} catch (Exception e) { } catch (Exception e) {
failed = true; failed = true;
LOG.warn("Unable to stop journal node " + jn, e); LOG.warn("Unable to stop journal node " + info.node, e);
} }
} }
if (failed) { if (failed) {
@ -150,8 +158,8 @@ public class MiniJournalCluster {
Configuration conf = new Configuration(b.conf); Configuration conf = new Configuration(b.conf);
File logDir = getStorageDir(idx); File logDir = getStorageDir(idx);
conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, logDir.toString()); conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, logDir.toString());
conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "localhost:0");
conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "localhost:0");
return conf; return conf;
} }
@ -164,23 +172,33 @@ public class MiniJournalCluster {
} }
public JournalNode getJournalNode(int i) { public JournalNode getJournalNode(int i) {
return nodes[i]; return nodes[i].node;
} }
public void restartJournalNode(int i) throws InterruptedException, IOException { public void restartJournalNode(int i) throws InterruptedException, IOException {
Configuration conf = new Configuration(nodes[i].getConf()); JNInfo info = nodes[i];
if (nodes[i].isStarted()) { JournalNode jn = info.node;
nodes[i].stopAndJoin(0); Configuration conf = new Configuration(jn.getConf());
if (jn.isStarted()) {
jn.stopAndJoin(0);
} }
conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "127.0.0.1:" + conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
ipcAddrs[i].getPort()); NetUtils.getHostPortString(info.ipcAddr));
conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "127.0.0.1:" +
httpAddrs[i].getPort());
nodes[i] = new JournalNode(); final String uri = info.httpServerURI;
nodes[i].setConf(conf); if (uri.startsWith("http://")) {
nodes[i].start(); conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY,
uri.substring(("http://".length())));
} else if (info.httpServerURI.startsWith("https://")) {
conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTPS_ADDRESS_KEY,
uri.substring(("https://".length())));
}
JournalNode newJN = new JournalNode();
newJN.setConf(conf);
newJN.start();
info.node = newJN;
} }
public int getQuorumSize() { public int getQuorumSize() {

View File

@ -25,7 +25,6 @@ import static org.junit.Assert.fail;
import java.io.File; import java.io.File;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URL; import java.net.URL;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -163,10 +162,7 @@ public class TestJournalNode {
@Test(timeout=100000) @Test(timeout=100000)
public void testHttpServer() throws Exception { public void testHttpServer() throws Exception {
InetSocketAddress addr = jn.getBoundHttpAddress(); String urlRoot = jn.getHttpServerURI();
assertTrue(addr.getPort() > 0);
String urlRoot = "http://localhost:" + addr.getPort();
// Check default servlets. // Check default servlets.
String pageContents = DFSTestUtil.urlGet(new URL(urlRoot + "/jmx")); String pageContents = DFSTestUtil.urlGet(new URL(urlRoot + "/jmx"));

View File

@ -178,6 +178,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5481. Enable uber jobs to have multiple reducers (Sandy Ryza) MAPREDUCE-5481. Enable uber jobs to have multiple reducers (Sandy Ryza)
MAPREDUCE-5052. Job History UI and web services confusing job start time and
job submit time (Chen He via jeagles)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza) MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
@ -231,6 +234,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5632. TestRMContainerAllocator#testUpdatedNodes fails (jeagles) MAPREDUCE-5632. TestRMContainerAllocator#testUpdatedNodes fails (jeagles)
MAPREDUCE-5656. bzip2 codec can drop records when reading data in splits
(jlowe)
Release 2.3.0 - UNRELEASED Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -520,6 +520,11 @@ public class JobHistoryEventHandler extends AbstractService
mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime()); mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime());
mi.getJobIndexInfo().setQueueName(jobSubmittedEvent.getJobQueueName()); mi.getJobIndexInfo().setQueueName(jobSubmittedEvent.getJobQueueName());
} }
//initialize the launchTime in the JobIndexInfo of MetaInfo
if(event.getHistoryEvent().getEventType() == EventType.JOB_INITED ){
JobInitedEvent jie = (JobInitedEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setJobStartTime(jie.getLaunchTime());
}
// If this is JobFinishedEvent, close the writer and setup the job-index // If this is JobFinishedEvent, close the writer and setup the job-index
if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) { if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) {

View File

@ -155,6 +155,7 @@ public class MockJobs extends MockApps {
public static JobReport newJobReport(JobId id) { public static JobReport newJobReport(JobId id) {
JobReport report = Records.newRecord(JobReport.class); JobReport report = Records.newRecord(JobReport.class);
report.setJobId(id); report.setJobId(id);
report.setSubmitTime(System.currentTimeMillis()-DT);
report report
.setStartTime(System.currentTimeMillis() - (int) (Math.random() * DT)); .setStartTime(System.currentTimeMillis() - (int) (Math.random() * DT));
report.setFinishTime(System.currentTimeMillis() report.setFinishTime(System.currentTimeMillis()

View File

@ -51,6 +51,7 @@ public class FileNameIndexUtils {
private static final int NUM_REDUCES_INDEX = 6; private static final int NUM_REDUCES_INDEX = 6;
private static final int JOB_STATUS_INDEX = 7; private static final int JOB_STATUS_INDEX = 7;
private static final int QUEUE_NAME_INDEX = 8; private static final int QUEUE_NAME_INDEX = 8;
private static final int JOB_START_TIME_INDEX = 9;
/** /**
* Constructs the job history file name from the JobIndexInfo. * Constructs the job history file name from the JobIndexInfo.
@ -64,7 +65,7 @@ public class FileNameIndexUtils {
sb.append(escapeDelimiters(TypeConverter.fromYarn(indexInfo.getJobId()).toString())); sb.append(escapeDelimiters(TypeConverter.fromYarn(indexInfo.getJobId()).toString()));
sb.append(DELIMITER); sb.append(DELIMITER);
//StartTime //SubmitTime
sb.append(indexInfo.getSubmitTime()); sb.append(indexInfo.getSubmitTime());
sb.append(DELIMITER); sb.append(DELIMITER);
@ -94,6 +95,10 @@ public class FileNameIndexUtils {
//QueueName //QueueName
sb.append(indexInfo.getQueueName()); sb.append(indexInfo.getQueueName());
sb.append(DELIMITER);
//JobStartTime
sb.append(indexInfo.getJobStartTime());
sb.append(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION); sb.append(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION);
return encodeJobHistoryFileName(sb.toString()); return encodeJobHistoryFileName(sb.toString());
@ -161,6 +166,14 @@ public class FileNameIndexUtils {
indexInfo.setQueueName( indexInfo.setQueueName(
decodeJobHistoryFileName(jobDetails[QUEUE_NAME_INDEX])); decodeJobHistoryFileName(jobDetails[QUEUE_NAME_INDEX]));
try{
indexInfo.setJobStartTime(
Long.parseLong(decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX])));
} catch (NumberFormatException e){
LOG.warn("Unable to parse launch time from job history file "
+ jhFileName + " : " + e);
}
} catch (IndexOutOfBoundsException e) { } catch (IndexOutOfBoundsException e) {
LOG.warn("Parsing job history file with partial data encoded into name: " LOG.warn("Parsing job history file with partial data encoded into name: "
+ jhFileName); + jhFileName);

View File

@ -34,6 +34,7 @@ public class JobIndexInfo {
private int numMaps; private int numMaps;
private int numReduces; private int numReduces;
private String jobStatus; private String jobStatus;
private long jobStartTime;
public JobIndexInfo() { public JobIndexInfo() {
} }
@ -48,6 +49,7 @@ public class JobIndexInfo {
this.numMaps = numMaps; this.numMaps = numMaps;
this.numReduces = numReduces; this.numReduces = numReduces;
this.jobStatus = jobStatus; this.jobStatus = jobStatus;
this.jobStartTime = -1;
} }
public long getSubmitTime() { public long getSubmitTime() {
@ -104,6 +106,12 @@ public class JobIndexInfo {
public void setJobStatus(String jobStatus) { public void setJobStatus(String jobStatus) {
this.jobStatus = jobStatus; this.jobStatus = jobStatus;
} }
public long getJobStartTime() {
return jobStartTime;
}
public void setJobStartTime(long lTime) {
this.jobStartTime = lTime;
}
@Override @Override
public String toString() { public String toString() {

View File

@ -48,6 +48,7 @@ public class TestFileNameIndexUtils {
+ FileNameIndexUtils.DELIMITER + "%s" + FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s" + FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s" + FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION; + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
private static final String JOB_ID = "job_1317928501754_0001"; private static final String JOB_ID = "job_1317928501754_0001";
@ -67,6 +68,7 @@ public class TestFileNameIndexUtils {
private static final String NUM_REDUCES = "1"; private static final String NUM_REDUCES = "1";
private static final String JOB_STATUS = "SUCCEEDED"; private static final String JOB_STATUS = "SUCCEEDED";
private static final String QUEUE_NAME = "default"; private static final String QUEUE_NAME = "default";
private static final String JOB_START_TIME = "1317928742060";
@Test @Test
public void testEncodingDecodingEquivalence() throws IOException { public void testEncodingDecodingEquivalence() throws IOException {
@ -82,6 +84,7 @@ public class TestFileNameIndexUtils {
info.setNumReduces(Integer.parseInt(NUM_REDUCES)); info.setNumReduces(Integer.parseInt(NUM_REDUCES));
info.setJobStatus(JOB_STATUS); info.setJobStatus(JOB_STATUS);
info.setQueueName(QUEUE_NAME); info.setQueueName(QUEUE_NAME);
info.setJobStartTime(Long.parseLong(JOB_START_TIME));
String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info); String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info);
JobIndexInfo parsedInfo = FileNameIndexUtils.getIndexInfo(jobHistoryFile); JobIndexInfo parsedInfo = FileNameIndexUtils.getIndexInfo(jobHistoryFile);
@ -104,6 +107,8 @@ public class TestFileNameIndexUtils {
info.getJobStatus(), parsedInfo.getJobStatus()); info.getJobStatus(), parsedInfo.getJobStatus());
Assert.assertEquals("Queue name different after encoding and decoding", Assert.assertEquals("Queue name different after encoding and decoding",
info.getQueueName(), parsedInfo.getQueueName()); info.getQueueName(), parsedInfo.getQueueName());
Assert.assertEquals("Job start time different after encoding and decoding",
info.getJobStartTime(), parsedInfo.getJobStartTime());
} }
@Test @Test
@ -120,6 +125,7 @@ public class TestFileNameIndexUtils {
info.setNumReduces(Integer.parseInt(NUM_REDUCES)); info.setNumReduces(Integer.parseInt(NUM_REDUCES));
info.setJobStatus(JOB_STATUS); info.setJobStatus(JOB_STATUS);
info.setQueueName(QUEUE_NAME); info.setQueueName(QUEUE_NAME);
info.setJobStartTime(Long.parseLong(JOB_START_TIME));
String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info); String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info);
Assert.assertTrue("User name not encoded correctly into job history file", Assert.assertTrue("User name not encoded correctly into job history file",
@ -137,7 +143,8 @@ public class TestFileNameIndexUtils {
NUM_MAPS, NUM_MAPS,
NUM_REDUCES, NUM_REDUCES,
JOB_STATUS, JOB_STATUS,
QUEUE_NAME); QUEUE_NAME,
JOB_START_TIME);
JobIndexInfo info = FileNameIndexUtils.getIndexInfo(jobHistoryFile); JobIndexInfo info = FileNameIndexUtils.getIndexInfo(jobHistoryFile);
Assert.assertEquals("User name doesn't match", Assert.assertEquals("User name doesn't match",
@ -158,6 +165,7 @@ public class TestFileNameIndexUtils {
info.setNumReduces(Integer.parseInt(NUM_REDUCES)); info.setNumReduces(Integer.parseInt(NUM_REDUCES));
info.setJobStatus(JOB_STATUS); info.setJobStatus(JOB_STATUS);
info.setQueueName(QUEUE_NAME); info.setQueueName(QUEUE_NAME);
info.setJobStartTime(Long.parseLong(JOB_START_TIME));
String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info); String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info);
Assert.assertTrue("Job name not encoded correctly into job history file", Assert.assertTrue("Job name not encoded correctly into job history file",
@ -175,7 +183,8 @@ public class TestFileNameIndexUtils {
NUM_MAPS, NUM_MAPS,
NUM_REDUCES, NUM_REDUCES,
JOB_STATUS, JOB_STATUS,
QUEUE_NAME); QUEUE_NAME,
JOB_START_TIME );
JobIndexInfo info = FileNameIndexUtils.getIndexInfo(jobHistoryFile); JobIndexInfo info = FileNameIndexUtils.getIndexInfo(jobHistoryFile);
Assert.assertEquals("Job name doesn't match", Assert.assertEquals("Job name doesn't match",

View File

@ -36,6 +36,8 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.SplitCompressionInputStream; import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec; import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader;
import org.apache.hadoop.mapreduce.lib.input.SplitLineReader;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -52,7 +54,7 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
private long start; private long start;
private long pos; private long pos;
private long end; private long end;
private LineReader in; private SplitLineReader in;
private FSDataInputStream fileIn; private FSDataInputStream fileIn;
private final Seekable filePosition; private final Seekable filePosition;
int maxLineLength; int maxLineLength;
@ -111,17 +113,18 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
((SplittableCompressionCodec)codec).createInputStream( ((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end, fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK); SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new LineReader(cIn, job, recordDelimiter); in = new CompressedSplitLineReader(cIn, job, recordDelimiter);
start = cIn.getAdjustedStart(); start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd(); end = cIn.getAdjustedEnd();
filePosition = cIn; // take pos from compressed stream filePosition = cIn; // take pos from compressed stream
} else { } else {
in = new LineReader(codec.createInputStream(fileIn, decompressor), job, recordDelimiter); in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, recordDelimiter);
filePosition = fileIn; filePosition = fileIn;
} }
} else { } else {
fileIn.seek(start); fileIn.seek(start);
in = new LineReader(fileIn, job, recordDelimiter); in = new SplitLineReader(fileIn, job, recordDelimiter);
filePosition = fileIn; filePosition = fileIn;
} }
// If this is not the first split, we always throw away first record // If this is not the first split, we always throw away first record
@ -141,7 +144,7 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
public LineRecordReader(InputStream in, long offset, long endOffset, public LineRecordReader(InputStream in, long offset, long endOffset,
int maxLineLength, byte[] recordDelimiter) { int maxLineLength, byte[] recordDelimiter) {
this.maxLineLength = maxLineLength; this.maxLineLength = maxLineLength;
this.in = new LineReader(in, recordDelimiter); this.in = new SplitLineReader(in, recordDelimiter);
this.start = offset; this.start = offset;
this.pos = offset; this.pos = offset;
this.end = endOffset; this.end = endOffset;
@ -159,7 +162,7 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
throws IOException{ throws IOException{
this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input. this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
this.in = new LineReader(in, job, recordDelimiter); this.in = new SplitLineReader(in, job, recordDelimiter);
this.start = offset; this.start = offset;
this.pos = offset; this.pos = offset;
this.end = endOffset; this.end = endOffset;
@ -200,7 +203,7 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
// We always read one extra line, which lies outside the upper // We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1) // split limit i.e. (end - 1)
while (getFilePosition() <= end) { while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
key.set(pos); key.set(pos);
int newSize = in.readLine(value, maxLineLength, int newSize = in.readLine(value, maxLineLength,

View File

@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.input;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
/**
* Line reader for compressed splits
*
* Reading records from a compressed split is tricky, as the
* LineRecordReader is using the reported compressed input stream
* position directly to determine when a split has ended. In addition the
* compressed input stream is usually faking the actual byte position, often
* updating it only after the first compressed block after the split is
* accessed.
*
* Depending upon where the last compressed block of the split ends relative
* to the record delimiters it can be easy to accidentally drop the last
* record or duplicate the last record between this split and the next.
*
* Split end scenarios:
*
* 1) Last block of split ends in the middle of a record
* Nothing special that needs to be done here, since the compressed input
* stream will report a position after the split end once the record
* is fully read. The consumer of the next split will discard the
* partial record at the start of the split normally, and no data is lost
* or duplicated between the splits.
*
* 2) Last block of split ends in the middle of a delimiter
* The line reader will continue to consume bytes into the next block to
* locate the end of the delimiter. If a custom delimiter is being used
* then the next record must be read by this split or it will be dropped.
* The consumer of the next split will not recognize the partial
* delimiter at the beginning of its split and will discard it along with
* the next record.
*
* However for the default delimiter processing there is a special case
* because CR, LF, and CRLF are all valid record delimiters. If the
* block ends with a CR then the reader must peek at the next byte to see
* if it is an LF and therefore part of the same record delimiter.
* Peeking at the next byte is an access to the next block and triggers
* the stream to report the end of the split. There are two cases based
* on the next byte:
*
* A) The next byte is LF
* The split needs to end after the current record is returned. The
* consumer of the next split will discard the first record, which
* is degenerate since LF is itself a delimiter, and start consuming
* records after that byte. If the current split tries to read
* another record then the record will be duplicated between splits.
*
* B) The next byte is not LF
* The current record will be returned but the stream will report
* the split has ended due to the peek into the next block. If the
* next record is not read then it will be lost, as the consumer of
* the next split will discard it before processing subsequent
* records. Therefore the next record beyond the reported split end
* must be consumed by this split to avoid data loss.
*
* 3) Last block of split ends at the beginning of a delimiter
* This is equivalent to case 1, as the reader will consume bytes into
* the next block and trigger the end of the split. No further records
* should be read as the consumer of the next split will discard the
* (degenerate) record at the beginning of its split.
*
* 4) Last block of split ends at the end of a delimiter
* Nothing special needs to be done here. The reader will not start
* examining the bytes into the next block until the next record is read,
* so the stream will not report the end of the split just yet. Once the
* next record is read then the next block will be accessed and the
* stream will indicate the end of the split. The consumer of the next
* split will correctly discard the first record of its split, and no
* data is lost or duplicated.
*
* If the default delimiter is used and the block ends at a CR then this
* is treated as case 2 since the reader does not yet know without
* looking at subsequent bytes whether the delimiter has ended.
*
* NOTE: It is assumed that compressed input streams *never* return bytes from
* multiple compressed blocks from a single read. Failure to do so will
* violate the buffering performed by this class, as it will access
* bytes into the next block after the split before returning all of the
* records from the previous block.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class CompressedSplitLineReader extends SplitLineReader {
SplitCompressionInputStream scin;
private boolean usingCRLF;
private boolean needAdditionalRecord = false;
private boolean finished = false;
public CompressedSplitLineReader(SplitCompressionInputStream in,
Configuration conf,
byte[] recordDelimiterBytes)
throws IOException {
super(in, conf, recordDelimiterBytes);
scin = in;
usingCRLF = (recordDelimiterBytes == null);
}
@Override
protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
throws IOException {
int bytesRead = in.read(buffer);
// If the split ended in the middle of a record delimiter then we need
// to read one additional record, as the consumer of the next split will
// not recognize the partial delimiter as a record.
// However if using the default delimiter and the next character is a
// linefeed then next split will treat it as a delimiter all by itself
// and the additional record read should not be performed.
if (inDelimiter && bytesRead > 0) {
if (usingCRLF) {
needAdditionalRecord = (buffer[0] != '\n');
} else {
needAdditionalRecord = true;
}
}
return bytesRead;
}
@Override
public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
int bytesRead = 0;
if (!finished) {
// only allow at most one more record to be read after the stream
// reports the split ended
if (scin.getPos() > scin.getAdjustedEnd()) {
finished = true;
}
bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume);
}
return bytesRead;
}
@Override
public boolean needAdditionalRecordAfterSplit() {
return !finished && needAdditionalRecord;
}
}

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -55,7 +54,7 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
private long start; private long start;
private long pos; private long pos;
private long end; private long end;
private LineReader in; private SplitLineReader in;
private FSDataInputStream fileIn; private FSDataInputStream fileIn;
private Seekable filePosition; private Seekable filePosition;
private int maxLineLength; private int maxLineLength;
@ -94,33 +93,19 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
((SplittableCompressionCodec)codec).createInputStream( ((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end, fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK); SplittableCompressionCodec.READ_MODE.BYBLOCK);
if (null == this.recordDelimiterBytes){ in = new CompressedSplitLineReader(cIn, job,
in = new LineReader(cIn, job); this.recordDelimiterBytes);
} else {
in = new LineReader(cIn, job, this.recordDelimiterBytes);
}
start = cIn.getAdjustedStart(); start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd(); end = cIn.getAdjustedEnd();
filePosition = cIn; filePosition = cIn;
} else { } else {
if (null == this.recordDelimiterBytes) { in = new SplitLineReader(codec.createInputStream(fileIn,
in = new LineReader(codec.createInputStream(fileIn, decompressor),
job);
} else {
in = new LineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes); decompressor), job, this.recordDelimiterBytes);
}
filePosition = fileIn; filePosition = fileIn;
} }
} else { } else {
fileIn.seek(start); fileIn.seek(start);
if (null == this.recordDelimiterBytes){ in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);
in = new LineReader(fileIn, job);
} else {
in = new LineReader(fileIn, job, this.recordDelimiterBytes);
}
filePosition = fileIn; filePosition = fileIn;
} }
// If this is not the first split, we always throw away first record // If this is not the first split, we always throw away first record
@ -160,7 +145,7 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
int newSize = 0; int newSize = 0;
// We always read one extra line, which lies outside the upper // We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1) // split limit i.e. (end - 1)
while (getFilePosition() <= end) { while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
newSize = in.readLine(value, maxLineLength, newSize = in.readLine(value, maxLineLength,
Math.max(maxBytesToConsume(pos), maxLineLength)); Math.max(maxBytesToConsume(pos), maxLineLength));
pos += newSize; pos += newSize;

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.input;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class SplitLineReader extends org.apache.hadoop.util.LineReader {
public SplitLineReader(InputStream in, byte[] recordDelimiterBytes) {
super(in, recordDelimiterBytes);
}
public SplitLineReader(InputStream in, Configuration conf,
byte[] recordDelimiterBytes) throws IOException {
super(in, conf, recordDelimiterBytes);
}
public boolean needAdditionalRecordAfterSplit() {
return false;
}
}

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.junit.Test;
public class TestLineRecordReader {
private void testSplitRecords(String testFileName, long firstSplitLength)
throws IOException {
URL testFileUrl = getClass().getClassLoader().getResource(testFileName);
assertNotNull("Cannot find " + testFileName, testFileUrl);
File testFile = new File(testFileUrl.getFile());
long testFileSize = testFile.length();
Path testFilePath = new Path(testFile.getAbsolutePath());
Configuration conf = new Configuration();
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
assertTrue("unexpected test data at " + testFile,
testFileSize > firstSplitLength);
// read the data without splitting to count the records
FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
(String[])null);
LineRecordReader reader = new LineRecordReader(conf, split);
LongWritable key = new LongWritable();
Text value = new Text();
int numRecordsNoSplits = 0;
while (reader.next(key, value)) {
++numRecordsNoSplits;
}
reader.close();
// count the records in the first split
split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null);
reader = new LineRecordReader(conf, split);
int numRecordsFirstSplit = 0;
while (reader.next(key, value)) {
++numRecordsFirstSplit;
}
reader.close();
// count the records in the second split
split = new FileSplit(testFilePath, firstSplitLength,
testFileSize - firstSplitLength, (String[])null);
reader = new LineRecordReader(conf, split);
int numRecordsRemainingSplits = 0;
while (reader.next(key, value)) {
++numRecordsRemainingSplits;
}
reader.close();
assertEquals("Unexpected number of records in bzip2 compressed split",
numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits);
}
@Test
public void testBzip2SplitEndsAtCR() throws IOException {
// the test data contains a carriage-return at the end of the first
// split which ends at compressed offset 136498 and the next
// character is not a linefeed
testSplitRecords("blockEndingInCR.txt.bz2", 136498);
}
@Test
public void testBzip2SplitEndsAtCRThenLF() throws IOException {
// the test data contains a carriage-return at the end of the first
// split which ends at compressed offset 136498 and the next
// character is a linefeed
testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498);
}
}

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.input;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.junit.Test;
public class TestLineRecordReader {
private void testSplitRecords(String testFileName, long firstSplitLength)
throws IOException {
URL testFileUrl = getClass().getClassLoader().getResource(testFileName);
assertNotNull("Cannot find " + testFileName, testFileUrl);
File testFile = new File(testFileUrl.getFile());
long testFileSize = testFile.length();
Path testFilePath = new Path(testFile.getAbsolutePath());
Configuration conf = new Configuration();
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
assertTrue("unexpected test data at " + testFile,
testFileSize > firstSplitLength);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
// read the data without splitting to count the records
FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
(String[])null);
LineRecordReader reader = new LineRecordReader();
reader.initialize(split, context);
int numRecordsNoSplits = 0;
while (reader.nextKeyValue()) {
++numRecordsNoSplits;
}
reader.close();
// count the records in the first split
split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null);
reader = new LineRecordReader();
reader.initialize(split, context);
int numRecordsFirstSplit = 0;
while (reader.nextKeyValue()) {
++numRecordsFirstSplit;
}
reader.close();
// count the records in the second split
split = new FileSplit(testFilePath, firstSplitLength,
testFileSize - firstSplitLength, (String[])null);
reader = new LineRecordReader();
reader.initialize(split, context);
int numRecordsRemainingSplits = 0;
while (reader.nextKeyValue()) {
++numRecordsRemainingSplits;
}
reader.close();
assertEquals("Unexpected number of records in bzip2 compressed split",
numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits);
}
@Test
public void testBzip2SplitEndsAtCR() throws IOException {
// the test data contains a carriage-return at the end of the first
// split which ends at compressed offset 136498 and the next
// character is not a linefeed
testSplitRecords("blockEndingInCR.txt.bz2", 136498);
}
@Test
public void testBzip2SplitEndsAtCRThenLF() throws IOException {
// the test data contains a carriage-return at the end of the first
// split which ends at compressed offset 136498 and the next
// character is a linefeed
testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498);
}
}

View File

@ -53,7 +53,8 @@ public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
this.jobIndexInfo = jobIndexInfo; this.jobIndexInfo = jobIndexInfo;
this.jobId = jobId; this.jobId = jobId;
jobReport = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobReport.class); jobReport = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobReport.class);
jobReport.setStartTime(jobIndexInfo.getSubmitTime()); jobReport.setSubmitTime(jobIndexInfo.getSubmitTime());
jobReport.setStartTime(jobIndexInfo.getJobStartTime());
jobReport.setFinishTime(jobIndexInfo.getFinishTime()); jobReport.setFinishTime(jobIndexInfo.getFinishTime());
jobReport.setJobState(getState()); jobReport.setJobState(getState());
} }

View File

@ -84,6 +84,7 @@ public class HsJobBlock extends HtmlBlock {
_("Queue:", job.getQueueName()). _("Queue:", job.getQueueName()).
_("State:", job.getState()). _("State:", job.getState()).
_("Uberized:", job.isUber()). _("Uberized:", job.isUber()).
_("Submitted:", new Date(job.getSubmitTime())).
_("Started:", new Date(job.getStartTime())). _("Started:", new Date(job.getStartTime())).
_("Finished:", new Date(job.getFinishTime())). _("Finished:", new Date(job.getFinishTime())).
_("Elapsed:", StringUtils.formatTime( _("Elapsed:", StringUtils.formatTime(

View File

@ -55,6 +55,7 @@ public class HsJobsBlock extends HtmlBlock {
table("#jobs"). table("#jobs").
thead(). thead().
tr(). tr().
th("Submit Time").
th("Start Time"). th("Start Time").
th("Finish Time"). th("Finish Time").
th(".id", "Job ID"). th(".id", "Job ID").
@ -74,6 +75,7 @@ public class HsJobsBlock extends HtmlBlock {
for (Job j : appContext.getAllJobs().values()) { for (Job j : appContext.getAllJobs().values()) {
JobInfo job = new JobInfo(j); JobInfo job = new JobInfo(j);
jobsTableData.append("[\"") jobsTableData.append("[\"")
.append(dateFormat.format(new Date(job.getSubmitTime()))).append("\",\"")
.append(dateFormat.format(new Date(job.getStartTime()))).append("\",\"") .append(dateFormat.format(new Date(job.getStartTime()))).append("\",\"")
.append(dateFormat.format(new Date(job.getFinishTime()))).append("\",\"") .append(dateFormat.format(new Date(job.getFinishTime()))).append("\",\"")
.append("<a href='").append(url("job", job.getId())).append("'>") .append("<a href='").append(url("job", job.getId())).append("'>")
@ -101,6 +103,7 @@ public class HsJobsBlock extends HtmlBlock {
tbody._(). tbody._().
tfoot(). tfoot().
tr(). tr().
th().input("search_init").$type(InputType.text).$name("submit_time").$value("Submit Time")._()._().
th().input("search_init").$type(InputType.text).$name("start_time").$value("Start Time")._()._(). th().input("search_init").$type(InputType.text).$name("start_time").$value("Start Time")._()._().
th().input("search_init").$type(InputType.text).$name("finish_time").$value("Finish Time")._()._(). th().input("search_init").$type(InputType.text).$name("finish_time").$value("Finish Time")._()._().
th().input("search_init").$type(InputType.text).$name("start_time").$value("Job ID")._()._(). th().input("search_init").$type(InputType.text).$name("start_time").$value("Job ID")._()._().

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.security.authorize.AccessControlList;
@XmlAccessorType(XmlAccessType.FIELD) @XmlAccessorType(XmlAccessType.FIELD)
public class JobInfo { public class JobInfo {
protected long submitTime;
protected long startTime; protected long startTime;
protected long finishTime; protected long finishTime;
protected String id; protected String id;
@ -85,6 +86,7 @@ public class JobInfo {
this.mapsCompleted = job.getCompletedMaps(); this.mapsCompleted = job.getCompletedMaps();
this.reducesTotal = job.getTotalReduces(); this.reducesTotal = job.getTotalReduces();
this.reducesCompleted = job.getCompletedReduces(); this.reducesCompleted = job.getCompletedReduces();
this.submitTime = report.getSubmitTime();
this.startTime = report.getStartTime(); this.startTime = report.getStartTime();
this.finishTime = report.getFinishTime(); this.finishTime = report.getFinishTime();
this.name = job.getName().toString(); this.name = job.getName().toString();
@ -216,6 +218,10 @@ public class JobInfo {
return this.id; return this.id;
} }
public long getSubmitTime() {
return this.submitTime;
}
public long getStartTime() { public long getStartTime() {
return this.startTime; return this.startTime;
} }

View File

@ -86,6 +86,7 @@ public class MockHistoryJobs extends MockJobs {
report.getFinishTime(), mockJob.getUserName(), mockJob.getName(), id, report.getFinishTime(), mockJob.getUserName(), mockJob.getName(), id,
mockJob.getCompletedMaps(), mockJob.getCompletedReduces(), mockJob.getCompletedMaps(), mockJob.getCompletedReduces(),
String.valueOf(mockJob.getState())); String.valueOf(mockJob.getState()));
info.setJobStartTime(report.getStartTime());
info.setQueueName(mockJob.getQueueName()); info.setQueueName(mockJob.getQueueName());
ret.partial.put(id, new PartialJob(info, id)); ret.partial.put(id, new PartialJob(info, id));

View File

@ -33,7 +33,7 @@ import org.codehaus.jettison.json.JSONObject;
public class VerifyJobsUtils { public class VerifyJobsUtils {
public static void verifyHsJobPartial(JSONObject info, Job job) throws JSONException { public static void verifyHsJobPartial(JSONObject info, Job job) throws JSONException {
assertEquals("incorrect number of elements", 11, info.length()); assertEquals("incorrect number of elements", 12, info.length());
// everyone access fields // everyone access fields
verifyHsJobGeneric(job, info.getString("id"), info.getString("user"), verifyHsJobGeneric(job, info.getString("id"), info.getString("user"),
@ -45,7 +45,7 @@ public class VerifyJobsUtils {
} }
public static void verifyHsJob(JSONObject info, Job job) throws JSONException { public static void verifyHsJob(JSONObject info, Job job) throws JSONException {
assertEquals("incorrect number of elements", 24, info.length()); assertEquals("incorrect number of elements", 25, info.length());
// everyone access fields // everyone access fields
verifyHsJobGeneric(job, info.getString("id"), info.getString("user"), verifyHsJobGeneric(job, info.getString("id"), info.getString("user"),

View File

@ -46,6 +46,9 @@ Release 2.4.0 - UNRELEASED
YARN-1447. Common PB type definitions for container resizing. (Wangda Tan YARN-1447. Common PB type definitions for container resizing. (Wangda Tan
via Sandy Ryza) via Sandy Ryza)
YARN-1448. AM-RM protocol changes to support container resizing (Wangda Tan
via Sandy Ryza)
IMPROVEMENTS IMPROVEMENTS
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu) YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -60,12 +61,24 @@ public abstract class AllocateRequest {
List<ResourceRequest> resourceAsk, List<ResourceRequest> resourceAsk,
List<ContainerId> containersToBeReleased, List<ContainerId> containersToBeReleased,
ResourceBlacklistRequest resourceBlacklistRequest) { ResourceBlacklistRequest resourceBlacklistRequest) {
return newInstance(responseID, appProgress, resourceAsk,
containersToBeReleased, resourceBlacklistRequest, null);
}
@Public
@Stable
public static AllocateRequest newInstance(int responseID, float appProgress,
List<ResourceRequest> resourceAsk,
List<ContainerId> containersToBeReleased,
ResourceBlacklistRequest resourceBlacklistRequest,
List<ContainerResourceIncreaseRequest> increaseRequests) {
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(responseID); allocateRequest.setResponseId(responseID);
allocateRequest.setProgress(appProgress); allocateRequest.setProgress(appProgress);
allocateRequest.setAskList(resourceAsk); allocateRequest.setAskList(resourceAsk);
allocateRequest.setReleaseList(containersToBeReleased); allocateRequest.setReleaseList(containersToBeReleased);
allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest); allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest);
allocateRequest.setIncreaseRequests(increaseRequests);
return allocateRequest; return allocateRequest;
} }
@ -170,4 +183,22 @@ public abstract class AllocateRequest {
@Stable @Stable
public abstract void setResourceBlacklistRequest( public abstract void setResourceBlacklistRequest(
ResourceBlacklistRequest resourceBlacklistRequest); ResourceBlacklistRequest resourceBlacklistRequest);
/**
* Get the <code>ContainerResourceIncreaseRequest</code> being sent by the
* <code>ApplicationMaster</code>
*/
@Public
@Stable
public abstract List<ContainerResourceIncreaseRequest> getIncreaseRequests();
/**
* Set the <code>ContainerResourceIncreaseRequest</code> to inform the
* <code>ResourceManager</code> about some container's resources need to be
* increased
*/
@Public
@Stable
public abstract void setIncreaseRequests(
List<ContainerResourceIncreaseRequest> increaseRequests);
} }

View File

@ -28,6 +28,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
@ -83,6 +85,23 @@ public abstract class AllocateResponse {
return response; return response;
} }
@Public
@Stable
public static AllocateResponse newInstance(int responseId,
List<ContainerStatus> completedContainers,
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt, List<NMToken> nmTokens,
List<ContainerResourceIncrease> increasedContainers,
List<ContainerResourceDecrease> decreasedContainers) {
AllocateResponse response = newInstance(responseId, completedContainers,
allocatedContainers, updatedNodes, availResources, command,
numClusterNodes, preempt, nmTokens);
response.setIncreasedContainers(increasedContainers);
response.setDecreasedContainers(decreasedContainers);
return response;
}
/** /**
* If the <code>ResourceManager</code> needs the * If the <code>ResourceManager</code> needs the
* <code>ApplicationMaster</code> to take some action then it will send an * <code>ApplicationMaster</code> to take some action then it will send an
@ -221,4 +240,34 @@ public abstract class AllocateResponse {
@Private @Private
@Unstable @Unstable
public abstract void setNMTokens(List<NMToken> nmTokens); public abstract void setNMTokens(List<NMToken> nmTokens);
/**
* Get the list of newly increased containers by <code>ResourceManager</code>
*/
@Public
@Stable
public abstract List<ContainerResourceIncrease> getIncreasedContainers();
/**
* Set the list of newly increased containers by <code>ResourceManager</code>
*/
@Private
@Unstable
public abstract void setIncreasedContainers(
List<ContainerResourceIncrease> increasedContainers);
/**
* Get the list of newly decreased containers by <code>NodeManager</code>
*/
@Public
@Stable
public abstract List<ContainerResourceDecrease> getDecreasedContainers();
/**
* Set the list of newly decreased containers by <code>NodeManager</code>
*/
@Private
@Unstable
public abstract void setDecreasedContainers(
List<ContainerResourceDecrease> decreasedContainers);
} }

View File

@ -62,6 +62,7 @@ message AllocateRequestProto {
optional ResourceBlacklistRequestProto blacklist_request = 3; optional ResourceBlacklistRequestProto blacklist_request = 3;
optional int32 response_id = 4; optional int32 response_id = 4;
optional float progress = 5; optional float progress = 5;
repeated ContainerResourceIncreaseRequestProto increase_request = 6;
} }
message NMTokenProto { message NMTokenProto {
@ -79,6 +80,8 @@ message AllocateResponseProto {
optional int32 num_cluster_nodes = 7; optional int32 num_cluster_nodes = 7;
optional PreemptionMessageProto preempt = 8; optional PreemptionMessageProto preempt = 8;
repeated NMTokenProto nm_tokens = 9; repeated NMTokenProto nm_tokens = 9;
repeated ContainerResourceIncreaseProto increased_containers = 10;
repeated ContainerResourceDecreaseProto decreased_containers = 11;
} }
////////////////////////////////////////////////////// //////////////////////////////////////////////////////

View File

@ -27,12 +27,15 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceIncreaseRequestPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceIncreaseRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
@ -49,9 +52,9 @@ public class AllocateRequestPBImpl extends AllocateRequest {
private List<ResourceRequest> ask = null; private List<ResourceRequest> ask = null;
private List<ContainerId> release = null; private List<ContainerId> release = null;
private List<ContainerResourceIncreaseRequest> increaseRequests = null;
private ResourceBlacklistRequest blacklistRequest = null; private ResourceBlacklistRequest blacklistRequest = null;
public AllocateRequestPBImpl() { public AllocateRequestPBImpl() {
builder = AllocateRequestProto.newBuilder(); builder = AllocateRequestProto.newBuilder();
} }
@ -95,6 +98,9 @@ public class AllocateRequestPBImpl extends AllocateRequest {
if (this.release != null) { if (this.release != null) {
addReleasesToProto(); addReleasesToProto();
} }
if (this.increaseRequests != null) {
addIncreaseRequestsToProto();
}
if (this.blacklistRequest != null) { if (this.blacklistRequest != null) {
builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest)); builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest));
} }
@ -155,6 +161,23 @@ public class AllocateRequestPBImpl extends AllocateRequest {
this.ask.addAll(resourceRequests); this.ask.addAll(resourceRequests);
} }
@Override
public List<ContainerResourceIncreaseRequest> getIncreaseRequests() {
initIncreaseRequests();
return this.increaseRequests;
}
@Override
public void setIncreaseRequests(
List<ContainerResourceIncreaseRequest> increaseRequests) {
if (increaseRequests == null) {
return;
}
initIncreaseRequests();
this.increaseRequests.clear();
this.increaseRequests.addAll(increaseRequests);
}
@Override @Override
public ResourceBlacklistRequest getResourceBlacklistRequest() { public ResourceBlacklistRequest getResourceBlacklistRequest() {
AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; AllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
@ -223,6 +246,57 @@ public class AllocateRequestPBImpl extends AllocateRequest {
}; };
builder.addAllAsk(iterable); builder.addAllAsk(iterable);
} }
private void initIncreaseRequests() {
if (this.increaseRequests != null) {
return;
}
AllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerResourceIncreaseRequestProto> list =
p.getIncreaseRequestList();
this.increaseRequests = new ArrayList<ContainerResourceIncreaseRequest>();
for (ContainerResourceIncreaseRequestProto c : list) {
this.increaseRequests.add(convertFromProtoFormat(c));
}
}
private void addIncreaseRequestsToProto() {
maybeInitBuilder();
builder.clearIncreaseRequest();
if (increaseRequests == null) {
return;
}
Iterable<ContainerResourceIncreaseRequestProto> iterable =
new Iterable<ContainerResourceIncreaseRequestProto>() {
@Override
public Iterator<ContainerResourceIncreaseRequestProto> iterator() {
return new Iterator<ContainerResourceIncreaseRequestProto>() {
Iterator<ContainerResourceIncreaseRequest> iter =
increaseRequests.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ContainerResourceIncreaseRequestProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllIncreaseRequest(iterable);
}
@Override @Override
public List<ContainerId> getReleaseList() { public List<ContainerId> getReleaseList() {
initReleases(); initReleases();
@ -293,6 +367,16 @@ public class AllocateRequestPBImpl extends AllocateRequest {
return ((ResourceRequestPBImpl)t).getProto(); return ((ResourceRequestPBImpl)t).getProto();
} }
private ContainerResourceIncreaseRequestPBImpl convertFromProtoFormat(
ContainerResourceIncreaseRequestProto p) {
return new ContainerResourceIncreaseRequestPBImpl(p);
}
private ContainerResourceIncreaseRequestProto convertToProtoFormat(
ContainerResourceIncreaseRequest t) {
return ((ContainerResourceIncreaseRequestPBImpl) t).getProto();
}
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p); return new ContainerIdPBImpl(p);
} }
@ -308,6 +392,4 @@ public class AllocateRequestPBImpl extends AllocateRequest {
private ResourceBlacklistRequestProto convertToProtoFormat(ResourceBlacklistRequest t) { private ResourceBlacklistRequestProto convertToProtoFormat(ResourceBlacklistRequest t) {
return ((ResourceBlacklistRequestPBImpl)t).getProto(); return ((ResourceBlacklistRequestPBImpl)t).getProto();
} }
} }

View File

@ -28,12 +28,16 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceDecreasePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceIncreasePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
@ -41,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceDecreaseProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceIncreaseProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionMessageProto; import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionMessageProto;
@ -63,6 +69,8 @@ public class AllocateResponsePBImpl extends AllocateResponse {
private List<Container> allocatedContainers = null; private List<Container> allocatedContainers = null;
private List<NMToken> nmTokens = null; private List<NMToken> nmTokens = null;
private List<ContainerStatus> completedContainersStatuses = null; private List<ContainerStatus> completedContainersStatuses = null;
private List<ContainerResourceIncrease> increasedContainers = null;
private List<ContainerResourceDecrease> decreasedContainers = null;
private List<NodeReport> updatedNodes = null; private List<NodeReport> updatedNodes = null;
private PreemptionMessage preempt; private PreemptionMessage preempt;
@ -108,7 +116,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
if (this.allocatedContainers != null) { if (this.allocatedContainers != null) {
builder.clearAllocatedContainers(); builder.clearAllocatedContainers();
Iterable<ContainerProto> iterable = Iterable<ContainerProto> iterable =
getProtoIterable(this.allocatedContainers); getContainerProtoIterable(this.allocatedContainers);
builder.addAllAllocatedContainers(iterable); builder.addAllAllocatedContainers(iterable);
} }
if (nmTokens != null) { if (nmTokens != null) {
@ -134,6 +142,18 @@ public class AllocateResponsePBImpl extends AllocateResponse {
if (this.preempt != null) { if (this.preempt != null) {
builder.setPreempt(convertToProtoFormat(this.preempt)); builder.setPreempt(convertToProtoFormat(this.preempt));
} }
if (this.increasedContainers != null) {
builder.clearIncreasedContainers();
Iterable<ContainerResourceIncreaseProto> iterable =
getIncreaseProtoIterable(this.increasedContainers);
builder.addAllIncreasedContainers(iterable);
}
if (this.decreasedContainers != null) {
builder.clearDecreasedContainers();
Iterable<ContainerResourceDecreaseProto> iterable =
getChangeProtoIterable(this.decreasedContainers);
builder.addAllDecreasedContainers(iterable);
}
} }
private synchronized void mergeLocalToProto() { private synchronized void mergeLocalToProto() {
@ -306,6 +326,63 @@ public class AllocateResponsePBImpl extends AllocateResponse {
this.preempt = preempt; this.preempt = preempt;
} }
@Override
public synchronized List<ContainerResourceIncrease> getIncreasedContainers() {
initLocalIncreasedContainerList();
return increasedContainers;
}
@Override
public synchronized void setIncreasedContainers(
List<ContainerResourceIncrease> increasedContainers) {
if (increasedContainers == null)
return;
initLocalIncreasedContainerList();
this.increasedContainers.addAll(increasedContainers);
}
@Override
public synchronized List<ContainerResourceDecrease> getDecreasedContainers() {
initLocalDecreasedContainerList();
return decreasedContainers;
}
@Override
public synchronized void setDecreasedContainers(
List<ContainerResourceDecrease> decreasedContainers) {
if (decreasedContainers == null) {
return;
}
initLocalDecreasedContainerList();
this.decreasedContainers.addAll(decreasedContainers);
}
private synchronized void initLocalIncreasedContainerList() {
if (this.increasedContainers != null) {
return;
}
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerResourceIncreaseProto> list = p.getIncreasedContainersList();
increasedContainers = new ArrayList<ContainerResourceIncrease>();
for (ContainerResourceIncreaseProto c : list) {
increasedContainers.add(convertFromProtoFormat(c));
}
}
private synchronized void initLocalDecreasedContainerList() {
if (this.decreasedContainers != null) {
return;
}
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerResourceDecreaseProto> list = p.getDecreasedContainersList();
decreasedContainers = new ArrayList<ContainerResourceDecrease>();
for (ContainerResourceDecreaseProto c : list) {
decreasedContainers.add(convertFromProtoFormat(c));
}
}
// Once this is called. updatedNodes will never be null - until a getProto is // Once this is called. updatedNodes will never be null - until a getProto is
// called. // called.
private synchronized void initLocalNewNodeReportList() { private synchronized void initLocalNewNodeReportList() {
@ -348,7 +425,71 @@ public class AllocateResponsePBImpl extends AllocateResponse {
} }
} }
private synchronized Iterable<ContainerProto> getProtoIterable( private synchronized Iterable<ContainerResourceIncreaseProto>
getIncreaseProtoIterable(
final List<ContainerResourceIncrease> newContainersList) {
maybeInitBuilder();
return new Iterable<ContainerResourceIncreaseProto>() {
@Override
public synchronized Iterator<ContainerResourceIncreaseProto> iterator() {
return new Iterator<ContainerResourceIncreaseProto>() {
Iterator<ContainerResourceIncrease> iter = newContainersList
.iterator();
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@Override
public synchronized ContainerResourceIncreaseProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public synchronized void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
private synchronized Iterable<ContainerResourceDecreaseProto>
getChangeProtoIterable(
final List<ContainerResourceDecrease> newContainersList) {
maybeInitBuilder();
return new Iterable<ContainerResourceDecreaseProto>() {
@Override
public synchronized Iterator<ContainerResourceDecreaseProto> iterator() {
return new Iterator<ContainerResourceDecreaseProto>() {
Iterator<ContainerResourceDecrease> iter = newContainersList
.iterator();
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@Override
public synchronized ContainerResourceDecreaseProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public synchronized void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
private synchronized Iterable<ContainerProto> getContainerProtoIterable(
final List<Container> newContainersList) { final List<Container> newContainersList) {
maybeInitBuilder(); maybeInitBuilder();
return new Iterable<ContainerProto>() { return new Iterable<ContainerProto>() {
@ -467,7 +608,6 @@ public class AllocateResponsePBImpl extends AllocateResponse {
} }
}; };
} }
}; };
} }
@ -487,6 +627,26 @@ public class AllocateResponsePBImpl extends AllocateResponse {
} }
} }
private synchronized ContainerResourceIncrease convertFromProtoFormat(
ContainerResourceIncreaseProto p) {
return new ContainerResourceIncreasePBImpl(p);
}
private synchronized ContainerResourceIncreaseProto convertToProtoFormat(
ContainerResourceIncrease t) {
return ((ContainerResourceIncreasePBImpl) t).getProto();
}
private synchronized ContainerResourceDecrease convertFromProtoFormat(
ContainerResourceDecreaseProto p) {
return new ContainerResourceDecreasePBImpl(p);
}
private synchronized ContainerResourceDecreaseProto convertToProtoFormat(
ContainerResourceDecrease t) {
return ((ContainerResourceDecreasePBImpl) t).getProto();
}
private synchronized NodeReportPBImpl convertFromProtoFormat( private synchronized NodeReportPBImpl convertFromProtoFormat(
NodeReportProto p) { NodeReportProto p) {
return new NodeReportPBImpl(p); return new NodeReportPBImpl(p);
@ -501,7 +661,8 @@ public class AllocateResponsePBImpl extends AllocateResponse {
return new ContainerPBImpl(p); return new ContainerPBImpl(p);
} }
private synchronized ContainerProto convertToProtoFormat(Container t) { private synchronized ContainerProto convertToProtoFormat(
Container t) {
return ((ContainerPBImpl)t).getProto(); return ((ContainerPBImpl)t).getProto();
} }

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api;
import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
import org.junit.Test;
public class TestAllocateRequest {
@Test
public void testAllcoateRequestWithIncrease() {
List<ContainerResourceIncreaseRequest> incRequests =
new ArrayList<ContainerResourceIncreaseRequest>();
for (int i = 0; i < 3; i++) {
incRequests.add(ContainerResourceIncreaseRequest.newInstance(null,
Resource.newInstance(0, i)));
}
AllocateRequest r =
AllocateRequest.newInstance(123, 0f, null, null, null, incRequests);
// serde
AllocateRequestProto p = ((AllocateRequestPBImpl) r).getProto();
r = new AllocateRequestPBImpl(p);
// check value
Assert.assertEquals(123, r.getResponseId());
Assert.assertEquals(incRequests.size(), r.getIncreaseRequests().size());
for (int i = 0; i < incRequests.size(); i++) {
Assert.assertEquals(r.getIncreaseRequests().get(i).getCapability()
.getVirtualCores(), incRequests.get(i).getCapability()
.getVirtualCores());
}
}
@Test
public void testAllcoateRequestWithoutIncrease() {
AllocateRequest r =
AllocateRequest.newInstance(123, 0f, null, null, null, null);
// serde
AllocateRequestProto p = ((AllocateRequestPBImpl) r).getProto();
r = new AllocateRequestPBImpl(p);
// check value
Assert.assertEquals(123, r.getResponseId());
Assert.assertEquals(0, r.getIncreaseRequests().size());
}
}

View File

@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api;
import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
import org.junit.Test;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
public class TestAllocateResponse {
@Test
public void testAllocateResponseWithIncDecContainers() {
List<ContainerResourceIncrease> incContainers =
new ArrayList<ContainerResourceIncrease>();
List<ContainerResourceDecrease> decContainers =
new ArrayList<ContainerResourceDecrease>();
for (int i = 0; i < 3; i++) {
incContainers.add(ContainerResourceIncrease.newInstance(null,
Resource.newInstance(1024, i), null));
}
for (int i = 0; i < 5; i++) {
decContainers.add(ContainerResourceDecrease.newInstance(null,
Resource.newInstance(1024, i)));
}
AllocateResponse r =
AllocateResponse.newInstance(3, new ArrayList<ContainerStatus>(),
new ArrayList<Container>(), new ArrayList<NodeReport>(), null,
AMCommand.AM_RESYNC, 3, null, new ArrayList<NMToken>(),
incContainers, decContainers);
// serde
AllocateResponseProto p = ((AllocateResponsePBImpl) r).getProto();
r = new AllocateResponsePBImpl(p);
// check value
Assert
.assertEquals(incContainers.size(), r.getIncreasedContainers().size());
Assert
.assertEquals(decContainers.size(), r.getDecreasedContainers().size());
for (int i = 0; i < incContainers.size(); i++) {
Assert.assertEquals(i, r.getIncreasedContainers().get(i).getCapability()
.getVirtualCores());
}
for (int i = 0; i < decContainers.size(); i++) {
Assert.assertEquals(i, r.getDecreasedContainers().get(i).getCapability()
.getVirtualCores());
}
}
@Test
public void testAllocateResponseWithoutIncDecContainers() {
AllocateResponse r =
AllocateResponse.newInstance(3, new ArrayList<ContainerStatus>(),
new ArrayList<Container>(), new ArrayList<NodeReport>(), null,
AMCommand.AM_RESYNC, 3, null, new ArrayList<NMToken>(), null, null);
// serde
AllocateResponseProto p = ((AllocateResponsePBImpl) r).getProto();
r = new AllocateResponsePBImpl(p);
// check value
Assert.assertEquals(0, r.getIncreasedContainers().size());
Assert.assertEquals(0, r.getDecreasedContainers().size());
}
}

View File

@ -201,6 +201,7 @@ History Server REST API's.
"jobs" : { "jobs" : {
"job" : [ "job" : [
{ {
"submitTime" : 1326381344449,
"state" : "SUCCEEDED", "state" : "SUCCEEDED",
"user" : "user1", "user" : "user1",
"reducesTotal" : 1, "reducesTotal" : 1,
@ -214,6 +215,7 @@ History Server REST API's.
"finishTime" : 1326381356010 "finishTime" : 1326381356010
}, },
{ {
"submitTime" : 1326381446500
"state" : "SUCCEEDED", "state" : "SUCCEEDED",
"user" : "user1", "user" : "user1",
"reducesTotal" : 1, "reducesTotal" : 1,
@ -255,6 +257,7 @@ History Server REST API's.
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<jobs> <jobs>
<job> <job>
<submitTime>1326381344449</submitTime>
<startTime>1326381344489</startTime> <startTime>1326381344489</startTime>
<finishTime>1326381356010</finishTime> <finishTime>1326381356010</finishTime>
<id>job_1326381300833_1_1</id> <id>job_1326381300833_1_1</id>
@ -268,6 +271,7 @@ History Server REST API's.
<reducesCompleted>1</reducesCompleted> <reducesCompleted>1</reducesCompleted>
</job> </job>
<job> <job>
<submitTime>1326381446500</submitTime>
<startTime>1326381446529</startTime> <startTime>1326381446529</startTime>
<finishTime>1326381582106</finishTime> <finishTime>1326381582106</finishTime>
<id>job_1326381300833_2_2</id> <id>job_1326381300833_2_2</id>
@ -322,6 +326,8 @@ History Server REST API's.
*---------------+--------------+-------------------------------+ *---------------+--------------+-------------------------------+
| diagnostics | string | A diagnostic message | | diagnostics | string | A diagnostic message |
*---------------+--------------+-------------------------------+ *---------------+--------------+-------------------------------+
| submitTime | long | The time the job submitted (in ms since epoch)|
*---------------+--------------+-------------------------------+
| startTime | long | The time the job started (in ms since epoch)| | startTime | long | The time the job started (in ms since epoch)|
*---------------+--------------+-------------------------------+ *---------------+--------------+-------------------------------+
| finishTime | long | The time the job finished (in ms since epoch)| | finishTime | long | The time the job finished (in ms since epoch)|
@ -393,6 +399,7 @@ History Server REST API's.
+---+ +---+
{ {
"job" : { "job" : {
"submitTime": 1326381446500,
"avgReduceTime" : 124961, "avgReduceTime" : 124961,
"failedReduceAttempts" : 0, "failedReduceAttempts" : 0,
"state" : "SUCCEEDED", "state" : "SUCCEEDED",
@ -453,6 +460,7 @@ History Server REST API's.
+---+ +---+
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<job> <job>
<submitTime>1326381446500</submitTime>
<startTime>1326381446529</startTime> <startTime>1326381446529</startTime>
<finishTime>1326381582106</finishTime> <finishTime>1326381582106</finishTime>
<id>job_1326381300833_2_2</id> <id>job_1326381300833_2_2</id>
@ -2664,4 +2672,3 @@ History Server REST API's.
</taskAttemptCounterGroup> </taskAttemptCounterGroup>
</jobTaskAttemptCounters> </jobTaskAttemptCounters>
+---+ +---+