YARN-3967. Fetch the application report from the AHS if the RM does not

know about it. Contributed by Mit Desai

(cherry picked from commit fbd6063269)
(cherry picked from commit 98ed4bca3b)
This commit is contained in:
Xuan 2015-07-24 10:15:54 -07:00
parent 76339bfb9f
commit c65b6b760f
3 changed files with 187 additions and 12 deletions

View File

@ -11,6 +11,9 @@ Release 2.7.2 - UNRELEASED
YARN-3170. YARN architecture document needs updating. (Brahma Reddy Battula
via ozawa)
YARN-3967. Fetch the application report from the AHS if the RM does not know about it.
(Mit Desai via xgong)
OPTIMIZATIONS
BUG FIXES

View File

@ -24,11 +24,15 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.AHSProxy;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -41,38 +45,73 @@ public class AppReportFetcher {
private static final Log LOG = LogFactory.getLog(AppReportFetcher.class);
private final Configuration conf;
private final ApplicationClientProtocol applicationsManager;
private final ApplicationHistoryProtocol historyManager;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private boolean isAHSEnabled;
/**
* Create a new Connection to the RM to fetch Application reports.
* Create a new Connection to the RM/Application History Server
* to fetch Application reports.
* @param conf the conf to use to know where the RM is.
*/
public AppReportFetcher(Configuration conf) {
if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
isAHSEnabled = true;
}
this.conf = conf;
try {
applicationsManager = ClientRMProxy.createRMProxy(conf,
ApplicationClientProtocol.class);
if (isAHSEnabled) {
historyManager = getAHSProxy(conf);
} else {
this.historyManager = null;
}
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}
/**
* Just call directly into the applicationsManager given instead of creating
* a remote connection to it. This is mostly for when the Proxy is running
* as part of the RM already.
* Create a direct connection to RM instead of a remote connection when
* the proxy is running as part of the RM. Also create a remote connection to
* Application History Server if it is enabled.
* @param conf the configuration to use
* @param applicationsManager what to use to get the RM reports.
*/
public AppReportFetcher(Configuration conf, ApplicationClientProtocol applicationsManager) {
if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
isAHSEnabled = true;
}
this.conf = conf;
this.applicationsManager = applicationsManager;
if (isAHSEnabled) {
try {
historyManager = getAHSProxy(conf);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
} else {
this.historyManager = null;
}
}
protected ApplicationHistoryProtocol getAHSProxy(Configuration configuration)
throws IOException {
return AHSProxy.createAHSProxy(configuration,
ApplicationHistoryProtocol.class,
configuration.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT));
}
/**
* Get a report for the specified app.
* @param appId the id of the application to get.
* @return the ApplicationReport for that app.
* Get an application report for the specified application id from the RM and
* fall back to the Application History Server if not found in RM.
* @param appId id of the application to get.
* @return the ApplicationReport for the appId.
* @throws YarnException on any error.
* @throws IOException
*/
@ -81,9 +120,22 @@ public class AppReportFetcher {
GetApplicationReportRequest request = recordFactory
.newRecordInstance(GetApplicationReportRequest.class);
request.setApplicationId(appId);
GetApplicationReportResponse response = applicationsManager
.getApplicationReport(request);
GetApplicationReportResponse response;
try {
response = applicationsManager.getApplicationReport(request);
} catch (YarnException e) {
if (!isAHSEnabled) {
// Just throw it as usual if historyService is not enabled.
throw e;
}
// Even if history-service is enabled, treat all exceptions still the same
// except the following
if (!(e.getClass() == ApplicationNotFoundException.class)) {
throw e;
}
response = historyManager.getApplicationReport(request);
}
return response.getApplicationReport();
}
@ -91,5 +143,8 @@ public class AppReportFetcher {
if (this.applicationsManager != null) {
RPC.stopProxy(this.applicationsManager);
}
if (this.historyManager != null) {
RPC.stopProxy(this.historyManager);
}
}
}

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.webproxy;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
public class TestAppReportFetcher {
static ApplicationHistoryProtocol historyManager;
static Configuration conf = new Configuration();
private static ApplicationClientProtocol appManager;
private static AppReportFetcher fetcher;
private final String appNotFoundExceptionMsg = "APP NOT FOUND";
@After
public void cleanUp() {
historyManager = null;
appManager = null;
fetcher = null;
}
public void testHelper(boolean isAHSEnabled)
throws YarnException, IOException {
conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
isAHSEnabled);
appManager = Mockito.mock(ApplicationClientProtocol.class);
Mockito.when(appManager
.getApplicationReport(Mockito.any(GetApplicationReportRequest.class)))
.thenThrow(new ApplicationNotFoundException(appNotFoundExceptionMsg));
fetcher = new AppReportFetcherForTest(conf, appManager);
ApplicationId appId = ApplicationId.newInstance(0,0);
fetcher.getApplicationReport(appId);
}
@Test
public void testFetchReportAHSEnabled() throws YarnException, IOException {
testHelper(true);
Mockito.verify(historyManager, Mockito.times(1))
.getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
Mockito.verify(appManager, Mockito.times(1))
.getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
}
@Test
public void testFetchReportAHSDisabled() throws YarnException, IOException {
try {
testHelper(false);
} catch (ApplicationNotFoundException e) {
Assert.assertTrue(e.getMessage() == appNotFoundExceptionMsg);
/* RM will not know of the app and Application History Service is disabled
* So we will not try to get the report from AHS and RM will throw
* ApplicationNotFoundException
*/
}
Mockito.verify(appManager, Mockito.times(1))
.getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
if (historyManager != null) {
Assert.fail("HistoryManager should be null as AHS is disabled");
}
}
static class AppReportFetcherForTest extends AppReportFetcher {
public AppReportFetcherForTest(Configuration conf,
ApplicationClientProtocol acp) {
super(conf, acp);
}
@Override
protected ApplicationHistoryProtocol getAHSProxy(Configuration conf)
throws IOException
{
GetApplicationReportResponse resp = Mockito.
mock(GetApplicationReportResponse.class);
historyManager = Mockito.mock(ApplicationHistoryProtocol.class);
try {
Mockito.when(historyManager.getApplicationReport(Mockito
.any(GetApplicationReportRequest.class))).thenReturn(resp);
} catch (YarnException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return historyManager;
}
}
}