Changes to allow pushing events directly to realtime tasks, and the creation

of single-segment realtime tasks.

Realtime:
- Move firehose stuff to com.metamx.druid.realtime.firehose package
- ClippedFirehoseFactory provides a time-clipped view
- TimedShutoffFirehoseFactory shuts off at a particular time

Indexer:
- Split StringInputRowParser into {String,Map}InputRowParser

Merger:
- Remove minTime in favor of clipped firehoses
- Add task executor servlet that can push events to EventReceivers
- Add EventReceivingFirehose that accepts events as an EventReceiver
This commit is contained in:
Gian Merlino 2013-04-25 13:10:17 +03:00
parent 77ef525a65
commit 341ee27419
30 changed files with 713 additions and 133 deletions

View File

@ -7,8 +7,8 @@ import com.google.common.collect.Maps;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow; import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import java.io.IOException; import java.io.IOException;
import java.util.LinkedList; import java.util.LinkedList;

View File

@ -7,8 +7,8 @@ import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow; import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import twitter4j.ConnectionLifeCycleListener; import twitter4j.ConnectionLifeCycleListener;
import twitter4j.HashtagEntity; import twitter4j.HashtagEntity;

View File

@ -0,0 +1,9 @@
package com.metamx.druid.indexer.data;
import com.metamx.druid.input.InputRow;
public interface InputRowParser<T>
{
public InputRow parse(T input);
public void addDimensionExclusion(String dimension);
}

View File

@ -0,0 +1,94 @@
package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow;
import org.joda.time.DateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class MapInputRowParser implements InputRowParser<Map<String, Object>>
{
private final TimestampSpec timestampSpec;
private final DataSpec dataSpec;
private final Set<String> dimensionExclusions;
@JsonCreator
public MapInputRowParser(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("data") DataSpec dataSpec,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions
)
{
this.timestampSpec = timestampSpec;
this.dataSpec = dataSpec;
this.dimensionExclusions = Sets.newHashSet();
Iterables.addAll(
this.dimensionExclusions,
Iterables.transform(
dimensionExclusions == null ? Sets.<String>newHashSet() : Sets.newHashSet(dimensionExclusions),
new Function<String, String>()
{
@Override
public String apply(String s)
{
return s.toLowerCase();
}
}
)
);
this.dimensionExclusions.add(timestampSpec.getTimestampColumn().toLowerCase());
}
@Override
public InputRow parse(Map<String, Object> theMap)
{
final List<String> dimensions = dataSpec.hasCustomDimensions()
? dataSpec.getDimensions()
: Lists.newArrayList(Sets.difference(theMap.keySet(), dimensionExclusions));
final DateTime timestamp = timestampSpec.extractTimestamp(theMap);
if (timestamp == null) {
final String input = theMap.toString();
throw new NullPointerException(
String.format(
"Null timestamp in input: %s",
input.length() < 100 ? input : input.substring(0, 100) + "..."
)
);
}
return new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap);
}
@Override
public void addDimensionExclusion(String dimension)
{
dimensionExclusions.add(dimension);
}
@JsonProperty
public TimestampSpec getTimestampSpec()
{
return timestampSpec;
}
@JsonProperty("data")
public DataSpec getDataSpec()
{
return dataSpec;
}
@JsonProperty
public Set<String> getDimensionExclusions()
{
return dimensionExclusions;
}
}

View File

@ -21,29 +21,20 @@ package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function; import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.exception.FormattedException; import com.metamx.common.exception.FormattedException;
import com.metamx.common.parsers.Parser; import com.metamx.common.parsers.Parser;
import com.metamx.common.parsers.ToLowerCaseParser; import com.metamx.common.parsers.ToLowerCaseParser;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow;
import org.joda.time.DateTime;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
/** /**
*/ */
public class StringInputRowParser public class StringInputRowParser implements InputRowParser<String>
{ {
private final TimestampSpec timestampSpec; private final InputRowParser<Map<String, Object>> inputRowCreator;
private final DataSpec dataSpec;
private final Set<String> dimensionExclusions;
private final Parser<String, Object> parser; private final Parser<String, Object> parser;
@JsonCreator @JsonCreator
@ -53,71 +44,24 @@ public class StringInputRowParser
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions @JsonProperty("dimensionExclusions") List<String> dimensionExclusions
) )
{ {
this.timestampSpec = timestampSpec; this.inputRowCreator = new MapInputRowParser(timestampSpec, dataSpec, dimensionExclusions);
this.dataSpec = dataSpec;
this.dimensionExclusions = Sets.newHashSet();
if (dimensionExclusions != null) {
this.dimensionExclusions.addAll(
Lists.transform(
dimensionExclusions, new Function<String, String>()
{
@Override
public String apply(String s)
{
return s.toLowerCase();
}
}
)
);
}
this.dimensionExclusions.add(timestampSpec.getTimestampColumn());
this.parser = new ToLowerCaseParser(dataSpec.getParser()); this.parser = new ToLowerCaseParser(dataSpec.getParser());
} }
public StringInputRowParser addDimensionExclusion(String dimension) public void addDimensionExclusion(String dimension)
{ {
dimensionExclusions.add(dimension); inputRowCreator.addDimensionExclusion(dimension);
return this;
} }
@Override
public InputRow parse(String input) throws FormattedException public InputRow parse(String input) throws FormattedException
{ {
Map<String, Object> theMap = parser.parse(input); return inputRowCreator.parse(parser.parse(input));
final List<String> dimensions = dataSpec.hasCustomDimensions()
? dataSpec.getDimensions()
: Lists.newArrayList(Sets.difference(theMap.keySet(), dimensionExclusions));
final DateTime timestamp = timestampSpec.extractTimestamp(theMap);
if (timestamp == null) {
throw new NullPointerException(
String.format(
"Null timestamp in input string: %s",
input.length() < 100 ? input : input.substring(0, 100) + "..."
)
);
}
return new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap);
} }
@JsonProperty @JsonValue
public TimestampSpec getTimestampSpec() public InputRowParser<Map<String, Object>> getInputRowCreator()
{ {
return timestampSpec; return inputRowCreator;
}
@JsonProperty("data")
public DataSpec getDataSpec()
{
return dataSpec;
}
@JsonProperty
public Set<String> getDimensionExclusions()
{
return dimensionExclusions;
} }
} }

View File

@ -0,0 +1,9 @@
package com.metamx.druid.merger.common.index;
import java.util.Collection;
import java.util.Map;
public interface EventReceiver
{
public void addAll(Collection<Map<String, Object>> events);
}

View File

@ -0,0 +1,177 @@
package com.metamx.druid.merger.common.index;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.metamx.druid.indexer.data.MapInputRowParser;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.emitter.EmittingLogger;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Builds firehoses that accept events through the {@link EventReceiver} interface. Can also register these
* firehoses with an {@link EventReceiverProvider}.
*/
@JsonTypeName("receiver")
public class EventReceiverFirehoseFactory implements FirehoseFactory
{
private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class);
private static final int DEFAULT_BUFFER_SIZE = 100000;
private final String firehoseId;
private final int bufferSize;
private final MapInputRowParser parser;
private final Optional<EventReceiverProvider> eventReceiverProvider;
@JsonCreator
public EventReceiverFirehoseFactory(
@JsonProperty("firehoseId") String firehoseId,
@JsonProperty("bufferSize") Integer bufferSize,
@JsonProperty("parser") MapInputRowParser parser,
@JacksonInject("eventReceiverProvider") EventReceiverProvider eventReceiverProvider
)
{
this.firehoseId = Preconditions.checkNotNull(firehoseId, "firehoseId");
this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize;
this.parser = Preconditions.checkNotNull(parser, "parser");
this.eventReceiverProvider = Optional.fromNullable(eventReceiverProvider);
}
@Override
public Firehose connect() throws IOException
{
log.info("Connecting firehose: %s", firehoseId);
final EventReceiverFirehose firehose = new EventReceiverFirehose();
if (eventReceiverProvider.isPresent()) {
eventReceiverProvider.get().register(firehoseId, firehose);
}
return firehose;
}
@JsonProperty
public String getFirehoseId()
{
return firehoseId;
}
@JsonProperty
public int getBufferSize()
{
return bufferSize;
}
@JsonProperty
public MapInputRowParser getParser()
{
return parser;
}
public class EventReceiverFirehose implements EventReceiver, Firehose
{
private final BlockingQueue<Map<String, Object>> buffer;
private final Object readLock = new Object();
private volatile Map<String, Object> nextEvent = null;
private volatile boolean closed = false;
public EventReceiverFirehose()
{
this.buffer = new ArrayBlockingQueue<Map<String, Object>>(bufferSize);
}
@Override
public void addAll(Collection<Map<String, Object>> events)
{
log.debug("Adding %,d events to firehose: %s", events.size(), firehoseId);
try {
for (final Map<String, Object> event : events) {
boolean added = false;
while (!closed && !added) {
added = buffer.offer(event, 500, TimeUnit.MILLISECONDS);
}
if (!added) {
throw new IllegalStateException("Cannot add events to closed firehose!");
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
}
@Override
public boolean hasMore()
{
synchronized (readLock) {
try {
while (!closed && nextEvent == null) {
nextEvent = buffer.poll(500, TimeUnit.MILLISECONDS);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
return nextEvent != null;
}
}
@Override
public InputRow nextRow()
{
synchronized (readLock) {
final Map<String, Object> event = nextEvent;
if (event == null) {
throw new NoSuchElementException();
} else {
// If nextEvent is unparseable, don't return it again
nextEvent = null;
return parser.parse(event);
}
}
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
// Nothing
}
};
}
@Override
public void close() throws IOException
{
log.info("Firehose closing.");
closed = true;
if (eventReceiverProvider.isPresent()) {
eventReceiverProvider.get().unregister(firehoseId, this);
}
}
}
}

View File

@ -0,0 +1,41 @@
package com.metamx.druid.merger.common.index;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import java.util.concurrent.ConcurrentMap;
public class EventReceiverProvider
{
private static final Logger log = new Logger(EventReceiverProvider.class);
private final ConcurrentMap<String, EventReceiver> receivers;
public EventReceiverProvider()
{
this.receivers = Maps.newConcurrentMap();
}
public void register(final String key, EventReceiver receiver)
{
log.info("Registering event receiver for %s", key);
if (receivers.putIfAbsent(key, receiver) != null) {
throw new ISE("Receiver already registered for key: %s", key);
}
}
public void unregister(final String key, EventReceiver receiver)
{
log.info("Unregistering event receiver for %s", key);
if (!receivers.remove(key, receiver)) {
log.warn("Receiver not currently registered, ignoring: %s", key);
}
}
public Optional<EventReceiver> get(final String key)
{
return Optional.fromNullable(receivers.get(key));
}
}

View File

@ -31,8 +31,8 @@ import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.indexer.data.StringInputRowParser; import com.metamx.druid.indexer.data.StringInputRowParser;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator; import org.apache.commons.io.LineIterator;

View File

@ -35,8 +35,8 @@ import com.metamx.druid.input.InputRow;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.SpawnTasksAction; import com.metamx.druid.merger.common.actions.SpawnTasksAction;
import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec; import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.druid.shard.ShardSpec; import com.metamx.druid.shard.ShardSpec;

View File

@ -37,8 +37,8 @@ import com.metamx.druid.merger.common.actions.LockListAction;
import com.metamx.druid.merger.common.actions.SegmentInsertAction; import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.merger.common.index.YeOldePlumberSchool; import com.metamx.druid.merger.common.index.YeOldePlumberSchool;
import com.metamx.druid.realtime.FireDepartmentMetrics; import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.plumber.Plumber; import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.plumber.Sink; import com.metamx.druid.realtime.plumber.Sink;

View File

@ -32,7 +32,7 @@ import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.SpawnTasksAction; import com.metamx.druid.merger.common.actions.SpawnTasksAction;
import com.metamx.druid.merger.common.actions.TaskActionClient; import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec; import com.metamx.druid.shard.NoneShardSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;

View File

@ -22,6 +22,7 @@ package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
@ -40,10 +41,10 @@ import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.realtime.FireDepartmentConfig; import com.metamx.druid.realtime.FireDepartmentConfig;
import com.metamx.druid.realtime.FireDepartmentMetrics; import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.GracefulShutdownFirehose; import com.metamx.druid.realtime.firehose.GracefulShutdownFirehose;
import com.metamx.druid.realtime.MinTimeFirehose; import com.metamx.druid.realtime.firehose.PredicateFirehose;
import com.metamx.druid.realtime.plumber.Plumber; import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.RealtimePlumberSchool; import com.metamx.druid.realtime.plumber.RealtimePlumberSchool;
import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.Schema;
@ -56,6 +57,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period; import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -78,9 +80,6 @@ public class RealtimeIndexTask extends AbstractTask
@JsonIgnore @JsonIgnore
private final IndexGranularity segmentGranularity; private final IndexGranularity segmentGranularity;
@JsonIgnore
private final DateTime minTime;
@JsonIgnore @JsonIgnore
private volatile Plumber plumber = null; private volatile Plumber plumber = null;
@ -100,8 +99,7 @@ public class RealtimeIndexTask extends AbstractTask
@JsonProperty("firehose") FirehoseFactory firehoseFactory, @JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig, // TODO rename? @JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig, // TODO rename?
@JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity, @JsonProperty("segmentGranularity") IndexGranularity segmentGranularity
@JsonProperty("minTime") DateTime minTime
) )
{ {
super( super(
@ -122,7 +120,6 @@ public class RealtimeIndexTask extends AbstractTask
this.fireDepartmentConfig = fireDepartmentConfig; this.fireDepartmentConfig = fireDepartmentConfig;
this.windowPeriod = windowPeriod; this.windowPeriod = windowPeriod;
this.segmentGranularity = segmentGranularity; this.segmentGranularity = segmentGranularity;
this.minTime = minTime;
} }
@Override @Override
@ -164,18 +161,12 @@ public class RealtimeIndexTask extends AbstractTask
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }
Firehose wrappedFirehose = firehoseFactory.connect();
if (minTime != null) {
log.info("Wrapping firehose in MinTimeFirehose with minTime[%s]", minTime);
wrappedFirehose = new MinTimeFirehose(wrappedFirehose, minTime);
}
log.info( log.info(
"Wrapping firehose in GracefulShutdownFirehose with segmentGranularity[%s] and windowPeriod[%s]", "Wrapping firehose in GracefulShutdownFirehose with segmentGranularity[%s] and windowPeriod[%s]",
segmentGranularity, segmentGranularity,
windowPeriod windowPeriod
); );
firehose = new GracefulShutdownFirehose(wrappedFirehose, segmentGranularity, windowPeriod); firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod);
} }
// TODO -- Take PlumberSchool in constructor (although that will need jackson injectables for stuff like // TODO -- Take PlumberSchool in constructor (although that will need jackson injectables for stuff like
@ -366,12 +357,6 @@ public class RealtimeIndexTask extends AbstractTask
return segmentGranularity; return segmentGranularity;
} }
@JsonProperty
public DateTime getMinTime()
{
return minTime;
}
public static class TaskActionSegmentPublisher implements SegmentPublisher public static class TaskActionSegmentPublisher implements SegmentPublisher
{ {
final Task task; final Task task;

View File

@ -21,6 +21,7 @@ package com.metamx.druid.merger.coordinator.http;
import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.ec2.AmazonEC2Client; import com.amazonaws.services.ec2.AmazonEC2Client;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Optional; import com.google.common.base.Optional;
@ -62,6 +63,7 @@ import com.metamx.druid.merger.common.actions.TaskActionToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.RetryPolicyConfig; import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.TaskLogConfig; import com.metamx.druid.merger.common.config.TaskLogConfig;
import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.common.tasklogs.NoopTaskLogs; import com.metamx.druid.merger.common.tasklogs.NoopTaskLogs;
import com.metamx.druid.merger.common.tasklogs.S3TaskLogs; import com.metamx.druid.merger.common.tasklogs.S3TaskLogs;
@ -271,6 +273,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
initializeTaskStorage(); initializeTaskStorage();
initializeTaskLockbox(); initializeTaskLockbox();
initializeTaskQueue(); initializeTaskQueue();
initializeJacksonInjections();
initializeJacksonSubtypes(); initializeJacksonSubtypes();
initializeCurator(); initializeCurator();
initializeIndexerZkConfig(); initializeIndexerZkConfig();
@ -484,9 +487,21 @@ public class IndexerCoordinatorNode extends RegisteringNode
} }
} }
private void initializeJacksonInjections()
{
InjectableValues.Std injectables = new InjectableValues.Std();
injectables.addValue("s3Client", null)
.addValue("segmentPusher", null)
.addValue("eventReceiverProvider", null);
getJsonMapper().setInjectableValues(injectables);
}
private void initializeJacksonSubtypes() private void initializeJacksonSubtypes()
{ {
getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class); getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class);
getJsonMapper().registerSubtypes(EventReceiverFirehoseFactory.class);
} }
private void initializeHttpClient() private void initializeHttpClient()

View File

@ -0,0 +1,48 @@
package com.metamx.druid.merger.worker.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.metamx.druid.merger.common.index.EventReceiver;
import com.metamx.druid.merger.common.index.EventReceiverProvider;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Response;
import java.util.List;
import java.util.Map;
@Path("/mmx/worker/v1")
public class EventReceiverResource
{
private final ObjectMapper jsonMapper;
private final EventReceiverProvider receivers;
@Inject
public EventReceiverResource(ObjectMapper jsonMapper, EventReceiverProvider receivers)
{
this.jsonMapper = jsonMapper;
this.receivers = receivers;
}
@POST
@Path("/firehose/{id}/push-events")
@Produces("application/json")
public Response doPush(
@PathParam("id") String firehoseId,
List<Map<String, Object>> events
)
{
final Optional<EventReceiver> receiver = receivers.get(firehoseId);
if (receiver.isPresent()) {
receiver.get().addAll(events);
return Response.ok(ImmutableMap.of("eventCount", events.size())).build();
} else {
return Response.status(Response.Status.NOT_FOUND).build();
}
}
}

View File

@ -11,6 +11,7 @@ import com.metamx.common.ISE;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.index.EventReceiverProvider;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskRunner; import com.metamx.druid.merger.coordinator.TaskRunner;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;

View File

@ -24,8 +24,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
@ -38,6 +40,7 @@ import com.metamx.druid.client.ClientConfig;
import com.metamx.druid.client.ClientInventoryManager; import com.metamx.druid.client.ClientInventoryManager;
import com.metamx.druid.client.MutableServerView; import com.metamx.druid.client.MutableServerView;
import com.metamx.druid.client.OnlyNewSegmentWatcherServerView; import com.metamx.druid.client.OnlyNewSegmentWatcherServerView;
import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.QueryServlet; import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.http.StatusServlet; import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.CuratorConfig;
@ -50,13 +53,13 @@ import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentKiller; import com.metamx.druid.loading.S3DataSegmentKiller;
import com.metamx.druid.merger.common.RetryPolicyFactory; import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory; import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory;
import com.metamx.druid.merger.common.config.RetryPolicyConfig; import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.merger.common.index.EventReceiverProvider;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner; import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner;
import com.metamx.druid.merger.worker.config.WorkerConfig; import com.metamx.druid.merger.worker.config.WorkerConfig;
import com.metamx.druid.realtime.SegmentAnnouncer; import com.metamx.druid.realtime.SegmentAnnouncer;
@ -82,6 +85,7 @@ import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials; import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.Server; import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder; import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory; import org.skife.config.ConfigurationObjectFactory;
@ -122,6 +126,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
private Server server = null; private Server server = null;
private ExecutorServiceTaskRunner taskRunner = null; private ExecutorServiceTaskRunner taskRunner = null;
private ExecutorLifecycle executorLifecycle = null; private ExecutorLifecycle executorLifecycle = null;
private EventReceiverProvider eventReceiverProvider = null;
public ExecutorNode( public ExecutorNode(
Properties props, Properties props,
@ -209,6 +214,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
initializeDataSegmentPusher(); initializeDataSegmentPusher();
initializeTaskToolbox(); initializeTaskToolbox();
initializeTaskRunner(); initializeTaskRunner();
initializeEventReceiverProvider();
initializeJacksonInjections(); initializeJacksonInjections();
initializeJacksonSubtypes(); initializeJacksonSubtypes();
initializeServer(); initializeServer();
@ -226,9 +232,18 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
executorLifecycle = executorLifecycleFactory.build(taskRunner, getJsonMapper()); executorLifecycle = executorLifecycleFactory.build(taskRunner, getJsonMapper());
lifecycle.addManagedInstance(executorLifecycle); lifecycle.addManagedInstance(executorLifecycle);
final Injector injector = Guice.createInjector(
new ExecutorServletModule(
getJsonMapper(),
eventReceiverProvider
)
);
final Context root = new Context(server, "/", Context.SESSIONS); final Context root = new Context(server, "/", Context.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status"); root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addEventListener(new GuiceServletConfig(injector));
root.addFilter(GuiceFilter.class, "/mmx/worker/v1/*", 0);
root.addServlet( root.addServlet(
new ServletHolder( new ServletHolder(
new QueryServlet(getJsonMapper(), getSmileMapper(), taskRunner, emitter, getRequestLogger()) new QueryServlet(getJsonMapper(), getSmileMapper(), taskRunner, emitter, getRequestLogger())
@ -296,7 +311,8 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
InjectableValues.Std injectables = new InjectableValues.Std(); InjectableValues.Std injectables = new InjectableValues.Std();
injectables.addValue("s3Client", s3Service) injectables.addValue("s3Client", s3Service)
.addValue("segmentPusher", segmentPusher); .addValue("segmentPusher", segmentPusher)
.addValue("eventReceiverProvider", eventReceiverProvider);
getJsonMapper().setInjectableValues(injectables); getJsonMapper().setInjectableValues(injectables);
} }
@ -304,6 +320,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
private void initializeJacksonSubtypes() private void initializeJacksonSubtypes()
{ {
getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class); getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class);
getJsonMapper().registerSubtypes(EventReceiverFirehoseFactory.class);
} }
private void initializeHttpClient() private void initializeHttpClient()
@ -461,7 +478,14 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
.build() .build()
) )
) )
);; );
}
}
public void initializeEventReceiverProvider()
{
if (eventReceiverProvider == null) {
this.eventReceiverProvider = new EventReceiverProvider();
} }
} }

View File

@ -0,0 +1,44 @@
package com.metamx.druid.merger.worker.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.inject.Provides;
import com.metamx.druid.merger.common.index.EventReceiverProvider;
import com.sun.jersey.guice.JerseyServletModule;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import javax.inject.Singleton;
public class ExecutorServletModule extends JerseyServletModule
{
private final ObjectMapper jsonMapper;
private final EventReceiverProvider receivers;
public ExecutorServletModule(
ObjectMapper jsonMapper,
EventReceiverProvider receivers
)
{
this.jsonMapper = jsonMapper;
this.receivers = receivers;
}
@Override
protected void configureServlets()
{
bind(EventReceiverResource.class);
bind(ObjectMapper.class).toInstance(jsonMapper);
bind(EventReceiverProvider.class).toInstance(receivers);
serve("/*").with(GuiceContainer.class);
}
@Provides
@Singleton
public JacksonJsonProvider getJacksonJsonProvider()
{
final JacksonJsonProvider provider = new JacksonJsonProvider();
provider.setMapper(jsonMapper);
return provider;
}
}

View File

@ -19,6 +19,7 @@
package com.metamx.druid.merger.worker.http; package com.metamx.druid.merger.worker.http;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -41,6 +42,7 @@ import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.TaskLogConfig; import com.metamx.druid.merger.common.config.TaskLogConfig;
import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.common.tasklogs.NoopTaskLogs; import com.metamx.druid.merger.common.tasklogs.NoopTaskLogs;
import com.metamx.druid.merger.common.tasklogs.S3TaskLogs; import com.metamx.druid.merger.common.tasklogs.S3TaskLogs;
@ -188,6 +190,7 @@ public class WorkerNode extends RegisteringNode
initializeCuratorFramework(); initializeCuratorFramework();
initializeServiceDiscovery(); initializeServiceDiscovery();
initializeCoordinatorServiceProvider(); initializeCoordinatorServiceProvider();
initializeJacksonInjections();
initializeJacksonSubtypes(); initializeJacksonSubtypes();
initializeCuratorCoordinator(); initializeCuratorCoordinator();
initializePersistentTaskLogs(); initializePersistentTaskLogs();
@ -272,9 +275,21 @@ public class WorkerNode extends RegisteringNode
} }
} }
private void initializeJacksonInjections()
{
InjectableValues.Std injectables = new InjectableValues.Std();
injectables.addValue("s3Client", null)
.addValue("segmentPusher", null)
.addValue("eventReceiverProvider", null);
getJsonMapper().setInjectableValues(injectables);
}
private void initializeJacksonSubtypes() private void initializeJacksonSubtypes()
{ {
getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class); getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class);
getJsonMapper().registerSubtypes(EventReceiverFirehoseFactory.class);
} }
private void initializeHttpClient() private void initializeHttpClient()

