YARN-3949. Ensure timely flush of timeline writes. Contributed by Sangjin Lee.

(cherry picked from commit 967bef7e0396d857913caa2574afb103a5f0b81b)
This commit is contained in:
Junping Du 2015-07-25 10:30:29 -07:00 committed by Sangjin Lee
parent 57d8dc2fb7
commit 8603736ef2
8 changed files with 116 additions and 5 deletions

View File

@ -1967,6 +1967,15 @@ public class YarnConfiguration extends Configuration {
public static final String TIMELINE_SERVICE_READER_CLASS = public static final String TIMELINE_SERVICE_READER_CLASS =
TIMELINE_SERVICE_PREFIX + "reader.class"; TIMELINE_SERVICE_PREFIX + "reader.class";
/** The setting that controls how often the timeline collector flushes the
* timeline writer.
*/
public static final String TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS =
TIMELINE_SERVICE_PREFIX + "writer.flush-interval-seconds";
public static final int
DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60;
// mark app-history related configs @Private as application history is going // mark app-history related configs @Private as application history is going
// to be integrated into the timeline service // to be integrated into the timeline service
@Private @Private

View File

@ -797,6 +797,14 @@
<value>false</value> <value>false</value>
</property> </property>
<property>
<description>The setting that controls whether yarn container metrics is
published to the timeline server or not by RM. This configuration setting is
for ATS V2.</description>
<name>yarn.rm.system-metrics-publisher.emit-container-events</name>
<value>false</value>
</property>
<property> <property>
<description>Number of worker threads that send the yarn system metrics <description>Number of worker threads that send the yarn system metrics
@ -2197,6 +2205,13 @@
<value>420</value> <value>420</value>
</property> </property>
<property>
<description>The setting that controls how often the timeline collector
flushes the timeline writer.</description>
<name>yarn.timeline-service.writer.flush-interval-seconds</name>
<value>60</value>
</property>
<!-- Shared Cache Configuration --> <!-- Shared Cache Configuration -->
<property> <property>

View File

@ -19,6 +19,13 @@
package org.apache.hadoop.yarn.server.timelineservice.collector; package org.apache.hadoop.yarn.server.timelineservice.collector;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -32,9 +39,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import java.util.Collections; import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
/** /**
* Class that manages adding and removing collectors and their lifecycle. It * Class that manages adding and removing collectors and their lifecycle. It
@ -47,7 +52,10 @@ public abstract class TimelineCollectorManager extends AbstractService {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(TimelineCollectorManager.class); LogFactory.getLog(TimelineCollectorManager.class);
protected TimelineWriter writer; private TimelineWriter writer;
private ScheduledExecutorService writerFlusher;
private int flushInterval;
private boolean writerFlusherRunning;
@Override @Override
public void serviceInit(Configuration conf) throws Exception { public void serviceInit(Configuration conf) throws Exception {
@ -56,6 +64,12 @@ public abstract class TimelineCollectorManager extends AbstractService {
FileSystemTimelineWriterImpl.class, FileSystemTimelineWriterImpl.class,
TimelineWriter.class), conf); TimelineWriter.class), conf);
writer.init(conf); writer.init(conf);
// create a single dedicated thread for flushing the writer on a periodic
// basis
writerFlusher = Executors.newSingleThreadScheduledExecutor();
flushInterval = conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS);
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -65,6 +79,10 @@ public abstract class TimelineCollectorManager extends AbstractService {
if (writer != null) { if (writer != null) {
writer.start(); writer.start();
} }
// schedule the flush task
writerFlusher.scheduleAtFixedRate(new WriterFlushTask(writer),
flushInterval, flushInterval, TimeUnit.SECONDS);
writerFlusherRunning = true;
} }
// access to this map is synchronized with the map itself // access to this map is synchronized with the map itself
@ -161,9 +179,48 @@ public abstract class TimelineCollectorManager extends AbstractService {
c.serviceStop(); c.serviceStop();
} }
} }
// stop the flusher first
if (writerFlusher != null) {
writerFlusher.shutdown();
writerFlusherRunning = false;
if (!writerFlusher.awaitTermination(30, TimeUnit.SECONDS)) {
// in reality it should be ample time for the flusher task to finish
// even if it times out, writers may be able to handle closing in this
// situation fine
// proceed to close the writer
LOG.warn("failed to stop the flusher task in time. " +
"will still proceed to close the writer.");
}
}
if (writer != null) { if (writer != null) {
writer.close(); writer.close();
} }
super.serviceStop(); super.serviceStop();
} }
@VisibleForTesting
boolean writerFlusherRunning() {
return writerFlusherRunning;
}
/**
* Task that invokes the flush operation on the timeline writer.
*/
private static class WriterFlushTask implements Runnable {
private final TimelineWriter writer;
public WriterFlushTask(TimelineWriter writer) {
this.writer = writer;
}
public void run() {
try {
writer.flush();
} catch (Throwable th) {
// we need to handle all exceptions or subsequent execution may be
// suppressed
LOG.error("exception during timeline writer flush!", th);
}
}
}
} }

View File

@ -127,6 +127,11 @@ public class FileSystemTimelineWriterImpl extends AbstractService
mkdirs(outputRoot, ENTITIES_DIR); mkdirs(outputRoot, ENTITIES_DIR);
} }
@Override
public void flush() throws IOException {
// no op
}
private static String mkdirs(String... dirStrs) throws IOException { private static String mkdirs(String... dirStrs) throws IOException {
StringBuilder path = new StringBuilder(); StringBuilder path = new StringBuilder();
for (String dirStr : dirStrs) { for (String dirStr : dirStrs) {

View File

@ -214,6 +214,12 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
return null; return null;
} }
@Override
public void flush() throws IOException {
// flush all buffered mutators
entityTable.flush();
}
/** /**
* close the hbase connections The close APIs perform flushing and release any * close the hbase connections The close APIs perform flushing and release any
* resources held * resources held

View File

@ -187,6 +187,11 @@ public class PhoenixTimelineWriterImpl extends AbstractService
} }
@Override
public void flush() throws IOException {
// currently no-op
}
// Utility functions // Utility functions
@Private @Private
@VisibleForTesting @VisibleForTesting

View File

@ -70,4 +70,13 @@ public interface TimelineWriter extends Service {
*/ */
TimelineWriteResponse aggregate(TimelineEntity data, TimelineWriteResponse aggregate(TimelineEntity data,
TimelineAggregationTrack track) throws IOException; TimelineAggregationTrack track) throws IOException;
/**
* Flushes the data to the backend storage. Whatever may be buffered will be
* written to the storage when the method returns. This may be a potentially
* time-consuming operation, and should be used judiciously.
*
* @throws IOException
*/
void flush() throws IOException;
} }

View File

@ -66,6 +66,11 @@ public class TestNMTimelineCollectorManager {
} }
} }
@Test
public void testStartingWriterFlusher() throws Exception {
assertTrue(collectorManager.writerFlusherRunning());
}
@Test @Test
public void testStartWebApp() throws Exception { public void testStartWebApp() throws Exception {
assertNotNull(collectorManager.getRestServerBindAddress()); assertNotNull(collectorManager.getRestServerBindAddress());