From 0ac1b27d53a0e5f6b826cb6c436e33d57fdabc86 Mon Sep 17 00:00:00 2001 From: Nishant Date: Tue, 24 May 2016 19:54:00 +0530 Subject: [PATCH] Allow manually setting of shutoffTime for EventReceiverFirehose (#2803) * Allow dynamically setting of shutoffTime for EventReceiverFirehose Allow dynamically setting shutoffTime for EventReceiverFirehose review comments and tests * shut down exec on close --- docs/content/ingestion/firehose.md | 6 +++ .../EventReceiverFirehoseFactory.java | 53 +++++++++++++++++++ .../firehose/EventReceiverFirehoseTest.java | 19 +++++++ 3 files changed, 78 insertions(+) diff --git a/docs/content/ingestion/firehose.md b/docs/content/ingestion/firehose.md index 5b1c6a68198..91d594ec8c1 100644 --- a/docs/content/ingestion/firehose.md +++ b/docs/content/ingestion/firehose.md @@ -97,6 +97,12 @@ When using this firehose, events can be sent by submitting a POST request to the |serviceName|name used to announce the event receiver service endpoint|yes| |bufferSize| size of buffer used by firehose to store events|no default(100000)| +Shut down time for EventReceiverFirehose can be specified by submitting a POST request to + +`http://:/druid/worker/v1/chat//shutdown?shutoffTime=` + +If shutOffTime is not specified, the firehose shuts off immediately. + #### TimedShutoffFirehose This can be used to start a firehose that will shut down at a specified time. diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java index ff6cb39e8e2..76322fc80bc 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -33,6 +34,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.CountingInputStream; import com.metamx.emitter.EmittingLogger; +import io.druid.concurrent.Execs; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; @@ -41,23 +43,27 @@ import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.server.metrics.EventReceiverFirehoseMetric; import io.druid.server.metrics.EventReceiverFirehoseRegister; +import org.joda.time.DateTime; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.io.IOException; import java.io.InputStream; import java.util.Collection; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -132,6 +138,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory buffer; private final MapInputRowParser parser; @@ -145,6 +152,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory(bufferSize); this.parser = parser; + exec = Execs.scheduledSingleThreaded("event-receiver-firehose-%d"); } @POST @@ -279,6 +287,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactoryof("error", e.getMessage())) + .build(); + + } + } + + @VisibleForTesting + public boolean isClosed() + { + return closed; + } } } diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java index b2b7a00e4ac..537365c4092 100644 --- a/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java +++ b/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java @@ -32,6 +32,7 @@ import io.druid.server.metrics.EventReceiverFirehoseMetric; import io.druid.server.metrics.EventReceiverFirehoseRegister; import org.apache.commons.io.IOUtils; import org.easymock.EasyMock; +import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -217,4 +218,22 @@ public class EventReceiverFirehoseTest ) ); } + + @Test(timeout = 40_000L) + public void testShutdownWithPrevTime() throws Exception + { + firehose.shutdown(DateTime.now().minusMinutes(2).toString()); + while (!firehose.isClosed()){ + Thread.sleep(50); + } + } + + @Test(timeout = 40_000L) + public void testShutdown() throws Exception + { + firehose.shutdown(DateTime.now().plusMillis(100).toString()); + while (!firehose.isClosed()){ + Thread.sleep(50); + } + } }