YARN-8234. Improve RM system metrics publisher's performance by pushing events to timeline server in batch (#3793)

Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
(cherry picked from commit 00e2405fbd)
This commit is contained in:
Ashutosh Gupta 2021-12-23 13:44:51 +05:30 committed by Akira Ajisaka
parent ccaba2561a
commit 1c99810b89
4 changed files with 259 additions and 25 deletions

View File

@ -756,6 +756,20 @@ public class YarnConfiguration extends Configuration {
public static final int
DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = 10;
public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE =
RM_PREFIX + "system-metrics-publisher.timeline-server-v1.batch-size";
public static final int
DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE =
1000;
public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL =
RM_PREFIX + "system-metrics-publisher.timeline-server-v1.interval-seconds";
public static final int DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL =
60;
public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED =
RM_PREFIX + "system-metrics-publisher.timeline-server-v1.enable-batch";
public static final boolean DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED =
false;
//RM delegation token related keys
public static final String RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY =
RM_PREFIX + "delegation.key.update-interval";

View File

@ -995,6 +995,33 @@
<value>10</value>
</property>
<property>
<description>
This setting enables/disables timeline server v1 publisher to publish timeline events in batch.
</description>
<name>yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.enable-batch</name>
<value>false</value>
</property>
<property>
<description>
The size of timeline server v1 publisher sending events in one request.
</description>
<name>yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.batch-size</name>
<value>1000</value>
</property>
<property>
<description>
When enable batch publishing in timeline server v1, we must avoid that the
publisher waits for a batch to be filled up and hold events in buffer for long
time. So we add another thread which send event's in the buffer periodically.
This config sets the interval of the cyclical sending thread.
</description>
<name>yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.interval-seconds</name>
<value>60</value>
</property>
<property>
<description>Number of diagnostics/failure messages can be saved in RM for
log aggregation. It also defines the number of diagnostics/failure

View File

@ -18,8 +18,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -32,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
@ -59,9 +65,55 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
}
private TimelineClient client;
private LinkedBlockingQueue<TimelineEntity> entityQueue;
private ExecutorService sendEventThreadPool;
private int dispatcherPoolSize;
private int dispatcherBatchSize;
private int putEventInterval;
private boolean isTimeLineServerBatchEnabled;
private volatile boolean stopped = false;
private PutEventThread putEventThread;
private Object sendEntityLock;
@Override
protected void serviceInit(Configuration conf) throws Exception {
isTimeLineServerBatchEnabled =
conf.getBoolean(
YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED,
YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED);
if (isTimeLineServerBatchEnabled) {
putEventInterval =
conf.getInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL,
YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL)
* 1000;
if (putEventInterval <= 0) {
throw new IllegalArgumentException(
"RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL should be greater than 0");
}
dispatcherPoolSize = conf.getInt(
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
YarnConfiguration.
DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE);
if (dispatcherPoolSize <= 0) {
throw new IllegalArgumentException(
"RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE should be greater than 0");
}
dispatcherBatchSize = conf.getInt(
YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE,
YarnConfiguration.
DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE);
if (dispatcherBatchSize <= 1) {
throw new IllegalArgumentException(
"RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE should be greater than 1");
}
putEventThread = new PutEventThread();
sendEventThreadPool = Executors.newFixedThreadPool(dispatcherPoolSize);
entityQueue = new LinkedBlockingQueue<>(dispatcherBatchSize + 1);
sendEntityLock = new Object();
LOG.info("Timeline service v1 batch publishing enabled");
} else {
LOG.info("Timeline service v1 batch publishing disabled");
}
client = TimelineClient.createTimelineClient();
addIfService(client);
super.serviceInit(conf);
@ -69,6 +121,36 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
new TimelineV1EventHandler());
}
protected void serviceStart() throws Exception {
if (isTimeLineServerBatchEnabled) {
stopped = false;
putEventThread.start();
}
super.serviceStart();
}
protected void serviceStop() throws Exception {
super.serviceStop();
if (isTimeLineServerBatchEnabled) {
stopped = true;
putEventThread.interrupt();
try {
putEventThread.join();
SendEntity task = new SendEntity();
if (!task.buffer.isEmpty()) {
LOG.info("Initiating final putEntities, remaining entities left in entityQueue: {}",
task.buffer.size());
sendEventThreadPool.submit(task);
}
} finally {
sendEventThreadPool.shutdown();
if (!sendEventThreadPool.awaitTermination(3, TimeUnit.SECONDS)) {
sendEventThreadPool.shutdownNow();
}
}
}
}
@SuppressWarnings("unchecked")
@Override
public void appCreated(RMApp app, long createdTime) {
@ -257,7 +339,7 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
@SuppressWarnings("unchecked")
@Override
public void appAttemptFinished(RMAppAttempt appAttempt,
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
RMAppAttemptState appAttemptState, RMApp app, long finishedTime) {
TimelineEntity entity =
createAppAttemptEntity(appAttempt.getAppAttemptId());
@ -274,7 +356,7 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_INFO,
app.getFinalApplicationStatus().toString());
eventInfo.put(AppAttemptMetricsConstants.STATE_INFO, RMServerUtils
.createApplicationAttemptState(appAttemtpState).toString());
.createApplicationAttemptState(appAttemptState).toString());
if (appAttempt.getMasterContainer() != null) {
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO,
appAttempt.getMasterContainer().getId().toString());
@ -374,6 +456,25 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
}
private void putEntity(TimelineEntity entity) {
if (isTimeLineServerBatchEnabled) {
try {
entityQueue.put(entity);
if (entityQueue.size() > dispatcherBatchSize) {
SendEntity task = null;
synchronized (sendEntityLock) {
if (entityQueue.size() > dispatcherBatchSize) {
task = new SendEntity();
}
}
if (task != null) {
sendEventThreadPool.submit(task);
}
}
} catch (Exception e) {
LOG.error("Error when publishing entity batch [ " + entity.getEntityType() + ","
+ entity.getEntityId() + " ] ", e);
}
} else {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Publishing the entity " + entity.getEntityId()
@ -386,11 +487,37 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
+ entity.getEntityId() + " ] ", e);
}
}
}
private class SendEntity implements Runnable {
private ArrayList<TimelineEntity> buffer;
SendEntity() {
buffer = new ArrayList();
entityQueue.drainTo(buffer);
}
@Override
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("Number of timeline entities being sent in batch: {}", buffer.size());
}
if (buffer.isEmpty()) {
return;
}
try {
client.putEntities(buffer.toArray(new TimelineEntity[0]));
} catch (Exception e) {
LOG.error("Error when publishing entity: ", e);
}
}
}
private class TimelineV1PublishEvent extends TimelinePublishEvent {
private TimelineEntity entity;
public TimelineV1PublishEvent(SystemMetricsEventType type,
TimelineV1PublishEvent(SystemMetricsEventType type,
TimelineEntity entity, ApplicationId appId) {
super(type, appId);
this.entity = entity;
@ -408,4 +535,46 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
putEntity(event.getEntity());
}
}
private class PutEventThread extends Thread {
PutEventThread() {
super("PutEventThread");
}
@Override
public void run() {
LOG.info("System metrics publisher will put events every " +
String.valueOf(putEventInterval) + " milliseconds");
while (!stopped && !Thread.currentThread().isInterrupted()) {
if (System.currentTimeMillis() % putEventInterval >= 1000) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
LOG.warn(SystemMetricsPublisher.class.getName()
+ " is interrupted. Exiting.");
break;
}
continue;
}
SendEntity task = null;
synchronized (sendEntityLock) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating SendEntity task in PutEventThread");
}
task = new SendEntity();
}
if (task != null) {
sendEventThreadPool.submit(task);
}
try {
// sleep added to avoid multiple SendEntity task within a single interval.
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.warn(SystemMetricsPublisher.class.getName()
+ " is interrupted. Exiting.");
break;
}
}
}
}
}

