Fixes for property conversion, firehose registration, and the indexing service

This commit is contained in:
Gian Merlino 2013-09-27 17:09:59 -07:00
parent 1f3aae6edf
commit dc5dab8747
11 changed files with 135 additions and 14 deletions

View File

@ -200,12 +200,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
command.add("io.druid.cli.Main"); command.add("io.druid.cli.Main");
command.add("internal"); command.add("internal");
command.add("peon"); command.add("peon");
command.add(taskFile.toString());
command.add(statusFile.toString());
String nodeType = task.getNodeType(); String nodeType = task.getNodeType();
if (nodeType != null) { if (nodeType != null) {
command.add(String.format("--nodeType %s", nodeType)); command.add(String.format("--nodeType %s", nodeType));
} }
command.add(taskFile.toString());
command.add(statusFile.toString());
jsonMapper.writeValue(taskFile, task); jsonMapper.writeValue(taskFile, task);

View File

@ -25,5 +25,6 @@ public interface Server
{ {
public String getScheme(); public String getScheme();
public String getHost(); public String getHost();
public String getAddress();
public int getPort(); public int getPort();
} }

View File

@ -62,7 +62,7 @@ public class CuratorServiceAnnouncer implements ServiceAnnouncer
try { try {
instance = ServiceInstance.<Void>builder() instance = ServiceInstance.<Void>builder()
.name(serviceName) .name(serviceName)
.address(service.getHost()) .address(service.getHostNoPort())
.port(service.getPort()) .port(service.getPort())
.build(); .build();
} }

View File

@ -147,9 +147,9 @@ public class DiscoveryModule implements Module
final Injector injector, final Injector injector,
final Set<KeyHolder<DruidNode>> nodesToAnnounce, final Set<KeyHolder<DruidNode>> nodesToAnnounce,
final Lifecycle lifecycle final Lifecycle lifecycle
) ) throws Exception
{ {
lifecycle.addHandler( lifecycle.addMaybeStartHandler(
new Lifecycle.Handler() new Lifecycle.Handler()
{ {
private volatile List<DruidNode> nodes = null; private volatile List<DruidNode> nodes = null;
@ -203,7 +203,7 @@ public class DiscoveryModule implements Module
.client(curator) .client(curator)
.build(); .build();
lifecycle.addHandler( lifecycle.addMaybeStartHandler(
new Lifecycle.Handler() new Lifecycle.Handler()
{ {
@Override @Override

View File

@ -63,6 +63,12 @@ public class ServerDiscoverySelector implements DiscoverySelector<Server>
{ {
@Override @Override
public String getHost() public String getHost()
{
return String.format("%s:%d", getAddress(), getPort());
}
@Override
public String getAddress()
{ {
return instance.getAddress(); return instance.getAddress();
} }

View File

@ -19,6 +19,8 @@
package io.druid.cli; package io.druid.cli;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Injector; import com.google.inject.Injector;
@ -28,6 +30,10 @@ import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder; import com.google.inject.multibindings.MapBinder;
import com.google.inject.servlet.GuiceFilter; import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import io.airlift.command.Command; import io.airlift.command.Command;
import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.IndexingServiceModuleHelper;
import io.druid.guice.JacksonConfigProvider; import io.druid.guice.JacksonConfigProvider;
@ -41,6 +47,8 @@ import io.druid.guice.PolyBind;
import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskLogs; import io.druid.indexing.common.tasklogs.TaskLogs;
@ -69,6 +77,12 @@ import io.druid.indexing.coordinator.scaling.ResourceManagementStrategy;
import io.druid.indexing.coordinator.scaling.SimpleResourceManagementConfig; import io.druid.indexing.coordinator.scaling.SimpleResourceManagementConfig;
import io.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy; import io.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy;
import io.druid.indexing.coordinator.setup.WorkerSetupData; import io.druid.indexing.coordinator.setup.WorkerSetupData;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import io.druid.server.http.RedirectFilter; import io.druid.server.http.RedirectFilter;
import io.druid.server.http.RedirectInfo; import io.druid.server.http.RedirectInfo;
import io.druid.server.initialization.JettyServerInitializer; import io.druid.server.initialization.JettyServerInitializer;
@ -84,6 +98,7 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter; import org.eclipse.jetty.servlets.GzipFilter;
import org.eclipse.jetty.util.resource.ResourceCollection; import org.eclipse.jetty.util.resource.ResourceCollection;
import java.util.Arrays;
import java.util.List; import java.util.List;
/** /**
@ -105,7 +120,7 @@ public class CliOverlord extends ServerRunnable
protected List<Object> getModules() protected List<Object> getModules()
{ {
return ImmutableList.<Object>of( return ImmutableList.<Object>of(
new Module() new DruidModule()
{ {
@Override @Override
public void configure(Binder binder) public void configure(Binder binder)
@ -199,6 +214,27 @@ public class CliOverlord extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class);
} }
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return Arrays.<com.fasterxml.jackson.databind.Module>asList(
new SimpleModule("RealtimeModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream"),
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"),
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
new NamedType(ClippedFirehoseFactory.class, "clipped"),
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(StaticS3FirehoseFactory.class, "s3"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver")
)
);
}
} }
); );
} }

View File

@ -19,6 +19,8 @@
package io.druid.cli; package io.druid.cli;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.inject.Binder; import com.google.inject.Binder;
@ -28,6 +30,10 @@ import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder; import com.google.inject.multibindings.MapBinder;
import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import io.airlift.command.Arguments; import io.airlift.command.Arguments;
import io.airlift.command.Command; import io.airlift.command.Command;
import io.airlift.command.Option; import io.airlift.command.Option;
@ -45,18 +51,26 @@ import io.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.index.ChatHandlerProvider; import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.indexing.common.index.EventReceivingChatHandlerProvider; import io.druid.indexing.common.index.EventReceivingChatHandlerProvider;
import io.druid.indexing.common.index.NoopChatHandlerProvider; import io.druid.indexing.common.index.NoopChatHandlerProvider;
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
import io.druid.indexing.coordinator.TaskRunner; import io.druid.indexing.coordinator.TaskRunner;
import io.druid.indexing.coordinator.ThreadPoolTaskRunner; import io.druid.indexing.coordinator.ThreadPoolTaskRunner;
import io.druid.indexing.worker.executor.ChatHandlerResource; import io.druid.indexing.worker.executor.ChatHandlerResource;
import io.druid.indexing.worker.executor.ExecutorLifecycle; import io.druid.indexing.worker.executor.ExecutorLifecycle;
import io.druid.indexing.worker.executor.ExecutorLifecycleConfig; import io.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import io.druid.initialization.DruidModule;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.S3DataSegmentKiller; import io.druid.segment.loading.S3DataSegmentKiller;
import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import io.druid.server.initialization.JettyServerInitializer; import io.druid.server.initialization.JettyServerInitializer;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -90,7 +104,7 @@ public class CliPeon extends GuiceRunnable
protected List<Object> getModules() protected List<Object> getModules()
{ {
return ImmutableList.<Object>of( return ImmutableList.<Object>of(
new Module() new DruidModule()
{ {
@Override @Override
public void configure(Binder binder) public void configure(Binder binder)
@ -143,6 +157,27 @@ public class CliPeon extends GuiceRunnable
LifecycleModule.register(binder, Server.class); LifecycleModule.register(binder, Server.class);
} }
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return Arrays.<com.fasterxml.jackson.databind.Module>asList(
new SimpleModule("RealtimeModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream"),
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"),
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
new NamedType(ClippedFirehoseFactory.class, "clipped"),
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(StaticS3FirehoseFactory.class, "s3"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver")
)
);
}
} }
); );
} }

View File

@ -35,9 +35,16 @@ import io.druid.client.InventoryView;
import io.druid.client.ServerView; import io.druid.client.ServerView;
import io.druid.guice.NoopSegmentPublisherProvider; import io.druid.guice.NoopSegmentPublisherProvider;
import io.druid.guice.RealtimeModule; import io.druid.guice.RealtimeModule;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
import io.druid.initialization.DruidModule; import io.druid.initialization.DruidModule;
import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.SegmentPublisher; import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
@ -88,7 +95,14 @@ public class CliRealtimeExample extends ServerRunnable
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"), new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"), new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream") new NamedType(WebFirehoseFactory.class, "webstream"),
new NamedType(KafkaFirehoseFactory.class, "kafka"),
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
new NamedType(ClippedFirehoseFactory.class, "clipped"),
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(StaticS3FirehoseFactory.class, "s3"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver")
) )
); );
} }

View File

@ -85,6 +85,8 @@ public class ConvertProperties implements Runnable
new Rename("druid.indexer.terminateResources.duration", "druid.indexer.autoscale.terminatePeriod"), new Rename("druid.indexer.terminateResources.duration", "druid.indexer.autoscale.terminatePeriod"),
new Rename("druid.indexer.terminateResources.originDateTime", "druid.indexer.autoscale.originTime"), new Rename("druid.indexer.terminateResources.originDateTime", "druid.indexer.autoscale.originTime"),
new Rename("druid.indexer.autoscaling.strategy", "druid.indexer.autoscale.strategy"), new Rename("druid.indexer.autoscaling.strategy", "druid.indexer.autoscale.strategy"),
new Rename("druid.indexer.logs.s3bucket", "druid.indexer.logs.s3Bucket"),
new Rename("druid.indexer.logs.s3prefix", "druid.indexer.logs.s3Prefix"),
new Rename("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion", "druid.indexer.autoscale.workerIdleTimeout"), new Rename("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion", "druid.indexer.autoscale.workerIdleTimeout"),
new Rename("druid.indexer.maxScalingDuration", "druid.indexer.autoscale.scalingTimeout"), new Rename("druid.indexer.maxScalingDuration", "druid.indexer.autoscale.scalingTimeout"),
new Rename("druid.indexer.numEventsToTrack", "druid.indexer.autoscale.numEventsToTrack"), new Rename("druid.indexer.numEventsToTrack", "druid.indexer.autoscale.numEventsToTrack"),
@ -122,7 +124,7 @@ public class ConvertProperties implements Runnable
} }
File outFile = new File(outFilename); File outFile = new File(outFilename);
if (!outFile.getParentFile().exists()) { if (outFile.getParentFile() != null && !outFile.getParentFile().exists()) {
outFile.getParentFile().mkdirs(); outFile.getParentFile().mkdirs();
} }
@ -144,8 +146,10 @@ public class ConvertProperties implements Runnable
for (PropertyConverter converter : converters) { for (PropertyConverter converter : converters) {
if (converter.canHandle(property)) { if (converter.canHandle(property)) {
for (Map.Entry<String, String> entry : converter.convert(fromFile).entrySet()) { for (Map.Entry<String, String> entry : converter.convert(fromFile).entrySet()) {
++count; if (entry.getValue() != null) {
updatedProps.setProperty(entry.getKey(), entry.getValue()); ++count;
updatedProps.setProperty(entry.getKey(), entry.getValue());
}
} }
handled = true; handled = true;
} }

View File

@ -49,6 +49,11 @@ public class Rename implements PropertyConverter
@Override @Override
public Map<String, String> convert(Properties properties) public Map<String, String> convert(Properties properties)
{ {
return ImmutableMap.of(newProperty, properties.getProperty(property)); final String value = properties.getProperty(property);
if (value != null) {
return ImmutableMap.of(newProperty, value);
} else {
return ImmutableMap.of();
}
} }
} }

View File

@ -26,7 +26,13 @@ import com.google.inject.Binder;
import com.google.inject.Key; import com.google.inject.Key;
import com.google.inject.TypeLiteral; import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder; import com.google.inject.multibindings.MapBinder;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import io.druid.cli.QueryJettyServerInitializer; import io.druid.cli.QueryJettyServerInitializer;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
import io.druid.initialization.DruidModule; import io.druid.initialization.DruidModule;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.segment.realtime.DbSegmentPublisher; import io.druid.segment.realtime.DbSegmentPublisher;
@ -34,7 +40,11 @@ import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.NoopSegmentPublisher; import io.druid.segment.realtime.NoopSegmentPublisher;
import io.druid.segment.realtime.RealtimeManager; import io.druid.segment.realtime.RealtimeManager;
import io.druid.segment.realtime.SegmentPublisher; import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import io.druid.server.initialization.JettyServerInitializer; import io.druid.server.initialization.JettyServerInitializer;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -80,7 +90,17 @@ public class RealtimeModule implements DruidModule
return Arrays.<Module>asList( return Arrays.<Module>asList(
new SimpleModule("RealtimeModule") new SimpleModule("RealtimeModule")
.registerSubtypes( .registerSubtypes(
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2") new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream"),
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"),
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
new NamedType(ClippedFirehoseFactory.class, "clipped"),
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(StaticS3FirehoseFactory.class, "s3"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver")
) )
); );
} }