YARN-3039. Implemented the app-level timeline aggregator discovery service. Contributed by Junping Du.

This commit is contained in:
Zhijie Shen 2015-03-17 20:23:49 -07:00 committed by Sangjin Lee
parent f0e752c14b
commit 9b56364080
51 changed files with 1860 additions and 52 deletions

View File

@ -127,6 +127,25 @@ public abstract class AllocateResponse {
response.setAMRMToken(amRMToken);
return response;
}
@Public
@Unstable
public static AllocateResponse newInstance(int responseId,
List<ContainerStatus> completedContainers,
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
List<Container> increasedContainers,
List<Container> decreasedContainers,
String aggregatorAddr) {
AllocateResponse response =
newInstance(responseId, completedContainers, allocatedContainers,
updatedNodes, availResources, command, numClusterNodes, preempt,
nmTokens, increasedContainers, decreasedContainers);
response.setAMRMToken(amRMToken);
response.setAggregatorAddr(aggregatorAddr);
return response;
}
/**
* If the <code>ResourceManager</code> needs the
@ -328,4 +347,18 @@ public abstract class AllocateResponse {
@Private
@Unstable
public abstract void setApplicationPriority(Priority priority);
/**
* The address of aggregator that belong to this app
*
* @return The address of aggregator that belong to this attempt
*/
@Public
@Unstable
public abstract String getAggregatorAddr();
@Private
@Unstable
public abstract void setAggregatorAddr(String aggregatorAddr);
}

View File

