diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 90660b622cc..df482c18598 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -756,6 +756,20 @@ public class YarnConfiguration extends Configuration {
public static final int
DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = 10;
+ public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE =
+ RM_PREFIX + "system-metrics-publisher.timeline-server-v1.batch-size";
+ public static final int
+ DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE =
+ 1000;
+ public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL =
+ RM_PREFIX + "system-metrics-publisher.timeline-server-v1.interval-seconds";
+ public static final int DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL =
+ 60;
+ public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED =
+ RM_PREFIX + "system-metrics-publisher.timeline-server-v1.enable-batch";
+ public static final boolean DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED =
+ false;
+
//RM delegation token related keys
public static final String RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY =
RM_PREFIX + "delegation.key.update-interval";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 01de27f5f96..ff3a8179132 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -995,6 +995,33 @@
10
+
+
+ This setting enables/disables timeline server v1 publisher to publish timeline events in batch.
+
+ yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.enable-batch
+ false
+
+
+
+
+ The size of timeline server v1 publisher sending events in one request.
+
+ yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.batch-size
+ 1000
+
+
+
+
+ When enable batch publishing in timeline server v1, we must avoid that the
+ publisher waits for a batch to be filled up and hold events in buffer for long
+ time. So we add another thread which send event's in the buffer periodically.
+ This config sets the interval of the cyclical sending thread.
+
+ yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.interval-seconds
+ 60
+
+
Number of diagnostics/failure messages can be saved in RM for
log aggregation. It also defines the number of diagnostics/failure
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
index 5a5fa11cc64..1d89f1e5222 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
@@ -18,8 +18,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
@@ -59,9 +65,55 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
}
private TimelineClient client;
+ private LinkedBlockingQueue entityQueue;
+ private ExecutorService sendEventThreadPool;
+ private int dispatcherPoolSize;
+ private int dispatcherBatchSize;
+ private int putEventInterval;
+ private boolean isTimeLineServerBatchEnabled;
+ private volatile boolean stopped = false;
+ private PutEventThread putEventThread;
+ private Object sendEntityLock;
@Override
protected void serviceInit(Configuration conf) throws Exception {
+ isTimeLineServerBatchEnabled =
+ conf.getBoolean(
+ YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED,
+ YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED);
+ if (isTimeLineServerBatchEnabled) {
+ putEventInterval =
+ conf.getInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL,
+ YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL)
+ * 1000;
+ if (putEventInterval <= 0) {
+ throw new IllegalArgumentException(
+ "RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL should be greater than 0");
+ }
+ dispatcherPoolSize = conf.getInt(
+ YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
+ YarnConfiguration.
+ DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE);
+ if (dispatcherPoolSize <= 0) {
+ throw new IllegalArgumentException(
+ "RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE should be greater than 0");
+ }
+ dispatcherBatchSize = conf.getInt(
+ YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE,
+ YarnConfiguration.
+ DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE);
+ if (dispatcherBatchSize <= 1) {
+ throw new IllegalArgumentException(
+ "RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE should be greater than 1");
+ }
+ putEventThread = new PutEventThread();
+ sendEventThreadPool = Executors.newFixedThreadPool(dispatcherPoolSize);
+ entityQueue = new LinkedBlockingQueue<>(dispatcherBatchSize + 1);
+ sendEntityLock = new Object();
+ LOG.info("Timeline service v1 batch publishing enabled");
+ } else {
+ LOG.info("Timeline service v1 batch publishing disabled");
+ }
client = TimelineClient.createTimelineClient();
addIfService(client);
super.serviceInit(conf);
@@ -69,6 +121,36 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
new TimelineV1EventHandler());
}
+ protected void serviceStart() throws Exception {
+ if (isTimeLineServerBatchEnabled) {
+ stopped = false;
+ putEventThread.start();
+ }
+ super.serviceStart();
+ }
+
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ if (isTimeLineServerBatchEnabled) {
+ stopped = true;
+ putEventThread.interrupt();
+ try {
+ putEventThread.join();
+ SendEntity task = new SendEntity();
+ if (!task.buffer.isEmpty()) {
+ LOG.info("Initiating final putEntities, remaining entities left in entityQueue: {}",
+ task.buffer.size());
+ sendEventThreadPool.submit(task);
+ }
+ } finally {
+ sendEventThreadPool.shutdown();
+ if (!sendEventThreadPool.awaitTermination(3, TimeUnit.SECONDS)) {
+ sendEventThreadPool.shutdownNow();
+ }
+ }
+ }
+ }
+
@SuppressWarnings("unchecked")
@Override
public void appCreated(RMApp app, long createdTime) {
@@ -257,7 +339,7 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
@SuppressWarnings("unchecked")
@Override
public void appAttemptFinished(RMAppAttempt appAttempt,
- RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
+ RMAppAttemptState appAttemptState, RMApp app, long finishedTime) {
TimelineEntity entity =
createAppAttemptEntity(appAttempt.getAppAttemptId());
@@ -274,7 +356,7 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_INFO,
app.getFinalApplicationStatus().toString());
eventInfo.put(AppAttemptMetricsConstants.STATE_INFO, RMServerUtils
- .createApplicationAttemptState(appAttemtpState).toString());
+ .createApplicationAttemptState(appAttemptState).toString());
if (appAttempt.getMasterContainer() != null) {
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO,
appAttempt.getMasterContainer().getId().toString());
@@ -374,23 +456,68 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
}
private void putEntity(TimelineEntity entity) {
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Publishing the entity " + entity.getEntityId()
- + ", JSON-style content: "
- + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+ if (isTimeLineServerBatchEnabled) {
+ try {
+ entityQueue.put(entity);
+ if (entityQueue.size() > dispatcherBatchSize) {
+ SendEntity task = null;
+ synchronized (sendEntityLock) {
+ if (entityQueue.size() > dispatcherBatchSize) {
+ task = new SendEntity();
+ }
+ }
+ if (task != null) {
+ sendEventThreadPool.submit(task);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error when publishing entity batch [ " + entity.getEntityType() + ","
+ + entity.getEntityId() + " ] ", e);
+ }
+ } else {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Publishing the entity " + entity.getEntityId()
+ + ", JSON-style content: "
+ + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+ }
+ client.putEntities(entity);
+ } catch (Exception e) {
+ LOG.error("Error when publishing entity [ " + entity.getEntityType() + ","
+ + entity.getEntityId() + " ] ", e);
+ }
+ }
+ }
+
+ private class SendEntity implements Runnable {
+
+ private ArrayList buffer;
+
+ SendEntity() {
+ buffer = new ArrayList();
+ entityQueue.drainTo(buffer);
+ }
+
+ @Override
+ public void run() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Number of timeline entities being sent in batch: {}", buffer.size());
+ }
+ if (buffer.isEmpty()) {
+ return;
+ }
+ try {
+ client.putEntities(buffer.toArray(new TimelineEntity[0]));
+ } catch (Exception e) {
+ LOG.error("Error when publishing entity: ", e);
}
- client.putEntities(entity);
- } catch (Exception e) {
- LOG.error("Error when publishing entity [" + entity.getEntityType() + ","
- + entity.getEntityId() + "]", e);
}
}
private class TimelineV1PublishEvent extends TimelinePublishEvent {
private TimelineEntity entity;
- public TimelineV1PublishEvent(SystemMetricsEventType type,
+ TimelineV1PublishEvent(SystemMetricsEventType type,
TimelineEntity entity, ApplicationId appId) {
super(type, appId);
this.entity = entity;
@@ -408,4 +535,46 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
putEntity(event.getEntity());
}
}
-}
+
+ private class PutEventThread extends Thread {
+ PutEventThread() {
+ super("PutEventThread");
+ }
+
+ @Override
+ public void run() {
+ LOG.info("System metrics publisher will put events every " +
+ String.valueOf(putEventInterval) + " milliseconds");
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ if (System.currentTimeMillis() % putEventInterval >= 1000) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ LOG.warn(SystemMetricsPublisher.class.getName()
+ + " is interrupted. Exiting.");
+ break;
+ }
+ continue;
+ }
+ SendEntity task = null;
+ synchronized (sendEntityLock) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating SendEntity task in PutEventThread");
+ }
+ task = new SendEntity();
+ }
+ if (task != null) {
+ sendEventThreadPool.submit(task);
+ }
+ try {
+ // sleep added to avoid multiple SendEntity task within a single interval.
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.warn(SystemMetricsPublisher.class.getName()
+ + " is interrupted. Exiting.");
+ break;
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
index 3c00bbcdc07..146a931e5ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
@@ -18,10 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -30,6 +27,14 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -65,19 +70,34 @@ import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(Parameterized.class)
public class TestSystemMetricsPublisher {
+ @Parameters
+ public static Collection