diff --git a/client/src/main/java/com/metamx/druid/coordination/LegacyDataSegmentAnnouncerProvider.java b/client/src/main/java/com/metamx/druid/coordination/LegacyDataSegmentAnnouncerProvider.java index cd99119b2a4..86b1f5fce88 100644 --- a/client/src/main/java/com/metamx/druid/coordination/LegacyDataSegmentAnnouncerProvider.java +++ b/client/src/main/java/com/metamx/druid/coordination/LegacyDataSegmentAnnouncerProvider.java @@ -1,7 +1,9 @@ package com.metamx.druid.coordination; import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.databind.ObjectMapper; import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.druid.guice.annotations.Json; import javax.validation.constraints.NotNull; import java.util.Arrays; @@ -18,10 +20,6 @@ public class LegacyDataSegmentAnnouncerProvider implements DataSegmentAnnouncerP @NotNull private BatchDataSegmentAnnouncer batchAnnouncer = null; - @JacksonInject - @NotNull - private Lifecycle lifecycle = null; - @Override public DataSegmentAnnouncer get() { diff --git a/client/src/main/java/com/metamx/druid/guice/AnnouncerModule.java b/client/src/main/java/com/metamx/druid/guice/AnnouncerModule.java new file mode 100644 index 00000000000..f3a6cd46a05 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/guice/AnnouncerModule.java @@ -0,0 +1,35 @@ +package com.metamx.druid.guice; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.metamx.druid.concurrent.Execs; +import com.metamx.druid.coordination.BatchDataSegmentAnnouncer; +import com.metamx.druid.coordination.DataSegmentAnnouncer; +import com.metamx.druid.coordination.DataSegmentAnnouncerProvider; +import com.metamx.druid.coordination.SingleDataSegmentAnnouncer; +import com.metamx.druid.curator.announcement.Announcer; +import com.metamx.druid.initialization.BatchDataSegmentAnnouncerConfig; +import org.apache.curator.framework.CuratorFramework; + +/** + */ +public class AnnouncerModule implements Module +{ + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.announcer", BatchDataSegmentAnnouncerConfig.class); + JsonConfigProvider.bind(binder, "druid.announcer", DataSegmentAnnouncerProvider.class); + binder.bind(DataSegmentAnnouncer.class).toProvider(DataSegmentAnnouncerProvider.class); + binder.bind(BatchDataSegmentAnnouncer.class).in(ManageLifecycleLast.class); + binder.bind(SingleDataSegmentAnnouncer.class).in(ManageLifecycleLast.class); + } + + @Provides + @ManageLifecycle + public Announcer getAnnouncer(CuratorFramework curator) + { + return new Announcer(curator, Execs.singleThreaded("Announcer-%s")); + } +} diff --git a/examples/bin/ec2/run.sh b/examples/bin/ec2/run.sh index 9f695c1e001..df7438ccf5b 100644 --- a/examples/bin/ec2/run.sh +++ b/examples/bin/ec2/run.sh @@ -7,7 +7,7 @@ cd druid-services-* 2>&1 > /dev/null mkdir logs 2>&1 > /dev/null # Now start a realtime node -nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=config/realtime/realtime.spec -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/realtime com.metamx.druid.realtime.RealtimeMain 2>&1 > logs/realtime.log & +nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=config/realtime/realtime.spec -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/realtime io.druid.cli.Main server realtime 2>&1 > logs/realtime.log & # And a master node nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/master io.druid.cli.Main server coordinator 2>&1 > logs/master.log & diff --git a/examples/bin/run_example_server.sh b/examples/bin/run_example_server.sh index e2cc076ce9c..e970ffcc7e0 100755 --- a/examples/bin/run_example_server.sh +++ b/examples/bin/run_example_server.sh @@ -63,4 +63,4 @@ DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config echo "Running command:" -(set -x; java ${JAVA_ARGS} -classpath ${DRUID_CP} druid.examples.RealtimeStandaloneMain) +(set -x; java ${JAVA_ARGS} -classpath ${DRUID_CP} io.druid.cli.Main druid server realtimeStandalone) diff --git a/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java b/examples/src/main/java/druid/examples/guice/RealtimeExampleModule.java similarity index 52% rename from examples/src/main/java/druid/examples/RealtimeStandaloneMain.java rename to examples/src/main/java/druid/examples/guice/RealtimeExampleModule.java index ebdd59d46c1..45e06db05b1 100644 --- a/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java +++ b/examples/src/main/java/druid/examples/guice/RealtimeExampleModule.java @@ -1,17 +1,25 @@ -package druid.examples; +package druid.examples.guice; import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; -import com.metamx.common.lifecycle.Lifecycle; +import com.google.inject.Binder; +import com.google.inject.TypeLiteral; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.InventoryView; import com.metamx.druid.client.ServerView; import com.metamx.druid.coordination.DataSegmentAnnouncer; +import com.metamx.druid.guice.FireDepartmentsProvider; +import com.metamx.druid.guice.JsonConfigProvider; +import com.metamx.druid.guice.ManageLifecycle; +import com.metamx.druid.guice.NoopSegmentPublisherProvider; +import com.metamx.druid.guice.RealtimeManagerConfig; +import com.metamx.druid.initialization.DruidModule; import com.metamx.druid.loading.DataSegmentPusher; -import com.metamx.druid.log.LogLevelAdjuster; -import com.metamx.druid.realtime.RealtimeNode; +import com.metamx.druid.realtime.FireDepartment; +import com.metamx.druid.realtime.RealtimeManager; import com.metamx.druid.realtime.SegmentPublisher; import druid.examples.flights.FlightsFirehoseFactory; import druid.examples.rand.RandomFirehoseFactory; @@ -20,64 +28,46 @@ import druid.examples.web.WebFirehoseFactory; import java.io.File; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.Executor; /** - * Standalone Demo Realtime process. */ -public class RealtimeStandaloneMain +public class RealtimeExampleModule implements DruidModule { - private static final Logger log = new Logger(RealtimeStandaloneMain.class); + private static final Logger log = new Logger(RealtimeExampleModule.class); - public static void main(String[] args) throws Exception + @Override + public void configure(Binder binder) { - LogLevelAdjuster.register(); + binder.bind(SegmentPublisher.class).toProvider(NoopSegmentPublisherProvider.class); + binder.bind(DataSegmentPusher.class).to(NoopDataSegmentPusher.class); + binder.bind(DataSegmentAnnouncer.class).to(NoopDataSegmentAnnouncer.class); + binder.bind(InventoryView.class).to(NoopInventoryView.class); + binder.bind(ServerView.class).to(NoopServerView.class); - final Lifecycle lifecycle = new Lifecycle(); - - RealtimeNode rn = RealtimeNode.builder().build(); - lifecycle.addManagedInstance(rn); - - // register the Firehoses - rn.registerJacksonSubtype( - new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), - new NamedType(FlightsFirehoseFactory.class, "flights"), - new NamedType(RandomFirehoseFactory.class, "rand"), - new NamedType(WebFirehoseFactory.class, "webstream") + JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class); + binder.bind( + new TypeLiteral>() + { + } + ).toProvider(FireDepartmentsProvider.class); + binder.bind(RealtimeManager.class).in(ManageLifecycle.class); + } + @Override + public List getJacksonModules() + { + return Arrays.asList( + new SimpleModule("RealtimeExampleModule") + .registerSubtypes( + new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), + new NamedType(FlightsFirehoseFactory.class, "flights"), + new NamedType(RandomFirehoseFactory.class, "rand"), + new NamedType(WebFirehoseFactory.class, "webstream") + ) ); - - // Create dummy objects for the various interfaces that interact with the DB, ZK and deep storage - rn.setSegmentPublisher(new NoopSegmentPublisher()); - rn.setAnnouncer(new NoopDataSegmentAnnouncer()); - rn.setDataSegmentPusher(new NoopDataSegmentPusher()); - rn.setServerView(new NoopServerView()); - rn.setInventoryView(new NoopInventoryView()); - - Runtime.getRuntime().addShutdownHook( - new Thread( - new Runnable() - { - @Override - public void run() - { - log.info("Running shutdown hook"); - lifecycle.stop(); - } - } - ) - ); - - try { - lifecycle.start(); - } - catch (Throwable t) { - log.info(t, "Throwable caught at startup, committing seppuku"); - t.printStackTrace(); - System.exit(2); - } - - lifecycle.join(); } private static class NoopServerView implements ServerView @@ -87,7 +77,7 @@ public class RealtimeStandaloneMain Executor exec, ServerCallback callback ) { - + // do nothing } @Override @@ -95,7 +85,7 @@ public class RealtimeStandaloneMain Executor exec, SegmentCallback callback ) { - + // do nothing } } @@ -123,15 +113,6 @@ public class RealtimeStandaloneMain } } - private static class NoopSegmentPublisher implements SegmentPublisher - { - @Override - public void publishSegment(DataSegment segment) throws IOException - { - // do nothing - } - } - private static class NoopDataSegmentAnnouncer implements DataSegmentAnnouncer { @Override @@ -158,4 +139,4 @@ public class RealtimeStandaloneMain // do nothing } } -} \ No newline at end of file +} diff --git a/realtime/src/main/java/com/metamx/druid/guice/DbSegmentPublisherProvider.java b/realtime/src/main/java/com/metamx/druid/guice/DbSegmentPublisherProvider.java new file mode 100644 index 00000000000..5d6d105fc06 --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/guice/DbSegmentPublisherProvider.java @@ -0,0 +1,53 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.metamx.druid.db.DbTablesConfig; +import com.metamx.druid.realtime.DbSegmentPublisher; +import com.metamx.druid.realtime.DbSegmentPublisherConfig; +import com.metamx.druid.realtime.SegmentPublisher; +import org.skife.jdbi.v2.IDBI; + +import javax.validation.constraints.NotNull; + +/** + */ +public class DbSegmentPublisherProvider implements SegmentPublisherProvider +{ + @JacksonInject + @NotNull + private IDBI idbi = null; + + @JacksonInject + @NotNull + private DbTablesConfig config = null; + + @JacksonInject + @NotNull + private ObjectMapper jsonMapper = null; + + @Override + public SegmentPublisher get() + { + return new DbSegmentPublisher(jsonMapper, config, idbi); + } +} diff --git a/realtime/src/main/java/com/metamx/druid/guice/FireDepartmentsProvider.java b/realtime/src/main/java/com/metamx/druid/guice/FireDepartmentsProvider.java new file mode 100644 index 00000000000..5f2fd146bc3 --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/guice/FireDepartmentsProvider.java @@ -0,0 +1,64 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.metamx.druid.realtime.FireDepartment; + +import java.util.List; + +/** + */ +public class FireDepartmentsProvider implements Provider> +{ + private final List fireDepartments = Lists.newArrayList(); + + @Inject + public FireDepartmentsProvider( + ObjectMapper jsonMapper, + RealtimeManagerConfig config + ) + { + try { + this.fireDepartments.addAll( + (List) jsonMapper.readValue( + config.getSpecFile(), new TypeReference>() + { + } + ) + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + + @Override + public List get() + { + return fireDepartments; + } +} diff --git a/realtime/src/main/java/com/metamx/druid/guice/NoopSegmentPublisherProvider.java b/realtime/src/main/java/com/metamx/druid/guice/NoopSegmentPublisherProvider.java new file mode 100644 index 00000000000..361d54cb40b --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/guice/NoopSegmentPublisherProvider.java @@ -0,0 +1,34 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.metamx.druid.realtime.NoopSegmentPublisher; +import com.metamx.druid.realtime.SegmentPublisher; + +/** + */ +public class NoopSegmentPublisherProvider implements SegmentPublisherProvider +{ + @Override + public SegmentPublisher get() + { + return new NoopSegmentPublisher(); + } +} diff --git a/realtime/src/main/java/com/metamx/druid/guice/RealtimeManagerConfig.java b/realtime/src/main/java/com/metamx/druid/guice/RealtimeManagerConfig.java new file mode 100644 index 00000000000..44d66bcf99b --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/guice/RealtimeManagerConfig.java @@ -0,0 +1,37 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.File; + +/** + */ +public class RealtimeManagerConfig +{ + @JsonProperty + private File specFile; + + public File getSpecFile() + { + return specFile; + } +} diff --git a/realtime/src/main/java/com/metamx/druid/guice/RealtimeModule.java b/realtime/src/main/java/com/metamx/druid/guice/RealtimeModule.java new file mode 100644 index 00000000000..c0a463d2850 --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/guice/RealtimeModule.java @@ -0,0 +1,53 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.TypeLiteral; +import com.metamx.common.logger.Logger; +import com.metamx.druid.realtime.DbSegmentPublisherConfig; +import com.metamx.druid.realtime.FireDepartment; +import com.metamx.druid.realtime.RealtimeManager; +import com.metamx.druid.realtime.SegmentPublisher; + +import java.util.List; + +/** + */ +public class RealtimeModule implements Module +{ + private static final Logger log = new Logger(RealtimeModule.class); + + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.publish", SegmentPublisherProvider.class); + binder.bind(SegmentPublisher.class).toProvider(SegmentPublisherProvider.class); + + JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class); + binder.bind( + new TypeLiteral>() + { + } + ).toProvider(FireDepartmentsProvider.class); + binder.bind(RealtimeManager.class).in(ManageLifecycle.class); + } +} diff --git a/realtime/src/main/java/com/metamx/druid/guice/SegmentPublisherProvider.java b/realtime/src/main/java/com/metamx/druid/guice/SegmentPublisherProvider.java new file mode 100644 index 00000000000..832acac32ff --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/guice/SegmentPublisherProvider.java @@ -0,0 +1,35 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.inject.Provider; +import com.metamx.druid.realtime.SegmentPublisher; + +/** + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = NoopSegmentPublisherProvider.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "db", value = DbSegmentPublisherProvider.class) +}) +public interface SegmentPublisherProvider extends Provider +{ +} diff --git a/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java b/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java index 9fb501eddd6..20e27c7dc06 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java @@ -3,6 +3,7 @@ package com.metamx.druid.realtime; import com.fasterxml.jackson.databind.ObjectMapper; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; +import com.metamx.druid.db.DbTablesConfig; import org.joda.time.DateTime; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; @@ -17,12 +18,12 @@ public class DbSegmentPublisher implements SegmentPublisher private static final Logger log = new Logger(DbSegmentPublisher.class); private final ObjectMapper jsonMapper; - private final DbSegmentPublisherConfig config; + private final DbTablesConfig config; private final IDBI dbi; public DbSegmentPublisher( ObjectMapper jsonMapper, - DbSegmentPublisherConfig config, + DbTablesConfig config, IDBI dbi ) { @@ -41,7 +42,7 @@ public class DbSegmentPublisher implements SegmentPublisher public List> withHandle(Handle handle) throws Exception { return handle.createQuery( - String.format("SELECT id FROM %s WHERE id=:id", config.getSegmentTable()) + String.format("SELECT id FROM %s WHERE id=:id", config.getSegmentsTable()) ) .bind("id", segment.getIdentifier()) .list(); @@ -65,13 +66,13 @@ public class DbSegmentPublisher implements SegmentPublisher statement = String.format( "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - config.getSegmentTable() + config.getSegmentsTable() ); } else { statement = String.format( "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - config.getSegmentTable() + config.getSegmentsTable() ); } diff --git a/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisherConfig.java b/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisherConfig.java index 5dcaccac49b..dcab06ef0fd 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisherConfig.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisherConfig.java @@ -1,9 +1,28 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.realtime; import org.skife.config.Config; public abstract class DbSegmentPublisherConfig { - @Config("druid.database.segmentTable") + @Config("druid.db.tables.segmentTable") public abstract String getSegmentTable(); } diff --git a/realtime/src/main/java/com/metamx/druid/realtime/FireDepartment.java b/realtime/src/main/java/com/metamx/druid/realtime/FireDepartment.java index 591486335ea..cc2cf2c6553 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/FireDepartment.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/FireDepartment.java @@ -21,6 +21,7 @@ package com.metamx.druid.realtime; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.inject.Inject; import com.metamx.druid.realtime.firehose.Firehose; import com.metamx.druid.realtime.firehose.FirehoseFactory; import com.metamx.druid.realtime.plumber.Plumber; diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeMain.java b/realtime/src/main/java/com/metamx/druid/realtime/NoopSegmentPublisher.java similarity index 57% rename from realtime/src/main/java/com/metamx/druid/realtime/RealtimeMain.java rename to realtime/src/main/java/com/metamx/druid/realtime/NoopSegmentPublisher.java index c81ab548a2e..6d51397e997 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeMain.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/NoopSegmentPublisher.java @@ -19,34 +19,17 @@ package com.metamx.druid.realtime; -import com.metamx.common.lifecycle.Lifecycle; -import com.metamx.common.logger.Logger; -import com.metamx.druid.log.LogLevelAdjuster; +import com.metamx.druid.client.DataSegment; + +import java.io.IOException; /** */ -public class RealtimeMain +public class NoopSegmentPublisher implements SegmentPublisher { - private static final Logger log = new Logger(RealtimeMain.class); - - public static void main(String[] args) throws Exception + @Override + public void publishSegment(DataSegment segment) throws IOException { - LogLevelAdjuster.register(); - - Lifecycle lifecycle = new Lifecycle(); - - lifecycle.addManagedInstance( - RealtimeNode.builder().build() - ); - - try { - lifecycle.start(); - } - catch (Throwable t) { - log.info(t, "Throwable caught at startup, committing seppuku"); - System.exit(2); - } - - lifecycle.join(); + // do nothing } -} \ No newline at end of file +} diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java index 0874a900f42..4fafd14b98a 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.io.Closeables; +import com.google.inject.Inject; import com.metamx.common.exception.FormattedException; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; @@ -60,6 +61,7 @@ public class RealtimeManager implements QuerySegmentWalker private final Map chiefs; + @Inject public RealtimeManager( List fireDepartments, QueryRunnerFactoryConglomerate conglomerate diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java deleted file mode 100644 index 4357f39c2b5..00000000000 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.realtime; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.BeanProperty; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.InjectableValues; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.smile.SmileFactory; -import com.google.common.base.Preconditions; -import com.google.common.base.Suppliers; -import com.google.common.base.Throwables; -import com.google.common.collect.Maps; -import com.metamx.common.ISE; -import com.metamx.common.config.Config; -import com.metamx.common.lifecycle.Lifecycle; -import com.metamx.common.lifecycle.LifecycleStart; -import com.metamx.common.lifecycle.LifecycleStop; -import com.metamx.common.logger.Logger; -import com.metamx.druid.BaseServerNode; -import com.metamx.druid.db.DbConnector; -import com.metamx.druid.db.DbConnectorConfig; -import com.metamx.druid.http.QueryServlet; -import com.metamx.druid.http.StatusServlet; -import com.metamx.druid.initialization.Initialization; -import com.metamx.druid.initialization.ServerInit; -import com.metamx.druid.jackson.DefaultObjectMapper; -import com.metamx.druid.loading.DataSegmentPusher; -import com.metamx.druid.query.QueryRunnerFactoryConglomerate; -import com.metamx.druid.utils.PropUtils; -import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.metrics.Monitor; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.skife.config.ConfigurationObjectFactory; - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -/** - */ -public class RealtimeNode extends BaseServerNode -{ - private static final Logger log = new Logger(RealtimeNode.class); - - public static Builder builder() - { - return new Builder(); - } - - private final Map injectablesMap = Maps.newLinkedHashMap(); - - private SegmentPublisher segmentPublisher = null; - private DataSegmentPusher dataSegmentPusher = null; - private List fireDepartments = null; - - private boolean initialized = false; - - public RealtimeNode( - Properties props, - Lifecycle lifecycle, - ObjectMapper jsonMapper, - ObjectMapper smileMapper, - ConfigurationObjectFactory configFactory - ) - { - super("realtime", log, props, lifecycle, jsonMapper, smileMapper, configFactory); - } - - public RealtimeNode setSegmentPublisher(SegmentPublisher segmentPublisher) - { - checkFieldNotSetAndSet("segmentPublisher", segmentPublisher); - return this; - } - - public RealtimeNode setDataSegmentPusher(DataSegmentPusher dataSegmentPusher) - { - checkFieldNotSetAndSet("dataSegmentPusher", dataSegmentPusher); - return this; - } - - public RealtimeNode setFireDepartments(List fireDepartments) - { - checkFieldNotSetAndSet("fireDepartments", fireDepartments); - return this; - } - - public RealtimeNode registerJacksonInjectable(String name, Object object) - { - Preconditions.checkState(injectablesMap.containsKey(name), "Already registered jackson object[%s]", name); - injectablesMap.put(name, object); - return this; - } - - public SegmentPublisher getSegmentPublisher() - { - initializeSegmentPublisher(); - return segmentPublisher; - } - - public DataSegmentPusher getDataSegmentPusher() - { - initializeSegmentPusher(); - return dataSegmentPusher; - } - - public List getFireDepartments() - { - initializeFireDepartments(); - return fireDepartments; - } - - protected void doInit() throws Exception - { - initializeJacksonInjectables(); - - final Lifecycle lifecycle = getLifecycle(); - final ServiceEmitter emitter = getEmitter(); - final QueryRunnerFactoryConglomerate conglomerate = getConglomerate(); - final List monitors = getMonitors(); - final List departments = getFireDepartments(); - - monitors.add(new RealtimeMetricsMonitor(departments)); - - final RealtimeManager realtimeManager = new RealtimeManager(departments, conglomerate); - lifecycle.addManagedInstance(realtimeManager); - - startMonitoring(monitors); - - final ServletContextHandler root = new ServletContextHandler(getServer(), "/", ServletContextHandler.SESSIONS); - root.addServlet(new ServletHolder(new StatusServlet()), "/status"); - root.addServlet( - new ServletHolder( - new QueryServlet(getJsonMapper(), getSmileMapper(), realtimeManager, emitter, getRequestLogger()) - ), - "/druid/v2/*" - ); - - initialized = true; - } - - @LifecycleStart - public synchronized void start() throws Exception - { - if (! initialized) { - init(); - } - - getLifecycle().start(); - } - - @LifecycleStop - public synchronized void stop() - { - getLifecycle().stop(); - } - - protected void initializeJacksonInjectables() - { - final Map injectables = Maps.newHashMap(); - - for (Map.Entry entry : injectablesMap.entrySet()) { - injectables.put(entry.getKey(), entry.getValue()); - } - - injectables.put("queryRunnerFactoryConglomerate", getConglomerate()); - injectables.put("segmentPusher", getDataSegmentPusher()); - injectables.put("segmentAnnouncer", getAnnouncer()); - injectables.put("segmentPublisher", getSegmentPublisher()); - injectables.put("serverView", getServerView()); - injectables.put("serviceEmitter", getEmitter()); - - getJsonMapper().setInjectableValues( - new InjectableValues() - { - @Override - public Object findInjectableValue( - Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance - ) - { - return injectables.get(valueId); - } - } - ); - } - - private void initializeFireDepartments() - { - if (fireDepartments == null) { - try { - setFireDepartments( - getJsonMapper().>readValue( - new File(PropUtils.getProperty(getProps(), "druid.realtime.specFile")), - new TypeReference>(){} - ) - ); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } - } - - private void initializeSegmentPusher() - { - if (dataSegmentPusher == null) { - dataSegmentPusher = ServerInit.getSegmentPusher(getProps(), getConfigFactory(), getJsonMapper()); - } - } - - protected void initializeSegmentPublisher() - { - if (segmentPublisher == null) { - final DbSegmentPublisherConfig dbSegmentPublisherConfig = getConfigFactory().build(DbSegmentPublisherConfig.class); - segmentPublisher = new DbSegmentPublisher( - getJsonMapper(), - dbSegmentPublisherConfig, - new DbConnector(Suppliers.ofInstance(getConfigFactory().build(DbConnectorConfig.class)), null).getDBI() // TODO - ); - getLifecycle().addManagedInstance(segmentPublisher); - } - } - - public static class Builder - { - private ObjectMapper jsonMapper = null; - private ObjectMapper smileMapper = null; - private Lifecycle lifecycle = null; - private Properties props = null; - private ConfigurationObjectFactory configFactory = null; - - public Builder withMappers(ObjectMapper jsonMapper, ObjectMapper smileMapper) - { - this.jsonMapper = jsonMapper; - this.smileMapper = smileMapper; - return this; - } - - public Builder withProps(Properties props) - { - this.props = props; - return this; - } - - public Builder withConfigFactory(ConfigurationObjectFactory configFactory) - { - this.configFactory = configFactory; - return this; - } - - public RealtimeNode build() - { - if (jsonMapper == null && smileMapper == null) { - jsonMapper = new DefaultObjectMapper(); - smileMapper = new DefaultObjectMapper(new SmileFactory()); - smileMapper.getJsonFactory().setCodec(smileMapper); - } - else if (jsonMapper == null || smileMapper == null) { - throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper); - } - - if (lifecycle == null) { - lifecycle = new Lifecycle(); - } - - if (props == null) { - props = Initialization.loadProperties(); - } - - if (configFactory == null) { - configFactory = Config.createFactory(props); - } - - return new RealtimeNode(props, lifecycle, jsonMapper, smileMapper, configFactory); - } - } -} diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/Firehose.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/Firehose.java index d5a8a2bbdf0..434300031f8 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/Firehose.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/Firehose.java @@ -26,7 +26,7 @@ import java.io.Closeable; /** * This is an interface that holds onto the stream of incoming data. Realtime data ingestion is built around this * abstraction. In order to add a new type of source for realtime data ingestion, all you need to do is implement - * one of these and register it with the RealtimeMain. + * one of these and register it with the Main. * * This object acts a lot like an Iterator, but it doesn't extend the Iterator interface because it extends * Closeable and it is very important that the close() method doesn't get forgotten, which is easy to do if this diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java index 391fd305e21..aa26c735da7 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java @@ -68,6 +68,7 @@ import com.metamx.druid.realtime.SegmentPublisher; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; +import com.sun.istack.internal.NotNull; import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; import org.joda.time.Duration; @@ -98,14 +99,31 @@ public class RealtimePlumberSchool implements PlumberSchool private final IndexGranularity segmentGranularity; private final Object handoffCondition = new Object(); + @JacksonInject + @NotNull private ServiceEmitter emitter; private volatile VersioningPolicy versioningPolicy = null; private volatile RejectionPolicyFactory rejectionPolicyFactory = null; + + @JacksonInject + @NotNull private volatile QueryRunnerFactoryConglomerate conglomerate = null; + + @JacksonInject + @NotNull private volatile DataSegmentPusher dataSegmentPusher = null; + + @JacksonInject + @NotNull private volatile DataSegmentAnnouncer segmentAnnouncer = null; + + @JacksonInject + @NotNull private volatile SegmentPublisher segmentPublisher = null; + + @JacksonInject + @NotNull private volatile ServerView serverView = null; @JsonCreator @@ -138,42 +156,6 @@ public class RealtimePlumberSchool implements PlumberSchool this.rejectionPolicyFactory = factory; } - @JacksonInject("queryRunnerFactoryConglomerate") - public void setConglomerate(QueryRunnerFactoryConglomerate conglomerate) - { - this.conglomerate = conglomerate; - } - - @JacksonInject("segmentPusher") - public void setDataSegmentPusher(DataSegmentPusher dataSegmentPusher) - { - this.dataSegmentPusher = dataSegmentPusher; - } - - @JacksonInject("segmentAnnouncer") - public void setSegmentAnnouncer(DataSegmentAnnouncer segmentAnnouncer) - { - this.segmentAnnouncer = segmentAnnouncer; - } - - @JacksonInject("segmentPublisher") - public void setSegmentPublisher(SegmentPublisher segmentPublisher) - { - this.segmentPublisher = segmentPublisher; - } - - @JacksonInject("serverView") - public void setServerView(ServerView serverView) - { - this.serverView = serverView; - } - - @JacksonInject("serviceEmitter") - public void setServiceEmitter(ServiceEmitter emitter) - { - this.emitter = emitter; - } - @Override public Plumber findPlumber(final Schema schema, final FireDepartmentMetrics metrics) { diff --git a/server/src/main/java/com/metamx/druid/guice/CassandraDataSegmentPusherProvider.java b/server/src/main/java/com/metamx/druid/guice/CassandraDataSegmentPusherProvider.java new file mode 100644 index 00000000000..6d28aff0f87 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/guice/CassandraDataSegmentPusherProvider.java @@ -0,0 +1,47 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.metamx.druid.loading.DataSegmentPusher; +import com.metamx.druid.loading.cassandra.CassandraDataSegmentConfig; +import com.metamx.druid.loading.cassandra.CassandraDataSegmentPusher; + +import javax.validation.constraints.NotNull; + +/** + */ +public class CassandraDataSegmentPusherProvider implements DataSegmentPusherProvider +{ + @JacksonInject + @NotNull + private CassandraDataSegmentConfig config = null; + + @JacksonInject + @NotNull + private ObjectMapper jsonMapper = null; + + @Override + public DataSegmentPusher get() + { + return new CassandraDataSegmentPusher(config, jsonMapper); + } +} diff --git a/server/src/main/java/com/metamx/druid/guice/DataSegmentPullerModule.java b/server/src/main/java/com/metamx/druid/guice/DataSegmentPullerModule.java new file mode 100644 index 00000000000..25a21ff3af2 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/guice/DataSegmentPullerModule.java @@ -0,0 +1,83 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.multibindings.MapBinder; +import com.metamx.druid.loading.DataSegmentPuller; +import com.metamx.druid.loading.HdfsDataSegmentPuller; +import com.metamx.druid.loading.LocalDataSegmentPuller; +import com.metamx.druid.loading.OmniSegmentLoader; +import com.metamx.druid.loading.S3DataSegmentPuller; +import com.metamx.druid.loading.SegmentLoader; +import com.metamx.druid.loading.cassandra.CassandraDataSegmentConfig; +import com.metamx.druid.loading.cassandra.CassandraDataSegmentPuller; +import org.apache.hadoop.conf.Configuration; + +/** + */ +public class DataSegmentPullerModule implements Module +{ + @Override + public void configure(Binder binder) + { + binder.bind(SegmentLoader.class).to(OmniSegmentLoader.class).in(LazySingleton.class); + + bindDeepStorageLocal(binder); + bindDeepStorageS3(binder); + bindDeepStorageHdfs(binder); + bindDeepStorageCassandra(binder); + } + + private static void bindDeepStorageLocal(Binder binder) + { + final MapBinder segmentPullerBinder = MapBinder.newMapBinder( + binder, String.class, DataSegmentPuller.class + ); + segmentPullerBinder.addBinding("local").to(LocalDataSegmentPuller.class).in(LazySingleton.class); + } + + private static void bindDeepStorageS3(Binder binder) + { + final MapBinder segmentPullerBinder = MapBinder.newMapBinder( + binder, String.class, DataSegmentPuller.class + ); + segmentPullerBinder.addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class); + } + + private static void bindDeepStorageHdfs(Binder binder) + { + final MapBinder segmentPullerBinder = MapBinder.newMapBinder( + binder, String.class, DataSegmentPuller.class + ); + segmentPullerBinder.addBinding("hdfs").to(HdfsDataSegmentPuller.class).in(LazySingleton.class); + binder.bind(Configuration.class).toInstance(new Configuration()); + } + + private static void bindDeepStorageCassandra(Binder binder) + { + final MapBinder segmentPullerBinder = MapBinder.newMapBinder( + binder, String.class, DataSegmentPuller.class + ); + segmentPullerBinder.addBinding("c*").to(CassandraDataSegmentPuller.class).in(LazySingleton.class); + ConfigProvider.bind(binder, CassandraDataSegmentConfig.class); + } +} diff --git a/server/src/main/java/com/metamx/druid/guice/DataSegmentPusherModule.java b/server/src/main/java/com/metamx/druid/guice/DataSegmentPusherModule.java new file mode 100644 index 00000000000..bb95cb45273 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/guice/DataSegmentPusherModule.java @@ -0,0 +1,30 @@ +package com.metamx.druid.guice; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.metamx.druid.loading.DataSegmentPusher; +import com.metamx.druid.loading.HdfsDataSegmentPusherConfig; +import com.metamx.druid.loading.LocalDataSegmentPusherConfig; +import com.metamx.druid.loading.S3DataSegmentPusherConfig; +import com.metamx.druid.loading.cassandra.CassandraDataSegmentConfig; +import org.apache.hadoop.conf.Configuration; + +/** + */ +public class DataSegmentPusherModule implements Module +{ + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.pusher", DataSegmentPusherProvider.class); + + JsonConfigProvider.bind(binder, "druid.pusher.s3", S3DataSegmentPusherConfig.class); + binder.bind(Configuration.class).toInstance(new Configuration()); + + JsonConfigProvider.bind(binder, "druid.pusher.hdfs", HdfsDataSegmentPusherConfig.class); + + JsonConfigProvider.bind(binder, "druid.pusher.cassandra", CassandraDataSegmentConfig.class); + + binder.bind(DataSegmentPusher.class).toProvider(DataSegmentPusherProvider.class); + } +} diff --git a/server/src/main/java/com/metamx/druid/guice/DataSegmentPusherProvider.java b/server/src/main/java/com/metamx/druid/guice/DataSegmentPusherProvider.java new file mode 100644 index 00000000000..f69c413663e --- /dev/null +++ b/server/src/main/java/com/metamx/druid/guice/DataSegmentPusherProvider.java @@ -0,0 +1,37 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.inject.Provider; +import com.metamx.druid.loading.DataSegmentPusher; + +/** + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = LocalDataSegmentPusherProvider.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "s3_zip", value = S3DataSegmentPusherProvider.class), + @JsonSubTypes.Type(name = "hdfs", value = HdfsDataSegmentPusherProvider.class), + @JsonSubTypes.Type(name = "c*", value = CassandraDataSegmentPusherProvider.class) +}) +public interface DataSegmentPusherProvider extends Provider +{ +} diff --git a/server/src/main/java/com/metamx/druid/guice/DruidProcessingModule.java b/server/src/main/java/com/metamx/druid/guice/DruidProcessingModule.java new file mode 100644 index 00000000000..27a8d069829 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/guice/DruidProcessingModule.java @@ -0,0 +1,136 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.ProvisionException; +import com.metamx.common.concurrent.ExecutorServiceConfig; +import com.metamx.common.logger.Logger; +import com.metamx.druid.DruidProcessingConfig; +import com.metamx.druid.collect.StupidPool; +import com.metamx.druid.concurrent.Execs; +import com.metamx.druid.guice.annotations.Global; +import com.metamx.druid.guice.annotations.Processing; +import com.metamx.druid.query.MetricsEmittingExecutorService; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; + +import java.lang.reflect.InvocationTargetException; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +/** + */ +public class DruidProcessingModule implements Module +{ + private static final Logger log = new Logger(DruidProcessingModule.class); + + @Override + public void configure(Binder binder) + { + ConfigProvider.bind(binder, DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing")); + binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class); + } + + @Provides + @Processing + @ManageLifecycle + public ExecutorService getProcessingExecutorService(ExecutorServiceConfig config, ServiceEmitter emitter) + { + return new MetricsEmittingExecutorService( + Executors.newFixedThreadPool(config.getNumThreads(), Execs.makeThreadFactory(config.getFormatString())), + emitter, + new ServiceMetricEvent.Builder() + ); + } + + @Provides + @LazySingleton + @Global + public StupidPool getIntermediateResultsPool(DruidProcessingConfig config) + { + try { + Class vmClass = Class.forName("sun.misc.VM"); + Object maxDirectMemoryObj = vmClass.getMethod("maxDirectMemory").invoke(null); + + if (maxDirectMemoryObj == null || !(maxDirectMemoryObj instanceof Number)) { + log.info("Cannot determine maxDirectMemory from[%s]", maxDirectMemoryObj); + } else { + long maxDirectMemory = ((Number) maxDirectMemoryObj).longValue(); + + final long memoryNeeded = (long) config.intermediateComputeSizeBytes() * (config.getNumThreads() + 1); + if (maxDirectMemory < memoryNeeded) { + throw new ProvisionException( + String.format( + "Not enough direct memory. Please adjust -XX:MaxDirectMemorySize or druid.computation.buffer.size: " + + "maxDirectMemory[%,d], memoryNeeded[%,d], druid.computation.buffer.size[%,d], numThreads[%,d]", + maxDirectMemory, memoryNeeded, config.intermediateComputeSizeBytes(), config.getNumThreads() + ) + ); + } + } + } + catch (ClassNotFoundException e) { + log.info("No VM class, cannot do memory check."); + } + catch (NoSuchMethodException e) { + log.info("VM.maxDirectMemory doesn't exist, cannot do memory check."); + } + catch (InvocationTargetException e) { + log.warn(e, "static method shouldn't throw this"); + } + catch (IllegalAccessException e) { + log.warn(e, "public method, shouldn't throw this"); + } + + return new IntermediateProcessingBufferPool(config.intermediateComputeSizeBytes()); + } + + private static class IntermediateProcessingBufferPool extends StupidPool + { + private static final Logger log = new Logger(IntermediateProcessingBufferPool.class); + + public IntermediateProcessingBufferPool(final int computationBufferSize) + { + super( + new Supplier() + { + final AtomicLong count = new AtomicLong(0); + + @Override + public ByteBuffer get() + { + log.info( + "Allocating new intermediate processing buffer[%,d] of size[%,d]", + count.getAndIncrement(), computationBufferSize + ); + return ByteBuffer.allocateDirect(computationBufferSize); + } + } + ); + } + } +} diff --git a/server/src/main/java/com/metamx/druid/guice/HdfsDataSegmentPusherProvider.java b/server/src/main/java/com/metamx/druid/guice/HdfsDataSegmentPusherProvider.java new file mode 100644 index 00000000000..837b41ce1e1 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/guice/HdfsDataSegmentPusherProvider.java @@ -0,0 +1,52 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.metamx.druid.loading.DataSegmentPusher; +import com.metamx.druid.loading.HdfsDataSegmentPusher; +import com.metamx.druid.loading.HdfsDataSegmentPusherConfig; +import org.apache.hadoop.conf.Configuration; + +import javax.validation.constraints.NotNull; + +/** + */ +public class HdfsDataSegmentPusherProvider implements DataSegmentPusherProvider +{ + @JacksonInject + @NotNull + private HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConfig = null; + + @JacksonInject + @NotNull + private Configuration config = null; + + @JacksonInject + @NotNull + private ObjectMapper jsonMapper = null; + + @Override + public DataSegmentPusher get() + { + return new HdfsDataSegmentPusher(hdfsDataSegmentPusherConfig, config, jsonMapper); + } +} diff --git a/server/src/main/java/com/metamx/druid/guice/HistoricalModule.java b/server/src/main/java/com/metamx/druid/guice/HistoricalModule.java index c96b15150e8..56526a5f286 100644 --- a/server/src/main/java/com/metamx/druid/guice/HistoricalModule.java +++ b/server/src/main/java/com/metamx/druid/guice/HistoricalModule.java @@ -1,59 +1,29 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.guice; -import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableMap; import com.google.inject.Binder; import com.google.inject.Module; -import com.google.inject.Provides; -import com.google.inject.ProvisionException; -import com.google.inject.multibindings.MapBinder; -import com.metamx.common.concurrent.ExecutorServiceConfig; import com.metamx.common.logger.Logger; -import com.metamx.druid.DruidProcessingConfig; -import com.metamx.druid.client.DruidServerConfig; -import com.metamx.druid.collect.StupidPool; -import com.metamx.druid.concurrent.Execs; -import com.metamx.druid.coordination.BatchDataSegmentAnnouncer; -import com.metamx.druid.coordination.DataSegmentAnnouncer; -import com.metamx.druid.coordination.DataSegmentAnnouncerProvider; -import com.metamx.druid.coordination.DruidServerMetadata; import com.metamx.druid.coordination.ServerManager; -import com.metamx.druid.coordination.SingleDataSegmentAnnouncer; import com.metamx.druid.coordination.ZkCoordinator; -import com.metamx.druid.curator.announcement.Announcer; -import com.metamx.druid.guice.annotations.Global; -import com.metamx.druid.guice.annotations.Processing; -import com.metamx.druid.guice.annotations.Self; -import com.metamx.druid.initialization.BatchDataSegmentAnnouncerConfig; -import com.metamx.druid.initialization.DruidNode; -import com.metamx.druid.loading.DataSegmentPuller; -import com.metamx.druid.loading.HdfsDataSegmentPuller; -import com.metamx.druid.loading.LocalDataSegmentPuller; -import com.metamx.druid.loading.MMappedQueryableIndexFactory; -import com.metamx.druid.loading.OmniSegmentLoader; -import com.metamx.druid.loading.QueryableIndexFactory; -import com.metamx.druid.loading.S3CredentialsConfig; -import com.metamx.druid.loading.S3DataSegmentPuller; -import com.metamx.druid.loading.SegmentLoader; -import com.metamx.druid.loading.SegmentLoaderConfig; -import com.metamx.druid.loading.cassandra.CassandraDataSegmentConfig; -import com.metamx.druid.loading.cassandra.CassandraDataSegmentPuller; -import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate; -import com.metamx.druid.query.MetricsEmittingExecutorService; -import com.metamx.druid.query.QueryRunnerFactoryConglomerate; -import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.emitter.service.ServiceMetricEvent; -import org.apache.curator.framework.CuratorFramework; -import org.apache.hadoop.conf.Configuration; -import org.jets3t.service.S3ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.security.AWSCredentials; - -import java.lang.reflect.InvocationTargetException; -import java.nio.ByteBuffer; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicLong; /** */ @@ -64,161 +34,7 @@ public class HistoricalModule implements Module @Override public void configure(Binder binder) { - ConfigProvider.bind(binder, DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing")); - binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class); - - JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class); - JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class); - binder.bind(ServerManager.class).in(LazySingleton.class); - - binder.bind(SegmentLoader.class).to(OmniSegmentLoader.class).in(LazySingleton.class); - binder.bind(QueryableIndexFactory.class).to(MMappedQueryableIndexFactory.class).in(LazySingleton.class); - - final MapBinder segmentPullerBinder = MapBinder.newMapBinder( - binder, String.class, DataSegmentPuller.class - ); - segmentPullerBinder.addBinding("local").to(LocalDataSegmentPuller.class).in(LazySingleton.class); - - bindDeepStorageS3(binder); - bindDeepStorageHdfs(binder); - bindDeepStorageCassandra(binder); - - binder.bind(QueryRunnerFactoryConglomerate.class) - .to(DefaultQueryRunnerFactoryConglomerate.class) - .in(LazySingleton.class); - binder.bind(ZkCoordinator.class).in(ManageLifecycle.class); - - JsonConfigProvider.bind(binder, "druid.announcer", BatchDataSegmentAnnouncerConfig.class); - JsonConfigProvider.bind(binder, "druid.announcer", DataSegmentAnnouncerProvider.class); - binder.bind(DataSegmentAnnouncer.class).toProvider(DataSegmentAnnouncerProvider.class); - binder.bind(BatchDataSegmentAnnouncer.class).in(ManageLifecycleLast.class); - binder.bind(SingleDataSegmentAnnouncer.class).in(ManageLifecycleLast.class); - } - - @Provides @LazySingleton - public DruidServerMetadata getMetadata(@Self DruidNode node, DruidServerConfig config) - { - return new DruidServerMetadata(node.getHost(), node.getHost(), config.getMaxSize(), "historical", config.getTier()); - } - - @Provides @ManageLifecycle - public Announcer getAnnouncer(CuratorFramework curator) - { - return new Announcer(curator, Execs.singleThreaded("Announcer-%s")); - } - - @Provides @Processing @ManageLifecycle - public ExecutorService getProcessingExecutorService(ExecutorServiceConfig config, ServiceEmitter emitter) - { - return new MetricsEmittingExecutorService( - Executors.newFixedThreadPool(config.getNumThreads(), Execs.makeThreadFactory(config.getFormatString())), - emitter, - new ServiceMetricEvent.Builder() - ); - } - - @Provides @LazySingleton - public RestS3Service getRestS3Service(S3CredentialsConfig config) - { - try { - return new RestS3Service(new AWSCredentials(config.getAccessKey(), config.getSecretKey())); - } - catch (S3ServiceException e) { - throw new ProvisionException("Unable to create a RestS3Service", e); - } - } - - @Provides @LazySingleton @Global - public StupidPool getIntermediateResultsPool(DruidProcessingConfig config) - { - try { - Class vmClass = Class.forName("sun.misc.VM"); - Object maxDirectMemoryObj = vmClass.getMethod("maxDirectMemory").invoke(null); - - if (maxDirectMemoryObj == null || !(maxDirectMemoryObj instanceof Number)) { - log.info("Cannot determine maxDirectMemory from[%s]", maxDirectMemoryObj); - } else { - long maxDirectMemory = ((Number) maxDirectMemoryObj).longValue(); - - final long memoryNeeded = (long) config.intermediateComputeSizeBytes() * (config.getNumThreads() + 1); - if (maxDirectMemory < memoryNeeded) { - throw new ProvisionException( - String.format( - "Not enough direct memory. Please adjust -XX:MaxDirectMemorySize or druid.computation.buffer.size: " - + "maxDirectMemory[%,d], memoryNeeded[%,d], druid.computation.buffer.size[%,d], numThreads[%,d]", - maxDirectMemory, memoryNeeded, config.intermediateComputeSizeBytes(), config.getNumThreads() - ) - ); - } - } - } - catch (ClassNotFoundException e) { - log.info("No VM class, cannot do memory check."); - } - catch (NoSuchMethodException e) { - log.info("VM.maxDirectMemory doesn't exist, cannot do memory check."); - } - catch (InvocationTargetException e) { - log.warn(e, "static method shouldn't throw this"); - } - catch (IllegalAccessException e) { - log.warn(e, "public method, shouldn't throw this"); - } - - return new IntermediateProcessingBufferPool(config.intermediateComputeSizeBytes()); - } - - private static void bindDeepStorageS3(Binder binder) - { - final MapBinder segmentPullerBinder = MapBinder.newMapBinder( - binder, String.class, DataSegmentPuller.class - ); - segmentPullerBinder.addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class); - JsonConfigProvider.bind(binder, "druid.s3", S3CredentialsConfig.class); - } - - private static void bindDeepStorageHdfs(Binder binder) - { - final MapBinder segmentPullerBinder = MapBinder.newMapBinder( - binder, String.class, DataSegmentPuller.class - ); - segmentPullerBinder.addBinding("hdfs").to(HdfsDataSegmentPuller.class).in(LazySingleton.class); - binder.bind(Configuration.class).toInstance(new Configuration()); - } - - private static void bindDeepStorageCassandra(Binder binder) - { - final MapBinder segmentPullerBinder = MapBinder.newMapBinder( - binder, String.class, DataSegmentPuller.class - ); - segmentPullerBinder.addBinding("c*").to(CassandraDataSegmentPuller.class).in(LazySingleton.class); - ConfigProvider.bind(binder, CassandraDataSegmentConfig.class); - } - - private static class IntermediateProcessingBufferPool extends StupidPool - { - private static final Logger log = new Logger(IntermediateProcessingBufferPool.class); - - public IntermediateProcessingBufferPool(final int computationBufferSize) - { - super( - new Supplier() - { - final AtomicLong count = new AtomicLong(0); - - @Override - public ByteBuffer get() - { - log.info( - "Allocating new intermediate processing buffer[%,d] of size[%,d]", - count.getAndIncrement(), computationBufferSize - ); - return ByteBuffer.allocateDirect(computationBufferSize); - } - } - ); - } } } diff --git a/server/src/main/java/com/metamx/druid/guice/LocalDataSegmentPusherProvider.java b/server/src/main/java/com/metamx/druid/guice/LocalDataSegmentPusherProvider.java new file mode 100644 index 00000000000..7a51b7264e1 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/guice/LocalDataSegmentPusherProvider.java @@ -0,0 +1,43 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.metamx.druid.loading.DataSegmentPusher; +import com.metamx.druid.loading.LocalDataSegmentPusher; +import com.metamx.druid.loading.LocalDataSegmentPusherConfig; + +import javax.validation.constraints.NotNull; + +/** + */ +public class LocalDataSegmentPusherProvider extends LocalDataSegmentPusherConfig implements DataSegmentPusherProvider +{ + @JacksonInject + @NotNull + private ObjectMapper jsonMapper = null; + + @Override + public DataSegmentPusher get() + { + return new LocalDataSegmentPusher(this, jsonMapper); + } +} diff --git a/server/src/main/java/com/metamx/druid/guice/S3DataSegmentPusherProvider.java b/server/src/main/java/com/metamx/druid/guice/S3DataSegmentPusherProvider.java new file mode 100644 index 00000000000..b86fecfdf7a --- /dev/null +++ b/server/src/main/java/com/metamx/druid/guice/S3DataSegmentPusherProvider.java @@ -0,0 +1,52 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.metamx.druid.loading.DataSegmentPusher; +import com.metamx.druid.loading.S3DataSegmentPusher; +import com.metamx.druid.loading.S3DataSegmentPusherConfig; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; + +import javax.validation.constraints.NotNull; + +/** + */ +public class S3DataSegmentPusherProvider implements DataSegmentPusherProvider +{ + @JacksonInject + @NotNull + private RestS3Service restS3Service = null; + + @JacksonInject + @NotNull + private S3DataSegmentPusherConfig config = null; + + @JacksonInject + @NotNull + private ObjectMapper jsonMapper = null; + + @Override + public DataSegmentPusher get() + { + return new S3DataSegmentPusher(restS3Service, config, jsonMapper); + } +} diff --git a/server/src/main/java/com/metamx/druid/guice/S3Module.java b/server/src/main/java/com/metamx/druid/guice/S3Module.java new file mode 100644 index 00000000000..e60827285a3 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/guice/S3Module.java @@ -0,0 +1,52 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.ProvisionException; +import com.metamx.druid.loading.S3CredentialsConfig; +import org.jets3t.service.S3ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.security.AWSCredentials; + +/** + */ +public class S3Module implements Module +{ + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.s3", S3CredentialsConfig.class); + } + + @Provides + @LazySingleton + public RestS3Service getRestS3Service(S3CredentialsConfig config) + { + try { + return new RestS3Service(new AWSCredentials(config.getAccessKey(), config.getSecretKey())); + } + catch (S3ServiceException e) { + throw new ProvisionException("Unable to create a RestS3Service", e); + } + } +} diff --git a/server/src/main/java/com/metamx/druid/guice/ServerModule.java b/server/src/main/java/com/metamx/druid/guice/ServerModule.java index 74d0605e5f9..400f4ad040b 100644 --- a/server/src/main/java/com/metamx/druid/guice/ServerModule.java +++ b/server/src/main/java/com/metamx/druid/guice/ServerModule.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.guice; import com.google.inject.Binder; diff --git a/server/src/main/java/com/metamx/druid/guice/StorageNodeModule.java b/server/src/main/java/com/metamx/druid/guice/StorageNodeModule.java new file mode 100644 index 00000000000..72954cec734 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/guice/StorageNodeModule.java @@ -0,0 +1,72 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.google.inject.Binder; +import com.google.inject.Provides; +import com.metamx.druid.client.DruidServerConfig; +import com.metamx.druid.coordination.DruidServerMetadata; +import com.metamx.druid.guice.annotations.Self; +import com.metamx.druid.initialization.DruidNode; +import com.metamx.druid.loading.MMappedQueryableIndexFactory; +import com.metamx.druid.loading.QueryableIndexFactory; +import com.metamx.druid.loading.SegmentLoaderConfig; +import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate; +import com.metamx.druid.query.QueryRunnerFactoryConglomerate; + +/** + */ +public class StorageNodeModule extends ServerModule +{ + private final String nodeType; + + public StorageNodeModule(String nodeType) + { + this.nodeType = nodeType; + } + + @Override + public void configure(Binder binder) + { + super.configure(binder); + + JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class); + JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class); + + binder.bind(QueryableIndexFactory.class).to(MMappedQueryableIndexFactory.class).in(LazySingleton.class); + + binder.bind(QueryRunnerFactoryConglomerate.class) + .to(DefaultQueryRunnerFactoryConglomerate.class) + .in(LazySingleton.class); + } + + @Provides + @LazySingleton + public DruidServerMetadata getMetadata(@Self DruidNode node, DruidServerConfig config) + { + return new DruidServerMetadata( + node.getHost(), + node.getHost(), + config.getMaxSize(), + nodeType, + config.getTier() + ); + } +} diff --git a/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusher.java index e232bbb8f08..e68a93f5282 100644 --- a/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusher.java +++ b/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusher.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.loading; import com.fasterxml.jackson.databind.ObjectMapper; @@ -5,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; import com.google.common.io.OutputSupplier; +import com.google.inject.Inject; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.v1.IndexIO; @@ -28,6 +48,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher private final Configuration hadoopConfig; private final ObjectMapper jsonMapper; + @Inject public HdfsDataSegmentPusher( HdfsDataSegmentPusherConfig config, Configuration hadoopConfig, diff --git a/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusherConfig.java b/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusherConfig.java index b27d03672bc..61061f7ecbf 100644 --- a/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusherConfig.java +++ b/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusherConfig.java @@ -19,12 +19,17 @@ package com.metamx.druid.loading; -import org.skife.config.Config; +import com.fasterxml.jackson.annotation.JsonProperty; /** */ public abstract class HdfsDataSegmentPusherConfig { - @Config("druid.pusher.hdfs.storageDirectory") - public abstract String getStorageDirectory(); + @JsonProperty + public String storageDirectory = ""; + + public String getStorageDirectory() + { + return storageDirectory; + } } diff --git a/server/src/main/java/com/metamx/druid/loading/LocalDataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/LocalDataSegmentPusher.java index 1493b162572..ed4ab5472c9 100644 --- a/server/src/main/java/com/metamx/druid/loading/LocalDataSegmentPusher.java +++ b/server/src/main/java/com/metamx/druid/loading/LocalDataSegmentPusher.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; import com.google.common.io.Files; +import com.google.inject.Inject; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.v1.IndexIO; @@ -40,6 +41,7 @@ public class LocalDataSegmentPusher implements DataSegmentPusher private final LocalDataSegmentPusherConfig config; private final ObjectMapper jsonMapper; + @Inject public LocalDataSegmentPusher( LocalDataSegmentPusherConfig config, ObjectMapper jsonMapper diff --git a/server/src/main/java/com/metamx/druid/loading/LocalDataSegmentPusherConfig.java b/server/src/main/java/com/metamx/druid/loading/LocalDataSegmentPusherConfig.java index d33a9a5130b..7addab21ce5 100644 --- a/server/src/main/java/com/metamx/druid/loading/LocalDataSegmentPusherConfig.java +++ b/server/src/main/java/com/metamx/druid/loading/LocalDataSegmentPusherConfig.java @@ -19,14 +19,20 @@ package com.metamx.druid.loading; +import com.fasterxml.jackson.annotation.JsonProperty; import org.skife.config.Config; import java.io.File; /** */ -public abstract class LocalDataSegmentPusherConfig +public class LocalDataSegmentPusherConfig { - @Config("druid.pusher.local.storageDirectory") - public abstract File getStorageDirectory(); + @JsonProperty + public File storageDirectory; + + public File getStorageDirectory() + { + return storageDirectory; + } } diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java index d9ac69e5c5b..f70055456af 100644 --- a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java +++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java @@ -24,6 +24,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; import com.google.common.io.Files; +import com.google.inject.Inject; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.utils.CompressionUtils; @@ -46,6 +47,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher private final S3DataSegmentPusherConfig config; private final ObjectMapper jsonMapper; + @Inject public S3DataSegmentPusher( RestS3Service s3Client, S3DataSegmentPusherConfig config, diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusherConfig.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusherConfig.java index 3fbbe2d311f..70fa23c4278 100644 --- a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusherConfig.java +++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusherConfig.java @@ -19,21 +19,35 @@ package com.metamx.druid.loading; +import com.fasterxml.jackson.annotation.JsonProperty; import org.skife.config.Config; import org.skife.config.Default; /** */ -public abstract class S3DataSegmentPusherConfig +public class S3DataSegmentPusherConfig { - @Config("druid.pusher.s3.bucket") - public abstract String getBucket(); + @JsonProperty + public String bucket = ""; - @Config("druid.pusher.s3.baseKey") - @Default("") - public abstract String getBaseKey(); + @JsonProperty + public String baseKey = ""; - @Config("druid.pusher.s3.disableAcl") - @Default("false") - public abstract boolean getDisableAcl(); + @JsonProperty + public boolean disableAcl = false; + + public String getBucket() + { + return bucket; + } + + public String getBaseKey() + { + return baseKey; + } + + public boolean getDisableAcl() + { + return disableAcl; + } } diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentConfig.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentConfig.java index 6e7976f824d..e392cc4bb0d 100644 --- a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentConfig.java +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentConfig.java @@ -19,8 +19,7 @@ package com.metamx.druid.loading.cassandra; -import org.skife.config.Config; -import org.skife.config.Default; +import com.fasterxml.jackson.annotation.JsonProperty; /** * Cassandra Config @@ -29,11 +28,19 @@ import org.skife.config.Default; */ public abstract class CassandraDataSegmentConfig { - @Config("druid.pusher.cassandra.host") - @Default("") - public abstract String getHost(); + @JsonProperty + public String host = ""; - @Config("druid.pusher.cassandra.keyspace") - @Default("") - public abstract String getKeyspace(); + @JsonProperty + public String keyspace = ""; + + public String getKeyspace() + { + return keyspace; + } + + public String getHost() + { + return host; + } } diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java index 57bc72b9124..63483075db9 100644 --- a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java @@ -1,12 +1,28 @@ -package com.metamx.druid.loading.cassandra; +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; +package com.metamx.druid.loading.cassandra; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.v1.IndexIO; @@ -16,6 +32,10 @@ import com.metamx.druid.utils.CompressionUtils; import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.recipes.storage.ChunkedStorage; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; + /** * Cassandra Segment Pusher * @@ -28,6 +48,7 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data private static final Joiner JOINER = Joiner.on("/").skipNulls(); private final ObjectMapper jsonMapper; + @Inject public CassandraDataSegmentPusher( CassandraDataSegmentConfig config, ObjectMapper jsonMapper) diff --git a/server/src/main/java/com/metamx/druid/metrics/MonitorsConfig.java b/server/src/main/java/com/metamx/druid/metrics/MonitorsConfig.java index 72f8feea6ca..86c351b3a1d 100644 --- a/server/src/main/java/com/metamx/druid/metrics/MonitorsConfig.java +++ b/server/src/main/java/com/metamx/druid/metrics/MonitorsConfig.java @@ -1,10 +1,27 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.metrics; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; -import com.metamx.metrics.JvmMonitor; +import com.google.common.collect.Lists; import com.metamx.metrics.Monitor; -import com.metamx.metrics.SysMonitor; import javax.validation.constraints.NotNull; import java.util.List; @@ -13,12 +30,9 @@ import java.util.List; */ public class MonitorsConfig { - @JsonProperty("monitorExclusions") + @JsonProperty("monitors") @NotNull - private List> monitors = ImmutableList.>builder() - .add(JvmMonitor.class) - .add(SysMonitor.class) - .build(); + private List> monitors = Lists.newArrayList(); public List> getMonitors() { diff --git a/services/pom.xml b/services/pom.xml index 89dd568b967..aee7193de5d 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -42,6 +42,11 @@ druid-server ${project.parent.version} + + com.metamx.druid + druid-examples + ${project.parent.version} + io.airlift airline diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index d5e00e4c7ff..14339320320 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -1,7 +1,27 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package io.druid.cli; import com.google.inject.Injector; import com.metamx.common.logger.Logger; +import com.metamx.druid.client.cache.CacheMonitor; import com.metamx.druid.curator.CuratorModule; import com.metamx.druid.guice.BrokerModule; import com.metamx.druid.guice.HttpClientModule; @@ -42,7 +62,7 @@ public class CliBroker extends ServerRunnable EmitterModule.class, HttpClientModule.global(), CuratorModule.class, - new MetricsModule(), + new MetricsModule().register(CacheMonitor.class), ServerModule.class, new JettyServerModule(new QueryJettyServerInitializer()) .addResource(StatusResource.class), diff --git a/services/src/main/java/io/druid/cli/CliHistorical.java b/services/src/main/java/io/druid/cli/CliHistorical.java index 01bb7484198..04aca3c58ba 100644 --- a/services/src/main/java/io/druid/cli/CliHistorical.java +++ b/services/src/main/java/io/druid/cli/CliHistorical.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package io.druid.cli; import com.google.inject.Injector; @@ -5,12 +24,18 @@ import com.metamx.common.logger.Logger; import com.metamx.druid.coordination.ServerManager; import com.metamx.druid.coordination.ZkCoordinator; import com.metamx.druid.curator.CuratorModule; +import com.metamx.druid.guice.AnnouncerModule; +import com.metamx.druid.guice.DataSegmentPullerModule; +import com.metamx.druid.guice.DataSegmentPusherModule; +import com.metamx.druid.guice.DruidProcessingModule; import com.metamx.druid.guice.HistoricalModule; import com.metamx.druid.guice.HttpClientModule; import com.metamx.druid.guice.LifecycleModule; import com.metamx.druid.guice.QueryRunnerFactoryModule; import com.metamx.druid.guice.QueryableModule; +import com.metamx.druid.guice.S3Module; import com.metamx.druid.guice.ServerModule; +import com.metamx.druid.guice.StorageNodeModule; import com.metamx.druid.http.StatusResource; import com.metamx.druid.initialization.EmitterModule; import com.metamx.druid.initialization.Initialization; @@ -42,8 +67,12 @@ public class CliHistorical extends ServerRunnable EmitterModule.class, HttpClientModule.global(), CuratorModule.class, + AnnouncerModule.class, + DruidProcessingModule.class, + S3Module.class, + DataSegmentPullerModule.class, new MetricsModule().register(ServerMonitor.class), - ServerModule.class, + new StorageNodeModule("historical"), new JettyServerModule(new QueryJettyServerInitializer()) .addResource(StatusResource.class), new QueryableModule(ServerManager.class), diff --git a/services/src/main/java/io/druid/cli/CliRealtime.java b/services/src/main/java/io/druid/cli/CliRealtime.java new file mode 100644 index 00000000000..2defc0a2ae2 --- /dev/null +++ b/services/src/main/java/io/druid/cli/CliRealtime.java @@ -0,0 +1,86 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.cli; + +import com.google.inject.Injector; +import com.metamx.common.logger.Logger; +import com.metamx.druid.DruidProcessingConfig; +import com.metamx.druid.curator.CuratorModule; +import com.metamx.druid.guice.AnnouncerModule; +import com.metamx.druid.guice.DataSegmentPusherModule; +import com.metamx.druid.guice.DbConnectorModule; +import com.metamx.druid.guice.DruidProcessingModule; +import com.metamx.druid.guice.HttpClientModule; +import com.metamx.druid.guice.LifecycleModule; +import com.metamx.druid.guice.QueryRunnerFactoryModule; +import com.metamx.druid.guice.QueryableModule; +import com.metamx.druid.guice.RealtimeModule; +import com.metamx.druid.guice.S3Module; +import com.metamx.druid.guice.ServerModule; +import com.metamx.druid.guice.ServerViewModule; +import com.metamx.druid.guice.StorageNodeModule; +import com.metamx.druid.http.StatusResource; +import com.metamx.druid.initialization.EmitterModule; +import com.metamx.druid.initialization.Initialization; +import com.metamx.druid.initialization.JettyServerModule; +import com.metamx.druid.loading.DataSegmentPusher; +import com.metamx.druid.metrics.MetricsModule; +import com.metamx.druid.realtime.RealtimeManager; +import io.airlift.command.Command; + +/** + */ +@Command( + name = "realtime", + description = "Runs a realtime node, see https://github.com/metamx/druid/wiki/Realtime for a description" +) +public class CliRealtime extends ServerRunnable +{ + private static final Logger log = new Logger(CliBroker.class); + + public CliRealtime() + { + super(log); + } + + @Override + protected Injector getInjector() + { + return Initialization.makeInjector( + new LifecycleModule(), + EmitterModule.class, + DbConnectorModule.class, + HttpClientModule.global(), + CuratorModule.class, + AnnouncerModule.class, + DruidProcessingModule.class, + S3Module.class, + DataSegmentPusherModule.class, + new MetricsModule(), + new StorageNodeModule("realtime"), + new JettyServerModule(new QueryJettyServerInitializer()) + .addResource(StatusResource.class), + new ServerViewModule(), + new QueryableModule(RealtimeManager.class), + new QueryRunnerFactoryModule(), + RealtimeModule.class + ); + } +} diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java new file mode 100644 index 00000000000..1b11ea07e14 --- /dev/null +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -0,0 +1,67 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.cli; + +import com.google.inject.Injector; +import com.metamx.common.logger.Logger; +import com.metamx.druid.guice.DruidProcessingModule; +import com.metamx.druid.guice.LifecycleModule; +import com.metamx.druid.guice.QueryRunnerFactoryModule; +import com.metamx.druid.guice.QueryableModule; +import com.metamx.druid.guice.StorageNodeModule; +import com.metamx.druid.http.StatusResource; +import com.metamx.druid.initialization.EmitterModule; +import com.metamx.druid.initialization.Initialization; +import com.metamx.druid.initialization.JettyServerModule; +import com.metamx.druid.realtime.RealtimeManager; +import druid.examples.guice.RealtimeExampleModule; +import io.airlift.command.Command; + +/** + */ +@Command( + name = "example realtime", + description = "Runs a standalone realtime node for examples, see https://github.com/metamx/druid/wiki/Realtime for a description" +) +public class CliRealtimeExample extends ServerRunnable +{ + private static final Logger log = new Logger(CliBroker.class); + + public CliRealtimeExample() + { + super(log); + } + + @Override + protected Injector getInjector() + { + return Initialization.makeInjector( + new LifecycleModule(), + EmitterModule.class, + DruidProcessingModule.class, + new StorageNodeModule("realtime"), + new JettyServerModule(new QueryJettyServerInitializer()) + .addResource(StatusResource.class), + new QueryableModule(RealtimeManager.class), + new QueryRunnerFactoryModule(), + RealtimeExampleModule.class + ); + } +} diff --git a/services/src/main/java/io/druid/cli/Main.java b/services/src/main/java/io/druid/cli/Main.java index 594deb05af5..d2f290e26c8 100644 --- a/services/src/main/java/io/druid/cli/Main.java +++ b/services/src/main/java/io/druid/cli/Main.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package io.druid.cli; import io.airlift.command.Cli; @@ -19,7 +38,7 @@ public class Main builder.withGroup("server") .withDescription("Run one of the Druid server types.") .withDefaultCommand(Help.class) - .withCommands(CliCoordinator.class, CliHistorical.class, CliBroker.class); + .withCommands(CliCoordinator.class, CliHistorical.class, CliBroker.class, CliRealtime.class, CliRealtimeExample.class); builder.build().parse(args).run(); }