YARN-1595. Made enabling history service configurable and fixed test failures on branch YARN-321. Contributed by Vinod Kumar Vavilapalli.

svn merge --ignore-ancestry -c 1559001 ../YARN-321


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1562218 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-01-28 20:18:42 +00:00
parent b786be0307
commit 4a3b6b7367
8 changed files with 92 additions and 45 deletions

View File

@ -84,6 +84,9 @@ Branch YARN-321: Generic ApplicationHistoryService
YARN-1597. Fixed Findbugs warnings on branch YARN-321. (Vinod Kumar Vavilapalli YARN-1597. Fixed Findbugs warnings on branch YARN-321. (Vinod Kumar Vavilapalli
via zjshen) via zjshen)
YARN-1595. Made enabling history service configurable and fixed test failures on
branch YARN-321. (Vinod Kumar Vavilapalli via zjshen)
Release 2.4.0 - UNRELEASED Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -263,11 +263,6 @@ public class YarnConfiguration extends Configuration {
RM_PREFIX + "nodemanagers.heartbeat-interval-ms"; RM_PREFIX + "nodemanagers.heartbeat-interval-ms";
public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS = 1000; public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS = 1000;
/** The setting that controls whether RM writes history data. */
public static final String RM_HISTORY_WRITER_ENABLED = RM_PREFIX
+ "history-writer.enabled";
public static final boolean DEFAULT_RM_HISTORY_WRITER_ENABLED = false;
/** Number of worker threads that write the history data. */ /** Number of worker threads that write the history data. */
public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE = public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE =
RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size"; RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size";
@ -953,6 +948,11 @@ public class YarnConfiguration extends Configuration {
public static final String AHS_PREFIX = YARN_PREFIX + "ahs."; public static final String AHS_PREFIX = YARN_PREFIX + "ahs.";
/** The setting that controls whether history-service is enabled or not.. */
public static final String YARN_HISTORY_SERVICE_ENABLED = AHS_PREFIX
+ ".enabled";
public static final boolean DEFAULT_YARN_HISTORY_SERVICE_ENABLED = false;
/** URI for FileSystemApplicationHistoryStore */ /** URI for FileSystemApplicationHistoryStore */
public static final String FS_HISTORY_STORE_URI = AHS_PREFIX + "fs-history-store.uri"; public static final String FS_HISTORY_STORE_URI = AHS_PREFIX + "fs-history-store.uri";

View File

@ -87,6 +87,7 @@ public class YarnClientImpl extends YarnClient {
protected long submitPollIntervalMillis; protected long submitPollIntervalMillis;
private long asyncApiPollIntervalMillis; private long asyncApiPollIntervalMillis;
protected AHSClient historyClient; protected AHSClient historyClient;
private boolean historyServiceEnabled;
private static final String ROOT = "root"; private static final String ROOT = "root";
@ -107,8 +108,14 @@ public class YarnClientImpl extends YarnClient {
YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS, YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
} }
historyClient = AHSClientImpl.createAHSClient();
historyClient.init(getConfig()); if (conf.getBoolean(YarnConfiguration.YARN_HISTORY_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_YARN_HISTORY_SERVICE_ENABLED)) {
historyServiceEnabled = true;
historyClient = AHSClientImpl.createAHSClient();
historyClient.init(getConfig());
}
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -117,7 +124,9 @@ public class YarnClientImpl extends YarnClient {
try { try {
rmClient = ClientRMProxy.createRMProxy(getConfig(), rmClient = ClientRMProxy.createRMProxy(getConfig(),
ApplicationClientProtocol.class); ApplicationClientProtocol.class);
historyClient.start(); if (historyServiceEnabled) {
historyClient.start();
}
} catch (IOException e) { } catch (IOException e) {
throw new YarnRuntimeException(e); throw new YarnRuntimeException(e);
} }
@ -129,7 +138,9 @@ public class YarnClientImpl extends YarnClient {
if (this.rmClient != null) { if (this.rmClient != null) {
RPC.stopProxy(this.rmClient); RPC.stopProxy(this.rmClient);
} }
historyClient.stop(); if (historyServiceEnabled) {
historyClient.stop();
}
super.serviceStop(); super.serviceStop();
} }
@ -225,11 +236,18 @@ public class YarnClientImpl extends YarnClient {
request.setApplicationId(appId); request.setApplicationId(appId);
response = rmClient.getApplicationReport(request); response = rmClient.getApplicationReport(request);
} catch (YarnException e) { } catch (YarnException e) {
if (!historyServiceEnabled) {
// Just throw it as usual if historyService is not enabled.
throw e;
}
// Even if history-service is enabled, treat all exceptions still the same
// except the following
if (!(e.getClass() == ApplicationNotFoundException.class)) { if (!(e.getClass() == ApplicationNotFoundException.class)) {
throw e; throw e;
} }
}
if (response == null || response.getApplicationReport() == null) {
return historyClient.getApplicationReport(appId); return historyClient.getApplicationReport(appId);
} }
return response.getApplicationReport(); return response.getApplicationReport();
@ -397,25 +415,37 @@ public class YarnClientImpl extends YarnClient {
@Override @Override
public ApplicationAttemptReport getApplicationAttemptReport( public ApplicationAttemptReport getApplicationAttemptReport(
ApplicationAttemptId appAttemptId) throws YarnException, IOException { ApplicationAttemptId appAttemptId) throws YarnException, IOException {
return historyClient.getApplicationAttemptReport(appAttemptId); if (historyServiceEnabled) {
return historyClient.getApplicationAttemptReport(appAttemptId);
}
throw new YarnException("History service is not enabled.");
} }
@Override @Override
public List<ApplicationAttemptReport> getApplicationAttempts( public List<ApplicationAttemptReport> getApplicationAttempts(
ApplicationId appId) throws YarnException, IOException { ApplicationId appId) throws YarnException, IOException {
return historyClient.getApplicationAttempts(appId); if (historyServiceEnabled) {
return historyClient.getApplicationAttempts(appId);
}
throw new YarnException("History service is not enabled.");
} }
@Override @Override
public ContainerReport getContainerReport(ContainerId containerId) public ContainerReport getContainerReport(ContainerId containerId)
throws YarnException, IOException { throws YarnException, IOException {
return historyClient.getContainerReport(containerId); if (historyServiceEnabled) {
return historyClient.getContainerReport(containerId);
}
throw new YarnException("History service is not enabled.");
} }
@Override @Override
public List<ContainerReport> getContainers( public List<ContainerReport> getContainers(
ApplicationAttemptId applicationAttemptId) throws YarnException, ApplicationAttemptId applicationAttemptId) throws YarnException,
IOException { IOException {
return historyClient.getContainers(applicationAttemptId); if (historyServiceEnabled) {
return historyClient.getContainers(applicationAttemptId);
}
throw new YarnException("History service is not enabled.");
} }
} }