View File

@ -18,10 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@ -30,6 +27,14 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -65,19 +70,34 @@ import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class TestSystemMetricsPublisher {
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {{false, 0}, {true, 1}});
}
private static ApplicationHistoryServer timelineServer;
private static TimelineServiceV1Publisher metricsPublisher;
private static TimelineStore store;
@BeforeClass
public static void setup() throws Exception {
private boolean rmTimelineServerV1PublisherBatchEnabled;
private int rmTimelineServerV1PublisherInterval;
public TestSystemMetricsPublisher(boolean rmTimelineServerV1PublisherBatchEnabled,
int rmTimelineServerV1PublisherInterval) {
this.rmTimelineServerV1PublisherBatchEnabled = rmTimelineServerV1PublisherBatchEnabled;
this.rmTimelineServerV1PublisherInterval = rmTimelineServerV1PublisherInterval;
}
@Before
public void setup() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
@ -88,6 +108,10 @@ public class TestSystemMetricsPublisher {
conf.setInt(
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
2);
conf.setBoolean(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED,
rmTimelineServerV1PublisherBatchEnabled);
conf.setInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL,
rmTimelineServerV1PublisherInterval);
timelineServer = new ApplicationHistoryServer();
timelineServer.init(conf);
@ -99,8 +123,8 @@ public class TestSystemMetricsPublisher {
metricsPublisher.start();
}
@AfterClass
public static void tearDown() throws Exception {
@After
public void tearDown() throws Exception {
if (metricsPublisher != null) {
metricsPublisher.stop();
}