From 063b513b1c10987461caab3d26c8543c6e657bf7 Mon Sep 17 00:00:00 2001 From: Varun Saxena Date: Wed, 29 Mar 2017 03:48:03 +0530 Subject: [PATCH] YARN-6357. Implement putEntitiesAsync API in TimelineCollector (Haibo Chen via Varun Saxena) --- .../collector/TimelineCollector.java | 31 +++++++-- .../TimelineCollectorWebService.java | 12 ++-- .../collector/TestTimelineCollector.java | 63 +++++++++++++++++++ 3 files changed, 96 insertions(+), 10 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 2fc30333133..353066bd775 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -133,19 +133,35 @@ protected Set getEntityTypesSkipAggregation() { public TimelineWriteResponse putEntities(TimelineEntities entities, UserGroupInformation callerUgi) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE"); LOG.debug("putEntities(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } - TimelineCollectorContext context = getTimelineEntityContext(); + TimelineWriteResponse response = writeTimelineEntities(entities); + flushBufferedTimelineEntities(); + + return response; + } + + private TimelineWriteResponse writeTimelineEntities( + TimelineEntities entities) throws IOException { // Update application metrics for aggregation updateAggregateStatus(entities, aggregationGroups, getEntityTypesSkipAggregation()); + final TimelineCollectorContext context = getTimelineEntityContext(); return writer.write(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowVersion(), context.getFlowRunId(), - context.getAppId(), entities); + context.getFlowName(), context.getFlowVersion(), + context.getFlowRunId(), context.getAppId(), entities); + } + + /** + * Flush buffered timeline entities, if any. + * @throws IOException if there is any exception encountered while + * flushing buffered entities. + */ + private void flushBufferedTimelineEntities() throws IOException { + writer.flush(); } /** @@ -158,14 +174,17 @@ public TimelineWriteResponse putEntities(TimelineEntities entities, * * @param entities entities to post * @param callerUgi the caller UGI + * @throws IOException if there is any exception encounted while putting + * entities. */ public void putEntitiesAsync(TimelineEntities entities, - UserGroupInformation callerUgi) { - // TODO implement + UserGroupInformation callerUgi) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } + + writeTimelineEntities(entities); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java index f36c63609c2..fe04b7afc5b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java @@ -152,9 +152,6 @@ public Response putEntities( throw new ForbiddenException(msg); } - // TODO how to express async posts and handle them - boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); - try { ApplicationId appID = parseApplicationId(appId); if (appID == null) { @@ -169,7 +166,14 @@ public Response putEntities( throw new NotFoundException(); // different exception? } - collector.putEntities(processTimelineEntities(entities), callerUgi); + boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); + if (isAsync) { + collector.putEntitiesAsync( + processTimelineEntities(entities), callerUgi); + } else { + collector.putEntities(processTimelineEntities(entities), callerUgi); + } + return Response.ok().build(); } catch (Exception e) { LOG.error("Error putting entities", e); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java index 5b4dc50deed..a55f2276a96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java @@ -18,17 +18,27 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.junit.Test; +import java.io.IOException; import java.util.HashSet; import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; public class TestTimelineCollector { @@ -124,4 +134,57 @@ public void testAggregation() throws Exception { } } + + /** + * Test TimelineCollector's interaction with TimelineWriter upon + * putEntity() calls. + */ + @Test + public void testPutEntity() throws IOException { + TimelineWriter writer = mock(TimelineWriter.class); + TimelineCollector collector = new TimelineCollectorForTest(writer); + + TimelineEntities entities = generateTestEntities(1, 1); + collector.putEntities( + entities, UserGroupInformation.createRemoteUser("test-user")); + + verify(writer, times(1)).write( + anyString(), anyString(), anyString(), anyString(), anyLong(), + anyString(), any(TimelineEntities.class)); + verify(writer, times(1)).flush(); + } + + /** + * Test TimelineCollector's interaction with TimelineWriter upon + * putEntityAsync() calls. + */ + @Test + public void testPutEntityAsync() throws IOException { + TimelineWriter writer = mock(TimelineWriter.class); + TimelineCollector collector = new TimelineCollectorForTest(writer); + + TimelineEntities entities = generateTestEntities(1, 1); + collector.putEntitiesAsync( + entities, UserGroupInformation.createRemoteUser("test-user")); + + verify(writer, times(1)).write( + anyString(), anyString(), anyString(), anyString(), anyLong(), + anyString(), any(TimelineEntities.class)); + verify(writer, never()).flush(); + } + + private static class TimelineCollectorForTest extends TimelineCollector { + private final TimelineCollectorContext context = + new TimelineCollectorContext(); + + TimelineCollectorForTest(TimelineWriter writer) { + super("TimelineCollectorForTest"); + setWriter(writer); + } + + @Override + public TimelineCollectorContext getTimelineEntityContext() { + return context; + } + } }