YARN-1749. Updated application-history related configs to reflect the latest reality and to be consistently named. Contributed by Zhijie Shen.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1570948 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ec23bf46e4
commit
e167e585e9
|
@ -224,6 +224,9 @@ Release 2.4.0 - UNRELEASED
|
|||
YARN-1687. Renamed user-facing records for the timeline-service to be simply
|
||||
named after 'timeline' instead of 'apptimeline'. (Zhijie Shen via vinodkv)
|
||||
|
||||
YARN-1749. Updated application-history related configs to reflect the latest
|
||||
reality and to be consistently named. (Zhijie Shen via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -300,11 +300,6 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final int DEFAULT_RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE =
|
||||
10;
|
||||
|
||||
/** The implementation class of ApplicationHistoryStore, which is to be used
|
||||
* by RMApplicationHistoryWriter. */
|
||||
public static final String RM_HISTORY_WRITER_CLASS = RM_PREFIX
|
||||
+ "history-writer.class";
|
||||
|
||||
//Delegation token related keys
|
||||
public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY =
|
||||
RM_PREFIX + "delegation.key.update-interval";
|
||||
|
@ -993,63 +988,6 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String YARN_APP_CONTAINER_LOG_BACKUPS =
|
||||
YARN_PREFIX + "app.container.log.backups";
|
||||
|
||||
////////////////////////////////
|
||||
// AHS Configs
|
||||
////////////////////////////////
|
||||
|
||||
public static final String AHS_PREFIX = YARN_PREFIX + "ahs.";
|
||||
|
||||
/** The setting that controls whether history-service is enabled or not.. */
|
||||
public static final String YARN_HISTORY_SERVICE_ENABLED = AHS_PREFIX
|
||||
+ "enabled";
|
||||
public static final boolean DEFAULT_YARN_HISTORY_SERVICE_ENABLED = false;
|
||||
|
||||
/** URI for FileSystemApplicationHistoryStore */
|
||||
public static final String FS_HISTORY_STORE_URI = AHS_PREFIX + "fs-history-store.uri";
|
||||
|
||||
/** T-file compression types used to compress history data.*/
|
||||
public static final String FS_HISTORY_STORE_COMPRESSION_TYPE = AHS_PREFIX + "fs-history-store.compression-type";
|
||||
public static final String DEFAULT_FS_HISTORY_STORE_COMPRESSION_TYPE = "none";
|
||||
|
||||
/** AHS store class */
|
||||
public static final String AHS_STORE = AHS_PREFIX + "store.class";
|
||||
|
||||
/** host:port address for Application History Server API. */
|
||||
public static final String AHS_ADDRESS = AHS_PREFIX + "address";
|
||||
public static final int DEFAULT_AHS_PORT = 10200;
|
||||
public static final String DEFAULT_AHS_ADDRESS = "0.0.0.0:"
|
||||
+ DEFAULT_AHS_PORT;
|
||||
|
||||
/** The number of threads to handle client API requests. */
|
||||
public static final String AHS_CLIENT_THREAD_COUNT = AHS_PREFIX
|
||||
+ "client.thread-count";
|
||||
public static final int DEFAULT_AHS_CLIENT_THREAD_COUNT = 10;
|
||||
|
||||
|
||||
/** The address of the AHS web application.*/
|
||||
public static final String AHS_WEBAPP_ADDRESS = AHS_PREFIX
|
||||
+ "webapp.address";
|
||||
|
||||
public static final int DEFAULT_AHS_WEBAPP_PORT = 8188;
|
||||
public static final String DEFAULT_AHS_WEBAPP_ADDRESS = "0.0.0.0:"
|
||||
+ DEFAULT_AHS_WEBAPP_PORT;
|
||||
|
||||
/** The https address of the AHS web application.*/
|
||||
public static final String AHS_WEBAPP_HTTPS_ADDRESS = AHS_PREFIX
|
||||
+ "webapp.https.address";
|
||||
|
||||
public static final int DEFAULT_AHS_WEBAPP_HTTPS_PORT = 8190;
|
||||
public static final String DEFAULT_AHS_WEBAPP_HTTPS_ADDRESS = "0.0.0.0:"
|
||||
+ DEFAULT_AHS_WEBAPP_HTTPS_PORT;
|
||||
|
||||
/**The kerberos principal to be used for spnego filter for AHS.*/
|
||||
public static final String AHS_WEBAPP_SPNEGO_USER_NAME_KEY =
|
||||
AHS_PREFIX + "webapp.spnego-principal";
|
||||
|
||||
/**The kerberos keytab to be used for spnego filter for AHS.*/
|
||||
public static final String AHS_WEBAPP_SPNEGO_KEYTAB_FILE_KEY =
|
||||
AHS_PREFIX + "webapp.spnego-keytab-file";
|
||||
|
||||
////////////////////////////////
|
||||
// Timeline Service Configs
|
||||
////////////////////////////////
|
||||
|
@ -1057,6 +995,78 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String TIMELINE_SERVICE_PREFIX =
|
||||
YARN_PREFIX + "timeline-service.";
|
||||
|
||||
|
||||
// mark app-history related configs @Private as application history is going
|
||||
// to be integrated into the timeline service
|
||||
@Private
|
||||
public static final String APPLICATION_HISTORY_PREFIX =
|
||||
TIMELINE_SERVICE_PREFIX + "generic-application-history.";
|
||||
|
||||
/**
|
||||
* The setting that controls whether application history service is
|
||||
* enabled or not.
|
||||
*/
|
||||
@Private
|
||||
public static final String APPLICATION_HISTORY_ENABLED =
|
||||
APPLICATION_HISTORY_PREFIX + "enabled";
|
||||
@Private
|
||||
public static final boolean DEFAULT_APPLICATION_HISTORY_ENABLED = false;
|
||||
|
||||
/** Application history store class */
|
||||
@Private
|
||||
public static final String APPLICATION_HISTORY_STORE =
|
||||
APPLICATION_HISTORY_PREFIX + "store-class";
|
||||
|
||||
/** URI for FileSystemApplicationHistoryStore */
|
||||
@Private
|
||||
public static final String FS_APPLICATION_HISTORY_STORE_URI =
|
||||
APPLICATION_HISTORY_PREFIX + "fs-history-store.uri";
|
||||
|
||||
/** T-file compression types used to compress history data.*/
|
||||
@Private
|
||||
public static final String FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE =
|
||||
APPLICATION_HISTORY_PREFIX + "fs-history-store.compression-type";
|
||||
@Private
|
||||
public static final String DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE =
|
||||
"none";
|
||||
|
||||
/** host:port address for timeline service RPC APIs. */
|
||||
public static final String TIMELINE_SERVICE_ADDRESS =
|
||||
TIMELINE_SERVICE_PREFIX + "address";
|
||||
public static final int DEFAULT_TIMELINE_SERVICE_PORT = 10200;
|
||||
public static final String DEFAULT_TIMELINE_SERVICE_ADDRESS = "0.0.0.0:"
|
||||
+ DEFAULT_TIMELINE_SERVICE_PORT;
|
||||
|
||||
/** The number of threads to handle client RPC API requests. */
|
||||
public static final String TIMELINE_SERVICE_HANDLER_THREAD_COUNT =
|
||||
TIMELINE_SERVICE_PREFIX + "handler-thread-count";
|
||||
public static final int DEFAULT_TIMELINE_SERVICE_CLIENT_THREAD_COUNT = 10;
|
||||
|
||||
|
||||
/** The address of the timeline service web application.*/
|
||||
public static final String TIMELINE_SERVICE_WEBAPP_ADDRESS =
|
||||
TIMELINE_SERVICE_PREFIX + "webapp.address";
|
||||
|
||||
public static final int DEFAULT_TIMELINE_SERVICE_WEBAPP_PORT = 8188;
|
||||
public static final String DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS =
|
||||
"0.0.0.0:" + DEFAULT_TIMELINE_SERVICE_WEBAPP_PORT;
|
||||
|
||||
/** The https address of the timeline service web application.*/
|
||||
public static final String TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS =
|
||||
TIMELINE_SERVICE_PREFIX + "webapp.https.address";
|
||||
|
||||
public static final int DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_PORT = 8190;
|
||||
public static final String DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS =
|
||||
"0.0.0.0:" + DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_PORT;
|
||||
|
||||
/**The kerberos principal to be used for spnego filter for timeline service.*/
|
||||
public static final String TIMELINE_SERVICE_WEBAPP_SPNEGO_USER_NAME_KEY =
|
||||
TIMELINE_SERVICE_PREFIX + "webapp.spnego-principal";
|
||||
|
||||
/**The kerberos keytab to be used for spnego filter for timeline service.*/
|
||||
public static final String TIMELINE_SERVICE_WEBAPP_SPNEGO_KEYTAB_FILE_KEY =
|
||||
TIMELINE_SERVICE_PREFIX + "webapp.spnego-keytab-file";
|
||||
|
||||
/** Timeline service store class */
|
||||
public static final String TIMELINE_SERVICE_STORE =
|
||||
TIMELINE_SERVICE_PREFIX + "store-class";
|
||||
|
|
|
@ -63,9 +63,9 @@ public class AHSClientImpl extends AHSClient {
|
|||
}
|
||||
|
||||
private static InetSocketAddress getAHSAddress(Configuration conf) {
|
||||
return conf.getSocketAddr(YarnConfiguration.AHS_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_AHS_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_AHS_PORT);
|
||||
return conf.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -67,13 +67,13 @@ public class TimelineClientImpl extends TimelineClient {
|
|||
if (YarnConfiguration.useHttps(conf)) {
|
||||
resURI = URI
|
||||
.create(JOINER.join("https://", conf.get(
|
||||
YarnConfiguration.AHS_WEBAPP_HTTPS_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_AHS_WEBAPP_HTTPS_ADDRESS),
|
||||
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS),
|
||||
RESOURCE_URI_STR));
|
||||
} else {
|
||||
resURI = URI.create(JOINER.join("http://", conf.get(
|
||||
YarnConfiguration.AHS_WEBAPP_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_AHS_WEBAPP_ADDRESS), RESOURCE_URI_STR));
|
||||
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS), RESOURCE_URI_STR));
|
||||
}
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
|
|
@ -114,8 +114,8 @@ public class YarnClientImpl extends YarnClient {
|
|||
YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
|
||||
}
|
||||
|
||||
if (conf.getBoolean(YarnConfiguration.YARN_HISTORY_SERVICE_ENABLED,
|
||||
YarnConfiguration.DEFAULT_YARN_HISTORY_SERVICE_ENABLED)) {
|
||||
if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
|
||||
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
|
||||
historyServiceEnabled = true;
|
||||
historyClient = AHSClientImpl.createAHSClient();
|
||||
historyClient.init(getConfig());
|
||||
|
|
|
@ -142,11 +142,11 @@ public class WebAppUtils {
|
|||
|
||||
public static String getAHSWebAppURLWithoutScheme(Configuration conf) {
|
||||
if (YarnConfiguration.useHttps(conf)) {
|
||||
return conf.get(YarnConfiguration.AHS_WEBAPP_HTTPS_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_AHS_WEBAPP_HTTPS_ADDRESS);
|
||||
return conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS);
|
||||
} else {
|
||||
return conf.get(YarnConfiguration.AHS_WEBAPP_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_AHS_WEBAPP_ADDRESS);
|
||||
return conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -572,30 +572,12 @@
|
|||
<value>org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Indicate to ResourceManager as well as clients whether
|
||||
history-service is enabled or not. If enabled, ResourceManager starts
|
||||
recording historical data that ApplicationHistory service can consume.
|
||||
Similarly, clients can redirect to the history service when applications
|
||||
finish if this is enabled.</description>
|
||||
<name>yarn.ahs.enabled</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Number of worker threads that write the history data.</description>
|
||||
<name>yarn.resourcemanager.history-writer.multi-threaded-dispatcher.pool-size</name>
|
||||
<value>10</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The implementation class of ApplicationHistoryStore, which is
|
||||
to be used by RMApplicationHistoryWriter.
|
||||
</description>
|
||||
<name>yarn.resourcemanager.history-writer.class</name>
|
||||
<value>org.apache.hadoop.yarn.server.applicationhistoryservice.NullApplicationHistoryStore</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The class to use as the configuration provider.
|
||||
If org.apache.hadoop.yarn.LocalConfigurationProvider is used,
|
||||
|
@ -1085,62 +1067,32 @@
|
|||
<value></value>
|
||||
</property>
|
||||
|
||||
<!-- Application History Service's Configuration-->
|
||||
<!-- Timeline Service's Configuration-->
|
||||
|
||||
<property>
|
||||
<description>The hostname of the AHS.</description>
|
||||
<name>yarn.ahs.hostname</name>
|
||||
<description>The hostname of the timeline service web application.</description>
|
||||
<name>yarn.timeline-service.hostname</name>
|
||||
<value>0.0.0.0</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The http address of the AHS web application.</description>
|
||||
<name>yarn.ahs.webapp.address</name>
|
||||
<value>${yarn.ahs.hostname}:8188</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The https adddress of the AHS web application.</description>
|
||||
<name>yarn.ahs.webapp.https.address</name>
|
||||
<value>${yarn.ahs.hostname}:8190</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>URI pointing to the location of the FileSystem path where
|
||||
the history will be persisted. This must be supplied when using
|
||||
org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore
|
||||
as the value for yarn.resourcemanager.history-writer.store.class</description>
|
||||
<name>yarn.ahs.fs-history-store.uri</name>
|
||||
<value>${hadoop.log.dir}/yarn/system/ahstore</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>This is default address for the Application History server
|
||||
to start the RPC server.</description>
|
||||
<name>yarn.ahs.address</name>
|
||||
<description>This is default address for the timeline server to start the
|
||||
RPC server.</description>
|
||||
<name>yarn.timeline-service.address</name>
|
||||
<value>0.0.0.0:10200</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>CLient thread count to serve the client requests.</description>
|
||||
<name>yarn.ahs.client.thread-count</name>
|
||||
<value>10</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>T-file compression types used to compress history data.</description>
|
||||
<name>yarn.ahs.fs-history-store.compression-type</name>
|
||||
<value>none</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description> Store class name for history store, defaulting to file
|
||||
system store </description>
|
||||
<name>yarn.ahs.store.class</name>
|
||||
<value>org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore</value>
|
||||
<description>The http address of the timeline service web application.</description>
|
||||
<name>yarn.timeline-service.webapp.address</name>
|
||||
<value>${yarn.timeline-service.hostname}:8188</value>
|
||||
</property>
|
||||
|
||||
<!-- Timeline Service's Configuration-->
|
||||
<property>
|
||||
<description>The https adddress of the timeline service web application.</description>
|
||||
<name>yarn.timeline-service.webapp.https.address</name>
|
||||
<value>${yarn.timeline-service.hostname}:8190</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Store class name for timeline store</description>
|
||||
|
@ -1154,6 +1106,44 @@
|
|||
<value>${yarn.log.dir}/timeline</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Handler thread count to serve the client RPC requests.</description>
|
||||
<name>yarn.timeline-service.handler-thread-count</name>
|
||||
<value>10</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Indicate to ResourceManager as well as clients whether
|
||||
history-service is enabled or not. If enabled, ResourceManager starts
|
||||
recording historical data that ApplicationHistory service can consume.
|
||||
Similarly, clients can redirect to the history service when applications
|
||||
finish if this is enabled.</description>
|
||||
<name>yarn.timeline-service.generic-application-history.enabled</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>URI pointing to the location of the FileSystem path where
|
||||
the history will be persisted. This must be supplied when using
|
||||
org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore
|
||||
as the value for yarn.timeline-service.generic-application-history.store-class</description>
|
||||
<name>yarn.timeline-service.generic-application-history.fs-history-store.uri</name>
|
||||
<value>${hadoop.log.dir}/yarn/system/history</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>T-file compression types used to compress history data.</description>
|
||||
<name>yarn.timeline-service.generic-application-history.fs-history-store.compression-type</name>
|
||||
<value>none</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Store class name for history store, defaulting to file
|
||||
system store </description>
|
||||
<name>yarn.timeline-service.generic-application-history.store-class</name>
|
||||
<value>org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore</value>
|
||||
</property>
|
||||
|
||||
<!-- Other configuration -->
|
||||
<property>
|
||||
<description>The interval that the yarn client library uses to poll the
|
||||
|
|
|
@ -76,19 +76,19 @@ public class ApplicationHistoryClientService extends AbstractService {
|
|||
Configuration conf = getConfig();
|
||||
YarnRPC rpc = YarnRPC.create(conf);
|
||||
InetSocketAddress address =
|
||||
conf.getSocketAddr(YarnConfiguration.AHS_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_AHS_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_AHS_PORT);
|
||||
conf.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT);
|
||||
|
||||
server =
|
||||
rpc.getServer(ApplicationHistoryProtocol.class, protocolHandler,
|
||||
address, conf, null, conf.getInt(
|
||||
YarnConfiguration.AHS_CLIENT_THREAD_COUNT,
|
||||
YarnConfiguration.DEFAULT_AHS_CLIENT_THREAD_COUNT));
|
||||
YarnConfiguration.TIMELINE_SERVICE_HANDLER_THREAD_COUNT,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_THREAD_COUNT));
|
||||
|
||||
server.start();
|
||||
this.bindAddress =
|
||||
conf.updateConnectAddr(YarnConfiguration.AHS_ADDRESS,
|
||||
conf.updateConnectAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
|
||||
server.getListenerAddress());
|
||||
LOG.info("Instantiated ApplicationHistoryClientService at "
|
||||
+ this.bindAddress);
|
||||
|
|
|
@ -79,7 +79,8 @@ public class ApplicationHistoryManagerImpl extends AbstractService implements
|
|||
protected ApplicationHistoryStore createApplicationHistoryStore(
|
||||
Configuration conf) {
|
||||
return ReflectionUtils.newInstance(conf.getClass(
|
||||
YarnConfiguration.AHS_STORE, FileSystemApplicationHistoryStore.class,
|
||||
YarnConfiguration.APPLICATION_HISTORY_STORE,
|
||||
FileSystemApplicationHistoryStore.class,
|
||||
ApplicationHistoryStore.class), conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -158,9 +158,9 @@ public class ApplicationHistoryServer extends CompositeService {
|
|||
ahsClientService, "ws")
|
||||
.with(getConfig())
|
||||
.withHttpSpnegoPrincipalKey(
|
||||
YarnConfiguration.AHS_WEBAPP_SPNEGO_USER_NAME_KEY)
|
||||
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_SPNEGO_USER_NAME_KEY)
|
||||
.withHttpSpnegoKeytabKey(
|
||||
YarnConfiguration.AHS_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
|
||||
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
|
||||
.at(bindAddress)
|
||||
.start(new AHSWebApp(historyManager, timelineStore));
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -113,7 +113,7 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|||
@Override
|
||||
public void serviceInit(Configuration conf) throws Exception {
|
||||
Path fsWorkingPath =
|
||||
new Path(conf.get(YarnConfiguration.FS_HISTORY_STORE_URI));
|
||||
new Path(conf.get(YarnConfiguration.FS_APPLICATION_HISTORY_STORE_URI));
|
||||
rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
|
||||
try {
|
||||
fs = fsWorkingPath.getFileSystem(conf);
|
||||
|
@ -727,8 +727,8 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|||
fs.setPermission(historyFile, HISTORY_FILE_UMASK);
|
||||
writer =
|
||||
new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
|
||||
YarnConfiguration.FS_HISTORY_STORE_COMPRESSION_TYPE,
|
||||
YarnConfiguration.DEFAULT_FS_HISTORY_STORE_COMPRESSION_TYPE), null,
|
||||
YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE,
|
||||
YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null,
|
||||
getConfig());
|
||||
}
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ public class TestApplicationHistoryClientService extends
|
|||
public void setup() {
|
||||
historyServer = new ApplicationHistoryServer();
|
||||
Configuration config = new YarnConfiguration();
|
||||
config.setClass(YarnConfiguration.AHS_STORE,
|
||||
config.setClass(YarnConfiguration.APPLICATION_HISTORY_STORE,
|
||||
MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class);
|
||||
historyServer.init(config);
|
||||
historyServer.start();
|
||||
|
|
|
@ -38,7 +38,7 @@ public class TestApplicationHistoryManagerImpl extends
|
|||
@Before
|
||||
public void setup() throws Exception {
|
||||
Configuration config = new Configuration();
|
||||
config.setClass(YarnConfiguration.AHS_STORE,
|
||||
config.setClass(YarnConfiguration.APPLICATION_HISTORY_STORE,
|
||||
MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class);
|
||||
applicationHistoryManagerImpl = new ApplicationHistoryManagerImpl();
|
||||
applicationHistoryManagerImpl.init(config);
|
||||
|
|
|
@ -52,7 +52,7 @@ public class TestFileSystemApplicationHistoryStore extends
|
|||
fs.initialize(new URI("/"), conf);
|
||||
fsWorkingPath = new Path("Test");
|
||||
fs.delete(fsWorkingPath, true);
|
||||
conf.set(YarnConfiguration.FS_HISTORY_STORE_URI, fsWorkingPath.toString());
|
||||
conf.set(YarnConfiguration.FS_APPLICATION_HISTORY_STORE_URI, fsWorkingPath.toString());
|
||||
store = new FileSystemApplicationHistoryStore();
|
||||
store.init(conf);
|
||||
store.start();
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryWriter;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.NullApplicationHistoryStore;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
|
||||
|
@ -81,8 +82,8 @@ public class RMApplicationHistoryWriter extends CompositeService {
|
|||
protected synchronized void serviceInit(Configuration conf) throws Exception {
|
||||
|
||||
historyServiceEnabled =
|
||||
conf.getBoolean(YarnConfiguration.YARN_HISTORY_SERVICE_ENABLED,
|
||||
YarnConfiguration.DEFAULT_YARN_HISTORY_SERVICE_ENABLED);
|
||||
conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
|
||||
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED);
|
||||
|
||||
writer = createApplicationHistoryStore(conf);
|
||||
addIfService(writer);
|
||||
|
@ -112,14 +113,15 @@ public class RMApplicationHistoryWriter extends CompositeService {
|
|||
if (historyServiceEnabled) {
|
||||
try {
|
||||
Class<? extends ApplicationHistoryStore> storeClass =
|
||||
conf.getClass(YarnConfiguration.RM_HISTORY_WRITER_CLASS,
|
||||
NullApplicationHistoryStore.class, ApplicationHistoryStore.class);
|
||||
conf.getClass(YarnConfiguration.APPLICATION_HISTORY_STORE,
|
||||
FileSystemApplicationHistoryStore.class,
|
||||
ApplicationHistoryStore.class);
|
||||
return storeClass.newInstance();
|
||||
} catch (Exception e) {
|
||||
String msg =
|
||||
"Could not instantiate ApplicationHistoryWriter: "
|
||||
+ conf.get(YarnConfiguration.RM_HISTORY_WRITER_CLASS,
|
||||
NullApplicationHistoryStore.class.getName());
|
||||
+ conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE,
|
||||
FileSystemApplicationHistoryStore.class.getName());
|
||||
LOG.error(msg, e);
|
||||
throw new YarnRuntimeException(msg, e);
|
||||
}
|
||||
|
@ -214,21 +216,25 @@ public class RMApplicationHistoryWriter extends CompositeService {
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void applicationStarted(RMApp app) {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new WritingApplicationStartEvent(app.getApplicationId(),
|
||||
ApplicationStartData.newInstance(app.getApplicationId(), app.getName(),
|
||||
app.getApplicationType(), app.getQueue(), app.getUser(),
|
||||
app.getSubmitTime(), app.getStartTime())));
|
||||
if (historyServiceEnabled) {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new WritingApplicationStartEvent(app.getApplicationId(),
|
||||
ApplicationStartData.newInstance(app.getApplicationId(), app.getName(),
|
||||
app.getApplicationType(), app.getQueue(), app.getUser(),
|
||||
app.getSubmitTime(), app.getStartTime())));
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void applicationFinished(RMApp app, RMAppState finalState) {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new WritingApplicationFinishEvent(app.getApplicationId(),
|
||||
ApplicationFinishData.newInstance(app.getApplicationId(),
|
||||
app.getFinishTime(), app.getDiagnostics().toString(),
|
||||
app.getFinalApplicationStatus(),
|
||||
RMServerUtils.createApplicationState(finalState))));
|
||||
if (historyServiceEnabled) {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new WritingApplicationFinishEvent(app.getApplicationId(),
|
||||
ApplicationFinishData.newInstance(app.getApplicationId(),
|
||||
app.getFinishTime(), app.getDiagnostics().toString(),
|
||||
app.getFinalApplicationStatus(),
|
||||
RMServerUtils.createApplicationState(finalState))));
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
|
@ -77,7 +77,7 @@ public class TestRMApplicationHistoryWriter {
|
|||
public void setup() {
|
||||
store = new MemoryApplicationHistoryStore();
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(YarnConfiguration.YARN_HISTORY_SERVICE_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, true);
|
||||
writer = new RMApplicationHistoryWriter() {
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue