From 8603736ef2bb34605ea10f516dbe4e73bed3f309 Mon Sep 17 00:00:00 2001 From: Junping Du Date: Sat, 25 Jul 2015 10:30:29 -0700 Subject: [PATCH] YARN-3949. Ensure timely flush of timeline writes. Contributed by Sangjin Lee. (cherry picked from commit 967bef7e0396d857913caa2574afb103a5f0b81b) --- .../hadoop/yarn/conf/YarnConfiguration.java | 9 +++ .../src/main/resources/yarn-default.xml | 17 ++++- .../collector/TimelineCollectorManager.java | 65 +++++++++++++++++-- .../storage/FileSystemTimelineWriterImpl.java | 5 ++ .../storage/HBaseTimelineWriterImpl.java | 6 ++ .../storage/PhoenixTimelineWriterImpl.java | 5 ++ .../storage/TimelineWriter.java | 9 +++ .../TestNMTimelineCollectorManager.java | 5 ++ 8 files changed, 116 insertions(+), 5 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index ddc1af99073..6f26dd966ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1967,6 +1967,15 @@ public class YarnConfiguration extends Configuration { public static final String TIMELINE_SERVICE_READER_CLASS = TIMELINE_SERVICE_PREFIX + "reader.class"; + /** The setting that controls how often the timeline collector flushes the + * timeline writer. + */ + public static final String TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = + TIMELINE_SERVICE_PREFIX + "writer.flush-interval-seconds"; + + public static final int + DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60; + // mark app-history related configs @Private as application history is going // to be integrated into the timeline service @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 46a661f951a..b2f1a936aa8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -796,7 +796,15 @@ yarn.system-metrics-publisher.enabled false - + + + The setting that controls whether yarn container metrics is + published to the timeline server or not by RM. This configuration setting is + for ATS V2. + yarn.rm.system-metrics-publisher.emit-container-events + false + + Number of worker threads that send the yarn system metrics @@ -2197,6 +2205,13 @@ 420 + + The setting that controls how often the timeline collector + flushes the timeline writer. + yarn.timeline-service.writer.flush-interval-seconds + 60 + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index 23ad4f4f9b5..e9f2085d709 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -19,6 +19,13 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -32,9 +39,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import com.google.common.annotations.VisibleForTesting; /** * Class that manages adding and removing collectors and their lifecycle. It @@ -47,7 +52,10 @@ public abstract class TimelineCollectorManager extends AbstractService { private static final Log LOG = LogFactory.getLog(TimelineCollectorManager.class); - protected TimelineWriter writer; + private TimelineWriter writer; + private ScheduledExecutorService writerFlusher; + private int flushInterval; + private boolean writerFlusherRunning; @Override public void serviceInit(Configuration conf) throws Exception { @@ -56,6 +64,12 @@ public abstract class TimelineCollectorManager extends AbstractService { FileSystemTimelineWriterImpl.class, TimelineWriter.class), conf); writer.init(conf); + // create a single dedicated thread for flushing the writer on a periodic + // basis + writerFlusher = Executors.newSingleThreadScheduledExecutor(); + flushInterval = conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS); super.serviceInit(conf); } @@ -65,6 +79,10 @@ public abstract class TimelineCollectorManager extends AbstractService { if (writer != null) { writer.start(); } + // schedule the flush task + writerFlusher.scheduleAtFixedRate(new WriterFlushTask(writer), + flushInterval, flushInterval, TimeUnit.SECONDS); + writerFlusherRunning = true; } // access to this map is synchronized with the map itself @@ -161,9 +179,48 @@ public abstract class TimelineCollectorManager extends AbstractService { c.serviceStop(); } } + // stop the flusher first + if (writerFlusher != null) { + writerFlusher.shutdown(); + writerFlusherRunning = false; + if (!writerFlusher.awaitTermination(30, TimeUnit.SECONDS)) { + // in reality it should be ample time for the flusher task to finish + // even if it times out, writers may be able to handle closing in this + // situation fine + // proceed to close the writer + LOG.warn("failed to stop the flusher task in time. " + + "will still proceed to close the writer."); + } + } if (writer != null) { writer.close(); } super.serviceStop(); } + + @VisibleForTesting + boolean writerFlusherRunning() { + return writerFlusherRunning; + } + + /** + * Task that invokes the flush operation on the timeline writer. + */ + private static class WriterFlushTask implements Runnable { + private final TimelineWriter writer; + + public WriterFlushTask(TimelineWriter writer) { + this.writer = writer; + } + + public void run() { + try { + writer.flush(); + } catch (Throwable th) { + // we need to handle all exceptions or subsequent execution may be + // suppressed + LOG.error("exception during timeline writer flush!", th); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index 34a6b7c2f91..b22b39f45c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -127,6 +127,11 @@ public class FileSystemTimelineWriterImpl extends AbstractService mkdirs(outputRoot, ENTITIES_DIR); } + @Override + public void flush() throws IOException { + // no op + } + private static String mkdirs(String... dirStrs) throws IOException { StringBuilder path = new StringBuilder(); for (String dirStr : dirStrs) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index e48ca60262a..876ad6ab34f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -214,6 +214,12 @@ public class HBaseTimelineWriterImpl extends AbstractService implements return null; } + @Override + public void flush() throws IOException { + // flush all buffered mutators + entityTable.flush(); + } + /** * close the hbase connections The close APIs perform flushing and release any * resources held diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java index 5b4442ceb82..381ff17aecf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java @@ -187,6 +187,11 @@ public class PhoenixTimelineWriterImpl extends AbstractService } + @Override + public void flush() throws IOException { + // currently no-op + } + // Utility functions @Private @VisibleForTesting diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java index 494e8ad5f1b..50136de766e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -70,4 +70,13 @@ public interface TimelineWriter extends Service { */ TimelineWriteResponse aggregate(TimelineEntity data, TimelineAggregationTrack track) throws IOException; + + /** + * Flushes the data to the backend storage. Whatever may be buffered will be + * written to the storage when the method returns. This may be a potentially + * time-consuming operation, and should be used judiciously. + * + * @throws IOException + */ + void flush() throws IOException; } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java index 87343fd6372..0d69fbc5c7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java @@ -66,6 +66,11 @@ public class TestNMTimelineCollectorManager { } } + @Test + public void testStartingWriterFlusher() throws Exception { + assertTrue(collectorManager.writerFlusherRunning()); + } + @Test public void testStartWebApp() throws Exception { assertNotNull(collectorManager.getRestServerBindAddress());