Merge pull request #254 from metamx/local-index

Add a local firehose for indexing local files
This commit is contained in:
cheddar 2013-10-01 14:17:05 -07:00
commit 1c77177de3
19 changed files with 427 additions and 263 deletions

View File

@ -25,6 +25,8 @@ import com.google.common.base.Optional;
*/
public interface ChatHandlerProvider
{
public String getType();
public void register(final String key, ChatHandler handler);
public void unregister(final String key);

View File

@ -57,20 +57,20 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class);
private static final int DEFAULT_BUFFER_SIZE = 100000;
private final String firehoseId;
private final String serviceName;
private final int bufferSize;
private final MapInputRowParser parser;
private final Optional<EventReceivingChatHandlerProvider> chatHandlerProvider;
private final Optional<ChatHandlerProvider> chatHandlerProvider;
@JsonCreator
public EventReceiverFirehoseFactory(
@JsonProperty("firehoseId") String firehoseId,
@JsonProperty("serviceName") String serviceName,
@JsonProperty("bufferSize") Integer bufferSize,
@JsonProperty("parser") MapInputRowParser parser,
@JacksonInject("chatHandlerProvider") EventReceivingChatHandlerProvider chatHandlerProvider
@JacksonInject ChatHandlerProvider chatHandlerProvider
)
{
this.firehoseId = Preconditions.checkNotNull(firehoseId, "firehoseId");
this.serviceName = Preconditions.checkNotNull(serviceName, "serviceName");
this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize;
this.parser = Preconditions.checkNotNull(parser, "parser");
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
@ -79,21 +79,24 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
@Override
public Firehose connect() throws IOException
{
log.info("Connecting firehose: %s", firehoseId);
log.info("Connecting firehose: %s", serviceName);
final EventReceiverFirehose firehose = new EventReceiverFirehose();
if (chatHandlerProvider.isPresent()) {
chatHandlerProvider.get().register(firehoseId, firehose);
log.info("Found chathandler with type[%s]", chatHandlerProvider.get().getType());
chatHandlerProvider.get().register(serviceName, firehose);
} else {
log.info("No chathandler detected");
}
return firehose;
}
@JsonProperty
public String getFirehoseId()
public String getServiceName()
{
return firehoseId;
return serviceName;
}
@JsonProperty
@ -111,7 +114,9 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
public class EventReceiverFirehose implements ChatHandler, Firehose
{
private final BlockingQueue<InputRow> buffer;
private final Object readLock = new Object();
private volatile InputRow nextRow = null;
private volatile boolean closed = false;
@ -125,7 +130,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
@Produces("application/json")
public Response addAll(Collection<Map<String, Object>> events)
{
log.debug("Adding %,d events to firehose: %s", events.size(), firehoseId);
log.debug("Adding %,d events to firehose: %s", events.size(), serviceName);
final List<InputRow> rows = Lists.newArrayList();
for (final Map<String, Object> event : events) {
@ -146,7 +151,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
}
return Response.ok().entity(ImmutableMap.of("eventCount", events.size())).build();
} catch (InterruptedException e) {
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
@ -167,7 +173,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
}
return nextRow != null;
}
}
}
@Override
@ -205,7 +211,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
closed = true;
if (chatHandlerProvider.isPresent()) {
chatHandlerProvider.get().unregister(firehoseId);
chatHandlerProvider.get().unregister(serviceName);
}
}
}

View File

@ -54,6 +54,12 @@ public class EventReceivingChatHandlerProvider implements ChatHandlerProvider
this.handlers = Maps.newConcurrentMap();
}
@Override
public String getType()
{
return "eventReceiving";
}
@Override
public void register(final String service, ChatHandler handler)
{
@ -76,7 +82,6 @@ public class EventReceivingChatHandlerProvider implements ChatHandlerProvider
@Override
public void unregister(final String service)
{
log.info("Unregistering chat handler[%s]", service);
final ChatHandler handler = handlers.get(service);

View File

@ -25,6 +25,12 @@ import com.google.common.base.Optional;
*/
public class NoopChatHandlerProvider implements ChatHandlerProvider
{
@Override
public String getType()
{
return "noop";
}
@Override
public void register(String key, ChatHandler handler)
{

View File

@ -23,16 +23,16 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.StringInputRowParser;
import io.druid.segment.realtime.firehose.FileIteratingFirehose;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import org.jets3t.service.S3Service;
@ -43,9 +43,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.zip.GZIPInputStream;
/**
@ -54,15 +54,12 @@ import java.util.zip.GZIPInputStream;
@JsonTypeName("s3")
public class StaticS3FirehoseFactory implements FirehoseFactory
{
private static final Logger log = new Logger(StaticS3FirehoseFactory.class);
private final S3Service s3Client;
private final StringInputRowParser parser;
private final List<URI> uris;
private final long retryCount = 5;
private final long retryMillis = 5000;
private static final Logger log = new Logger(StaticS3FirehoseFactory.class);
@JsonCreator
public StaticS3FirehoseFactory(
@JacksonInject("s3Client") S3Service s3Client,
@ -96,25 +93,21 @@ public class StaticS3FirehoseFactory implements FirehoseFactory
{
Preconditions.checkNotNull(s3Client, "null s3Client");
return new Firehose()
{
LineIterator lineIterator = null;
final Queue<URI> objectQueue = Lists.newLinkedList(uris);
final LinkedList<URI> objectQueue = Lists.newLinkedList(uris);
// Rolls over our streams and iterators to the next file, if appropriate
private void maybeNextFile() throws Exception
{
if (lineIterator == null || !lineIterator.hasNext()) {
// Close old streams, maybe.
if (lineIterator != null) {
lineIterator.close();
return new FileIteratingFirehose(
new Iterator<LineIterator>()
{
@Override
public boolean hasNext()
{
return !objectQueue.isEmpty();
}
// Open new streams, maybe.
final URI nextURI = objectQueue.poll();
if (nextURI != null) {
@Override
public LineIterator next()
{
final URI nextURI = objectQueue.poll();
final String s3Bucket = nextURI.getAuthority();
final S3Object s3Object = new S3Object(
@ -125,7 +118,6 @@ public class StaticS3FirehoseFactory implements FirehoseFactory
log.info("Reading from bucket[%s] object[%s] (%s)", s3Bucket, s3Object.getKey(), nextURI);
int ntry = 0;
try {
final InputStream innerInputStream = s3Client.getObject(s3Bucket, s3Object.getKey())
.getDataInputStream();
@ -134,75 +126,31 @@ public class StaticS3FirehoseFactory implements FirehoseFactory
? new GZIPInputStream(innerInputStream)
: innerInputStream;
lineIterator = IOUtils.lineIterator(
return IOUtils.lineIterator(
new BufferedReader(
new InputStreamReader(outerInputStream, Charsets.UTF_8)
)
);
} catch(IOException e) {
}
catch (Exception e) {
log.error(
e,
"Exception reading from bucket[%s] object[%s] (try %d) (sleeping %d millis)",
"Exception reading from bucket[%s] object[%s]",
s3Bucket,
s3Object.getKey(),
ntry,
retryMillis
s3Object.getKey()
);
ntry ++;
if(ntry <= retryCount) {
Thread.sleep(retryMillis);
}
throw Throwables.propagate(e);
}
}
}
}
@Override
public boolean hasMore()
{
try {
maybeNextFile();
} catch(Exception e) {
throw Throwables.propagate(e);
}
return lineIterator != null && lineIterator.hasNext();
}
@Override
public InputRow nextRow()
{
try {
maybeNextFile();
} catch(Exception e) {
throw Throwables.propagate(e);
}
if(lineIterator == null) {
throw new NoSuchElementException();
}
return parser.parse(lineIterator.next());
}
@Override
public Runnable commit()
{
// Do nothing.
return new Runnable() { public void run() {} };
}
@Override
public void close() throws IOException
{
objectQueue.clear();
if(lineIterator != null) {
lineIterator.close();
}
}
};
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
},
parser
);
}
}

View File

@ -43,6 +43,8 @@ import java.util.List;
public class IndexTask extends AbstractTask
{
private static final Logger log = new Logger(IndexTask.class);
@JsonIgnore
private final GranularitySpec granularitySpec;
@ -64,8 +66,6 @@ public class IndexTask extends AbstractTask
@JsonIgnore
private final int rowFlushBoundary;
private static final Logger log = new Logger(IndexTask.class);
@JsonCreator
public IndexTask(
@JsonProperty("id") String id,
@ -94,7 +94,7 @@ public class IndexTask extends AbstractTask
? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
this.aggregators = aggregators;
this.indexGranularity = indexGranularity;
this.indexGranularity = (indexGranularity == null) ? QueryGranularity.NONE : indexGranularity;
this.targetPartitionSize = targetPartitionSize;
this.firehoseFactory = firehoseFactory;
this.rowFlushBoundary = rowFlushBoundary;
@ -202,5 +202,4 @@ public class IndexTask extends AbstractTask
{
return rowFlushBoundary;
}
}

View File

@ -22,6 +22,7 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.logger.Logger;
import io.druid.data.input.FirehoseFactory;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import org.joda.time.DateTime;
@ -33,11 +34,17 @@ import org.joda.time.Period;
public class NoopTask extends AbstractTask
{
private static final Logger log = new Logger(NoopTask.class);
private static int defaultRunTime = 2500;
private final int runTime;
private final FirehoseFactory firehoseFactory;
@JsonCreator
public NoopTask(
@JsonProperty("id") String id,
@JsonProperty("interval") Interval interval
@JsonProperty("interval") Interval interval,
@JsonProperty("runTime") int runTime,
@JsonProperty("firehose") FirehoseFactory firehoseFactory
)
{
super(
@ -45,6 +52,10 @@ public class NoopTask extends AbstractTask
"none",
interval == null ? new Interval(Period.days(1), new DateTime()) : interval
);
this.runTime = (runTime == 0) ? defaultRunTime : runTime;
this.firehoseFactory = firehoseFactory;
}
@Override
@ -53,14 +64,29 @@ public class NoopTask extends AbstractTask
return "noop";
}
@JsonProperty("runTime")
public int getRunTime()
{
return runTime;
}
@JsonProperty("firehose")
public FirehoseFactory getFirehoseFactory()
{
return firehoseFactory;
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final int sleepTime = 2500;
if (firehoseFactory != null) {
log.info("Connecting firehose");
firehoseFactory.connect();
}
log.info("Running noop task[%s]", getId());
log.info("Sleeping for %,d millis.", sleepTime);
Thread.sleep(sleepTime);
log.info("Sleeping for %,d millis.", runTime);
Thread.sleep(runTime);
log.info("Woke up!");
return TaskStatus.success(getId());
}

View File

@ -639,7 +639,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
log.info("Task[%s] just disappeared!", taskId);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
} else {
log.warn("Task[%s] just disappeared but I didn't know about it?!", taskId);
log.info("Task[%s] went bye bye.", taskId);
}
break;
}

View File

@ -24,6 +24,7 @@ import com.google.inject.Inject;
import io.druid.indexing.common.index.ChatHandler;
import io.druid.indexing.common.index.ChatHandlerProvider;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Response;
@ -39,6 +40,7 @@ public class ChatHandlerResource
this.handlers = handlers;
}
@POST
@Path("/chat/{id}")
public Object doTaskChat(
@PathParam("id") String handlerId

View File

@ -31,6 +31,8 @@ import java.util.Map;
*/
public class TimestampSpec
{
private static final String defaultFormat = "auto";
private final String timestampColumn;
private final String timestampFormat;
private final Function<String, DateTime> timestampConverter;
@ -42,8 +44,8 @@ public class TimestampSpec
)
{
this.timestampColumn = timestampColumn;
this.timestampFormat = format;
this.timestampConverter = ParserUtils.createTimestampParser(format);
this.timestampFormat = format == null ? defaultFormat : format;
this.timestampConverter = ParserUtils.createTimestampParser(timestampFormat);
}
@JsonProperty("column")

View File

@ -0,0 +1,96 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.realtime.firehose;
import com.google.common.base.Throwables;
import io.druid.common.guava.Runnables;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.data.input.StringInputRowParser;
import org.apache.commons.io.LineIterator;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
/**
*/
public class FileIteratingFirehose implements Firehose
{
private final Iterator<LineIterator> lineIterators;
private final StringInputRowParser parser;
private LineIterator lineIterator = null;
public FileIteratingFirehose(
Iterator<LineIterator> lineIterators,
StringInputRowParser parser
)
{
this.lineIterators = lineIterators;
this.parser = parser;
}
@Override
public boolean hasMore()
{
try {
return lineIterators.hasNext() || (lineIterator != null && lineIterator.hasNext());
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public InputRow nextRow()
{
try {
if (lineIterator == null || !lineIterator.hasNext()) {
// Close old streams, maybe.
if (lineIterator != null) {
lineIterator.close();
}
lineIterator = lineIterators.next();
}
return parser.parse(lineIterator.next());
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override
public void close() throws IOException
{
if (lineIterator != null) {
lineIterator.close();
}
}
}

View File

@ -0,0 +1,124 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.StringInputRowParser;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
/**
*/
public class LocalFirehoseFactory implements FirehoseFactory
{
private final File baseDir;
private final String filter;
private final StringInputRowParser parser;
@JsonCreator
public LocalFirehoseFactory(
@JsonProperty("baseDir") File baseDir,
@JsonProperty("filter") String filter,
@JsonProperty("parser") StringInputRowParser parser
)
{
this.baseDir = baseDir;
this.filter = filter;
this.parser = parser;
}
@JsonProperty
public File getBaseDir()
{
return baseDir;
}
@JsonProperty
public String getFilter()
{
return filter;
}
@JsonProperty
public StringInputRowParser getParser()
{
return parser;
}
@Override
public Firehose connect() throws IOException
{
final LinkedList<File> files = Lists.<File>newLinkedList(
Arrays.<File>asList(
baseDir.listFiles(
new FilenameFilter()
{
@Override
public boolean accept(File file, String name)
{
return name.contains(filter);
}
}
)
)
);
return new FileIteratingFirehose(
new Iterator<LineIterator>()
{
@Override
public boolean hasNext()
{
return !files.isEmpty();
}
@Override
public LineIterator next()
{
try {
return FileUtils.lineIterator(files.poll());
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
},
parser
);
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.util.Providers;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.guice.IndexingServiceModuleHelper;
@ -32,6 +33,7 @@ import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.indexing.coordinator.ForkingTaskRunner;
import io.druid.indexing.coordinator.TaskRunner;
import io.druid.indexing.worker.Worker;
@ -76,6 +78,8 @@ public class CliMiddleManager extends ServerRunnable
binder.bind(TaskRunner.class).to(ForkingTaskRunner.class);
binder.bind(ForkingTaskRunner.class).in(LazySingleton.class);
binder.bind(ChatHandlerProvider.class).toProvider(Providers.<ChatHandlerProvider>of(null));
binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class);
binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class);

View File

@ -19,8 +19,6 @@
package io.druid.cli;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
@ -29,11 +27,8 @@ import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.servlet.GuiceFilter;
import com.google.inject.util.Providers;
import com.metamx.common.logger.Logger;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import io.airlift.command.Command;
import io.druid.guice.IndexingServiceModuleHelper;
import io.druid.guice.JacksonConfigProvider;
@ -47,8 +42,7 @@ import io.druid.guice.PolyBind;
import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskLogs;
@ -77,12 +71,6 @@ import io.druid.indexing.coordinator.scaling.ResourceManagementStrategy;
import io.druid.indexing.coordinator.scaling.SimpleResourceManagementConfig;
import io.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy;
import io.druid.indexing.coordinator.setup.WorkerSetupData;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import io.druid.server.http.RedirectFilter;
import io.druid.server.http.RedirectInfo;
import io.druid.server.initialization.JettyServerInitializer;
@ -98,7 +86,6 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
import org.eclipse.jetty.util.resource.ResourceCollection;
import java.util.Arrays;
import java.util.List;
/**
@ -120,7 +107,7 @@ public class CliOverlord extends ServerRunnable
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new DruidModule()
new Module()
{
@Override
public void configure(Binder binder)
@ -146,6 +133,8 @@ public class CliOverlord extends ServerRunnable
.to(ResourceManagementSchedulerFactoryImpl.class)
.in(LazySingleton.class);
binder.bind(ChatHandlerProvider.class).toProvider(Providers.<ChatHandlerProvider>of(null));
configureTaskStorage(binder);
configureRunners(binder);
configureAutoscale(binder);
@ -177,7 +166,10 @@ public class CliOverlord extends ServerRunnable
private void configureRunners(Binder binder)
{
PolyBind.createChoice(
binder, "druid.indexer.runner.type", Key.get(TaskRunnerFactory.class), Key.get(ForkingTaskRunnerFactory.class)
binder,
"druid.indexer.runner.type",
Key.get(TaskRunnerFactory.class),
Key.get(ForkingTaskRunnerFactory.class)
);
final MapBinder<String, TaskRunnerFactory> biddy = PolyBind.optionBinder(binder, Key.get(TaskRunnerFactory.class));
@ -214,27 +206,6 @@ public class CliOverlord extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class);
}
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return Arrays.<com.fasterxml.jackson.databind.Module>asList(
new SimpleModule("RealtimeModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream"),
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"),
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
new NamedType(ClippedFirehoseFactory.class, "clipped"),
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(StaticS3FirehoseFactory.class, "s3"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver")
)
);
}
}
);
}

View File

@ -19,8 +19,6 @@
package io.druid.cli;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
@ -30,10 +28,6 @@ import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import io.airlift.command.Arguments;
import io.airlift.command.Command;
import io.airlift.command.Option;
@ -51,26 +45,18 @@ import io.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.indexing.common.index.EventReceivingChatHandlerProvider;
import io.druid.indexing.common.index.NoopChatHandlerProvider;
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
import io.druid.indexing.coordinator.TaskRunner;
import io.druid.indexing.coordinator.ThreadPoolTaskRunner;
import io.druid.indexing.worker.executor.ChatHandlerResource;
import io.druid.indexing.worker.executor.ExecutorLifecycle;
import io.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import io.druid.initialization.DruidModule;
import io.druid.query.QuerySegmentWalker;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.S3DataSegmentKiller;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import io.druid.server.initialization.JettyServerInitializer;
import org.eclipse.jetty.server.Server;
@ -104,7 +90,7 @@ public class CliPeon extends GuiceRunnable
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new DruidModule()
new Module()
{
@Override
public void configure(Binder binder)
@ -118,8 +104,10 @@ public class CliPeon extends GuiceRunnable
final MapBinder<String, ChatHandlerProvider> handlerProviderBinder = PolyBind.optionBinder(
binder, Key.get(ChatHandlerProvider.class)
);
handlerProviderBinder.addBinding("curator").to(EventReceivingChatHandlerProvider.class);
handlerProviderBinder.addBinding("noop").to(NoopChatHandlerProvider.class);
handlerProviderBinder.addBinding("receiver")
.to(EventReceivingChatHandlerProvider.class).in(LazySingleton.class);
handlerProviderBinder.addBinding("noop")
.to(NoopChatHandlerProvider.class).in(LazySingleton.class);
binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
@ -157,27 +145,6 @@ public class CliPeon extends GuiceRunnable
LifecycleModule.register(binder, Server.class);
}
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return Arrays.<com.fasterxml.jackson.databind.Module>asList(
new SimpleModule("RealtimeModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream"),
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"),
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
new NamedType(ClippedFirehoseFactory.class, "clipped"),
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(StaticS3FirehoseFactory.class, "s3"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver")
)
);
}
}
);
}

View File

@ -19,38 +19,23 @@
package io.druid.cli;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.metamx.common.logger.Logger;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import io.airlift.command.Command;
import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
import io.druid.client.ServerView;
import io.druid.guice.NoopSegmentPublisherProvider;
import io.druid.guice.RealtimeModule;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
import io.druid.initialization.DruidModule;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
@ -74,7 +59,7 @@ public class CliRealtimeExample extends ServerRunnable
{
return ImmutableList.<Object>of(
new RealtimeModule(),
new DruidModule()
new Module()
{
@Override
public void configure(Binder binder)
@ -85,27 +70,6 @@ public class CliRealtimeExample extends ServerRunnable
binder.bind(InventoryView.class).to(NoopInventoryView.class);
binder.bind(ServerView.class).to(NoopServerView.class);
}
@Override
public List<Module> getJacksonModules()
{
return Arrays.<Module>asList(
new SimpleModule("RealtimeExampleModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream"),
new NamedType(KafkaFirehoseFactory.class, "kafka"),
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
new NamedType(ClippedFirehoseFactory.class, "clipped"),
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(StaticS3FirehoseFactory.class, "s3"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver")
)
);
}
}
);
}

View File

@ -40,6 +40,7 @@ import io.druid.guice.DataSegmentPusherPullerModule;
import io.druid.guice.DbConnectorModule;
import io.druid.guice.DruidProcessingModule;
import io.druid.guice.DruidSecondaryModule;
import io.druid.guice.FirehoseModule;
import io.druid.guice.HttpClientModule;
import io.druid.guice.IndexingServiceDiscoveryModule;
import io.druid.guice.JacksonConfigManagerModule;
@ -225,7 +226,8 @@ public class
new JacksonConfigManagerModule(),
new IndexingServiceDiscoveryModule(),
new DataSegmentPusherPullerModule(),
new TaskLogsModule()
new TaskLogsModule(),
new FirehoseModule()
);
ModuleList actualModules = new ModuleList(baseInjector);

View File

@ -0,0 +1,73 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import java.util.Arrays;
import java.util.List;
/**
*/
public class FirehoseModule implements DruidModule
{
@Override
public void configure(Binder binder)
{
}
@Override
public List<? extends Module> getJacksonModules()
{
return Arrays.<Module>asList(
new SimpleModule("FirehoseModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream"),
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"),
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
new NamedType(ClippedFirehoseFactory.class, "clipped"),
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(StaticS3FirehoseFactory.class, "s3"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
new NamedType(LocalFirehoseFactory.class, "local")
)
);
}
}

View File

@ -19,41 +19,26 @@
package io.druid.guice;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import io.druid.cli.QueryJettyServerInitializer;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
import io.druid.initialization.DruidModule;
import io.druid.query.QuerySegmentWalker;
import io.druid.segment.realtime.DbSegmentPublisher;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.NoopSegmentPublisher;
import io.druid.segment.realtime.RealtimeManager;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import io.druid.server.initialization.JettyServerInitializer;
import org.eclipse.jetty.server.Server;
import java.util.Arrays;
import java.util.List;
/**
*/
public class RealtimeModule implements DruidModule
public class RealtimeModule implements Module
{
@Override
public void configure(Binder binder)
@ -64,7 +49,10 @@ public class RealtimeModule implements DruidModule
Key.get(SegmentPublisher.class),
Key.get(NoopSegmentPublisher.class)
);
final MapBinder<String, SegmentPublisher> publisherBinder = PolyBind.optionBinder(binder, Key.get(SegmentPublisher.class));
final MapBinder<String, SegmentPublisher> publisherBinder = PolyBind.optionBinder(
binder,
Key.get(SegmentPublisher.class)
);
publisherBinder.addBinding("db").to(DbSegmentPublisher.class);
binder.bind(DbSegmentPublisher.class).in(LazySingleton.class);
@ -83,25 +71,4 @@ public class RealtimeModule implements DruidModule
LifecycleModule.register(binder, Server.class);
}
@Override
public List<? extends Module> getJacksonModules()
{
return Arrays.<Module>asList(
new SimpleModule("RealtimeModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream"),
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"),
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
new NamedType(ClippedFirehoseFactory.class, "clipped"),
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(StaticS3FirehoseFactory.class, "s3"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver")
)
);
}
}