mirror of https://github.com/apache/druid.git
Merge pull request #253 from metamx/stuff
Fixes for property conversion, firehose registration, and the indexing service
This commit is contained in:
commit
e60387b3d4
|
@ -200,12 +200,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
command.add("io.druid.cli.Main");
|
||||
command.add("internal");
|
||||
command.add("peon");
|
||||
command.add(taskFile.toString());
|
||||
command.add(statusFile.toString());
|
||||
String nodeType = task.getNodeType();
|
||||
if (nodeType != null) {
|
||||
command.add(String.format("--nodeType %s", nodeType));
|
||||
}
|
||||
command.add(taskFile.toString());
|
||||
command.add(statusFile.toString());
|
||||
|
||||
jsonMapper.writeValue(taskFile, task);
|
||||
|
||||
|
|
|
@ -25,5 +25,6 @@ public interface Server
|
|||
{
|
||||
public String getScheme();
|
||||
public String getHost();
|
||||
public String getAddress();
|
||||
public int getPort();
|
||||
}
|
||||
|
|
|
@ -62,7 +62,7 @@ public class CuratorServiceAnnouncer implements ServiceAnnouncer
|
|||
try {
|
||||
instance = ServiceInstance.<Void>builder()
|
||||
.name(serviceName)
|
||||
.address(service.getHost())
|
||||
.address(service.getHostNoPort())
|
||||
.port(service.getPort())
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -147,9 +147,9 @@ public class DiscoveryModule implements Module
|
|||
final Injector injector,
|
||||
final Set<KeyHolder<DruidNode>> nodesToAnnounce,
|
||||
final Lifecycle lifecycle
|
||||
)
|
||||
) throws Exception
|
||||
{
|
||||
lifecycle.addHandler(
|
||||
lifecycle.addMaybeStartHandler(
|
||||
new Lifecycle.Handler()
|
||||
{
|
||||
private volatile List<DruidNode> nodes = null;
|
||||
|
@ -203,7 +203,7 @@ public class DiscoveryModule implements Module
|
|||
.client(curator)
|
||||
.build();
|
||||
|
||||
lifecycle.addHandler(
|
||||
lifecycle.addMaybeStartHandler(
|
||||
new Lifecycle.Handler()
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -63,6 +63,12 @@ public class ServerDiscoverySelector implements DiscoverySelector<Server>
|
|||
{
|
||||
@Override
|
||||
public String getHost()
|
||||
{
|
||||
return String.format("%s:%d", getAddress(), getPort());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getAddress()
|
||||
{
|
||||
return instance.getAddress();
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package io.druid.cli;
|
||||
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Injector;
|
||||
|
@ -28,6 +30,10 @@ import com.google.inject.TypeLiteral;
|
|||
import com.google.inject.multibindings.MapBinder;
|
||||
import com.google.inject.servlet.GuiceFilter;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import druid.examples.flights.FlightsFirehoseFactory;
|
||||
import druid.examples.rand.RandomFirehoseFactory;
|
||||
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
|
||||
import druid.examples.web.WebFirehoseFactory;
|
||||
import io.airlift.command.Command;
|
||||
import io.druid.guice.IndexingServiceModuleHelper;
|
||||
import io.druid.guice.JacksonConfigProvider;
|
||||
|
@ -41,6 +47,8 @@ import io.druid.guice.PolyBind;
|
|||
import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
|
||||
import io.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
import io.druid.indexing.common.actions.TaskActionToolbox;
|
||||
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
|
||||
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
|
||||
import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
|
||||
import io.druid.indexing.common.tasklogs.TaskLogStreamer;
|
||||
import io.druid.indexing.common.tasklogs.TaskLogs;
|
||||
|
@ -69,6 +77,12 @@ import io.druid.indexing.coordinator.scaling.ResourceManagementStrategy;
|
|||
import io.druid.indexing.coordinator.scaling.SimpleResourceManagementConfig;
|
||||
import io.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy;
|
||||
import io.druid.indexing.coordinator.setup.WorkerSetupData;
|
||||
import io.druid.initialization.DruidModule;
|
||||
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
|
||||
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
|
||||
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
|
||||
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
|
||||
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
|
||||
import io.druid.server.http.RedirectFilter;
|
||||
import io.druid.server.http.RedirectInfo;
|
||||
import io.druid.server.initialization.JettyServerInitializer;
|
||||
|
@ -84,6 +98,7 @@ import org.eclipse.jetty.servlet.ServletHolder;
|
|||
import org.eclipse.jetty.servlets.GzipFilter;
|
||||
import org.eclipse.jetty.util.resource.ResourceCollection;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -105,7 +120,7 @@ public class CliOverlord extends ServerRunnable
|
|||
protected List<Object> getModules()
|
||||
{
|
||||
return ImmutableList.<Object>of(
|
||||
new Module()
|
||||
new DruidModule()
|
||||
{
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
|
@ -199,6 +214,27 @@ public class CliOverlord extends ServerRunnable
|
|||
|
||||
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
|
||||
{
|
||||
return Arrays.<com.fasterxml.jackson.databind.Module>asList(
|
||||
new SimpleModule("RealtimeModule")
|
||||
.registerSubtypes(
|
||||
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
|
||||
new NamedType(FlightsFirehoseFactory.class, "flights"),
|
||||
new NamedType(RandomFirehoseFactory.class, "rand"),
|
||||
new NamedType(WebFirehoseFactory.class, "webstream"),
|
||||
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"),
|
||||
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
|
||||
new NamedType(ClippedFirehoseFactory.class, "clipped"),
|
||||
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
|
||||
new NamedType(IrcFirehoseFactory.class, "irc"),
|
||||
new NamedType(StaticS3FirehoseFactory.class, "s3"),
|
||||
new NamedType(EventReceiverFirehoseFactory.class, "receiver")
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package io.druid.cli;
|
||||
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Binder;
|
||||
|
@ -28,6 +30,10 @@ import com.google.inject.Module;
|
|||
import com.google.inject.multibindings.MapBinder;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import druid.examples.flights.FlightsFirehoseFactory;
|
||||
import druid.examples.rand.RandomFirehoseFactory;
|
||||
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
|
||||
import druid.examples.web.WebFirehoseFactory;
|
||||
import io.airlift.command.Arguments;
|
||||
import io.airlift.command.Command;
|
||||
import io.airlift.command.Option;
|
||||
|
@ -45,18 +51,26 @@ import io.druid.indexing.common.actions.RemoteTaskActionClientFactory;
|
|||
import io.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
import io.druid.indexing.common.config.TaskConfig;
|
||||
import io.druid.indexing.common.index.ChatHandlerProvider;
|
||||
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
|
||||
import io.druid.indexing.common.index.EventReceivingChatHandlerProvider;
|
||||
import io.druid.indexing.common.index.NoopChatHandlerProvider;
|
||||
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
|
||||
import io.druid.indexing.coordinator.TaskRunner;
|
||||
import io.druid.indexing.coordinator.ThreadPoolTaskRunner;
|
||||
import io.druid.indexing.worker.executor.ChatHandlerResource;
|
||||
import io.druid.indexing.worker.executor.ExecutorLifecycle;
|
||||
import io.druid.indexing.worker.executor.ExecutorLifecycleConfig;
|
||||
import io.druid.initialization.DruidModule;
|
||||
import io.druid.query.QuerySegmentWalker;
|
||||
import io.druid.segment.loading.DataSegmentKiller;
|
||||
import io.druid.segment.loading.S3DataSegmentKiller;
|
||||
import io.druid.segment.loading.SegmentLoaderConfig;
|
||||
import io.druid.segment.loading.StorageLocationConfig;
|
||||
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
|
||||
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
|
||||
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
|
||||
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
|
||||
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
|
||||
import io.druid.server.initialization.JettyServerInitializer;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
|
||||
|
@ -90,7 +104,7 @@ public class CliPeon extends GuiceRunnable
|
|||
protected List<Object> getModules()
|
||||
{
|
||||
return ImmutableList.<Object>of(
|
||||
new Module()
|
||||
new DruidModule()
|
||||
{
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
|
@ -143,6 +157,27 @@ public class CliPeon extends GuiceRunnable
|
|||
|
||||
LifecycleModule.register(binder, Server.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
|
||||
{
|
||||
return Arrays.<com.fasterxml.jackson.databind.Module>asList(
|
||||
new SimpleModule("RealtimeModule")
|
||||
.registerSubtypes(
|
||||
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
|
||||
new NamedType(FlightsFirehoseFactory.class, "flights"),
|
||||
new NamedType(RandomFirehoseFactory.class, "rand"),
|
||||
new NamedType(WebFirehoseFactory.class, "webstream"),
|
||||
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"),
|
||||
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
|
||||
new NamedType(ClippedFirehoseFactory.class, "clipped"),
|
||||
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
|
||||
new NamedType(IrcFirehoseFactory.class, "irc"),
|
||||
new NamedType(StaticS3FirehoseFactory.class, "s3"),
|
||||
new NamedType(EventReceiverFirehoseFactory.class, "receiver")
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
|
@ -35,9 +35,16 @@ import io.druid.client.InventoryView;
|
|||
import io.druid.client.ServerView;
|
||||
import io.druid.guice.NoopSegmentPublisherProvider;
|
||||
import io.druid.guice.RealtimeModule;
|
||||
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
|
||||
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
|
||||
import io.druid.initialization.DruidModule;
|
||||
import io.druid.segment.loading.DataSegmentPusher;
|
||||
import io.druid.segment.realtime.SegmentPublisher;
|
||||
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
|
||||
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
|
||||
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
|
||||
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
|
||||
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
|
||||
import io.druid.server.coordination.DataSegmentAnnouncer;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
||||
|
@ -88,7 +95,14 @@ public class CliRealtimeExample extends ServerRunnable
|
|||
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
|
||||
new NamedType(FlightsFirehoseFactory.class, "flights"),
|
||||
new NamedType(RandomFirehoseFactory.class, "rand"),
|
||||
new NamedType(WebFirehoseFactory.class, "webstream")
|
||||
new NamedType(WebFirehoseFactory.class, "webstream"),
|
||||
new NamedType(KafkaFirehoseFactory.class, "kafka"),
|
||||
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
|
||||
new NamedType(ClippedFirehoseFactory.class, "clipped"),
|
||||
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
|
||||
new NamedType(IrcFirehoseFactory.class, "irc"),
|
||||
new NamedType(StaticS3FirehoseFactory.class, "s3"),
|
||||
new NamedType(EventReceiverFirehoseFactory.class, "receiver")
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -85,6 +85,8 @@ public class ConvertProperties implements Runnable
|
|||
new Rename("druid.indexer.terminateResources.duration", "druid.indexer.autoscale.terminatePeriod"),
|
||||
new Rename("druid.indexer.terminateResources.originDateTime", "druid.indexer.autoscale.originTime"),
|
||||
new Rename("druid.indexer.autoscaling.strategy", "druid.indexer.autoscale.strategy"),
|
||||
new Rename("druid.indexer.logs.s3bucket", "druid.indexer.logs.s3Bucket"),
|
||||
new Rename("druid.indexer.logs.s3prefix", "druid.indexer.logs.s3Prefix"),
|
||||
new Rename("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion", "druid.indexer.autoscale.workerIdleTimeout"),
|
||||
new Rename("druid.indexer.maxScalingDuration", "druid.indexer.autoscale.scalingTimeout"),
|
||||
new Rename("druid.indexer.numEventsToTrack", "druid.indexer.autoscale.numEventsToTrack"),
|
||||
|
@ -122,7 +124,7 @@ public class ConvertProperties implements Runnable
|
|||
}
|
||||
|
||||
File outFile = new File(outFilename);
|
||||
if (!outFile.getParentFile().exists()) {
|
||||
if (outFile.getParentFile() != null && !outFile.getParentFile().exists()) {
|
||||
outFile.getParentFile().mkdirs();
|
||||
}
|
||||
|
||||
|
@ -144,8 +146,10 @@ public class ConvertProperties implements Runnable
|
|||
for (PropertyConverter converter : converters) {
|
||||
if (converter.canHandle(property)) {
|
||||
for (Map.Entry<String, String> entry : converter.convert(fromFile).entrySet()) {
|
||||
++count;
|
||||
updatedProps.setProperty(entry.getKey(), entry.getValue());
|
||||
if (entry.getValue() != null) {
|
||||
++count;
|
||||
updatedProps.setProperty(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
handled = true;
|
||||
}
|
||||
|
|
|
@ -49,6 +49,11 @@ public class Rename implements PropertyConverter
|
|||
@Override
|
||||
public Map<String, String> convert(Properties properties)
|
||||
{
|
||||
return ImmutableMap.of(newProperty, properties.getProperty(property));
|
||||
final String value = properties.getProperty(property);
|
||||
if (value != null) {
|
||||
return ImmutableMap.of(newProperty, value);
|
||||
} else {
|
||||
return ImmutableMap.of();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,13 @@ import com.google.inject.Binder;
|
|||
import com.google.inject.Key;
|
||||
import com.google.inject.TypeLiteral;
|
||||
import com.google.inject.multibindings.MapBinder;
|
||||
import druid.examples.flights.FlightsFirehoseFactory;
|
||||
import druid.examples.rand.RandomFirehoseFactory;
|
||||
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
|
||||
import druid.examples.web.WebFirehoseFactory;
|
||||
import io.druid.cli.QueryJettyServerInitializer;
|
||||
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
|
||||
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
|
||||
import io.druid.initialization.DruidModule;
|
||||
import io.druid.query.QuerySegmentWalker;
|
||||
import io.druid.segment.realtime.DbSegmentPublisher;
|
||||
|
@ -34,7 +40,11 @@ import io.druid.segment.realtime.FireDepartment;
|
|||
import io.druid.segment.realtime.NoopSegmentPublisher;
|
||||
import io.druid.segment.realtime.RealtimeManager;
|
||||
import io.druid.segment.realtime.SegmentPublisher;
|
||||
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
|
||||
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
|
||||
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
|
||||
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
|
||||
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
|
||||
import io.druid.server.initialization.JettyServerInitializer;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
|
||||
|
@ -80,7 +90,17 @@ public class RealtimeModule implements DruidModule
|
|||
return Arrays.<Module>asList(
|
||||
new SimpleModule("RealtimeModule")
|
||||
.registerSubtypes(
|
||||
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2")
|
||||
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
|
||||
new NamedType(FlightsFirehoseFactory.class, "flights"),
|
||||
new NamedType(RandomFirehoseFactory.class, "rand"),
|
||||
new NamedType(WebFirehoseFactory.class, "webstream"),
|
||||
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"),
|
||||
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
|
||||
new NamedType(ClippedFirehoseFactory.class, "clipped"),
|
||||
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
|
||||
new NamedType(IrcFirehoseFactory.class, "irc"),
|
||||
new NamedType(StaticS3FirehoseFactory.class, "s3"),
|
||||
new NamedType(EventReceiverFirehoseFactory.class, "receiver")
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue