YARN-8234. Improve RM system metrics publisher's performance by pushing events to timeline server in batch (#3793)
Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
(cherry picked from commit 00e2405fbd
)
Conflicts:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
This commit is contained in:
parent
3924efb595
commit
e0e84431f0
|
@ -735,6 +735,20 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final int
|
public static final int
|
||||||
DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = 10;
|
DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = 10;
|
||||||
|
|
||||||
|
public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE =
|
||||||
|
RM_PREFIX + "system-metrics-publisher.timeline-server-v1.batch-size";
|
||||||
|
public static final int
|
||||||
|
DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE =
|
||||||
|
1000;
|
||||||
|
public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL =
|
||||||
|
RM_PREFIX + "system-metrics-publisher.timeline-server-v1.interval-seconds";
|
||||||
|
public static final int DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL =
|
||||||
|
60;
|
||||||
|
public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED =
|
||||||
|
RM_PREFIX + "system-metrics-publisher.timeline-server-v1.enable-batch";
|
||||||
|
public static final boolean DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED =
|
||||||
|
false;
|
||||||
|
|
||||||
//RM delegation token related keys
|
//RM delegation token related keys
|
||||||
public static final String RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY =
|
public static final String RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY =
|
||||||
RM_PREFIX + "delegation.key.update-interval";
|
RM_PREFIX + "delegation.key.update-interval";
|
||||||
|
|
|
@ -982,6 +982,33 @@
|
||||||
<value>10</value>
|
<value>10</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>
|
||||||
|
This setting enables/disables timeline server v1 publisher to publish timeline events in batch.
|
||||||
|
</description>
|
||||||
|
<name>yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.enable-batch</name>
|
||||||
|
<value>false</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>
|
||||||
|
The size of timeline server v1 publisher sending events in one request.
|
||||||
|
</description>
|
||||||
|
<name>yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.batch-size</name>
|
||||||
|
<value>1000</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>
|
||||||
|
When enable batch publishing in timeline server v1, we must avoid that the
|
||||||
|
publisher waits for a batch to be filled up and hold events in buffer for long
|
||||||
|
time. So we add another thread which send event's in the buffer periodically.
|
||||||
|
This config sets the interval of the cyclical sending thread.
|
||||||
|
</description>
|
||||||
|
<name>yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.interval-seconds</name>
|
||||||
|
<value>60</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>Number of diagnostics/failure messages can be saved in RM for
|
<description>Number of diagnostics/failure messages can be saved in RM for
|
||||||
log aggregation. It also defines the number of diagnostics/failure
|
log aggregation. It also defines the number of diagnostics/failure
|
||||||
|
|
|
@ -18,8 +18,13 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
|
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -32,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
||||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
|
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
|
||||||
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||||
|
@ -59,9 +65,55 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
|
||||||
}
|
}
|
||||||
|
|
||||||
private TimelineClient client;
|
private TimelineClient client;
|
||||||
|
private LinkedBlockingQueue<TimelineEntity> entityQueue;
|
||||||
|
private ExecutorService sendEventThreadPool;
|
||||||
|
private int dispatcherPoolSize;
|
||||||
|
private int dispatcherBatchSize;
|
||||||
|
private int putEventInterval;
|
||||||
|
private boolean isTimeLineServerBatchEnabled;
|
||||||
|
private volatile boolean stopped = false;
|
||||||
|
private PutEventThread putEventThread;
|
||||||
|
private Object sendEntityLock;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
|
isTimeLineServerBatchEnabled =
|
||||||
|
conf.getBoolean(
|
||||||
|
YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED,
|
||||||
|
YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED);
|
||||||
|
if (isTimeLineServerBatchEnabled) {
|
||||||
|
putEventInterval =
|
||||||
|
conf.getInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL,
|
||||||
|
YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL)
|
||||||
|
* 1000;
|
||||||
|
if (putEventInterval <= 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL should be greater than 0");
|
||||||
|
}
|
||||||
|
dispatcherPoolSize = conf.getInt(
|
||||||
|
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
|
||||||
|
YarnConfiguration.
|
||||||
|
DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE);
|
||||||
|
if (dispatcherPoolSize <= 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE should be greater than 0");
|
||||||
|
}
|
||||||
|
dispatcherBatchSize = conf.getInt(
|
||||||
|
YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE,
|
||||||
|
YarnConfiguration.
|
||||||
|
DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE);
|
||||||
|
if (dispatcherBatchSize <= 1) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE should be greater than 1");
|
||||||
|
}
|
||||||
|
putEventThread = new PutEventThread();
|
||||||
|
sendEventThreadPool = Executors.newFixedThreadPool(dispatcherPoolSize);
|
||||||
|
entityQueue = new LinkedBlockingQueue<>(dispatcherBatchSize + 1);
|
||||||
|
sendEntityLock = new Object();
|
||||||
|
LOG.info("Timeline service v1 batch publishing enabled");
|
||||||
|
} else {
|
||||||
|
LOG.info("Timeline service v1 batch publishing disabled");
|
||||||
|
}
|
||||||
client = TimelineClient.createTimelineClient();
|
client = TimelineClient.createTimelineClient();
|
||||||
addIfService(client);
|
addIfService(client);
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
|
@ -69,6 +121,36 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
|
||||||
new TimelineV1EventHandler());
|
new TimelineV1EventHandler());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void serviceStart() throws Exception {
|
||||||
|
if (isTimeLineServerBatchEnabled) {
|
||||||
|
stopped = false;
|
||||||
|
putEventThread.start();
|
||||||
|
}
|
||||||
|
super.serviceStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void serviceStop() throws Exception {
|
||||||
|
super.serviceStop();
|
||||||
|
if (isTimeLineServerBatchEnabled) {
|
||||||
|
stopped = true;
|
||||||
|
putEventThread.interrupt();
|
||||||
|
try {
|
||||||
|
putEventThread.join();
|
||||||
|
SendEntity task = new SendEntity();
|
||||||
|
if (!task.buffer.isEmpty()) {
|
||||||
|
LOG.info("Initiating final putEntities, remaining entities left in entityQueue: {}",
|
||||||
|
task.buffer.size());
|
||||||
|
sendEventThreadPool.submit(task);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
sendEventThreadPool.shutdown();
|
||||||
|
if (!sendEventThreadPool.awaitTermination(3, TimeUnit.SECONDS)) {
|
||||||
|
sendEventThreadPool.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public void appCreated(RMApp app, long createdTime) {
|
public void appCreated(RMApp app, long createdTime) {
|
||||||
|
@ -244,7 +326,7 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public void appAttemptFinished(RMAppAttempt appAttempt,
|
public void appAttemptFinished(RMAppAttempt appAttempt,
|
||||||
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
|
RMAppAttemptState appAttemptState, RMApp app, long finishedTime) {
|
||||||
TimelineEntity entity =
|
TimelineEntity entity =
|
||||||
createAppAttemptEntity(appAttempt.getAppAttemptId());
|
createAppAttemptEntity(appAttempt.getAppAttemptId());
|
||||||
|
|
||||||
|
@ -261,7 +343,7 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
|
||||||
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_INFO,
|
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_INFO,
|
||||||
app.getFinalApplicationStatus().toString());
|
app.getFinalApplicationStatus().toString());
|
||||||
eventInfo.put(AppAttemptMetricsConstants.STATE_INFO, RMServerUtils
|
eventInfo.put(AppAttemptMetricsConstants.STATE_INFO, RMServerUtils
|
||||||
.createApplicationAttemptState(appAttemtpState).toString());
|
.createApplicationAttemptState(appAttemptState).toString());
|
||||||
if (appAttempt.getMasterContainer() != null) {
|
if (appAttempt.getMasterContainer() != null) {
|
||||||
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO,
|
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO,
|
||||||
appAttempt.getMasterContainer().getId().toString());
|
appAttempt.getMasterContainer().getId().toString());
|
||||||
|
@ -361,23 +443,68 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void putEntity(TimelineEntity entity) {
|
private void putEntity(TimelineEntity entity) {
|
||||||
try {
|
if (isTimeLineServerBatchEnabled) {
|
||||||
if (LOG.isDebugEnabled()) {
|
try {
|
||||||
LOG.debug("Publishing the entity " + entity.getEntityId()
|
entityQueue.put(entity);
|
||||||
+ ", JSON-style content: "
|
if (entityQueue.size() > dispatcherBatchSize) {
|
||||||
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
|
SendEntity task = null;
|
||||||
|
synchronized (sendEntityLock) {
|
||||||
|
if (entityQueue.size() > dispatcherBatchSize) {
|
||||||
|
task = new SendEntity();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (task != null) {
|
||||||
|
sendEventThreadPool.submit(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error when publishing entity batch [ " + entity.getEntityType() + ","
|
||||||
|
+ entity.getEntityId() + " ] ", e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Publishing the entity " + entity.getEntityId()
|
||||||
|
+ ", JSON-style content: "
|
||||||
|
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
|
||||||
|
}
|
||||||
|
client.putEntities(entity);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error when publishing entity [ " + entity.getEntityType() + ","
|
||||||
|
+ entity.getEntityId() + " ] ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class SendEntity implements Runnable {
|
||||||
|
|
||||||
|
private ArrayList<TimelineEntity> buffer;
|
||||||
|
|
||||||
|
SendEntity() {
|
||||||
|
buffer = new ArrayList();
|
||||||
|
entityQueue.drainTo(buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Number of timeline entities being sent in batch: {}", buffer.size());
|
||||||
|
}
|
||||||
|
if (buffer.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
client.putEntities(buffer.toArray(new TimelineEntity[0]));
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error when publishing entity: ", e);
|
||||||
}
|
}
|
||||||
client.putEntities(entity);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Error when publishing entity [" + entity.getEntityType() + ","
|
|
||||||
+ entity.getEntityId() + "]", e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class TimelineV1PublishEvent extends TimelinePublishEvent {
|
private class TimelineV1PublishEvent extends TimelinePublishEvent {
|
||||||
private TimelineEntity entity;
|
private TimelineEntity entity;
|
||||||
|
|
||||||
public TimelineV1PublishEvent(SystemMetricsEventType type,
|
TimelineV1PublishEvent(SystemMetricsEventType type,
|
||||||
TimelineEntity entity, ApplicationId appId) {
|
TimelineEntity entity, ApplicationId appId) {
|
||||||
super(type, appId);
|
super(type, appId);
|
||||||
this.entity = entity;
|
this.entity = entity;
|
||||||
|
@ -395,4 +522,46 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
|
||||||
putEntity(event.getEntity());
|
putEntity(event.getEntity());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
private class PutEventThread extends Thread {
|
||||||
|
PutEventThread() {
|
||||||
|
super("PutEventThread");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
LOG.info("System metrics publisher will put events every " +
|
||||||
|
String.valueOf(putEventInterval) + " milliseconds");
|
||||||
|
while (!stopped && !Thread.currentThread().isInterrupted()) {
|
||||||
|
if (System.currentTimeMillis() % putEventInterval >= 1000) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(500);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.warn(SystemMetricsPublisher.class.getName()
|
||||||
|
+ " is interrupted. Exiting.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
SendEntity task = null;
|
||||||
|
synchronized (sendEntityLock) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Creating SendEntity task in PutEventThread");
|
||||||
|
}
|
||||||
|
task = new SendEntity();
|
||||||
|
}
|
||||||
|
if (task != null) {
|
||||||
|
sendEventThreadPool.submit(task);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
// sleep added to avoid multiple SendEntity task within a single interval.
|
||||||
|
Thread.sleep(1000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.warn(SystemMetricsPublisher.class.getName()
|
||||||
|
+ " is interrupted. Exiting.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,9 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
|
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
|
||||||
|
|
||||||
import static org.mockito.Mockito.mock;
|
import java.util.Arrays;
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
@ -29,6 +27,14 @@ import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
|
|
||||||
import org.apache.hadoop.ipc.CallerContext;
|
import org.apache.hadoop.ipc.CallerContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -64,19 +70,33 @@ import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
||||||
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
|
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
|
||||||
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
|
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
public class TestSystemMetricsPublisher {
|
public class TestSystemMetricsPublisher {
|
||||||
|
|
||||||
|
@Parameters
|
||||||
|
public static Collection<Object[]> data() {
|
||||||
|
return Arrays.asList(new Object[][] {{false, 0}, {true, 1}});
|
||||||
|
}
|
||||||
|
|
||||||
private static ApplicationHistoryServer timelineServer;
|
private static ApplicationHistoryServer timelineServer;
|
||||||
private static TimelineServiceV1Publisher metricsPublisher;
|
private static TimelineServiceV1Publisher metricsPublisher;
|
||||||
private static TimelineStore store;
|
private static TimelineStore store;
|
||||||
|
|
||||||
@BeforeClass
|
private boolean rmTimelineServerV1PublisherBatchEnabled;
|
||||||
public static void setup() throws Exception {
|
private int rmTimelineServerV1PublisherInterval;
|
||||||
|
|
||||||
|
public TestSystemMetricsPublisher(boolean rmTimelineServerV1PublisherBatchEnabled,
|
||||||
|
int rmTimelineServerV1PublisherInterval) {
|
||||||
|
this.rmTimelineServerV1PublisherBatchEnabled = rmTimelineServerV1PublisherBatchEnabled;
|
||||||
|
this.rmTimelineServerV1PublisherInterval = rmTimelineServerV1PublisherInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||||
conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
|
||||||
|
@ -87,6 +107,10 @@ public class TestSystemMetricsPublisher {
|
||||||
conf.setInt(
|
conf.setInt(
|
||||||
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
|
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
|
||||||
2);
|
2);
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED,
|
||||||
|
rmTimelineServerV1PublisherBatchEnabled);
|
||||||
|
conf.setInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL,
|
||||||
|
rmTimelineServerV1PublisherInterval);
|
||||||
|
|
||||||
timelineServer = new ApplicationHistoryServer();
|
timelineServer = new ApplicationHistoryServer();
|
||||||
timelineServer.init(conf);
|
timelineServer.init(conf);
|
||||||
|
@ -98,8 +122,8 @@ public class TestSystemMetricsPublisher {
|
||||||
metricsPublisher.start();
|
metricsPublisher.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@After
|
||||||
public static void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
if (metricsPublisher != null) {
|
if (metricsPublisher != null) {
|
||||||
metricsPublisher.stop();
|
metricsPublisher.stop();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue