Allow manually setting of shutoffTime for EventReceiverFirehose (#2803)

* Allow dynamically setting of shutoffTime for EventReceiverFirehose

Allow dynamically setting shutoffTime for EventReceiverFirehose

review comments and tests

* shut down exec on close
This commit is contained in:
Nishant 2016-05-24 19:54:00 +05:30 committed by Fangjin Yang
parent dea4391a49
commit 0ac1b27d53
3 changed files with 78 additions and 0 deletions

View File

@ -97,6 +97,12 @@ When using this firehose, events can be sent by submitting a POST request to the
|serviceName|name used to announce the event receiver service endpoint|yes| |serviceName|name used to announce the event receiver service endpoint|yes|
|bufferSize| size of buffer used by firehose to store events|no default(100000)| |bufferSize| size of buffer used by firehose to store events|no default(100000)|
Shut down time for EventReceiverFirehose can be specified by submitting a POST request to
`http://<peonHost>:<port>/druid/worker/v1/chat/<eventReceiverServiceName>/shutdown?shutoffTime=<shutoffTime>`
If shutOffTime is not specified, the firehose shuts off immediately.
#### TimedShutoffFirehose #### TimedShutoffFirehose
This can be used to start a firehose that will shut down at a specified time. This can be used to start a firehose that will shut down at a specified time.

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
@ -33,6 +34,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.io.CountingInputStream; import com.google.common.io.CountingInputStream;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
@ -41,23 +43,27 @@ import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile; import io.druid.guice.annotations.Smile;
import io.druid.server.metrics.EventReceiverFirehoseMetric; import io.druid.server.metrics.EventReceiverFirehoseMetric;
import io.druid.server.metrics.EventReceiverFirehoseRegister; import io.druid.server.metrics.EventReceiverFirehoseRegister;
import org.joda.time.DateTime;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.POST; import javax.ws.rs.POST;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Collection; import java.util.Collection;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -132,6 +138,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
public class EventReceiverFirehose implements ChatHandler, Firehose, EventReceiverFirehoseMetric public class EventReceiverFirehose implements ChatHandler, Firehose, EventReceiverFirehoseMetric
{ {
private final ScheduledExecutorService exec;
private final BlockingQueue<InputRow> buffer; private final BlockingQueue<InputRow> buffer;
private final MapInputRowParser parser; private final MapInputRowParser parser;
@ -145,6 +152,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
{ {
this.buffer = new ArrayBlockingQueue<>(bufferSize); this.buffer = new ArrayBlockingQueue<>(bufferSize);
this.parser = parser; this.parser = parser;
exec = Execs.scheduledSingleThreaded("event-receiver-firehose-%d");
} }
@POST @POST
@ -279,6 +287,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
if (chatHandlerProvider.isPresent()) { if (chatHandlerProvider.isPresent()) {
chatHandlerProvider.get().unregister(serviceName); chatHandlerProvider.get().unregister(serviceName);
} }
exec.shutdown();
} }
} }
@ -296,5 +305,49 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
} }
} }
} }
@POST
@Path("/shutdown")
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response shutdown(
@QueryParam("shutoffTime") final String shutoffTime
)
{
try {
DateTime shutoffAt = shutoffTime == null ? DateTime.now() : new DateTime(shutoffTime);
log.info("Setting Firehose shutoffTime to %s", shutoffTime);
exec.schedule(
new Runnable()
{
@Override
public void run()
{
try {
close();
}
catch (IOException e) {
log.warn(e, "Failed to close delegate firehose, ignoring.");
}
}
},
shutoffAt.getMillis() - System.currentTimeMillis(),
TimeUnit.MILLISECONDS
);
return Response.ok().build();
}
catch (IllegalArgumentException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.<String, Object>of("error", e.getMessage()))
.build();
}
}
@VisibleForTesting
public boolean isClosed()
{
return closed;
}
} }
} }

View File

@ -32,6 +32,7 @@ import io.druid.server.metrics.EventReceiverFirehoseMetric;
import io.druid.server.metrics.EventReceiverFirehoseRegister; import io.druid.server.metrics.EventReceiverFirehoseRegister;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -217,4 +218,22 @@ public class EventReceiverFirehoseTest
) )
); );
} }
@Test(timeout = 40_000L)
public void testShutdownWithPrevTime() throws Exception
{
firehose.shutdown(DateTime.now().minusMinutes(2).toString());
while (!firehose.isClosed()){
Thread.sleep(50);
}
}
@Test(timeout = 40_000L)
public void testShutdown() throws Exception
{
firehose.shutdown(DateTime.now().plusMillis(100).toString());
while (!firehose.isClosed()){
Thread.sleep(50);
}
}
} }