YARN-5189. Make HBaseTimeline[Reader|Writer]Impl default and move FileSystemTimeline*Impl. (Joep Rottinghuis and Sangjin Lee via gtcarrera9)

This commit is contained in:
Li Lu 2016-06-03 16:33:51 -07:00 committed by Sangjin Lee
parent c81a2e1d19
commit 0a9b085f05
17 changed files with 100 additions and 32 deletions

View File

@ -97,6 +97,12 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Assert;
import org.junit.Test;
@ -175,6 +176,8 @@ public class TestMRTimelineEventHandling {
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
// enable new timeline service
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
// enable aux-service based timeline collectors

View File

@ -1989,11 +1989,13 @@ public class YarnConfiguration extends Configuration {
*/
public static final String APP_FINAL_VALUE_RETENTION_THRESHOLD =
TIMELINE_SERVICE_PREFIX
+ "coprocessor.app-final-value-retention-milliseconds";
+ "hbase.coprocessor.app-final-value-retention-milliseconds";
/**
* The setting that controls how long the final value of a metric
* of a completed app is retained before merging into the flow sum.
* The setting that controls how long the final value of a metric of a
* completed app is retained before merging into the flow sum. Up to this time
* after an application is completed out-of-order values that arrive can be
* recognized and discarded at the cost of increased storage.
*/
public static final long DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD = 3 * 24
* 60 * 60 * 1000L;

View File

@ -77,6 +77,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->

View File

@ -64,29 +64,26 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster;
import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@ -185,6 +182,9 @@ public class TestDistributedShell {
conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME
+ ".class", PerNodeTimelineCollectorsAuxService.class.getName());
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
FileSystemTimelineWriterImpl.class,
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter.class);
} else {
Assert.fail("Wrong timeline version number: " + timelineVersion);
}

View File

@ -2159,7 +2159,6 @@
<value>604800</value>
</property>
<!-- Timeline Service v2 Configuration -->
<property>
<name>yarn.timeline-service.entity-group-fs-store.leveldb-cache-read-cache-size</name>
<description>
@ -2197,6 +2196,23 @@
<value>300</value>
</property>
<!-- Timeline Service v2 Configuration -->
<property>
<name>yarn.timeline-service.writer.class</name>
<description>
Storage implementation ATS v2 will use for the TimelineWriter service.
</description>
<value>org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl</value>
</property>
<property>
<name>yarn.timeline-service.reader.class</name>
<description>
Storage implementation ATS v2 will use for the TimelineReader service.
</description>
<value>org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl</value>
</property>
<property>
<name>yarn.timeline-service.client.internal-timers-ttl-secs</name>
<description>
@ -2230,10 +2246,15 @@
</property>
<property>
<description> The setting that controls how long the final value
<description>
The setting that controls how long the final value
of a metric of a completed app is retained before merging into
the flow sum.</description>
<name>yarn.timeline-service.coprocessor.app-final-value-retention-milliseconds</name>
the flow sum. Up to this time after an application is completed
out-of-order values that arrive can be recognized and discarded at the
cost of increased storage.
</description>
<name>yarn.timeline-service.hbase.coprocessor.app-final-value-retention-milliseconds
</name>
<value>259200000</value>
</property>

View File

@ -158,6 +158,12 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineC
import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.AfterClass;
import org.junit.Assert;
@ -104,6 +105,8 @@ public class TestSystemMetricsPublisherForV2 {
rmTimelineCollectorManager);
Configuration conf = getTimelineV2Conf();
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
rmTimelineCollectorManager.init(conf);
rmTimelineCollectorManager.start();

View File

