update code according to code review

This commit is contained in:
fjy 2013-08-22 10:46:05 -07:00
parent 3f51ef63a4
commit 6a8c160740
14 changed files with 61 additions and 89 deletions

View File

@ -34,19 +34,11 @@ public class DruidServerConfig
@JsonProperty @JsonProperty
private String tier = "_default_tier"; private String tier = "_default_tier";
@JsonProperty
private String type = "historical";
public long getMaxSize() public long getMaxSize()
{ {
return maxSize; return maxSize;
} }
public String getType()
{
return type;
}
public String getTier() public String getTier()
{ {
return tier; return tier;

View File

@ -2,47 +2,24 @@ package druid.examples.guice;
import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Provides; import com.google.inject.TypeLiteral;
import com.google.inject.ProvisionException;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.DruidProcessingConfig;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.client.InventoryView; import com.metamx.druid.client.InventoryView;
import com.metamx.druid.client.ServerView; import com.metamx.druid.client.ServerView;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata; import com.metamx.druid.guice.FireDepartmentsProvider;
import com.metamx.druid.guice.ConfigProvider;
import com.metamx.druid.guice.JsonConfigProvider; import com.metamx.druid.guice.JsonConfigProvider;
import com.metamx.druid.guice.LazySingleton;
import com.metamx.druid.guice.ManageLifecycle; import com.metamx.druid.guice.ManageLifecycle;
import com.metamx.druid.guice.NoopSegmentPublisherProvider; import com.metamx.druid.guice.NoopSegmentPublisherProvider;
import com.metamx.druid.guice.RealtimeManagerConfig; import com.metamx.druid.guice.RealtimeManagerConfig;
import com.metamx.druid.guice.RealtimeManagerProvider;
import com.metamx.druid.guice.annotations.Global;
import com.metamx.druid.guice.annotations.Processing;
import com.metamx.druid.guice.annotations.Self;
import com.metamx.druid.initialization.DruidModule; import com.metamx.druid.initialization.DruidModule;
import com.metamx.druid.initialization.DruidNode;
import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.MMappedQueryableIndexFactory; import com.metamx.druid.realtime.FireDepartment;
import com.metamx.druid.loading.QueryableIndexFactory;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate;
import com.metamx.druid.query.MetricsEmittingExecutorService;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.realtime.RealtimeManager;
import com.metamx.druid.realtime.SegmentPublisher; import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import druid.examples.flights.FlightsFirehoseFactory; import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory; import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory; import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
@ -50,20 +27,15 @@ import druid.examples.web.WebFirehoseFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
/** /**
*/ */
public class RealtimeStandaloneModule implements DruidModule public class RealtimeExampleModule implements DruidModule
{ {
private static final Logger log = new Logger(RealtimeStandaloneModule.class); private static final Logger log = new Logger(RealtimeExampleModule.class);
@Override @Override
public void configure(Binder binder) public void configure(Binder binder)
@ -75,14 +47,18 @@ public class RealtimeStandaloneModule implements DruidModule
binder.bind(ServerView.class).to(NoopServerView.class); binder.bind(ServerView.class).to(NoopServerView.class);
JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class); JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class);
binder.bind(RealtimeManager.class).toProvider(RealtimeManagerProvider.class).in(ManageLifecycle.class); binder.bind(
new TypeLiteral<List<FireDepartment>>()
{
}
).toProvider(FireDepartmentsProvider.class).in(ManageLifecycle.class);
} }
@Override @Override
public List<com.fasterxml.jackson.databind.Module> getJacksonModules() public List<com.fasterxml.jackson.databind.Module> getJacksonModules()
{ {
return Arrays.<com.fasterxml.jackson.databind.Module>asList( return Arrays.<com.fasterxml.jackson.databind.Module>asList(
new SimpleModule("RealtimestandAloneModule") new SimpleModule("RealtimeExampleModule")
.registerSubtypes( .registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"), new NamedType(FlightsFirehoseFactory.class, "flights"),

View File

@ -21,6 +21,7 @@ package com.metamx.druid.guice;
import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.db.DbTablesConfig;
import com.metamx.druid.realtime.DbSegmentPublisher; import com.metamx.druid.realtime.DbSegmentPublisher;
import com.metamx.druid.realtime.DbSegmentPublisherConfig; import com.metamx.druid.realtime.DbSegmentPublisherConfig;
import com.metamx.druid.realtime.SegmentPublisher; import com.metamx.druid.realtime.SegmentPublisher;
@ -38,7 +39,7 @@ public class DbSegmentPublisherProvider implements SegmentPublisherProvider
@JacksonInject @JacksonInject
@NotNull @NotNull
private DbSegmentPublisherConfig config = null; private DbTablesConfig config = null;
@JacksonInject @JacksonInject
@NotNull @NotNull

View File

@ -21,33 +21,26 @@ package com.metamx.druid.guice;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Provider; import com.google.inject.Provider;
import com.metamx.common.logger.Logger;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.realtime.FireDepartment; import com.metamx.druid.realtime.FireDepartment;
import com.metamx.druid.realtime.RealtimeManager;
import java.util.List; import java.util.List;
/** /**
*/ */
public class RealtimeManagerProvider implements Provider<RealtimeManager> public class FireDepartmentsProvider implements Provider<List<FireDepartment>>
{ {
private static final Logger log = new Logger(RealtimeManagerProvider.class);
private final QueryRunnerFactoryConglomerate conglomerate;
private final List<FireDepartment> fireDepartments = Lists.newArrayList(); private final List<FireDepartment> fireDepartments = Lists.newArrayList();
@Inject @Inject
public RealtimeManagerProvider( public FireDepartmentsProvider(
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
RealtimeManagerConfig config, RealtimeManagerConfig config
QueryRunnerFactoryConglomerate conglomerate
) )
{ {
this.conglomerate = conglomerate;
try { try {
this.fireDepartments.addAll( this.fireDepartments.addAll(
(List<FireDepartment>) jsonMapper.readValue( (List<FireDepartment>) jsonMapper.readValue(
@ -58,14 +51,14 @@ public class RealtimeManagerProvider implements Provider<RealtimeManager>
); );
} }
catch (Exception e) { catch (Exception e) {
log.error(e, "Unable to read fireDepartments from config"); throw Throwables.propagate(e);
} }
} }
@Override @Override
public RealtimeManager get() public List<FireDepartment> get()
{ {
return new RealtimeManager(fireDepartments, conglomerate); return fireDepartments;
} }
} }

View File

@ -21,11 +21,15 @@ package com.metamx.druid.guice;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Module; import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.realtime.DbSegmentPublisherConfig; import com.metamx.druid.realtime.DbSegmentPublisherConfig;
import com.metamx.druid.realtime.FireDepartment;
import com.metamx.druid.realtime.RealtimeManager; import com.metamx.druid.realtime.RealtimeManager;
import com.metamx.druid.realtime.SegmentPublisher; import com.metamx.druid.realtime.SegmentPublisher;
import java.util.List;
/** /**
*/ */
public class RealtimeModule implements Module public class RealtimeModule implements Module
@ -36,10 +40,14 @@ public class RealtimeModule implements Module
public void configure(Binder binder) public void configure(Binder binder)
{ {
JsonConfigProvider.bind(binder, "druid.publish", SegmentPublisherProvider.class); JsonConfigProvider.bind(binder, "druid.publish", SegmentPublisherProvider.class);
JsonConfigProvider.bind(binder, "druid.db.tables", DbSegmentPublisherConfig.class);
binder.bind(SegmentPublisher.class).toProvider(SegmentPublisherProvider.class); binder.bind(SegmentPublisher.class).toProvider(SegmentPublisherProvider.class);
JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class); JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class);
binder.bind(RealtimeManager.class).toProvider(RealtimeManagerProvider.class).in(ManageLifecycle.class); binder.bind(
new TypeLiteral<List<FireDepartment>>()
{
}
).toProvider(FireDepartmentsProvider.class).in(ManageLifecycle.class);
binder.bind(RealtimeManager.class).in(ManageLifecycle.class);
} }
} }

View File

@ -3,6 +3,7 @@ package com.metamx.druid.realtime;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.db.DbTablesConfig;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.IDBI;
@ -17,12 +18,12 @@ public class DbSegmentPublisher implements SegmentPublisher
private static final Logger log = new Logger(DbSegmentPublisher.class); private static final Logger log = new Logger(DbSegmentPublisher.class);
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final DbSegmentPublisherConfig config; private final DbTablesConfig config;
private final IDBI dbi; private final IDBI dbi;
public DbSegmentPublisher( public DbSegmentPublisher(
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
DbSegmentPublisherConfig config, DbTablesConfig config,
IDBI dbi IDBI dbi
) )
{ {
@ -41,7 +42,7 @@ public class DbSegmentPublisher implements SegmentPublisher
public List<Map<String, Object>> withHandle(Handle handle) throws Exception public List<Map<String, Object>> withHandle(Handle handle) throws Exception
{ {
return handle.createQuery( return handle.createQuery(
String.format("SELECT id FROM %s WHERE id=:id", config.getSegmentTable()) String.format("SELECT id FROM %s WHERE id=:id", config.getSegmentsTable())
) )
.bind("id", segment.getIdentifier()) .bind("id", segment.getIdentifier())
.list(); .list();
@ -65,13 +66,13 @@ public class DbSegmentPublisher implements SegmentPublisher
statement = String.format( statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentTable() config.getSegmentsTable()
); );
} else { } else {
statement = String.format( statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentTable() config.getSegmentsTable()
); );
} }

View File

@ -17,11 +17,14 @@ public class DataSegmentPusherModule implements Module
public void configure(Binder binder) public void configure(Binder binder)
{ {
JsonConfigProvider.bind(binder, "druid.pusher", DataSegmentPusherProvider.class); JsonConfigProvider.bind(binder, "druid.pusher", DataSegmentPusherProvider.class);
JsonConfigProvider.bind(binder, "druid.pusher.local", LocalDataSegmentPusherConfig.class);
JsonConfigProvider.bind(binder, "druid.pusher.s3", S3DataSegmentPusherConfig.class); JsonConfigProvider.bind(binder, "druid.pusher.s3", S3DataSegmentPusherConfig.class);
binder.bind(Configuration.class).toInstance(new Configuration()); binder.bind(Configuration.class).toInstance(new Configuration());
JsonConfigProvider.bind(binder, "druid.pusher.hdfs", HdfsDataSegmentPusherConfig.class); JsonConfigProvider.bind(binder, "druid.pusher.hdfs", HdfsDataSegmentPusherConfig.class);
JsonConfigProvider.bind(binder, "druid.pusher.cassandra", CassandraDataSegmentConfig.class); JsonConfigProvider.bind(binder, "druid.pusher.cassandra", CassandraDataSegmentConfig.class);
binder.bind(DataSegmentPusher.class).toProvider(DataSegmentPusherProvider.class); binder.bind(DataSegmentPusher.class).toProvider(DataSegmentPusherProvider.class);
} }
} }

View File

@ -29,12 +29,8 @@ import javax.validation.constraints.NotNull;
/** /**
*/ */
public class LocalDataSegmentPusherProvider implements DataSegmentPusherProvider public class LocalDataSegmentPusherProvider extends LocalDataSegmentPusherConfig implements DataSegmentPusherProvider
{ {
@JacksonInject
@NotNull
private LocalDataSegmentPusherConfig config = null;
@JacksonInject @JacksonInject
@NotNull @NotNull
private ObjectMapper jsonMapper = null; private ObjectMapper jsonMapper = null;
@ -42,6 +38,6 @@ public class LocalDataSegmentPusherProvider implements DataSegmentPusherProvider
@Override @Override
public DataSegmentPusher get() public DataSegmentPusher get()
{ {
return new LocalDataSegmentPusher(config, jsonMapper); return new LocalDataSegmentPusher(this, jsonMapper);
} }
} }

View File

@ -35,6 +35,13 @@ import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
*/ */
public class StorageNodeModule extends ServerModule public class StorageNodeModule extends ServerModule
{ {
private final String nodeType;
public StorageNodeModule(String nodeType)
{
this.nodeType = nodeType;
}
@Override @Override
public void configure(Binder binder) public void configure(Binder binder)
{ {
@ -58,7 +65,7 @@ public class StorageNodeModule extends ServerModule
node.getHost(), node.getHost(),
node.getHost(), node.getHost(),
config.getMaxSize(), config.getMaxSize(),
config.getType(), nodeType,
config.getTier() config.getTier()
); );
} }

View File

@ -20,10 +20,8 @@
package com.metamx.druid.metrics; package com.metamx.druid.metrics;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists;
import com.metamx.metrics.JvmMonitor;
import com.metamx.metrics.Monitor; import com.metamx.metrics.Monitor;
import com.metamx.metrics.SysMonitor;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.util.List; import java.util.List;
@ -34,10 +32,7 @@ public class MonitorsConfig
{ {
@JsonProperty("monitors") @JsonProperty("monitors")
@NotNull @NotNull
private List<Class<? extends Monitor>> monitors = ImmutableList.<Class<? extends Monitor>>builder() private List<Class<? extends Monitor>> monitors = Lists.newArrayList();
.add(JvmMonitor.class)
.add(SysMonitor.class)
.build();
public List<Class<? extends Monitor>> getMonitors() public List<Class<? extends Monitor>> getMonitors()
{ {

View File

@ -72,7 +72,7 @@ public class CliHistorical extends ServerRunnable
S3Module.class, S3Module.class,
DataSegmentPullerModule.class, DataSegmentPullerModule.class,
new MetricsModule().register(ServerMonitor.class), new MetricsModule().register(ServerMonitor.class),
StorageNodeModule.class, new StorageNodeModule("historical"),
new JettyServerModule(new QueryJettyServerInitializer()) new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class), .addResource(StatusResource.class),
new QueryableModule(ServerManager.class), new QueryableModule(ServerManager.class),

View File

@ -74,7 +74,7 @@ public class CliRealtime extends ServerRunnable
S3Module.class, S3Module.class,
DataSegmentPusherModule.class, DataSegmentPusherModule.class,
new MetricsModule(), new MetricsModule(),
StorageNodeModule.class, new StorageNodeModule("realtime"),
new JettyServerModule(new QueryJettyServerInitializer()) new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class), .addResource(StatusResource.class),
new ServerViewModule(), new ServerViewModule(),

View File

@ -31,20 +31,20 @@ import com.metamx.druid.initialization.EmitterModule;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.JettyServerModule; import com.metamx.druid.initialization.JettyServerModule;
import com.metamx.druid.realtime.RealtimeManager; import com.metamx.druid.realtime.RealtimeManager;
import druid.examples.guice.RealtimeStandaloneModule; import druid.examples.guice.RealtimeExampleModule;
import io.airlift.command.Command; import io.airlift.command.Command;
/** /**
*/ */
@Command( @Command(
name = "realtimeStandalone", name = "example realtime",
description = "Runs a standalone realtime node for examples, see https://github.com/metamx/druid/wiki/Realtime for a description" description = "Runs a standalone realtime node for examples, see https://github.com/metamx/druid/wiki/Realtime for a description"
) )
public class CliRealtimeStandalone extends ServerRunnable public class CliRealtimeExample extends ServerRunnable
{ {
private static final Logger log = new Logger(CliBroker.class); private static final Logger log = new Logger(CliBroker.class);
public CliRealtimeStandalone() public CliRealtimeExample()
{ {
super(log); super(log);
} }
@ -56,12 +56,12 @@ public class CliRealtimeStandalone extends ServerRunnable
new LifecycleModule(), new LifecycleModule(),
EmitterModule.class, EmitterModule.class,
DruidProcessingModule.class, DruidProcessingModule.class,
StorageNodeModule.class, new StorageNodeModule("realtime"),
new JettyServerModule(new QueryJettyServerInitializer()) new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class), .addResource(StatusResource.class),
new QueryableModule(RealtimeManager.class), new QueryableModule(RealtimeManager.class),
new QueryRunnerFactoryModule(), new QueryRunnerFactoryModule(),
RealtimeStandaloneModule.class RealtimeExampleModule.class
); );
} }
} }

View File

@ -38,7 +38,7 @@ public class Main
builder.withGroup("server") builder.withGroup("server")
.withDescription("Run one of the Druid server types.") .withDescription("Run one of the Druid server types.")
.withDefaultCommand(Help.class) .withDefaultCommand(Help.class)
.withCommands(CliCoordinator.class, CliHistorical.class, CliBroker.class, CliRealtime.class, CliRealtimeStandalone.class); .withCommands(CliCoordinator.class, CliHistorical.class, CliBroker.class, CliRealtime.class, CliRealtimeExample.class);
builder.build().parse(args).run(); builder.build().parse(args).run();
} }