View File

@ -565,9 +565,12 @@
</property> </property>
<property> <property>
<description>Enable RM to write history data. If true, then <description>Indicate to ResourceManager as well as clients whether
yarn.resourcemanager.history-writer.class must be specified</description> history-service is enabled or not. If enabled, ResourceManager starts
<name>yarn.resourcemanager.history-writer.enabled</name> recording historical data that ApplicationHistory service can consume.
Similarly, clients can redirect to the history service when applications
finish if this is enabled.</description>
<name>yarn.ahs.enabled</name>
<value>false</value> <value>false</value>
</property> </property>

View File

@ -201,7 +201,7 @@ public class TestMemoryApplicationHistoryStore extends
writeContainerFinishData(containerId); writeContainerFinishData(containerId);
} }
long usedMemoryAfter = (runtime.totalMemory() - runtime.freeMemory()) / mb; long usedMemoryAfter = (runtime.totalMemory() - runtime.freeMemory()) / mb;
Assert.assertTrue((usedMemoryAfter - usedMemoryBefore) < 100); Assert.assertTrue((usedMemoryAfter - usedMemoryBefore) < 200);
} }
} }

View File

@ -68,6 +68,7 @@ public class RMApplicationHistoryWriter extends CompositeService {
private Dispatcher dispatcher; private Dispatcher dispatcher;
private ApplicationHistoryWriter writer; private ApplicationHistoryWriter writer;
private boolean historyServiceEnabled;
public RMApplicationHistoryWriter() { public RMApplicationHistoryWriter() {
super(RMApplicationHistoryWriter.class.getName()); super(RMApplicationHistoryWriter.class.getName());
@ -76,6 +77,11 @@ public class RMApplicationHistoryWriter extends CompositeService {
@Override @Override
protected synchronized void serviceInit( protected synchronized void serviceInit(
Configuration conf) throws Exception { Configuration conf) throws Exception {
historyServiceEnabled = conf.getBoolean(
YarnConfiguration.YARN_HISTORY_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_YARN_HISTORY_SERVICE_ENABLED);
writer = createApplicationHistoryStore(conf); writer = createApplicationHistoryStore(conf);
addIfService(writer); addIfService(writer);
@ -96,12 +102,9 @@ public class RMApplicationHistoryWriter extends CompositeService {
protected ApplicationHistoryStore createApplicationHistoryStore( protected ApplicationHistoryStore createApplicationHistoryStore(
Configuration conf) { Configuration conf) {
boolean ahsEnabled = conf.getBoolean(
YarnConfiguration.RM_HISTORY_WRITER_ENABLED,
YarnConfiguration.DEFAULT_RM_HISTORY_WRITER_ENABLED);
// If the history writer is not enabled, a dummy store will be used to // If the history writer is not enabled, a dummy store will be used to
// write nothing // write nothing
if (ahsEnabled) { if (historyServiceEnabled) {
try { try {
Class<? extends ApplicationHistoryStore> storeClass = Class<? extends ApplicationHistoryStore> storeClass =
conf.getClass(YarnConfiguration.RM_HISTORY_WRITER_CLASS, conf.getClass(YarnConfiguration.RM_HISTORY_WRITER_CLASS,
@ -225,42 +228,49 @@ public class RMApplicationHistoryWriter extends CompositeService {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void applicationAttemptStarted(RMAppAttempt appAttempt) { public void applicationAttemptStarted(RMAppAttempt appAttempt) {
dispatcher.getEventHandler().handle( if (historyServiceEnabled) {
dispatcher.getEventHandler().handle(
new WritingApplicationAttemptStartEvent(appAttempt.getAppAttemptId(), new WritingApplicationAttemptStartEvent(appAttempt.getAppAttemptId(),
ApplicationAttemptStartData.newInstance( ApplicationAttemptStartData.newInstance(appAttempt.getAppAttemptId(),
appAttempt.getAppAttemptId(), appAttempt.getHost(), appAttempt.getHost(), appAttempt.getRpcPort(), appAttempt
appAttempt.getRpcPort(), appAttempt.getMasterContainer() .getMasterContainer().getId())));
.getId()))); }
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void applicationAttemptFinished(RMAppAttempt appAttempt) { public void applicationAttemptFinished(RMAppAttempt appAttempt) {
dispatcher.getEventHandler().handle( if (historyServiceEnabled) {
dispatcher.getEventHandler().handle(
new WritingApplicationAttemptFinishEvent(appAttempt.getAppAttemptId(), new WritingApplicationAttemptFinishEvent(appAttempt.getAppAttemptId(),
ApplicationAttemptFinishData.newInstance(appAttempt ApplicationAttemptFinishData.newInstance(
.getAppAttemptId(), appAttempt.getDiagnostics().toString(), appAttempt.getAppAttemptId(), appAttempt.getDiagnostics()
appAttempt.getTrackingUrl(), appAttempt .toString(), appAttempt.getTrackingUrl(), appAttempt
.getFinalApplicationStatus(), appAttempt .getFinalApplicationStatus(), appAttempt
.createApplicationAttemptState()))); .createApplicationAttemptState())));
}
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void containerStarted(RMContainer container) { public void containerStarted(RMContainer container) {
dispatcher.getEventHandler().handle( if (historyServiceEnabled) {
dispatcher.getEventHandler().handle(
new WritingContainerStartEvent(container.getContainerId(), new WritingContainerStartEvent(container.getContainerId(),
ContainerStartData.newInstance(container.getContainerId(), ContainerStartData.newInstance(container.getContainerId(),
container.getAllocatedResource(), container.getAllocatedNode(), container.getAllocatedResource(), container.getAllocatedNode(),
container.getAllocatedPriority(), container.getStartTime()))); container.getAllocatedPriority(), container.getStartTime())));
}
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void containerFinished(RMContainer container) { public void containerFinished(RMContainer container) {
dispatcher.getEventHandler().handle( if (historyServiceEnabled) {
dispatcher.getEventHandler().handle(
new WritingContainerFinishEvent(container.getContainerId(), new WritingContainerFinishEvent(container.getContainerId(),
ContainerFinishData.newInstance(container.getContainerId(), ContainerFinishData.newInstance(container.getContainerId(),
container.getFinishTime(), container.getDiagnosticsInfo(), container.getFinishTime(), container.getDiagnosticsInfo(),
container.getLogURL(), container.getContainerExitStatus(), container.getLogURL(), container.getContainerExitStatus(),
container.getContainerState()))); container.getContainerState())));
}
} }
/** /**

View File

@ -77,6 +77,7 @@ public class TestRMApplicationHistoryWriter {
public void setup() { public void setup() {
store = new MemoryApplicationHistoryStore(); store = new MemoryApplicationHistoryStore();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.YARN_HISTORY_SERVICE_ENABLED, true);
writer = new RMApplicationHistoryWriter() { writer = new RMApplicationHistoryWriter() {
@Override @Override

View File

@ -105,7 +105,7 @@ public class TestRMContainerImpl {
drainDispatcher.await(); drainDispatcher.await();
assertEquals(RMContainerState.RUNNING, rmContainer.getState()); assertEquals(RMContainerState.RUNNING, rmContainer.getState());
assertEquals( assertEquals(
"http://host:3465/node/containerlogs/container_1_0001_01_000001/user", "http://host:3465/logs/host:3425/container_1_0001_01_000001/container_1_0001_01_000001/user",
rmContainer.getLogURL()); rmContainer.getLogURL());
// In RUNNING state. Verify RELEASED and associated actions. // In RUNNING state. Verify RELEASED and associated actions.
@ -192,7 +192,7 @@ public class TestRMContainerImpl {
drainDispatcher.await(); drainDispatcher.await();
assertEquals(RMContainerState.RUNNING, rmContainer.getState()); assertEquals(RMContainerState.RUNNING, rmContainer.getState());
assertEquals( assertEquals(
"http://host:3465/node/containerlogs/container_1_0001_01_000001/user", "http://host:3465/logs/host:3425/container_1_0001_01_000001/container_1_0001_01_000001/user",
rmContainer.getLogURL()); rmContainer.getLogURL());
// In RUNNING state. Verify EXPIRE and associated actions. // In RUNNING state. Verify EXPIRE and associated actions.