View File

@ -205,8 +205,7 @@ public class TaskSerdeTest
null, null,
null, null,
new Period("PT10M"), new Period("PT10M"),
IndexGranularity.HOUR, IndexGranularity.HOUR
null
); );
final ObjectMapper jsonMapper = new DefaultObjectMapper(); final ObjectMapper jsonMapper = new DefaultObjectMapper();

View File

@ -58,8 +58,8 @@ import com.metamx.druid.merger.common.task.IndexTask;
import com.metamx.druid.merger.common.task.KillTask; import com.metamx.druid.merger.common.task.KillTask;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.exec.TaskConsumer; import com.metamx.druid.merger.coordinator.exec.TaskConsumer;
import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Event; import com.metamx.emitter.core.Event;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;

View File

@ -21,6 +21,8 @@ package com.metamx.druid.realtime;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.plumber.Plumber; import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.PlumberSchool; import com.metamx.druid.realtime.plumber.PlumberSchool;

View File

@ -36,6 +36,7 @@ import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryToolChest; import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.query.segment.QuerySegmentWalker; import com.metamx.druid.query.segment.QuerySegmentWalker;
import com.metamx.druid.query.segment.SegmentDescriptor; import com.metamx.druid.query.segment.SegmentDescriptor;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.plumber.Plumber; import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.Sink; import com.metamx.druid.realtime.plumber.Sink;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;

View File

@ -0,0 +1,56 @@
package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Predicate;
import com.metamx.druid.input.InputRow;
import org.joda.time.Interval;
import java.io.IOException;
/**
* Creates firehoses clipped to a particular time interval. Useful for enforcing min time, max time, and time windows.
*/
public class ClippedFirehoseFactory implements FirehoseFactory
{
private final FirehoseFactory delegate;
private final Interval interval;
@JsonCreator
public ClippedFirehoseFactory(
@JsonProperty("delegate") FirehoseFactory delegate,
@JsonProperty("interval") Interval interval
)
{
this.delegate = delegate;
this.interval = interval;
}
@JsonProperty
public FirehoseFactory getDelegate()
{
return delegate;
}
@JsonProperty
public Interval getInterval()
{
return interval;
}
@Override
public Firehose connect() throws IOException
{
return new PredicateFirehose(
delegate.connect(),
new Predicate<InputRow>()
{
@Override
public boolean apply(InputRow input)
{
return interval.contains(input.getTimestampFromEpoch());
}
}
);
}
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package com.metamx.druid.realtime; package com.metamx.druid.realtime.firehose;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
@ -28,7 +28,7 @@ import java.io.Closeable;
* abstraction. In order to add a new type of source for realtime data ingestion, all you need to do is implement * abstraction. In order to add a new type of source for realtime data ingestion, all you need to do is implement
* one of these and register it with the RealtimeMain. * one of these and register it with the RealtimeMain.
* *
* This object acts a lot like an Iterator, but it doesn't not extend the Iterator interface because it extends * This object acts a lot like an Iterator, but it doesn't extend the Iterator interface because it extends
* Closeable and it is very important that the close() method doesn't get forgotten, which is easy to do if this * Closeable and it is very important that the close() method doesn't get forgotten, which is easy to do if this
* gets passed around as an Iterator. * gets passed around as an Iterator.
* <p> * <p>

View File

@ -17,25 +17,25 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package com.metamx.druid.realtime; package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.io.IOException; import java.io.IOException;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({ @JsonSubTypes({
@JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class) @JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class),
@JsonSubTypes.Type(name = "clipped", value = ClippedFirehoseFactory.class),
@JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class)
}) })
public interface FirehoseFactory public interface FirehoseFactory
{ {
/** /**
* Initialization method that connects up the fire hose. If this method returns successfully it should be safe to * Initialization method that connects up the fire hose. If this method returns successfully it should be safe to
* call hasMore() on the returned Firehose (which might subsequently block). * call hasMore() on the returned Firehose (which might subsequently block).
* * <p/>
* If this method returns null, then any attempt to call hasMore(), nextRow(), commit() and close() on the return * If this method returns null, then any attempt to call hasMore(), nextRow(), commit() and close() on the return
* value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on * value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on
* invalid configuration is preferred over returning null. * invalid configuration is preferred over returning null.

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package com.metamx.druid.realtime; package com.metamx.druid.realtime.firehose;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package com.metamx.druid.realtime; package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;

View File

@ -1,26 +1,25 @@
package com.metamx.druid.realtime; package com.metamx.druid.realtime.firehose;
import com.google.common.base.Predicate;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
import org.joda.time.DateTime;
import java.io.IOException; import java.io.IOException;
import java.util.NoSuchElementException;
/** /**
* Provides a view on a firehose that only returns rows at or after a certain minimum timestamp. * Provides a view on a firehose that only returns rows that match a certain predicate.
* Not thread-safe. * Not thread-safe.
*/ */
public class MinTimeFirehose implements Firehose public class PredicateFirehose implements Firehose
{ {
private final Firehose firehose; private final Firehose firehose;
private final DateTime minTime; private final Predicate<InputRow> predicate;
private InputRow savedInputRow = null; private InputRow savedInputRow = null;
public MinTimeFirehose(Firehose firehose, DateTime minTime) public PredicateFirehose(Firehose firehose, Predicate<InputRow> predicate)
{ {
this.firehose = firehose; this.firehose = firehose;
this.minTime = minTime; this.predicate = predicate;
} }
@Override @Override
@ -32,7 +31,7 @@ public class MinTimeFirehose implements Firehose
while (firehose.hasMore()) { while (firehose.hasMore()) {
final InputRow row = firehose.nextRow(); final InputRow row = firehose.nextRow();
if (acceptable(row)) { if (predicate.apply(row)) {
savedInputRow = row; savedInputRow = row;
return true; return true;
} }
@ -60,9 +59,4 @@ public class MinTimeFirehose implements Firehose
{ {
firehose.close(); firehose.close();
} }
private boolean acceptable(InputRow row)
{
return row.getTimestampFromEpoch() >= minTime.getMillis();
}
} }

View File

@ -0,0 +1,122 @@
package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.druid.input.InputRow;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Creates firehoses that shut off at a particular time. Useful for limiting the lifespan of a realtime job.
*/
public class TimedShutoffFirehoseFactory implements FirehoseFactory
{
private static final EmittingLogger log = new EmittingLogger(FirehoseFactory.class);
private final FirehoseFactory delegateFactory;
private final DateTime shutoffTime;
@JsonCreator
public TimedShutoffFirehoseFactory(
@JsonProperty("delegate") FirehoseFactory delegateFactory,
@JsonProperty("shutoffTime") DateTime shutoffTime
)
{
this.delegateFactory = delegateFactory;
this.shutoffTime = shutoffTime;
}
@Override
public Firehose connect() throws IOException
{
return new TimedShutoffFirehose();
}
public class TimedShutoffFirehose implements Firehose
{
private final Firehose firehose;
private final ScheduledExecutorService exec;
private final Object shutdownLock = new Object();
private volatile boolean shutdown = false;
public TimedShutoffFirehose() throws IOException
{
firehose = delegateFactory.connect();
exec = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("timed-shutoff-firehose-%d")
.build()
);
exec.schedule(
new Runnable()
{
@Override
public void run()
{
log.info("Closing delegate firehose.");
shutdown = true;
try {
firehose.close();
} catch (IOException e) {
log.warn(e, "Failed to close delegate firehose, ignoring.");
}
}
},
shutoffTime.getMillis() - System.currentTimeMillis(),
TimeUnit.MILLISECONDS
);
log.info("Firehose created, will shut down at: %s", shutoffTime);
}
@Override
public boolean hasMore()
{
return firehose.hasMore();
}
@Override
public InputRow nextRow()
{
return firehose.nextRow();
}
@Override
public Runnable commit()
{
return firehose.commit();
}
@Override
public void close() throws IOException
{
synchronized (shutdownLock) {
if (!shutdown) {
shutdown = true;
firehose.close();
}
}
}
}
@JsonProperty("delegate")
public FirehoseFactory getDelegateFactory()
{
return delegateFactory;
}
@JsonProperty("shutoffTime")
public DateTime getShutoffTime()
{
return shutoffTime;
}
}