YARN-987. Added ApplicationHistoryManager responsible for exposing reports to all clients. Contributed by Mayank Bansal.

svn merge --ignore-ancestry -c 1556736 ../YARN-321


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1562189 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-01-28 19:53:54 +00:00
parent f8cd06194d
commit 0d2e62ae04
6 changed files with 420 additions and 0 deletions

View File

@ -490,6 +490,9 @@ Branch YARN-321: Generic ApplicationHistoryService
YARN-974. Added more information to RMContainer to be collected and recorded in
Application-History. (Zhijie Shen via vinodkv)
YARN-987. Added ApplicationHistoryManager responsible for exposing reports to
all clients. (Mayank Bansal via vinodkv)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -944,6 +944,9 @@ public class YarnConfiguration extends Configuration {
public static final String FS_HISTORY_STORE_COMPRESSION_TYPE = AHS_PREFIX + "fs-history-store.compression-type";
public static final String DEFAULT_FS_HISTORY_STORE_COMPRESSION_TYPE = "none";
/** AHS store class */
public static final String AHS_STORE = AHS_PREFIX + "store.class";
////////////////////////////////
// Other Configs
////////////////////////////////

View File

@ -1057,6 +1057,13 @@
<name>yarn.ahs.fs-history-store.compression-type</name>
<value>none</value>
</property>
<property>
<description> Store class name for history store, defaulting to file
system store </description>
<name>yarn.ahs.store.class</name>
<value>org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore</value>
</property>
<!-- Other configuration -->
<property>

View File

@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface ApplicationHistoryManager {
/**
* This method returns Application {@link ApplicationReport} for the specified
* {@link ApplicationId}.
*
* @return {@link ApplicationReport} for the ApplicationId.
* @throws {@link IOException}
*/
@Public
@Unstable
ApplicationReport getApplication(ApplicationId appId) throws IOException;
/**
* This method returns all Application {@link ApplicationReport}s
*
* @return map {@link ApplicationId, @link ApplicationReport}s.
* @throws {@link IOException}
*/
@Public
@Unstable
Map<ApplicationId, ApplicationReport> getAllApplications() throws IOException;
/**
* Application can have multiple application attempts
* {@link ApplicationAttemptReport}. This method returns the all
* {@link ApplicationAttemptReport}s for the Application.
*
* @return all {@link ApplicationAttemptReport}s for the Application.
* @throws {@link IOException}
*/
@Public
@Unstable
Map<ApplicationAttemptId, ApplicationAttemptReport> getApplicationAttempts(
ApplicationId appId) throws IOException;
/**
* This method returns {@link ApplicationAttemptReport} for specified
* {@link ApplicationId}.
*
* @param {@link ApplicationAttemptId}
* @return {@link ApplicationAttemptReport} for ApplicationAttemptId
* @throws {@link IOException}
*/
@Public
@Unstable
ApplicationAttemptReport getApplicationAttempt(
ApplicationAttemptId appAttemptId) throws IOException;
/**
* This method returns {@link ContainerReport} for specified
* {@link ContainerId}.
*
* @param {@link ContainerId}
* @return {@link Container} for ContainerId
* @throws {@link IOException}
*/
@Public
@Unstable
ContainerReport getContainer(ContainerId containerId) throws IOException;
/**
* This method returns {@link ContainerReport} for specified
* {@link ApplicationAttemptId}.
*
* @param {@link ApplicationAttemptId}
* @return {@link Container} for ApplicationAttemptId
* @throws {@link IOException}
*/
@Public
@Unstable
ContainerReport getAMContainer(ApplicationAttemptId appAttemptId)
throws IOException;
/**
* This method returns Map{@link ContainerId,@link ContainerReport} for
* specified {@link ApplicationAttemptId}.
*
* @param {@link ApplicationAttemptId}
* @return Map{@link ContainerId, @link ContainerReport} for
* ApplicationAttemptId
* @throws {@link IOException}
*/
@Public
@Unstable
Map<ContainerId, ContainerReport> getContainers(
ApplicationAttemptId appAttemptId) throws IOException;
}

View File

@ -0,0 +1,210 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
import com.google.common.annotations.VisibleForTesting;
public class ApplicationHistoryManagerImpl extends AbstractService implements
ApplicationHistoryManager {
private static final Log LOG = LogFactory
.getLog(ApplicationHistoryManagerImpl.class);
private static final String UNAVAILABLE = "N/A";
private ApplicationHistoryStore historyStore;
public ApplicationHistoryManagerImpl() {
super(ApplicationHistoryManagerImpl.class.getName());
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
LOG.info("ApplicationHistory Init");
historyStore = ReflectionUtils.newInstance(conf.getClass(
YarnConfiguration.AHS_STORE, FileSystemApplicationHistoryStore.class,
ApplicationHistoryStore.class), conf);
historyStore.init(conf);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
LOG.info("Starting ApplicationHistory");
historyStore.start();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
LOG.info("Stopping ApplicationHistory");
historyStore.stop();
super.serviceStop();
}
@Override
public ContainerReport getAMContainer(ApplicationAttemptId appAttemptId)
throws IOException {
return convertToContainerReport(historyStore.getAMContainer(appAttemptId));
}
@Override
public Map<ApplicationId, ApplicationReport> getAllApplications()
throws IOException {
Map<ApplicationId, ApplicationHistoryData> histData = historyStore
.getAllApplications();
HashMap<ApplicationId, ApplicationReport> applicationsReport = new HashMap<ApplicationId, ApplicationReport>();
for (ApplicationId appId : histData.keySet()) {
applicationsReport.put(appId, convertToApplicationReport(histData
.get(appId)));
}
return applicationsReport;
}
@Override
public ApplicationReport getApplication(ApplicationId appId)
throws IOException {
return convertToApplicationReport(historyStore.getApplication(appId));
}
private ApplicationReport convertToApplicationReport(
ApplicationHistoryData appHistory) throws IOException {
ApplicationAttemptId currentApplicationAttemptId = null;
String trackingUrl = UNAVAILABLE;
String host = UNAVAILABLE;
int rpcPort = -1;
ApplicationAttemptHistoryData lastAttempt = getLastAttempt(appHistory
.getApplicationId());
if (lastAttempt != null) {
currentApplicationAttemptId = lastAttempt.getApplicationAttemptId();
trackingUrl = lastAttempt.getTrackingURL();
host = lastAttempt.getHost();
rpcPort = lastAttempt.getRPCPort();
}
return ApplicationReport.newInstance(appHistory.getApplicationId(),
currentApplicationAttemptId, appHistory.getUser(), appHistory
.getQueue(), appHistory.getApplicationName(), host, rpcPort, null,
appHistory.getYarnApplicationState(), appHistory.getDiagnosticsInfo(),
trackingUrl, appHistory.getStartTime(), appHistory.getFinishTime(),
appHistory.getFinalApplicationStatus(), null, "", 100, appHistory
.getApplicationType(), null);
}
private ApplicationAttemptHistoryData getLastAttempt(ApplicationId appId)
throws IOException {
Map<ApplicationAttemptId, ApplicationAttemptHistoryData> attempts = historyStore
.getApplicationAttempts(appId);
ApplicationAttemptId prevMaxAttemptId = null;
for (ApplicationAttemptId attemptId : attempts.keySet()) {
if (prevMaxAttemptId == null) {
prevMaxAttemptId = attemptId;
} else {
if (prevMaxAttemptId.getAttemptId() < attemptId.getAttemptId()) {
prevMaxAttemptId = attemptId;
}
}
}
return attempts.get(prevMaxAttemptId);
}
private ApplicationAttemptReport convertToApplicationAttemptReport(
ApplicationAttemptHistoryData appAttemptHistory) {
return ApplicationAttemptReport.newInstance(appAttemptHistory
.getApplicationAttemptId(), appAttemptHistory.getHost(),
appAttemptHistory.getRPCPort(), appAttemptHistory.getTrackingURL(),
appAttemptHistory.getDiagnosticsInfo(), null, appAttemptHistory
.getMasterContainerId());
}
@Override
public ApplicationAttemptReport getApplicationAttempt(
ApplicationAttemptId appAttemptId) throws IOException {
return convertToApplicationAttemptReport(historyStore
.getApplicationAttempt(appAttemptId));
}
@Override
public Map<ApplicationAttemptId, ApplicationAttemptReport> getApplicationAttempts(
ApplicationId appId) throws IOException {
Map<ApplicationAttemptId, ApplicationAttemptHistoryData> histData = historyStore
.getApplicationAttempts(appId);
HashMap<ApplicationAttemptId, ApplicationAttemptReport> applicationAttemptsReport = new HashMap<ApplicationAttemptId, ApplicationAttemptReport>();
for (ApplicationAttemptId appAttemptId : histData.keySet()) {
applicationAttemptsReport.put(appAttemptId,
convertToApplicationAttemptReport(histData.get(appAttemptId)));
}
return applicationAttemptsReport;
}
@Override
public ContainerReport getContainer(ContainerId containerId)
throws IOException {
return convertToContainerReport(historyStore.getContainer(containerId));
}
private ContainerReport convertToContainerReport(
ContainerHistoryData containerHistory) {
return ContainerReport.newInstance(containerHistory.getContainerId(),
containerHistory.getAllocatedResource(), containerHistory
.getAssignedNode(), containerHistory.getPriority(),
containerHistory.getStartTime(), containerHistory.getFinishTime(),
containerHistory.getDiagnosticsInfo(), containerHistory.getLogURL(),
containerHistory.getContainerExitStatus(), containerHistory
.getContainerState());
}
@Override
public Map<ContainerId, ContainerReport> getContainers(
ApplicationAttemptId appAttemptId) throws IOException {
Map<ContainerId, ContainerHistoryData> histData = historyStore
.getContainers(appAttemptId);
HashMap<ContainerId, ContainerReport> containersReport = new HashMap<ContainerId, ContainerReport>();
for (ContainerId container : histData.keySet()) {
containersReport.put(container, convertToContainerReport(histData
.get(container)));
}
return containersReport;
}
@Private
@VisibleForTesting
public ApplicationHistoryStore getHistoryStore() {
return this.historyStore;
}
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestApplicationHistoryManagerImpl extends
ApplicationHistoryStoreTestUtils {
ApplicationHistoryManagerImpl applicationHistoryManagerImpl = null;
@Before
public void setup() throws Exception {
Configuration config = new Configuration();
config.setClass(YarnConfiguration.AHS_STORE,
MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class);
applicationHistoryManagerImpl = new ApplicationHistoryManagerImpl();
applicationHistoryManagerImpl.init(config);
applicationHistoryManagerImpl.start();
store = applicationHistoryManagerImpl.getHistoryStore();
}
@After
public void tearDown() throws Exception {
applicationHistoryManagerImpl.stop();
}
@Test
public void testApplicationReport() throws IOException, YarnException {
ApplicationId appId = null;
appId = ApplicationId.newInstance(0, 1);
writeApplicationStartData(appId);
writeApplicationFinishData(appId);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
1);
writeApplicationAttemptStartData(appAttemptId);
writeApplicationAttemptFinishData(appAttemptId);
ApplicationReport appReport = applicationHistoryManagerImpl
.getApplication(appId);
Assert.assertNotNull(appReport);
Assert.assertEquals(appId, appReport.getApplicationId());
Assert.assertEquals(appAttemptId, appReport
.getCurrentApplicationAttemptId());
Assert.assertEquals(appAttemptId.toString(), appReport.getHost());
Assert.assertEquals("test type", appReport.getApplicationType().toString());
Assert.assertEquals("test queue", appReport.getQueue().toString());
}
}