clean up realtime module and fix breakage in broker paths

This commit is contained in:
fjy 2013-09-23 16:26:05 -07:00
parent 1750b702d7
commit 1ff04412a2
3 changed files with 57 additions and 5 deletions

View File

@ -20,6 +20,7 @@
package io.druid.segment.realtime;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.db.DbTablesConfig;
import io.druid.timeline.DataSegment;
@ -40,6 +41,7 @@ public class DbSegmentPublisher implements SegmentPublisher
private final DbTablesConfig config;
private final IDBI dbi;
@Inject
public DbSegmentPublisher(
ObjectMapper jsonMapper,
DbTablesConfig config,

View File

@ -21,7 +21,9 @@ package io.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.client.BrokerServerView;
@ -42,9 +44,18 @@ import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.ClientInfoResource;
import io.druid.server.ClientQuerySegmentWalker;
import io.druid.server.QueryServlet;
import io.druid.server.StatusResource;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.metrics.MetricsModule;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
import java.util.List;
@ -81,7 +92,7 @@ public class CliBroker extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class);
binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(BrokerJettyServerInitializer.class).in(LazySingleton.class);
Jerseys.addResource(binder, ClientInfoResource.class);
DiscoveryModule.register(binder, Self.class);
@ -92,4 +103,28 @@ public class CliBroker extends ServerRunnable
}
);
}
private static class BrokerJettyServerInitializer implements JettyServerInitializer
{
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler resources = new ServletContextHandler(ServletContextHandler.SESSIONS);
resources.addServlet(new ServletHolder(new DefaultServlet()), "/druid/v2/datasources/*");
resources.addFilter(GuiceFilter.class, "/druid/v2/datasources/*", null);
final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS);
queries.setResourceBase("/");
queries.addServlet(new ServletHolder(injector.getInstance(QueryServlet.class)), "/druid/v2/*");
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{resources, queries, root, new DefaultHandler()});
server.setHandler(handlerList);
}
}
}

View File

@ -23,11 +23,15 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import io.druid.cli.QueryJettyServerInitializer;
import io.druid.initialization.DruidModule;
import io.druid.query.QuerySegmentWalker;
import io.druid.segment.realtime.DbSegmentPublisher;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.NoopSegmentPublisher;
import io.druid.segment.realtime.RealtimeManager;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
@ -44,11 +48,22 @@ public class RealtimeModule implements DruidModule
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.publish", SegmentPublisherProvider.class);
binder.bind(SegmentPublisher.class).toProvider(SegmentPublisherProvider.class);
PolyBind.createChoice(
binder,
"druid.publish.type",
Key.get(SegmentPublisher.class),
Key.get(NoopSegmentPublisher.class)
);
final MapBinder<String, SegmentPublisher> publisherBinder = PolyBind.optionBinder(binder, Key.get(SegmentPublisher.class));
publisherBinder.addBinding("db").to(DbSegmentPublisher.class);
binder.bind(DbSegmentPublisher.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class);
binder.bind(new TypeLiteral<List<FireDepartment>>(){})
binder.bind(
new TypeLiteral<List<FireDepartment>>()
{
}
)
.toProvider(FireDepartmentsProvider.class)
.in(LazySingleton.class);