From dc5dab8747902c25bf14eb100ea9d960d2fbcd37 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 27 Sep 2013 17:09:59 -0700 Subject: [PATCH] Fixes for property conversion, firehose registration, and the indexing service --- .../coordinator/ForkingTaskRunner.java | 4 +- .../java/io/druid/client/selector/Server.java | 1 + .../discovery/CuratorServiceAnnouncer.java | 2 +- .../curator/discovery/DiscoveryModule.java | 6 +-- .../discovery/ServerDiscoverySelector.java | 6 +++ .../main/java/io/druid/cli/CliOverlord.java | 38 ++++++++++++++++++- .../src/main/java/io/druid/cli/CliPeon.java | 37 +++++++++++++++++- .../java/io/druid/cli/CliRealtimeExample.java | 16 +++++++- .../druid/cli/convert/ConvertProperties.java | 10 +++-- .../java/io/druid/cli/convert/Rename.java | 7 +++- .../java/io/druid/guice/RealtimeModule.java | 22 ++++++++++- 11 files changed, 135 insertions(+), 14 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/coordinator/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/coordinator/ForkingTaskRunner.java index 382aaed6ece..e18a806f338 100644 --- a/indexing-service/src/main/java/io/druid/indexing/coordinator/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/coordinator/ForkingTaskRunner.java @@ -200,12 +200,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer command.add("io.druid.cli.Main"); command.add("internal"); command.add("peon"); + command.add(taskFile.toString()); + command.add(statusFile.toString()); String nodeType = task.getNodeType(); if (nodeType != null) { command.add(String.format("--nodeType %s", nodeType)); } - command.add(taskFile.toString()); - command.add(statusFile.toString()); jsonMapper.writeValue(taskFile, task); diff --git a/server/src/main/java/io/druid/client/selector/Server.java b/server/src/main/java/io/druid/client/selector/Server.java index 664d621136e..79fc216a12b 100644 --- a/server/src/main/java/io/druid/client/selector/Server.java +++ b/server/src/main/java/io/druid/client/selector/Server.java @@ -25,5 +25,6 @@ public interface Server { public String getScheme(); public String getHost(); + public String getAddress(); public int getPort(); } diff --git a/server/src/main/java/io/druid/curator/discovery/CuratorServiceAnnouncer.java b/server/src/main/java/io/druid/curator/discovery/CuratorServiceAnnouncer.java index 633b987a88a..2a0eb3b770c 100644 --- a/server/src/main/java/io/druid/curator/discovery/CuratorServiceAnnouncer.java +++ b/server/src/main/java/io/druid/curator/discovery/CuratorServiceAnnouncer.java @@ -62,7 +62,7 @@ public class CuratorServiceAnnouncer implements ServiceAnnouncer try { instance = ServiceInstance.builder() .name(serviceName) - .address(service.getHost()) + .address(service.getHostNoPort()) .port(service.getPort()) .build(); } diff --git a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java index 36e13f2e144..f3fc56d59a7 100644 --- a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java @@ -147,9 +147,9 @@ public class DiscoveryModule implements Module final Injector injector, final Set> nodesToAnnounce, final Lifecycle lifecycle - ) + ) throws Exception { - lifecycle.addHandler( + lifecycle.addMaybeStartHandler( new Lifecycle.Handler() { private volatile List nodes = null; @@ -203,7 +203,7 @@ public class DiscoveryModule implements Module .client(curator) .build(); - lifecycle.addHandler( + lifecycle.addMaybeStartHandler( new Lifecycle.Handler() { @Override diff --git a/server/src/main/java/io/druid/curator/discovery/ServerDiscoverySelector.java b/server/src/main/java/io/druid/curator/discovery/ServerDiscoverySelector.java index b7cd72abf17..fe716715ad3 100644 --- a/server/src/main/java/io/druid/curator/discovery/ServerDiscoverySelector.java +++ b/server/src/main/java/io/druid/curator/discovery/ServerDiscoverySelector.java @@ -63,6 +63,12 @@ public class ServerDiscoverySelector implements DiscoverySelector { @Override public String getHost() + { + return String.format("%s:%d", getAddress(), getPort()); + } + + @Override + public String getAddress() { return instance.getAddress(); } diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 446283e0dee..c8fe70ab2b8 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -19,6 +19,8 @@ package io.druid.cli; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Injector; @@ -28,6 +30,10 @@ import com.google.inject.TypeLiteral; import com.google.inject.multibindings.MapBinder; import com.google.inject.servlet.GuiceFilter; import com.metamx.common.logger.Logger; +import druid.examples.flights.FlightsFirehoseFactory; +import druid.examples.rand.RandomFirehoseFactory; +import druid.examples.twitter.TwitterSpritzerFirehoseFactory; +import druid.examples.web.WebFirehoseFactory; import io.airlift.command.Command; import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.JacksonConfigProvider; @@ -41,6 +47,8 @@ import io.druid.guice.PolyBind; import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; +import io.druid.indexing.common.index.EventReceiverFirehoseFactory; +import io.druid.indexing.common.index.StaticS3FirehoseFactory; import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskLogs; @@ -69,6 +77,12 @@ import io.druid.indexing.coordinator.scaling.ResourceManagementStrategy; import io.druid.indexing.coordinator.scaling.SimpleResourceManagementConfig; import io.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy; import io.druid.indexing.coordinator.setup.WorkerSetupData; +import io.druid.initialization.DruidModule; +import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; +import io.druid.segment.realtime.firehose.IrcFirehoseFactory; +import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; +import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory; +import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.server.http.RedirectFilter; import io.druid.server.http.RedirectInfo; import io.druid.server.initialization.JettyServerInitializer; @@ -84,6 +98,7 @@ import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlets.GzipFilter; import org.eclipse.jetty.util.resource.ResourceCollection; +import java.util.Arrays; import java.util.List; /** @@ -105,7 +120,7 @@ public class CliOverlord extends ServerRunnable protected List getModules() { return ImmutableList.of( - new Module() + new DruidModule() { @Override public void configure(Binder binder) @@ -199,6 +214,27 @@ public class CliOverlord extends ServerRunnable JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class); } + + @Override + public List getJacksonModules() + { + return Arrays.asList( + new SimpleModule("RealtimeModule") + .registerSubtypes( + new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), + new NamedType(FlightsFirehoseFactory.class, "flights"), + new NamedType(RandomFirehoseFactory.class, "rand"), + new NamedType(WebFirehoseFactory.class, "webstream"), + new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"), + new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), + new NamedType(ClippedFirehoseFactory.class, "clipped"), + new NamedType(TimedShutoffFirehoseFactory.class, "timed"), + new NamedType(IrcFirehoseFactory.class, "irc"), + new NamedType(StaticS3FirehoseFactory.class, "s3"), + new NamedType(EventReceiverFirehoseFactory.class, "receiver") + ) + ); + } } ); } diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index b4b9ce4e661..46edf595802 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -19,6 +19,8 @@ package io.druid.cli; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; @@ -28,6 +30,10 @@ import com.google.inject.Module; import com.google.inject.multibindings.MapBinder; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; +import druid.examples.flights.FlightsFirehoseFactory; +import druid.examples.rand.RandomFirehoseFactory; +import druid.examples.twitter.TwitterSpritzerFirehoseFactory; +import druid.examples.web.WebFirehoseFactory; import io.airlift.command.Arguments; import io.airlift.command.Command; import io.airlift.command.Option; @@ -45,18 +51,26 @@ import io.druid.indexing.common.actions.RemoteTaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.index.ChatHandlerProvider; +import io.druid.indexing.common.index.EventReceiverFirehoseFactory; import io.druid.indexing.common.index.EventReceivingChatHandlerProvider; import io.druid.indexing.common.index.NoopChatHandlerProvider; +import io.druid.indexing.common.index.StaticS3FirehoseFactory; import io.druid.indexing.coordinator.TaskRunner; import io.druid.indexing.coordinator.ThreadPoolTaskRunner; import io.druid.indexing.worker.executor.ChatHandlerResource; import io.druid.indexing.worker.executor.ExecutorLifecycle; import io.druid.indexing.worker.executor.ExecutorLifecycleConfig; +import io.druid.initialization.DruidModule; import io.druid.query.QuerySegmentWalker; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.S3DataSegmentKiller; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.StorageLocationConfig; +import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; +import io.druid.segment.realtime.firehose.IrcFirehoseFactory; +import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; +import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory; +import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.server.initialization.JettyServerInitializer; import org.eclipse.jetty.server.Server; @@ -90,7 +104,7 @@ public class CliPeon extends GuiceRunnable protected List getModules() { return ImmutableList.of( - new Module() + new DruidModule() { @Override public void configure(Binder binder) @@ -143,6 +157,27 @@ public class CliPeon extends GuiceRunnable LifecycleModule.register(binder, Server.class); } + + @Override + public List getJacksonModules() + { + return Arrays.asList( + new SimpleModule("RealtimeModule") + .registerSubtypes( + new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), + new NamedType(FlightsFirehoseFactory.class, "flights"), + new NamedType(RandomFirehoseFactory.class, "rand"), + new NamedType(WebFirehoseFactory.class, "webstream"), + new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"), + new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), + new NamedType(ClippedFirehoseFactory.class, "clipped"), + new NamedType(TimedShutoffFirehoseFactory.class, "timed"), + new NamedType(IrcFirehoseFactory.class, "irc"), + new NamedType(StaticS3FirehoseFactory.class, "s3"), + new NamedType(EventReceiverFirehoseFactory.class, "receiver") + ) + ); + } } ); } diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index 0dcbfe6d99b..e0ad5b471cd 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -35,9 +35,16 @@ import io.druid.client.InventoryView; import io.druid.client.ServerView; import io.druid.guice.NoopSegmentPublisherProvider; import io.druid.guice.RealtimeModule; +import io.druid.indexing.common.index.EventReceiverFirehoseFactory; +import io.druid.indexing.common.index.StaticS3FirehoseFactory; import io.druid.initialization.DruidModule; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; +import io.druid.segment.realtime.firehose.IrcFirehoseFactory; +import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; +import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory; +import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; @@ -88,7 +95,14 @@ public class CliRealtimeExample extends ServerRunnable new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), new NamedType(FlightsFirehoseFactory.class, "flights"), new NamedType(RandomFirehoseFactory.class, "rand"), - new NamedType(WebFirehoseFactory.class, "webstream") + new NamedType(WebFirehoseFactory.class, "webstream"), + new NamedType(KafkaFirehoseFactory.class, "kafka"), + new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), + new NamedType(ClippedFirehoseFactory.class, "clipped"), + new NamedType(TimedShutoffFirehoseFactory.class, "timed"), + new NamedType(IrcFirehoseFactory.class, "irc"), + new NamedType(StaticS3FirehoseFactory.class, "s3"), + new NamedType(EventReceiverFirehoseFactory.class, "receiver") ) ); } diff --git a/services/src/main/java/io/druid/cli/convert/ConvertProperties.java b/services/src/main/java/io/druid/cli/convert/ConvertProperties.java index 7c97b4a8012..314ec107194 100644 --- a/services/src/main/java/io/druid/cli/convert/ConvertProperties.java +++ b/services/src/main/java/io/druid/cli/convert/ConvertProperties.java @@ -85,6 +85,8 @@ public class ConvertProperties implements Runnable new Rename("druid.indexer.terminateResources.duration", "druid.indexer.autoscale.terminatePeriod"), new Rename("druid.indexer.terminateResources.originDateTime", "druid.indexer.autoscale.originTime"), new Rename("druid.indexer.autoscaling.strategy", "druid.indexer.autoscale.strategy"), + new Rename("druid.indexer.logs.s3bucket", "druid.indexer.logs.s3Bucket"), + new Rename("druid.indexer.logs.s3prefix", "druid.indexer.logs.s3Prefix"), new Rename("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion", "druid.indexer.autoscale.workerIdleTimeout"), new Rename("druid.indexer.maxScalingDuration", "druid.indexer.autoscale.scalingTimeout"), new Rename("druid.indexer.numEventsToTrack", "druid.indexer.autoscale.numEventsToTrack"), @@ -122,7 +124,7 @@ public class ConvertProperties implements Runnable } File outFile = new File(outFilename); - if (!outFile.getParentFile().exists()) { + if (outFile.getParentFile() != null && !outFile.getParentFile().exists()) { outFile.getParentFile().mkdirs(); } @@ -144,8 +146,10 @@ public class ConvertProperties implements Runnable for (PropertyConverter converter : converters) { if (converter.canHandle(property)) { for (Map.Entry entry : converter.convert(fromFile).entrySet()) { - ++count; - updatedProps.setProperty(entry.getKey(), entry.getValue()); + if (entry.getValue() != null) { + ++count; + updatedProps.setProperty(entry.getKey(), entry.getValue()); + } } handled = true; } diff --git a/services/src/main/java/io/druid/cli/convert/Rename.java b/services/src/main/java/io/druid/cli/convert/Rename.java index 19139d8e363..f2297782a48 100644 --- a/services/src/main/java/io/druid/cli/convert/Rename.java +++ b/services/src/main/java/io/druid/cli/convert/Rename.java @@ -49,6 +49,11 @@ public class Rename implements PropertyConverter @Override public Map convert(Properties properties) { - return ImmutableMap.of(newProperty, properties.getProperty(property)); + final String value = properties.getProperty(property); + if (value != null) { + return ImmutableMap.of(newProperty, value); + } else { + return ImmutableMap.of(); + } } } diff --git a/services/src/main/java/io/druid/guice/RealtimeModule.java b/services/src/main/java/io/druid/guice/RealtimeModule.java index 13c182d352e..f3bfb156547 100644 --- a/services/src/main/java/io/druid/guice/RealtimeModule.java +++ b/services/src/main/java/io/druid/guice/RealtimeModule.java @@ -26,7 +26,13 @@ import com.google.inject.Binder; import com.google.inject.Key; import com.google.inject.TypeLiteral; import com.google.inject.multibindings.MapBinder; +import druid.examples.flights.FlightsFirehoseFactory; +import druid.examples.rand.RandomFirehoseFactory; +import druid.examples.twitter.TwitterSpritzerFirehoseFactory; +import druid.examples.web.WebFirehoseFactory; import io.druid.cli.QueryJettyServerInitializer; +import io.druid.indexing.common.index.EventReceiverFirehoseFactory; +import io.druid.indexing.common.index.StaticS3FirehoseFactory; import io.druid.initialization.DruidModule; import io.druid.query.QuerySegmentWalker; import io.druid.segment.realtime.DbSegmentPublisher; @@ -34,7 +40,11 @@ import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.NoopSegmentPublisher; import io.druid.segment.realtime.RealtimeManager; import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; +import io.druid.segment.realtime.firehose.IrcFirehoseFactory; import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; +import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory; +import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.server.initialization.JettyServerInitializer; import org.eclipse.jetty.server.Server; @@ -80,7 +90,17 @@ public class RealtimeModule implements DruidModule return Arrays.asList( new SimpleModule("RealtimeModule") .registerSubtypes( - new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2") + new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), + new NamedType(FlightsFirehoseFactory.class, "flights"), + new NamedType(RandomFirehoseFactory.class, "rand"), + new NamedType(WebFirehoseFactory.class, "webstream"), + new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"), + new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), + new NamedType(ClippedFirehoseFactory.class, "clipped"), + new NamedType(TimedShutoffFirehoseFactory.class, "timed"), + new NamedType(IrcFirehoseFactory.class, "irc"), + new NamedType(StaticS3FirehoseFactory.class, "s3"), + new NamedType(EventReceiverFirehoseFactory.class, "receiver") ) ); }