@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorCon
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.timelineservice.collector.NodeTimelineCollectorManager;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -66,6 +68,8 @@ public class TestTimelineServiceClientIntegration {
// enable timeline service v.2
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
auxService =
PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
collectorManager, conf);
@ -159,7 +163,9 @@ public class TestTimelineServiceClientIntegration {
mock(CollectorNodemanagerProtocol.class);
try {
GetTimelineCollectorContextResponse response =
GetTimelineCollectorContextResponse.newInstance(null, null, null, 0L);
GetTimelineCollectorContextResponse.newInstance(
UserGroupInformation.getCurrentUser().getShortUserName(),
"test_flow_name", "test_flow_version", 1L);
when(protocol.getTimelineCollectorContext(any(
GetTimelineCollectorContextRequest.class))).thenReturn(response);
} catch (YarnException | IOException e) {

View File

@ -36,7 +36,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import com.google.common.annotations.VisibleForTesting;
@ -61,7 +61,7 @@ public class TimelineCollectorManager extends AbstractService {
public void serviceInit(Configuration conf) throws Exception {
writer = ReflectionUtils.newInstance(conf.getClass(
YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
FileSystemTimelineWriterImpl.class,
HBaseTimelineWriterImpl.class,
TimelineWriter.class), conf);
writer.init(conf);
// create a single dedicated thread for flushing the writer on a periodic

View File

@ -41,7 +41,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
@ -81,7 +81,7 @@ public class TimelineReaderServer extends CompositeService {
private TimelineReader createTimelineReaderStore(Configuration conf) {
TimelineReader readerStore = ReflectionUtils.newInstance(conf.getClass(
YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
FileSystemTimelineReaderImpl.class, TimelineReader.class), conf);
HBaseTimelineReaderImpl.class, TimelineReader.class), conf);
LOG.info("Using store " + readerStore.getClass().getName());
readerStore.init(conf);
return readerStore;

View File

@ -56,7 +56,10 @@ import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
/**
* File System based implementation for TimelineReader.
* File System based implementation for TimelineReader. This implementation may
* not provide a complete implementation of all the necessary features. This
* implementation is provided solely for basic testing purposes, and should not
* be used in a non-test situation.
*/
public class FileSystemTimelineReaderImpl extends AbstractService
implements TimelineReader {

View File

@ -38,7 +38,9 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
/**
* This implements a local file based backend for storing application timeline
* information.
* information. This implementation may not provide a complete implementation of
* all the necessary features. This implementation is provided solely for basic
* testing purposes, and should not be used in a non-test situation.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable

View File

@ -24,7 +24,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -55,7 +57,10 @@ public class TestNMTimelineCollectorManager {
@Before
public void setup() throws Exception {
collectorManager = createCollectorManager();
collectorManager.init(new YarnConfiguration());
Configuration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
collectorManager.init(conf);
collectorManager.start();
}

View File

@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.junit.After;
import org.junit.Test;
@ -60,6 +62,8 @@ public class TestPerNodeTimelineCollectorsAuxService {
// enable timeline service v.2
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
}
@After

View File

@ -23,7 +23,8 @@ import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderServer;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
import org.junit.Test;
public class TestTimelineReaderServer {
@ -37,6 +38,8 @@ public class TestTimelineReaderServer {
config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
"localhost:0");
config.setClass(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
FileSystemTimelineReaderImpl.class, TimelineReader.class);
try {
server.init(config);
assertEquals(STATE.INITED, server.getServiceState());

View File

@ -36,8 +36,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TestFileSystemTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@ -74,9 +76,11 @@ public class TestTimelineReaderWebServices {
Configuration config = new YarnConfiguration();
config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
"localhost:0");
config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
config.setClass(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
FileSystemTimelineReaderImpl.class, TimelineReader.class);
server = new TimelineReaderServer();
server.init(config);
server.start();
@ -101,13 +105,12 @@ public class TestTimelineReaderWebServices {
}
private static void verifyHttpResponse(Client client, URI uri,
Status status) {
Status expectedStatus) {
ClientResponse resp =
client.resource(uri).accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertNotNull(resp);
assertTrue("Response from server should have been " + status,
resp.getClientResponseStatus().equals(status));
assertEquals(resp.getClientResponseStatus(), expectedStatus);
}
private static Client createClient() {