YARN-6446. Revisit ATSv2 integration to ensure all required information is published. Contributed by Rohith Sharma K S

This commit is contained in:
Jian He 2017-05-04 09:15:01 -07:00
parent 08cdf231a0
commit a8a273b064
6 changed files with 45 additions and 6 deletions

View File

@ -28,6 +28,7 @@ import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.launch.ContainerLauncher;
import org.apache.slider.server.appmaster.state.RoleInstance;
import org.apache.slider.server.appmaster.state.StateAccessForProviders;
import org.apache.slider.server.appmaster.timelineservice.ServiceTimelinePublisher;
import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
import java.io.IOException;
@ -57,4 +58,11 @@ public interface ProviderService extends Service {
*/
boolean processContainerStatus(ContainerId containerId,
ContainerStatus status);
/**
* Set service publisher.
* @param serviceTimelinePublisher service publisher.
*/
void setServiceTimelinePublisher(
ServiceTimelinePublisher serviceTimelinePublisher);
}

View File

@ -38,6 +38,7 @@ import org.apache.slider.providers.ProviderService;
import org.apache.slider.providers.ProviderUtils;
import org.apache.slider.server.appmaster.state.RoleInstance;
import org.apache.slider.server.appmaster.state.StateAccessForProviders;
import org.apache.slider.server.appmaster.timelineservice.ServiceTimelinePublisher;
import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -57,6 +58,7 @@ public class DockerProviderService extends AbstractService
private static final String QUICK_LINKS = "quicklinks";
protected StateAccessForProviders amState;
protected YarnRegistryViewForProviders yarnRegistry;
private ServiceTimelinePublisher serviceTimelinePublisher;
protected DockerProviderService() {
super("DockerProviderService");
@ -126,6 +128,9 @@ public class DockerProviderService extends AbstractService
PublishedConfiguration pubconf = new PublishedConfiguration(QUICK_LINKS,
application.getQuicklinks().entrySet());
amState.getPublishedSliderConfigurations().put(QUICK_LINKS, pubconf);
if (serviceTimelinePublisher != null) {
serviceTimelinePublisher.serviceAttemptUpdated(application);
}
}
public boolean processContainerStatus(ContainerId containerId,
@ -155,4 +160,9 @@ public class DockerProviderService extends AbstractService
}
return false;
}
@Override
public void setServiceTimelinePublisher(ServiceTimelinePublisher publisher) {
this.serviceTimelinePublisher = publisher;
}
}

View File

@ -661,11 +661,13 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
asyncRMClient.registerTimelineV2Client(timelineClient);
timelineClient.init(getConfig());
timelineClient.start();
log.info("Timeline client started.");
log.info("Timeline v2 client started.");
serviceTimelinePublisher = new ServiceTimelinePublisher(timelineClient);
serviceTimelinePublisher.init(getConfig());
serviceTimelinePublisher.start();
providerService.setServiceTimelinePublisher(serviceTimelinePublisher);
appState.setServiceTimelinePublisher(serviceTimelinePublisher);
log.info("ServiceTimelinePublisher started.");
}

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.slider.api.resource.Application;
@ -109,6 +108,13 @@ public class ServiceTimelinePublisher extends CompositeService {
publishComponents(application.getComponents());
}
public void serviceAttemptUpdated(Application application) {
TimelineEntity entity = createServiceAttemptEntity(application.getId());
entity.addInfo(SliderTimelineMetricsConstants.QUICK_LINKS,
application.getQuicklinks());
putEntity(entity);
}
public void serviceAttemptUnregistered(AppState appState,
ActionStopSlider stopAction) {
long currentTimeMillis = System.currentTimeMillis();

View File

@ -37,6 +37,8 @@ public final class SliderTimelineMetricsConstants {
public static final String LAUNCH_TIME = "LAUNCH_TIME";
public static final String QUICK_LINKS = "QUICK_LINKS";
public static final String LAUNCH_COMMAND = "LAUNCH_COMMAND";
public static final String TOTAL_CONTAINERS = "NUMBER_OF_CONTAINERS";

View File

@ -23,10 +23,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.slider.api.resource.Application;
import org.apache.slider.api.resource.ApplicationState;
@ -77,6 +76,8 @@ public class TestServiceTimelinePublisher {
@Before
public void setUp() throws Exception {
config = new Configuration();
config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
timelineClient =
new DummyTimelineClient(ApplicationId.fromString(SERVICEID));
serviceTimelinePublisher = new ServiceTimelinePublisher(timelineClient);
@ -88,8 +89,12 @@ public class TestServiceTimelinePublisher {
@After
public void tearDown() throws Exception {
serviceTimelinePublisher.stop();
timelineClient.stop();
if (serviceTimelinePublisher != null) {
serviceTimelinePublisher.stop();
}
if (timelineClient != null) {
timelineClient.stop();
}
}
@Test
@ -264,6 +269,12 @@ public class TestServiceTimelinePublisher {
@Override
public void putEntitiesAsync(TimelineEntity... entities)
throws IOException, YarnException {
putEntities(entities);
}
@Override
public void putEntities(TimelineEntity... entities)
throws IOException, YarnException {
for (TimelineEntity timelineEntity : entities) {
TimelineEntity entity =
lastPublishedEntities.get(timelineEntity.getIdentifier());