@ -813,6 +813,11 @@ public class YarnConfiguration extends Configuration {
NM_PREFIX + "container-manager.thread-count";
public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 20;
/** Number of threads container manager uses.*/
public static final String NM_AGGREGATOR_SERVICE_THREAD_COUNT =
NM_PREFIX + "aggregator-service.thread-count";
public static final int DEFAULT_NM_AGGREGATOR_SERVICE_THREAD_COUNT = 5;
/** Number of threads used in cleanup.*/
public static final String NM_DELETE_THREAD_COUNT =
NM_PREFIX + "delete.thread-count";
@ -840,6 +845,13 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_NM_LOCALIZER_ADDRESS = "0.0.0.0:" +
DEFAULT_NM_LOCALIZER_PORT;
/** Address where the aggregator service IPC is.*/
public static final String NM_AGGREGATOR_SERVICE_ADDRESS =
NM_PREFIX + "aggregator-service.address";
public static final int DEFAULT_NM_AGGREGATOR_SERVICE_PORT = 8048;
public static final String DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS =
"0.0.0.0:" + DEFAULT_NM_LOCALIZER_PORT;
/** Interval in between cache cleanups.*/
public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS =
NM_PREFIX + "localizer.cache.cleanup.interval-ms";

View File

@ -89,6 +89,7 @@ message AllocateResponseProto {
repeated ContainerProto decreased_containers = 11;
optional hadoop.common.TokenProto am_rm_token = 12;
optional PriorityProto application_priority = 13;
optional string aggregator_addr = 14;
}
enum SchedulerResourceTypes {

View File

@ -41,6 +41,9 @@ import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
@ -108,6 +111,7 @@ import org.apache.log4j.LogManager;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* An ApplicationMaster for executing shell commands on a set of launched
@ -220,6 +224,13 @@ public class ApplicationMaster {
private String appMasterTrackingUrl = "";
private boolean newTimelineService = false;
// For posting entities in new timeline service in a non-blocking way
// TODO replace with event loop in TimelineClient.
private static ExecutorService threadPool =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
.build());
// App Master configuration
// No. of containers to run shell command on
@ -320,6 +331,19 @@ public class ApplicationMaster {
}
appMaster.run();
result = appMaster.finish();
threadPool.shutdown();
while (!threadPool.isTerminated()) { // wait for all posting thread to finish
try {
if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) {
threadPool.shutdownNow(); // send interrupt to hurry them along
}
} catch (InterruptedException e) {
LOG.warn("Timeline client service stop interrupted!");
break;
}
}
} catch (Throwable t) {
LOG.fatal("Error running ApplicationMaster", t);
LogManager.shutdown();
@ -631,13 +655,15 @@ public class ApplicationMaster {
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf);
amRMClient.start();
containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
startTimelineClient(conf);
// need to bind timelineClient
amRMClient.registerTimelineClient(timelineClient);
if(timelineClient != null) {
if (newTimelineService) {
publishApplicationAttemptEventOnNewTimelineService(timelineClient,
@ -719,7 +745,12 @@ public class ApplicationMaster {
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
// Creating the Timeline Client
timelineClient = TimelineClient.createTimelineClient();
if (newTimelineService) {
timelineClient = TimelineClient.createTimelineClient(
appAttemptID.getApplicationId());
} else {
timelineClient = TimelineClient.createTimelineClient();
}
timelineClient.init(conf);
timelineClient.start();
} else {
@ -809,7 +840,7 @@ public class ApplicationMaster {
if(timelineClient != null) {
timelineClient.stop();
}
return success;
}
@ -1361,6 +1392,18 @@ public class ApplicationMaster {
}
private static void publishContainerStartEventOnNewTimelineService(
final TimelineClient timelineClient, final Container container,
final String domainId, final UserGroupInformation ugi) {
Runnable publishWrapper = new Runnable() {
public void run() {
publishContainerStartEventOnNewTimelineServiceBase(timelineClient,
container, domainId, ugi);
}
};
threadPool.execute(publishWrapper);
}
private static void publishContainerStartEventOnNewTimelineServiceBase(
final TimelineClient timelineClient, Container container, String domainId,
UserGroupInformation ugi) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
@ -1392,10 +1435,22 @@ public class ApplicationMaster {
e instanceof UndeclaredThrowableException ? e.getCause() : e);
}
}
private static void publishContainerEndEventOnNewTimelineService(
final TimelineClient timelineClient, ContainerStatus container,
String domainId, UserGroupInformation ugi) {
final TimelineClient timelineClient, final ContainerStatus container,
final String domainId, final UserGroupInformation ugi) {
Runnable publishWrapper = new Runnable() {
public void run() {
publishContainerEndEventOnNewTimelineServiceBase(timelineClient,
container, domainId, ugi);
}
};
threadPool.execute(publishWrapper);
}
private static void publishContainerEndEventOnNewTimelineServiceBase(
final TimelineClient timelineClient, final ContainerStatus container,
final String domainId, final UserGroupInformation ugi) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(container.getContainerId().toString());
@ -1426,6 +1481,20 @@ public class ApplicationMaster {
}
private static void publishApplicationAttemptEventOnNewTimelineService(
final TimelineClient timelineClient, final String appAttemptId,
final DSEvent appEvent, final String domainId,
final UserGroupInformation ugi) {
Runnable publishWrapper = new Runnable() {
public void run() {
publishApplicationAttemptEventOnNewTimelineServiceBase(timelineClient,
appAttemptId, appEvent, domainId, ugi);
}
};
threadPool.execute(publishWrapper);
}
private static void publishApplicationAttemptEventOnNewTimelineServiceBase(
final TimelineClient timelineClient, String appAttemptId,
DSEvent appEvent, String domainId, UserGroupInformation ugi) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =

View File

@ -51,6 +51,8 @@ import com.google.common.collect.ImmutableList;
public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
AbstractService {
private static final Log LOG = LogFactory.getLog(AMRMClient.class);
private TimelineClient timelineClient;
/**
* Create a new instance of AMRMClient.
@ -459,6 +461,22 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
return nmTokenCache;
}
/**
* Register TimelineClient to AMRMClient.
* @param timelineClient
*/
public void registerTimelineClient(TimelineClient timelineClient) {
this.timelineClient = timelineClient;
}
/**
* Get registered timeline client.
* @return
*/
public TimelineClient getRegisteredTimeineClient() {
return this.timelineClient;
}
/**
* Wait for <code>check</code> to return true for each 1000 ms.
* See also {@link #waitFor(com.google.common.base.Supplier, int)}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import com.google.common.annotations.VisibleForTesting;
@ -292,6 +293,22 @@ extends AbstractService {
* @return Current number of nodes in the cluster
*/
public abstract int getClusterNodeCount();
/**
* Register TimelineClient to AMRMClient.
* @param timelineClient
*/
public void registerTimelineClient(TimelineClient timelineClient) {
client.registerTimelineClient(timelineClient);
}
/**
* Get registered timeline client.
* @return
*/
public TimelineClient getRegisteredTimeineClient() {
return client.getRegisteredTimeineClient();
}
/**
* Update application's blacklist with addition or removal resources.

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -66,6 +67,8 @@ extends AMRMClientAsync<T> {
private volatile boolean keepRunning;
private volatile float progress;
private volatile String aggregatorAddr;
private volatile Throwable savedException;
/**
@ -351,7 +354,17 @@ extends AMRMClientAsync<T> {
if (!allocated.isEmpty()) {
handler.onContainersAllocated(allocated);
}
String aggregatorAddress = response.getAggregatorAddr();
TimelineClient timelineClient = client.getRegisteredTimeineClient();
if (timelineClient != null && aggregatorAddress != null
&& !aggregatorAddress.isEmpty()) {
if (aggregatorAddr == null ||
!aggregatorAddr.equals(aggregatorAddress)) {
aggregatorAddr = aggregatorAddress;
timelineClient.setTimelineServiceAddress(aggregatorAddress);
}
}
progress = handler.getProgress();
} catch (Throwable ex) {
handler.onError(ex);

View File

@ -384,6 +384,23 @@ public class AllocateResponsePBImpl extends AllocateResponse {
}
this.amrmToken = amRMToken;
}
@Override
public String getAggregatorAddr() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
return p.getAggregatorAddr();
}
@Override
public void setAggregatorAddr(String aggregatorAddr) {
maybeInitBuilder();
if (aggregatorAddr == null) {
builder.clearAggregatorAddr();
return;
}
builder.setAggregatorAddr(aggregatorAddr);
}
@Override
public synchronized Priority getApplicationPriority() {

View File

@ -58,7 +58,6 @@ public abstract class TimelineClient extends AbstractService implements
* @return a timeline client
*/
protected ApplicationId contextAppId;
protected String timelineServiceAddress;
@Public
public static TimelineClient createTimelineClient() {
@ -242,7 +241,6 @@ public abstract class TimelineClient extends AbstractService implements
* @param address
* the timeline service address
*/
public void setTimelineServiceAddress(String address) {
timelineServiceAddress = address;
}
public abstract void setTimelineServiceAddress(String address);
}

View File

@ -118,6 +118,15 @@ public class TimelineClientImpl extends TimelineClient {
private float timelineServiceVersion;
private TimelineWriter timelineWriter;
private volatile String timelineServiceAddress;
// Retry parameters for identifying new timeline service
// TODO consider to merge with connection retry
private int maxServiceRetries;
private long serviceRetryInterval;
private boolean timelineServiceV2 = false;
@Private
@VisibleForTesting
TimelineClientConnectionRetry connectionRetry;
@ -264,6 +273,7 @@ public class TimelineClientImpl extends TimelineClient {
public TimelineClientImpl(ApplicationId applicationId) {
super(TimelineClientImpl.class.getName(), applicationId);
this.timelineServiceV2 = true;
}
protected void serviceInit(Configuration conf) throws Exception {
@ -292,22 +302,35 @@ public class TimelineClientImpl extends TimelineClient {
client = new Client(new URLConnectionClientHandler(
new TimelineURLConnectionFactory()), cc);
TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
client.addFilter(retryFilter);
if (YarnConfiguration.useHttps(conf)) {
timelineServiceAddress = conf.get(
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS);
} else {
timelineServiceAddress = conf.get(
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
// TODO need to cleanup filter retry later.
if (!timelineServiceV2) {
client.addFilter(retryFilter);
}
LOG.info("Timeline service address: " + resURI);
timelineServiceVersion =
conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
LOG.info("Timeline service address: " + timelineServiceAddress);
// old version timeline service need to get address from configuration
// while new version need to auto discovery (with retry).
if (timelineServiceV2) {
maxServiceRetries = conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
serviceRetryInterval = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
} else {
if (YarnConfiguration.useHttps(conf)) {
setTimelineServiceAddress(conf.get(
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS));
} else {
setTimelineServiceAddress(conf.get(
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS));
}
timelineServiceVersion =
conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
LOG.info("Timeline service address: " + getTimelineServiceAddress());
}
super.serviceInit(conf);
}
@ -379,8 +402,7 @@ public class TimelineClientImpl extends TimelineClient {
if (async) {
params.add("async", Boolean.TRUE.toString());
}
putObjects(constructResURI(getConfig(), timelineServiceAddress, true),
"entities", params, entitiesContainer);
putObjects("entities", params, entitiesContainer);
}
@Override
@ -388,6 +410,60 @@ public class TimelineClientImpl extends TimelineClient {
YarnException {
timelineWriter.putDomain(domain);
}
// Used for new timeline service only
@Private
public void putObjects(String path, MultivaluedMap<String, String> params,
Object obj) throws IOException, YarnException {
// timelineServiceAddress could haven't be initialized yet
// or stale (only for new timeline service)
int retries = pollTimelineServiceAddress(this.maxServiceRetries);
// timelineServiceAddress could be stale, add retry logic here.
boolean needRetry = true;
while (needRetry) {
try {
URI uri = constructResURI(getConfig(), timelineServiceAddress, true);
putObjects(uri, path, params, obj);
needRetry = false;
}
catch (Exception e) {
// TODO only handle exception for timelineServiceAddress being updated.
// skip retry for other exceptions.
checkRetryWithSleep(retries, e);
retries--;
}
}
}
/**
* Check if reaching to maximum of retries.
* @param retries
* @param e
*/
private void checkRetryWithSleep(int retries, Exception e) throws
YarnException, IOException {
if (retries > 0) {
try {
Thread.sleep(this.serviceRetryInterval);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
} else {
LOG.error(
"TimelineClient has reached to max retry times :" +
this.maxServiceRetries + " for service address: " +
timelineServiceAddress);
if (e instanceof YarnException) {
throw (YarnException)e;
} else if (e instanceof IOException) {
throw (IOException)e;
} else {
throw new YarnException(e);
}
}
}
private void putObjects(
URI base, String path, MultivaluedMap<String, String> params, Object obj)
@ -419,11 +495,21 @@ public class TimelineClientImpl extends TimelineClient {
}
}
@Override
public void setTimelineServiceAddress(String address) {
this.timelineServiceAddress = address;
}
private String getTimelineServiceAddress() {
return this.timelineServiceAddress;
}
@SuppressWarnings("unchecked")
@Override
public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
final String renewer) throws IOException, YarnException {
PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>> getDTAction =
PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>
getDTAction =
new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() {
@Override
@ -432,8 +518,10 @@ public class TimelineClientImpl extends TimelineClient {
DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(authenticator,
connConfigurator);
// TODO we should add retry logic here if timelineServiceAddress is
// not available immediately.
return (Token) authUrl.getDelegationToken(
constructResURI(getConfig(), timelineServiceAddress, false).toURL(),
constructResURI(getConfig(), getTimelineServiceAddress(), false).toURL(),
token, renewer, doAsUser);
}
};
@ -530,6 +618,24 @@ public class TimelineClientImpl extends TimelineClient {
return connectionRetry.retryOn(tokenRetryOp);
}
/**
* Poll TimelineServiceAddress for maximum of retries times if it is null
* @param retries
* @return the left retry times
*/
private int pollTimelineServiceAddress(int retries) {
while (timelineServiceAddress == null && retries > 0) {
try {
Thread.sleep(this.serviceRetryInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
timelineServiceAddress = getTimelineServiceAddress();
retries--;
}
return retries;
}
private class TimelineURLConnectionFactory
implements HttpURLConnectionFactory {

View File

@ -213,7 +213,7 @@ public class WebAppUtils {
return getResolvedAddress(address);
}
private static String getResolvedAddress(InetSocketAddress address) {
public static String getResolvedAddress(InetSocketAddress address) {
address = NetUtils.getConnectAddress(address);
StringBuilder sb = new StringBuilder();
InetAddress resolved = address.getAddress();

View File

@ -965,6 +965,12 @@
<name>yarn.nodemanager.container-manager.thread-count</name>
<value>20</value>
</property>
<property>
<description>Number of threads aggregator service uses.</description>
<name>yarn.nodemanager.aggregator-service.thread-count</name>
<value>5</value>
</property>
<property>
<description>Number of threads used in cleanup.</description>
@ -1040,6 +1046,13 @@
<name>yarn.nodemanager.localizer.address</name>
<value>${yarn.nodemanager.hostname}:8040</value>
</property>
<property>
<description>Address where the aggregator service IPC is.</description>
<name>yarn.nodemanager.aggregator-service.address</name>
<value>${yarn.nodemanager.hostname}:8048</value>
</property>
<property>
<description>Interval in between cache cleanups.</description>

View File

@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
@ -109,7 +110,7 @@ public class TestContainerLaunchRPC {
resource, System.currentTimeMillis() + 10000, 42, 42,
Priority.newInstance(0), 0);
Token containerToken =
TestRPC.newContainerToken(nodeId, "password".getBytes(),
newContainerToken(nodeId, "password".getBytes(),
containerTokenIdentifier);
StartContainerRequest scRequest =
@ -134,6 +135,19 @@ public class TestContainerLaunchRPC {
Assert.fail("timeout exception should have occurred!");
}
public static Token newContainerToken(NodeId nodeId, byte[] password,
ContainerTokenIdentifier tokenIdentifier) {
// RPC layer client expects ip:port as service for tokens
InetSocketAddress addr =
NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
// NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
Token containerToken =
Token.newInstance(tokenIdentifier.getBytes(),
ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
.buildTokenService(addr).toString());
return containerToken;
}
public class DummyContainerManager implements ContainerManagementProtocol {

View File

@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
@ -98,7 +99,7 @@ public class TestContainerResourceIncreaseRPC {
resource, System.currentTimeMillis() + 10000, 42, 42,
Priority.newInstance(0), 0);
Token containerToken =
TestRPC.newContainerToken(nodeId, "password".getBytes(),
newContainerToken(nodeId, "password".getBytes(),
containerTokenIdentifier);
// Construct container resource increase request,
List<Token> increaseTokens = new ArrayList<>();
@ -121,6 +122,19 @@ public class TestContainerResourceIncreaseRPC {
Assert.fail("timeout exception should have occurred!");
}
public static Token newContainerToken(NodeId nodeId, byte[] password,
ContainerTokenIdentifier tokenIdentifier) {
// RPC layer client expects ip:port as service for tokens
InetSocketAddress addr =
NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
// NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
Token containerToken =
Token.newInstance(tokenIdentifier.getBytes(),
ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
.buildTokenService(addr).toString());
return containerToken;
}
public class DummyContainerManager implements ContainerManagementProtocol {
@Override

View File

@ -150,6 +150,7 @@
<include>yarn_server_common_service_protos.proto</include>
<include>ResourceTracker.proto</include>
<include>SCMUploader.proto</include>
<include>aggregatornodemanager_protocol.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
/**
* <p>The protocol between an <code>TimelineAggregatorsCollection</code> and a
* <code>NodeManager</code> to report a new application aggregator get launched.
* </p>
*
*/
@Private
public interface AggregatorNodemanagerProtocol {
/**
*
* <p>
* The <code>TimelineAggregatorsCollection</code> provides a list of mapping
* between application and aggregator's address in
* {@link ReportNewAggregatorsInfoRequest} to a <code>NodeManager</code> to
* <em>register</em> aggregator's info, include: applicationId and REST URI to
* access aggregator. NodeManager will add them into registered aggregators
* and register them into <code>ResourceManager</code> afterwards.
* </p>
*
* @param request the request of registering a new aggregator or a list of aggregators
* @return
* @throws YarnException
* @throws IOException
*/
ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
ReportNewAggregatorsInfoRequest request)
throws YarnException, IOException;
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.yarn.proto.AggregatorNodemanagerProtocol.AggregatorNodemanagerProtocolService;
@Private
@Unstable
@ProtocolInfo(
protocolName = "org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB",
protocolVersion = 1)
public interface AggregatorNodemanagerProtocolPB extends
AggregatorNodemanagerProtocolService.BlockingInterface {
}

View File

@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.impl.pb.client;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto;
import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoResponsePBImpl;
import com.google.protobuf.ServiceException;
public class AggregatorNodemanagerProtocolPBClientImpl implements
AggregatorNodemanagerProtocol, Closeable {
// Not a documented config. Only used for tests internally
static final String NM_COMMAND_TIMEOUT = YarnConfiguration.YARN_PREFIX
+ "rpc.nm-command-timeout";
/**
* Maximum of 1 minute timeout for a Node to react to the command
*/
static final int DEFAULT_COMMAND_TIMEOUT = 60000;
private AggregatorNodemanagerProtocolPB proxy;
@Private
public AggregatorNodemanagerProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, AggregatorNodemanagerProtocolPB.class,
ProtobufRpcEngine.class);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT);
proxy =
(AggregatorNodemanagerProtocolPB) RPC.getProxy(
AggregatorNodemanagerProtocolPB.class,
clientVersion, addr, ugi, conf,
NetUtils.getDefaultSocketFactory(conf), expireIntvl);
}
@Override
public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
ReportNewAggregatorsInfoRequest request) throws YarnException, IOException {
ReportNewAggregatorsInfoRequestProto requestProto =
((ReportNewAggregatorsInfoRequestPBImpl) request).getProto();
try {
return new ReportNewAggregatorsInfoResponsePBImpl(
proxy.reportNewAggregatorInfo(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
@Override
public void close() {
if (this.proxy != null) {
RPC.stopProxy(this.proxy);
}
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.impl.pb.service;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoResponseProto;
import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoResponsePBImpl;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
public class AggregatorNodemanagerProtocolPBServiceImpl implements
AggregatorNodemanagerProtocolPB {
private AggregatorNodemanagerProtocol real;
public AggregatorNodemanagerProtocolPBServiceImpl(AggregatorNodemanagerProtocol impl) {
this.real = impl;
}
@Override
public ReportNewAggregatorsInfoResponseProto reportNewAggregatorInfo(
RpcController arg0, ReportNewAggregatorsInfoRequestProto proto)
throws ServiceException {
ReportNewAggregatorsInfoRequestPBImpl request =
new ReportNewAggregatorsInfoRequestPBImpl(proto);
try {
ReportNewAggregatorsInfoResponse response = real.reportNewAggregatorInfo(request);
return ((ReportNewAggregatorsInfoResponsePBImpl)response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -19,8 +19,10 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@ -41,6 +43,22 @@ public abstract class NodeHeartbeatRequest {
nodeHeartbeatRequest.setNodeLabels(nodeLabels);
return nodeHeartbeatRequest;
}
public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
MasterKey lastKnownContainerTokenMasterKey,
MasterKey lastKnownNMTokenMasterKey, Set<NodeLabel> nodeLabels,
Map<ApplicationId, String> registeredAggregators) {
NodeHeartbeatRequest nodeHeartbeatRequest =
Records.newRecord(NodeHeartbeatRequest.class);
nodeHeartbeatRequest.setNodeStatus(nodeStatus);
nodeHeartbeatRequest
.setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey);
nodeHeartbeatRequest
.setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
nodeHeartbeatRequest.setNodeLabels(nodeLabels);
nodeHeartbeatRequest.setRegisteredAggregators(registeredAggregators);
return nodeHeartbeatRequest;
}
public abstract NodeStatus getNodeStatus();
public abstract void setNodeStatus(NodeStatus status);
@ -59,4 +77,8 @@ public abstract class NodeHeartbeatRequest {
public abstract void setLogAggregationReportsForApps(
List<LogAggregationReport> logAggregationReportsForApps);
// This tells RM registered aggregators' address info on this node
public abstract Map<ApplicationId, String> getRegisteredAggregators();
public abstract void setRegisteredAggregators(Map<ApplicationId, String> appAggregatorsMap);
}

View File

@ -40,6 +40,10 @@ public interface NodeHeartbeatResponse {
List<ContainerId> getContainersToBeRemovedFromNM();
List<ApplicationId> getApplicationsToCleanup();
// This tells NM the aggregators' address info of related Apps
Map<ApplicationId, String> getAppAggregatorsMap();
void setAppAggregatorsMap(Map<ApplicationId, String> appAggregatorsMap);
void setResponseId(int responseId);
void setNodeAction(NodeAction action);

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import java.util.List;
import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
import org.apache.hadoop.yarn.util.Records;
@Private
public abstract class ReportNewAggregatorsInfoRequest {
public static ReportNewAggregatorsInfoRequest newInstance(
List<AppAggregatorsMap> appAggregatorsList) {
ReportNewAggregatorsInfoRequest request =
Records.newRecord(ReportNewAggregatorsInfoRequest.class);
request.setAppAggregatorsList(appAggregatorsList);
return request;
}
public static ReportNewAggregatorsInfoRequest newInstance(
ApplicationId id, String aggregatorAddr) {
ReportNewAggregatorsInfoRequest request =
Records.newRecord(ReportNewAggregatorsInfoRequest.class);
request.setAppAggregatorsList(
Arrays.asList(AppAggregatorsMap.newInstance(id, aggregatorAddr)));
return request;
}
public abstract List<AppAggregatorsMap> getAppAggregatorsList();
public abstract void setAppAggregatorsList(
List<AppAggregatorsMap> appAggregatorsList);
}

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.util.Records;
public abstract class ReportNewAggregatorsInfoResponse {
@Private
public static ReportNewAggregatorsInfoResponse newInstance() {
ReportNewAggregatorsInfoResponse response =
Records.newRecord(ReportNewAggregatorsInfoResponse.class);
return response;
}
}

View File

@ -19,16 +19,22 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
@ -52,6 +58,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private Set<NodeLabel> labels = null;
private List<LogAggregationReport> logAggregationReportsForApps = null;
Map<ApplicationId, String> registeredAggregators = null;
public NodeHeartbeatRequestPBImpl() {
builder = NodeHeartbeatRequestProto.newBuilder();
}
@ -106,6 +114,9 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
if (this.logAggregationReportsForApps != null) {
addLogAggregationStatusForAppsToProto();
}
if (this.registeredAggregators != null) {
addRegisteredAggregatorsToProto();
}
}
private void addLogAggregationStatusForAppsToProto() {
@ -146,6 +157,16 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
LogAggregationReport value) {
return ((LogAggregationReportPBImpl) value).getProto();
}
private void addRegisteredAggregatorsToProto() {
maybeInitBuilder();
builder.clearRegisteredAggregators();
for (Map.Entry<ApplicationId, String> entry : registeredAggregators.entrySet()) {
builder.addRegisteredAggregators(AppAggregatorsMapProto.newBuilder()
.setAppId(convertToProtoFormat(entry.getKey()))
.setAppAggregatorAddr(entry.getValue()));
}
}
private void mergeLocalToProto() {
if (viaProto)
@ -227,6 +248,36 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
builder.clearLastKnownNmTokenMasterKey();
this.lastKnownNMTokenMasterKey = masterKey;
}
@Override
public Map<ApplicationId, String> getRegisteredAggregators() {
if (this.registeredAggregators != null) {
return this.registeredAggregators;
}
initRegisteredAggregators();
return registeredAggregators;
}
private void initRegisteredAggregators() {
NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
List<AppAggregatorsMapProto> list = p.getRegisteredAggregatorsList();
this.registeredAggregators = new HashMap<ApplicationId, String> ();
for (AppAggregatorsMapProto c : list) {
ApplicationId appId = convertFromProtoFormat(c.getAppId());
this.registeredAggregators.put(appId, c.getAppAggregatorAddr());
}
}
@Override
public void setRegisteredAggregators(
Map<ApplicationId, String> registeredAggregators) {
if (registeredAggregators == null || registeredAggregators.isEmpty()) {
return;
}
maybeInitBuilder();
this.registeredAggregators = new HashMap<ApplicationId, String>();
this.registeredAggregators.putAll(registeredAggregators);
}
private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) {
return new NodeStatusPBImpl(p);
@ -235,6 +286,14 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private NodeStatusProto convertToProtoFormat(NodeStatus t) {
return ((NodeStatusPBImpl)t).getProto();
}
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl) t).getProto();
}
private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
return new MasterKeyPBImpl(p);

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueui
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
@ -68,6 +69,7 @@ public class NodeHeartbeatResponsePBImpl extends
private List<ApplicationId> applicationsToCleanup = null;
private Map<ApplicationId, ByteBuffer> systemCredentials = null;
private Resource resource = null;
Map<ApplicationId, String> appAggregatorsMap = null;
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
@ -125,6 +127,9 @@ public class NodeHeartbeatResponsePBImpl extends
if (this.resource != null) {
builder.setResource(convertToProtoFormat(this.resource));
}
if (this.appAggregatorsMap != null) {
addAppAggregatorsMapToProto();
}
}
private void addSystemCredentialsToProto() {
@ -137,6 +142,16 @@ public class NodeHeartbeatResponsePBImpl extends
entry.getValue().duplicate())));
}
}
private void addAppAggregatorsMapToProto() {
maybeInitBuilder();
builder.clearAppAggregatorsMap();
for (Map.Entry<ApplicationId, String> entry : appAggregatorsMap.entrySet()) {
builder.addAppAggregatorsMap(AppAggregatorsMapProto.newBuilder()
.setAppId(convertToProtoFormat(entry.getKey()))
.setAppAggregatorAddr(entry.getValue()));
}
}
private void mergeLocalToProto() {
if (viaProto)
@ -550,6 +565,15 @@ public class NodeHeartbeatResponsePBImpl extends
initSystemCredentials();
return systemCredentials;
}
@Override
public Map<ApplicationId, String> getAppAggregatorsMap() {
if (this.appAggregatorsMap != null) {
return this.appAggregatorsMap;
}
initAppAggregatorsMap();
return appAggregatorsMap;
}
private void initSystemCredentials() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
@ -561,6 +585,16 @@ public class NodeHeartbeatResponsePBImpl extends
this.systemCredentials.put(appId, byteBuffer);
}
}
private void initAppAggregatorsMap() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
List<AppAggregatorsMapProto> list = p.getAppAggregatorsMapList();
this.appAggregatorsMap = new HashMap<ApplicationId, String> ();
for (AppAggregatorsMapProto c : list) {
ApplicationId appId = convertFromProtoFormat(c.getAppId());
this.appAggregatorsMap.put(appId, c.getAppAggregatorAddr());
}
}
@Override
public void setSystemCredentialsForApps(
@ -572,6 +606,17 @@ public class NodeHeartbeatResponsePBImpl extends
this.systemCredentials = new HashMap<ApplicationId, ByteBuffer>();
this.systemCredentials.putAll(systemCredentials);
}
@Override
public void setAppAggregatorsMap(
Map<ApplicationId, String> appAggregatorsMap) {
if (appAggregatorsMap == null || appAggregatorsMap.isEmpty()) {
return;
}
maybeInitBuilder();
this.appAggregatorsMap = new HashMap<ApplicationId, String>();
this.appAggregatorsMap.putAll(appAggregatorsMap);
}
@Override
public long getNextHeartBeatInterval() {

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
import org.apache.hadoop.yarn.server.api.records.impl.pb.AppAggregatorsMapPBImpl;
public class ReportNewAggregatorsInfoRequestPBImpl extends
ReportNewAggregatorsInfoRequest {
ReportNewAggregatorsInfoRequestProto proto =
ReportNewAggregatorsInfoRequestProto.getDefaultInstance();
ReportNewAggregatorsInfoRequestProto.Builder builder = null;
boolean viaProto = false;
private List<AppAggregatorsMap> aggregatorsList = null;
public ReportNewAggregatorsInfoRequestPBImpl() {
builder = ReportNewAggregatorsInfoRequestProto.newBuilder();
}
public ReportNewAggregatorsInfoRequestPBImpl(
ReportNewAggregatorsInfoRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public ReportNewAggregatorsInfoRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void mergeLocalToBuilder() {
if (aggregatorsList != null) {
addLocalAggregatorsToProto();
}
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ReportNewAggregatorsInfoRequestProto.newBuilder(proto);
}
viaProto = false;
}
private void addLocalAggregatorsToProto() {
maybeInitBuilder();
builder.clearAppAggregators();
List<AppAggregatorsMapProto> protoList =
new ArrayList<AppAggregatorsMapProto>();
for (AppAggregatorsMap m : this.aggregatorsList) {
protoList.add(convertToProtoFormat(m));
}
builder.addAllAppAggregators(protoList);
}
private void initLocalAggregatorsList() {
ReportNewAggregatorsInfoRequestProtoOrBuilder p = viaProto ? proto : builder;
List<AppAggregatorsMapProto> aggregatorsList =
p.getAppAggregatorsList();
this.aggregatorsList = new ArrayList<AppAggregatorsMap>();
for (AppAggregatorsMapProto m : aggregatorsList) {
this.aggregatorsList.add(convertFromProtoFormat(m));
}
}
@Override
public List<AppAggregatorsMap> getAppAggregatorsList() {
if (this.aggregatorsList == null) {
initLocalAggregatorsList();
}
return this.aggregatorsList;
}
@Override
public void setAppAggregatorsList(List<AppAggregatorsMap> appAggregatorsList) {
maybeInitBuilder();
if (appAggregatorsList == null) {
builder.clearAppAggregators();
}
this.aggregatorsList = appAggregatorsList;
}
private AppAggregatorsMapPBImpl convertFromProtoFormat(
AppAggregatorsMapProto p) {
return new AppAggregatorsMapPBImpl(p);
}
private AppAggregatorsMapProto convertToProtoFormat(
AppAggregatorsMap m) {
return ((AppAggregatorsMapPBImpl) m).getProto();
}
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoResponseProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
import com.google.protobuf.TextFormat;
@Private
@Unstable
public class ReportNewAggregatorsInfoResponsePBImpl extends
ReportNewAggregatorsInfoResponse {
ReportNewAggregatorsInfoResponseProto proto =
ReportNewAggregatorsInfoResponseProto.getDefaultInstance();
ReportNewAggregatorsInfoResponseProto.Builder builder = null;
boolean viaProto = false;
public ReportNewAggregatorsInfoResponsePBImpl() {
builder = ReportNewAggregatorsInfoResponseProto.newBuilder();
}
public ReportNewAggregatorsInfoResponsePBImpl(ReportNewAggregatorsInfoResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public ReportNewAggregatorsInfoResponseProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
}

View File

@ -0,0 +1,33 @@
package org.apache.hadoop.yarn.server.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.Records;
@Private
public abstract class AppAggregatorsMap {
public static AppAggregatorsMap newInstance(
ApplicationId id, String aggregatorAddr) {
AppAggregatorsMap appAggregatorMap =
Records.newRecord(AppAggregatorsMap.class);
appAggregatorMap.setApplicationId(id);
appAggregatorMap.setAggregatorAddr(aggregatorAddr);
return appAggregatorMap;
}
public abstract ApplicationId getApplicationId();
public abstract void setApplicationId(
ApplicationId id);
public abstract String getAggregatorAddr();
public abstract void setAggregatorAddr(
String addr);
}

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProtoOrBuilder;
import com.google.protobuf.TextFormat;
@Private
@Unstable
public class AppAggregatorsMapPBImpl extends AppAggregatorsMap {
AppAggregatorsMapProto proto =
AppAggregatorsMapProto.getDefaultInstance();
AppAggregatorsMapProto.Builder builder = null;
boolean viaProto = false;
private ApplicationId appId = null;
private String aggregatorAddr = null;
public AppAggregatorsMapPBImpl() {
builder = AppAggregatorsMapProto.newBuilder();
}
public AppAggregatorsMapPBImpl(AppAggregatorsMapProto proto) {
this.proto = proto;
viaProto = true;
}
public AppAggregatorsMapProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
@Override
public ApplicationId getApplicationId() {
AppAggregatorsMapProtoOrBuilder p = viaProto ? proto : builder;
if (this.appId == null && p.hasAppId()) {
this.appId = convertFromProtoFormat(p.getAppId());
}
return this.appId;
}
@Override
public String getAggregatorAddr() {
AppAggregatorsMapProtoOrBuilder p = viaProto ? proto : builder;
if (this.aggregatorAddr == null
&& p.hasAppAggregatorAddr()) {
this.aggregatorAddr = p.getAppAggregatorAddr();
}
return this.aggregatorAddr;
}
@Override
public void setApplicationId(ApplicationId appId) {
maybeInitBuilder();
if (appId == null) {
builder.clearAppId();
}
this.appId = appId;
}
@Override
public void setAggregatorAddr(String aggregatorAddr) {
maybeInitBuilder();
if (aggregatorAddr == null) {
builder.clearAppAggregatorAddr();
}
this.aggregatorAddr = aggregatorAddr;
}
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl) t).getProto();
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = AppAggregatorsMapProto.newBuilder(proto);
}
viaProto = false;
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void mergeLocalToBuilder() {
if (this.appId != null) {
builder.setAppId(convertToProtoFormat(this.appId));
}
if (this.aggregatorAddr != null) {
builder.setAppAggregatorAddr(this.aggregatorAddr);
}
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.yarn.proto";
option java_outer_classname = "AggregatorNodemanagerProtocol";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "yarn_server_common_service_protos.proto";
service AggregatorNodemanagerProtocolService {
rpc reportNewAggregatorInfo (ReportNewAggregatorsInfoRequestProto) returns (ReportNewAggregatorsInfoResponseProto);
}

View File

@ -84,6 +84,7 @@ message NodeHeartbeatRequestProto {
optional MasterKeyProto last_known_nm_token_master_key = 3;
optional NodeLabelsProto nodeLabels = 4;
repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5;
repeated AppAggregatorsMapProto registered_aggregators = 6;
}
message LogAggregationReportProto {
@ -108,6 +109,7 @@ message NodeHeartbeatResponseProto {
repeated SignalContainerRequestProto containers_to_signal = 13;
optional ResourceProto resource = 14;
optional ContainerQueuingLimitProto container_queuing_limit = 15;
repeated AppAggregatorsMapProto app_aggregators_map = 16;
}
message ContainerQueuingLimitProto {
@ -120,6 +122,25 @@ message SystemCredentialsForAppsProto {
optional bytes credentialsForApp = 2;
}
////////////////////////////////////////////////////////////////////////
////// From aggregator_nodemanager_protocol ////////////////////////////
////////////////////////////////////////////////////////////////////////
message AppAggregatorsMapProto {
optional ApplicationIdProto appId = 1;
optional string appAggregatorAddr = 2;
}
//////////////////////////////////////////////////////
/////// aggregator_nodemanager_protocol //////////////
//////////////////////////////////////////////////////
message ReportNewAggregatorsInfoRequestProto {
repeated AppAggregatorsMapProto app_aggregators = 1;
}
message ReportNewAggregatorsInfoResponseProto {
}
message NMContainerStatusProto {
optional ContainerIdProto container_id = 1;
optional ContainerStateProto container_state = 2;

View File

@ -63,6 +63,10 @@ import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@ -73,6 +77,14 @@ public class TestRPC {
private static final String EXCEPTION_CAUSE = "exception cause";
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
public static final String ILLEGAL_NUMBER_MESSAGE =
"aggregators' number in ReportNewAggregatorsInfoRequest is not ONE.";
public static final String DEFAULT_AGGREGATOR_ADDR = "localhost:0";
public static final ApplicationId DEFAULT_APP_ID =
ApplicationId.newInstance(0, 0);
@Test
public void testUnknownCall() {
Configuration conf = new Configuration();
@ -100,8 +112,66 @@ public class TestRPC {
+ "\\$ApplicationClientProtocolService\\$BlockingInterface protocol."));
} catch (Exception e) {
e.printStackTrace();
} finally {
server.stop();
}
}
@Test
public void testRPCOnAggregatorNodeManagerProtocol() throws IOException {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
.getName());
YarnRPC rpc = YarnRPC.create(conf);
String bindAddr = "localhost:0";
InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
Server server = rpc.getServer(AggregatorNodemanagerProtocol.class,
new DummyNMAggregatorService(), addr, conf, null, 1);
server.start();
// Test unrelated protocol wouldn't get response
ApplicationClientProtocol unknownProxy = (ApplicationClientProtocol) rpc.getProxy(
ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), conf);
try {
unknownProxy.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class));
Assert.fail("Excepted RPC call to fail with unknown method.");
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().matches(
"Unknown method getNewApplication called on.*"
+ "org.apache.hadoop.yarn.proto.ApplicationClientProtocol"
+ "\\$ApplicationClientProtocolService\\$BlockingInterface protocol."));
} catch (Exception e) {
e.printStackTrace();
}
// Test AggregatorNodemanagerProtocol get proper response
AggregatorNodemanagerProtocol proxy = (AggregatorNodemanagerProtocol)rpc.getProxy(
AggregatorNodemanagerProtocol.class, NetUtils.getConnectAddress(server), conf);
// Verify request with DEFAULT_APP_ID and DEFAULT_AGGREGATOR_ADDR get
// normally response.
try {
ReportNewAggregatorsInfoRequest request =
ReportNewAggregatorsInfoRequest.newInstance(
DEFAULT_APP_ID, DEFAULT_AGGREGATOR_ADDR);
proxy.reportNewAggregatorInfo(request);
} catch (YarnException e) {
Assert.fail("RPC call failured is not expected here.");
}
// Verify empty request get YarnException back (by design in
// DummyNMAggregatorService)
try {
proxy.reportNewAggregatorInfo(Records
.newRecord(ReportNewAggregatorsInfoRequest.class));
Assert.fail("Excepted RPC call to fail with YarnException.");
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().contains(ILLEGAL_NUMBER_MESSAGE));
}
server.stop();
}
@Test
public void testHadoopProtoRPC() throws Exception {
@ -169,10 +239,10 @@ public class TestRPC {
System.out.println("Test Exception is " + e.getMessage());
} catch (Exception ex) {
ex.printStackTrace();
} finally {
server.stop();
}
Assert.assertTrue(exception);
server.stop();
Assert.assertNotNull(statuses.get(0));
Assert.assertEquals(ContainerState.RUNNING, statuses.get(0).getState());
}
@ -262,4 +332,32 @@ public class TestRPC {
.buildTokenService(addr).toString());
return containerToken;
}
// A dummy implementation for AggregatorNodemanagerProtocol for test purpose,
// it only can accept one appID, aggregatorAddr pair or throw exceptions
public class DummyNMAggregatorService
implements AggregatorNodemanagerProtocol {
@Override
public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
ReportNewAggregatorsInfoRequest request)
throws YarnException, IOException {
List<AppAggregatorsMap> appAggregators = request.getAppAggregatorsList();
if (appAggregators.size() == 1) {
// check default appID and aggregatorAddr
AppAggregatorsMap appAggregator = appAggregators.get(0);
Assert.assertEquals(appAggregator.getApplicationId(),
DEFAULT_APP_ID);
Assert.assertEquals(appAggregator.getAggregatorAddr(),
DEFAULT_AGGREGATOR_ADDR);
} else {
throw new YarnException(ILLEGAL_NUMBER_MESSAGE);
}
ReportNewAggregatorsInfoResponse response =
recordFactory.newRecordInstance(ReportNewAggregatorsInfoResponse.class);
return response;
}
}
}

View File

@ -25,7 +25,9 @@ import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -107,11 +109,14 @@ public class TestYarnServerApiClasses {
original.setLastKnownNMTokenMasterKey(getMasterKey());
original.setNodeStatus(getNodeStatus());
original.setNodeLabels(getValidNodeLabels());
Map<ApplicationId, String> aggregators = getAggregators();
original.setRegisteredAggregators(aggregators);
NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl(
original.getProto());
assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId());
assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId());
assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost());
assertEquals(aggregators, copy.getRegisteredAggregators());
// check labels are coming with valid values
Assert.assertTrue(original.getNodeLabels()
.containsAll(copy.getNodeLabels()));
@ -148,6 +153,8 @@ public class TestYarnServerApiClasses {
original.setNextHeartBeatInterval(1000);
original.setNodeAction(NodeAction.NORMAL);
original.setResponseId(100);
Map<ApplicationId, String> aggregators = getAggregators();
original.setAppAggregatorsMap(aggregators);
NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl(
original.getProto());
@ -157,6 +164,7 @@ public class TestYarnServerApiClasses {
assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
assertEquals(aggregators, copy.getAppAggregatorsMap());
assertEquals(false, copy.getAreNodeLabelsAcceptedByRM());
}
@ -336,6 +344,15 @@ public class TestYarnServerApiClasses {
return nodeLabels;
}
private Map<ApplicationId, String> getAggregators() {
ApplicationId appID = ApplicationId.newInstance(1L, 1);
String aggregatorAddr = "localhost:0";
Map<ApplicationId, String> aggregatorMap =
new HashMap<ApplicationId, String>();
aggregatorMap.put(appID, aggregatorAddr);
return aggregatorMap;
}
private ContainerStatus getContainerStatus(int applicationId,
int containerID, int appAttemptId) {
ContainerStatus status = recordFactory

View File

@ -70,6 +70,19 @@ public interface Context {
ConcurrentMap<ApplicationId, Application> getApplications();
Map<ApplicationId, Credentials> getSystemCredentialsForApps();
/**
* Get the registered aggregators that located on this NM.
* @return registered
*/
Map<ApplicationId, String> getRegisteredAggregators();
/**
* Return the known aggregators which get from RM for all active applications
* running on this NM.
* @return known aggregators.
*/
Map<ApplicationId, String> getKnownAggregators();
ConcurrentMap<ContainerId, Container> getContainers();

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.aggregatormanager.NMAggregatorService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -98,6 +99,7 @@ public class NodeManager extends CompositeService
private Context context;
private AsyncDispatcher dispatcher;
private ContainerManagerImpl containerManager;
private NMAggregatorService nmAggregatorService;
private NodeStatusUpdater nodeStatusUpdater;
private NodeResourceMonitor nodeResourceMonitor;
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
@ -182,6 +184,10 @@ public class NodeManager extends CompositeService
metrics, dirsHandler);
}
}
protected NMAggregatorService createNMAggregatorService(Context context) {
return new NMAggregatorService(context);
}
protected WebServer createWebServer(Context nmContext,
ResourceView resourceView, ApplicationACLsManager aclsManager,
@ -373,6 +379,9 @@ public class NodeManager extends CompositeService
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
DefaultMetricsSystem.initialize("NodeManager");
this.nmAggregatorService = createNMAggregatorService(context);
addService(nmAggregatorService);
// StatusUpdater should be added last so that it get started last
// so that we make sure everything is up before registering with RM.
@ -465,6 +474,12 @@ public class NodeManager extends CompositeService
protected final ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>();
protected Map<ApplicationId, String> registeredAggregators =
new ConcurrentHashMap<ApplicationId, String>();
protected Map<ApplicationId, String> knownAggregators =
new ConcurrentHashMap<ApplicationId, String>();
protected final ConcurrentMap<ContainerId,
org.apache.hadoop.yarn.api.records.Container> increasedContainers =
@ -645,6 +660,29 @@ public class NodeManager extends CompositeService
public OpportunisticContainerAllocator getContainerAllocator() {
return containerAllocator;
}
@Override
public Map<ApplicationId, String> getRegisteredAggregators() {
return this.registeredAggregators;
}
public void addRegisteredAggregators(
Map<ApplicationId, String> newRegisteredAggregators) {
this.registeredAggregators.putAll(newRegisteredAggregators);
// Update to knownAggregators as well so it can immediately be consumed by
// this NM's TimelineClient.
this.knownAggregators.putAll(newRegisteredAggregators);
}
@Override
public Map<ApplicationId, String> getKnownAggregators() {
return this.knownAggregators;
}
public void addKnownAggregators(
Map<ApplicationId, String> knownAggregators) {
this.knownAggregators.putAll(knownAggregators);
}
}
/**
@ -743,6 +781,11 @@ public class NodeManager extends CompositeService
public Context getNMContext() {
return this.context;
}
// For testing
NMAggregatorService getNMAggregatorService() {
return this.nmAggregatorService;
}
public static void main(String[] args) throws IOException {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());

View File

@ -813,7 +813,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
.getContainerTokenSecretManager().getCurrentKey(),
NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey(),
nodeLabelsForHeartbeat);
nodeLabelsForHeartbeat,
NodeStatusUpdaterImpl.this.context.getRegisteredAggregators());
if (logAggregationEnabled) {
// pull log aggregation status for application running in this NM
@ -905,6 +906,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
newResource.toString());
}
}
Map<ApplicationId, String> knownAggregators = response.getAppAggregatorsMap();
((NodeManager.NMContext)context).addKnownAggregators(knownAggregators);
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.aggregatormanager;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
public class NMAggregatorService extends CompositeService implements
AggregatorNodemanagerProtocol {
private static final Log LOG = LogFactory.getLog(NMAggregatorService.class);
final Context context;
private Server server;
public NMAggregatorService(Context context) {
super(NMAggregatorService.class.getName());
this.context = context;
}
@Override
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
InetSocketAddress aggregatorServerAddress = conf.getSocketAddr(
YarnConfiguration.NM_BIND_HOST,
YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT);
Configuration serverConf = new Configuration(conf);
// TODO Security settings.
YarnRPC rpc = YarnRPC.create(conf);
server =
rpc.getServer(AggregatorNodemanagerProtocol.class, this,
aggregatorServerAddress, serverConf,
this.context.getNMTokenSecretManager(),
conf.getInt(YarnConfiguration.NM_AGGREGATOR_SERVICE_THREAD_COUNT,
YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_THREAD_COUNT));
server.start();
// start remaining services
super.serviceStart();
LOG.info("NMAggregatorService started at " + aggregatorServerAddress);
}
@Override
public void serviceStop() throws Exception {
if (server != null) {
server.stop();
}
// TODO may cleanup app aggregators running on this NM in future.
super.serviceStop();
}
@Override
public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
ReportNewAggregatorsInfoRequest request) throws IOException {
List<AppAggregatorsMap> newAggregatorsList = request.getAppAggregatorsList();
if (newAggregatorsList != null && !newAggregatorsList.isEmpty()) {
Map<ApplicationId, String> newAggregatorsMap =
new HashMap<ApplicationId, String>();
for (AppAggregatorsMap aggregator : newAggregatorsList) {
newAggregatorsMap.put(aggregator.getApplicationId(), aggregator.getAggregatorAddr());
}
((NodeManager.NMContext)context).addRegisteredAggregators(newAggregatorsMap);
}
return ReportNewAggregatorsInfoResponse.newInstance();
}
}

View File

@ -496,6 +496,10 @@ public class ApplicationImpl implements Application {
new LogHandlerAppFinishedEvent(app.appId));
app.context.getNMTokenSecretManager().appFinished(app.getAppId());
// Remove aggregator info for finished apps.
// TODO check we remove related aggregators info in failure cases (YARN-3038)
app.context.getRegisteredAggregators().remove(app.getAppId());
app.context.getKnownAggregators().remove(app.getAppId());
}
}

View File

@ -617,6 +617,16 @@ public abstract class BaseAMRMProxyTest {
return null;
}
@Override
public Map<ApplicationId, String> getRegisteredAggregators() {
return null;
}
@Override
public Map<ApplicationId, String> getKnownAggregators() {
return null;
}
@Override
public ConcurrentMap<ContainerId, Container> getContainers() {
return null;

View File

@ -311,6 +311,8 @@ public class ApplicationMasterService extends AbstractService implements
RMApp rmApp =
rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
// Remove aggregator address when app get finished.
rmApp.removeAggregatorAddr();
// checking whether the app exits in RMStateStore at first not to throw
// ApplicationDoesNotExistInCacheException before and after
// RM work-preserving restart.
@ -571,6 +573,10 @@ public class ApplicationMasterService extends AbstractService implements
allocateResponse.setDecreasedContainers(allocation.getDecreasedContainers());
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
// add aggregator address for this application
allocateResponse.setAggregatorAddr(
this.rmContext.getRMApps().get(applicationId).getAggregatorAddr());
// add preemption to the allocateResponse message (if any)
allocateResponse

View File

@ -23,8 +23,10 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@ -68,6 +70,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppAggregatorUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -513,6 +516,11 @@ public class ResourceTrackerService extends AbstractService implements
return YarnServerBuilderUtils.newNodeHeartbeatResponse(NodeAction.RESYNC,
message);
}
// Check & update aggregators info from request.
// TODO make sure it won't have race condition issue for AM failed over case
// that the older registration could possible override the newer one.
updateAppAggregatorsMap(request);
// Heartbeat response
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
@ -530,6 +538,14 @@ public class ResourceTrackerService extends AbstractService implements
if (!systemCredentials.isEmpty()) {
nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
}
// Return aggregators' map that NM needs to know
// TODO we should optimize this to only include aggreator info that NM
// doesn't know yet.
List<ApplicationId> keepAliveApps = remoteNodeStatus.getKeepAliveApplications();
if (keepAliveApps != null) {
setAppAggregatorsMapToResponse(keepAliveApps, nodeHeartBeatResponse);
}
// 4. Send status to RMNode, saving the latest response.
RMNodeStatusEvent nodeStatusEvent =
@ -573,6 +589,55 @@ public class ResourceTrackerService extends AbstractService implements
}
return nodeHeartBeatResponse;
}
private void setAppAggregatorsMapToResponse(
List<ApplicationId> liveApps, NodeHeartbeatResponse response) {
Map<ApplicationId, String> liveAppAggregatorsMap = new
ConcurrentHashMap<ApplicationId, String>();
Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
for (ApplicationId appId : liveApps) {
String appAggregatorAddr = rmApps.get(appId).getAggregatorAddr();
if (appAggregatorAddr != null) {
liveAppAggregatorsMap.put(appId, appAggregatorAddr);
} else {
// Log a debug info if aggregator address is not found.
if (LOG.isDebugEnabled()) {
LOG.debug("Aggregator for applicaton: " + appId + " hasn't registered yet!");
}
}
}
response.setAppAggregatorsMap(liveAppAggregatorsMap);
}
private void updateAppAggregatorsMap(NodeHeartbeatRequest request) {
Map<ApplicationId, String> registeredAggregatorsMap =
request.getRegisteredAggregators();
if (registeredAggregatorsMap != null
&& !registeredAggregatorsMap.isEmpty()) {
Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
for (Map.Entry<ApplicationId, String> entry:
registeredAggregatorsMap.entrySet()) {
ApplicationId appId = entry.getKey();
String aggregatorAddr = entry.getValue();
if (aggregatorAddr != null && !aggregatorAddr.isEmpty()) {
RMApp rmApp = rmApps.get(appId);
if (rmApp == null) {
LOG.warn("Cannot update aggregator info because application ID: " +
appId + " is not found in RMContext!");
} else {
String previousAggregatorAddr = rmApp.getAggregatorAddr();
if (previousAggregatorAddr == null ||
previousAggregatorAddr != aggregatorAddr) {
// sending aggregator update event.
RMAppAggregatorUpdateEvent event =
new RMAppAggregatorUpdateEvent(appId, aggregatorAddr);
rmContext.getDispatcher().getEventHandler().handle(event);
}
}
}
}
}
}
/**
* Check if node in decommissioning state.

View File

@ -175,6 +175,23 @@ public interface RMApp extends EventHandler<RMAppEvent> {
* @return the tracking url for the application master.
*/
String getTrackingUrl();
/**
* The aggregator address for the application.
* @return the address for the application's aggregator.
*/
String getAggregatorAddr();
/**
* Set aggregator address for the application
* @param aggregatorAddr the address of aggregator
*/
void setAggregatorAddr(String aggregatorAddr);
/**
* Remove aggregator address when application is finished or killed.
*/
void removeAggregatorAddr();
/**
* The original tracking url for the application master.

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import org.apache.hadoop.yarn.api.records.ApplicationId;
public class RMAppAggregatorUpdateEvent extends RMAppEvent {
private final String appAggregatorAddr;
public RMAppAggregatorUpdateEvent(ApplicationId appId, String appAggregatorAddr) {
super(appId, RMAppEventType.AGGREGATOR_UPDATE);
this.appAggregatorAddr = appAggregatorAddr;
}
public String getAppAggregatorAddr(){
return this.appAggregatorAddr;
}
}

View File

@ -30,6 +30,9 @@ public enum RMAppEventType {
// Source: Scheduler
APP_ACCEPTED,
// TODO add source later
AGGREGATOR_UPDATE,
// Source: RMAppAttempt
ATTEMPT_REGISTERED,

View File

@ -152,6 +152,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private long storedFinishTime = 0;
private int firstAttemptIdInStateStore = 1;
private int nextAttemptId = 1;
private String aggregatorAddr;
// This field isn't protected by readlock now.
private volatile RMAppAttempt currentAttempt;
private String queue;
@ -199,6 +200,8 @@ public class RMAppImpl implements RMApp, Recoverable {
// Transitions from NEW state
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
@ -215,6 +218,8 @@ public class RMAppImpl implements RMApp, Recoverable {
// Transitions from NEW_SAVING state
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
@ -233,6 +238,8 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(
@ -249,6 +256,8 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition(
YarnApplicationState.RUNNING))
@ -276,6 +285,8 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
.addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_UNREGISTERED,
new FinalSavingTransition(
@ -305,6 +316,8 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
// ignorable transitions
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
@ -316,6 +329,8 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
// ignorable transitions
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
EnumSet.of(RMAppEventType.NODE_UPDATE,
@ -327,6 +342,8 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_KILLED,
new FinalSavingTransition(
@ -560,6 +577,21 @@ public class RMAppImpl implements RMApp, Recoverable {
public void setQueue(String queue) {
this.queue = queue;
}
@Override
public String getAggregatorAddr() {
return this.aggregatorAddr;
}
@Override
public void setAggregatorAddr(String aggregatorAddr) {
this.aggregatorAddr = aggregatorAddr;
}
@Override
public void removeAggregatorAddr() {
this.aggregatorAddr = null;
}
@Override
public String getName() {
@ -832,6 +864,8 @@ public class RMAppImpl implements RMApp, Recoverable {
// send the ATS create Event
sendATSCreateEvent(this, this.startTime);
//TODO recover aggregator address.
//this.aggregatorAddr = appState.getAggregatorAddr();
RMAppAttemptImpl preAttempt = null;
for (ApplicationAttemptId attemptId :
@ -902,9 +936,24 @@ public class RMAppImpl implements RMApp, Recoverable {
SingleArcTransition<RMAppImpl, RMAppEvent> {
public void transition(RMAppImpl app, RMAppEvent event) {
};
}
private static final class RMAppAggregatorUpdateTransition
extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
LOG.info("Updating aggregator info for app: " + app.getApplicationId());
RMAppAggregatorUpdateEvent appAggregatorUpdateEvent =
(RMAppAggregatorUpdateEvent) event;
// Update aggregator address
app.setAggregatorAddr(appAggregatorUpdateEvent.getAppAggregatorAddr());
// TODO persistent to RMStateStore for recover
// Save to RMStateStore
};
}
private static final class RMAppNodeUpdateTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event;

View File

@ -95,6 +95,18 @@ public abstract class MockAsm extends MockApps {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public String getAggregatorAddr() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void setAggregatorAddr(String aggregatorAddr) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void removeAggregatorAddr() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public ApplicationId getApplicationId() {
throw new UnsupportedOperationException("Not supported yet.");
}

View File

@ -301,4 +301,18 @@ public class MockRMApp implements RMApp {
public CallerContext getCallerContext() {
throw new UnsupportedOperationException("Not supported yet.");
}
public String getAggregatorAddr() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void removeAggregatorAddr() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void setAggregatorAddr(String aggregatorAddr) {
throw new UnsupportedOperationException("Not supported yet.");
}
}

View File

@ -22,12 +22,12 @@
<parent>
<artifactId>hadoop-yarn-server</artifactId>
<groupId>org.apache.hadoop</groupId>
<version>3.0.0-SNAPSHOT</version>
<version>3.0.0-alpha1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>3.0.0-alpha1-SNAPSHOT</version>
<name>Apache Hadoop YARN Timeline Service</name>
<properties>

View File

@ -94,10 +94,9 @@ public class PerNodeTimelineAggregatorsAuxService extends AuxiliaryService {
* @return whether it was added successfully
*/
public boolean addApplication(ApplicationId appId) {
String appIdString = appId.toString();
AppLevelTimelineAggregator aggregator =
new AppLevelTimelineAggregator(appIdString);
return (aggregatorCollection.putIfAbsent(appIdString, aggregator)
new AppLevelTimelineAggregator(appId.toString());
return (aggregatorCollection.putIfAbsent(appId, aggregator)
== aggregator);
}

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.server.timelineservice.aggregator;
import java.io.IOException;
import java.net.URI;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -30,9 +32,15 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@ -62,6 +70,12 @@ public class TimelineAggregatorsCollection extends CompositeService {
// REST server for this aggregator collection
private HttpServer2 timelineRestServer;
private String timelineRestServerBindAddress;
private AggregatorNodemanagerProtocol nmAggregatorService;
private InetSocketAddress nmAggregatorServiceAddress;
static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection";
@ -73,6 +87,16 @@ public class TimelineAggregatorsCollection extends CompositeService {
super(TimelineAggregatorsCollection.class.getName());
}
@Override
public void serviceInit(Configuration conf) throws Exception {
this.nmAggregatorServiceAddress = conf.getSocketAddr(
YarnConfiguration.NM_BIND_HOST,
YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT);
}
@Override
protected void serviceStart() throws Exception {
startWebApp();
@ -95,9 +119,13 @@ public class TimelineAggregatorsCollection extends CompositeService {
* starting the app level service
* @return the aggregator associated with id after the potential put.
*/
public TimelineAggregator putIfAbsent(String id, TimelineAggregator aggregator) {
public TimelineAggregator putIfAbsent(ApplicationId appId,
TimelineAggregator aggregator) {
String id = appId.toString();
TimelineAggregator aggregatorInTable;
boolean aggregatorIsNew = false;
synchronized (aggregators) {
TimelineAggregator aggregatorInTable = aggregators.get(id);
aggregatorInTable = aggregators.get(id);
if (aggregatorInTable == null) {
try {
// initialize, start, and add it to the collection so it can be
@ -106,16 +134,30 @@ public class TimelineAggregatorsCollection extends CompositeService {
aggregator.start();
aggregators.put(id, aggregator);
LOG.info("the aggregator for " + id + " was added");
return aggregator;
aggregatorInTable = aggregator;
aggregatorIsNew = true;
} catch (Exception e) {
throw new YarnRuntimeException(e);
}
} else {
String msg = "the aggregator for " + id + " already exists!";
LOG.error(msg);
return aggregatorInTable;
}
}
// Report to NM if a new aggregator is added.
if (aggregatorIsNew) {
try {
reportNewAggregatorToNM(appId);
} catch (Exception e) {
// throw exception here as it cannot be used if failed report to NM
LOG.error("Failed to report a new aggregator for application: " + appId +
" to NM Aggregator Services.");
throw new YarnRuntimeException(e);
}
}
return aggregatorInTable;
}
/**
@ -167,7 +209,10 @@ public class TimelineAggregatorsCollection extends CompositeService {
String bindAddress = WebAppUtils.getWebAppBindURL(conf,
YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress);
this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress(
NetUtils.createSocketAddr(bindAddress));
LOG.info("Instantiating the per-node aggregator webapp at " +
timelineRestServerBindAddress);
try {
Configuration confForInfoServer = new Configuration(conf);
confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
@ -200,4 +245,27 @@ public class TimelineAggregatorsCollection extends CompositeService {
throw new YarnRuntimeException(msg, e);
}
}
private void reportNewAggregatorToNM(ApplicationId appId)
throws YarnException, IOException {
this.nmAggregatorService = getNMAggregatorService();
ReportNewAggregatorsInfoRequest request =
ReportNewAggregatorsInfoRequest.newInstance(appId,
this.timelineRestServerBindAddress);
LOG.info("Report a new aggregator for application: " + appId +
" to NM Aggregator Services.");
nmAggregatorService.reportNewAggregatorInfo(request);
}
// protected for test
protected AggregatorNodemanagerProtocol getNMAggregatorService(){
Configuration conf = getConfig();
final YarnRPC rpc = YarnRPC.create(conf);
// TODO Security settings.
return (AggregatorNodemanagerProtocol) rpc.getProxy(
AggregatorNodemanagerProtocol.class,
nmAggregatorServiceAddress, conf);
}
}

View File

@ -32,6 +32,7 @@ import java.util.concurrent.Future;
import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.junit.Test;
public class TestTimelineAggregatorsCollection {
@ -45,11 +46,11 @@ public class TestTimelineAggregatorsCollection {
final int NUM_APPS = 5;
List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
for (int i = 0; i < NUM_APPS; i++) {
final String appId = String.valueOf(i);
final ApplicationId appId = ApplicationId.newInstance(0L, i);
Callable<Boolean> task = new Callable<Boolean>() {
public Boolean call() {
AppLevelTimelineAggregator aggregator =
new AppLevelTimelineAggregator(appId);
new AppLevelTimelineAggregator(appId.toString());
return (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator);
}
};
@ -79,14 +80,14 @@ public class TestTimelineAggregatorsCollection {
final int NUM_APPS = 5;
List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
for (int i = 0; i < NUM_APPS; i++) {
final String appId = String.valueOf(i);
final ApplicationId appId = ApplicationId.newInstance(0L, i);
Callable<Boolean> task = new Callable<Boolean>() {
public Boolean call() {
AppLevelTimelineAggregator aggregator =
new AppLevelTimelineAggregator(appId);
new AppLevelTimelineAggregator(appId.toString());
boolean successPut =
(aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator);
return successPut && aggregatorCollection.remove(appId);
return successPut && aggregatorCollection.remove(appId.toString());
}
};
tasks.add(task);