mirror of https://github.com/apache/druid.git
port hadoop druid indexer to new guice framework
This commit is contained in:
parent
9b57b4e1f3
commit
87259321b6
|
@ -57,7 +57,7 @@ public class ConfigProvider<T> implements Provider<T>
|
||||||
private final Class<T> clazz;
|
private final Class<T> clazz;
|
||||||
private final Map<String, String> replacements;
|
private final Map<String, String> replacements;
|
||||||
|
|
||||||
private T object = null;
|
private ConfigurationObjectFactory factory = null;
|
||||||
|
|
||||||
public ConfigProvider(
|
public ConfigProvider(
|
||||||
Class<T> clazz,
|
Class<T> clazz,
|
||||||
|
@ -70,20 +70,21 @@ public class ConfigProvider<T> implements Provider<T>
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public void inject(ConfigurationObjectFactory factory)
|
public void inject(ConfigurationObjectFactory factory)
|
||||||
|
{
|
||||||
|
this.factory = factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T get()
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
// ConfigMagic handles a null replacements
|
// ConfigMagic handles a null replacements
|
||||||
object = factory.buildWithReplacements(clazz, replacements);
|
Preconditions.checkNotNull(factory, "WTF!? Code misconfigured, inject() didn't get called.");
|
||||||
|
return factory.buildWithReplacements(clazz, replacements);
|
||||||
}
|
}
|
||||||
catch (IllegalArgumentException e) {
|
catch (IllegalArgumentException e) {
|
||||||
log.info("Unable to build instance of class[%s]", clazz);
|
log.info("Unable to build instance of class[%s]", clazz);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public T get()
|
|
||||||
{
|
|
||||||
return Preconditions.checkNotNull(object, "WTF!? Code misconfigured, inject() didn't get called.");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,130 +0,0 @@
|
||||||
/*
|
|
||||||
* Druid - a distributed column store.
|
|
||||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
|
||||||
*
|
|
||||||
* This program is free software; you can redistribute it and/or
|
|
||||||
* modify it under the terms of the GNU General Public License
|
|
||||||
* as published by the Free Software Foundation; either version 2
|
|
||||||
* of the License, or (at your option) any later version.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU General Public License for more details.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU General Public License
|
|
||||||
* along with this program; if not, write to the Free Software
|
|
||||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.druid.indexer;
|
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import com.metamx.common.Pair;
|
|
||||||
import com.metamx.common.lifecycle.Lifecycle;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public class HadoopDruidIndexer
|
|
||||||
{
|
|
||||||
public static void main(String[] args) throws Exception
|
|
||||||
{
|
|
||||||
if (args.length < 1 || args.length > 2) {
|
|
||||||
printHelp();
|
|
||||||
System.exit(2);
|
|
||||||
}
|
|
||||||
|
|
||||||
HadoopDruidIndexerNode node = HadoopDruidIndexerNode.builder().build();
|
|
||||||
|
|
||||||
if (args.length == 2) {
|
|
||||||
node.setIntervalSpec(args[0]);
|
|
||||||
}
|
|
||||||
node.setArgumentSpec(args[args.length == 1 ? 0 : 1]);
|
|
||||||
|
|
||||||
Lifecycle lifecycle = new Lifecycle();
|
|
||||||
lifecycle.addManagedInstance(node);
|
|
||||||
|
|
||||||
try {
|
|
||||||
lifecycle.start();
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
Thread.sleep(500);
|
|
||||||
printHelp();
|
|
||||||
System.exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final List<Pair<String, String>> expectedFields =
|
|
||||||
ImmutableList.<Pair<String, String>>builder()
|
|
||||||
.add(Pair.of("dataSource", "Name of dataSource"))
|
|
||||||
.add(Pair.of("timestampColumn", "Column name of the timestamp column"))
|
|
||||||
.add(Pair.of("timestampFormat", "Format name of the timestamp column (posix or iso)"))
|
|
||||||
.add(
|
|
||||||
Pair.of(
|
|
||||||
"dataSpec",
|
|
||||||
"A JSON object with fields "
|
|
||||||
+
|
|
||||||
"format=(json, csv, tsv), "
|
|
||||||
+
|
|
||||||
"columns=JSON array of column names for the delimited text input file (only for csv and tsv formats),"
|
|
||||||
+
|
|
||||||
"dimensions=JSON array of dimensionn names (must match names in columns),"
|
|
||||||
+
|
|
||||||
"delimiter=delimiter of the data (only for tsv format)"
|
|
||||||
)
|
|
||||||
)
|
|
||||||
.add(
|
|
||||||
Pair.of(
|
|
||||||
"granularitySpec",
|
|
||||||
"A JSON object indicating the Granularity that segments should be created at."
|
|
||||||
)
|
|
||||||
)
|
|
||||||
.add(
|
|
||||||
Pair.of(
|
|
||||||
"pathSpec",
|
|
||||||
"A JSON object with fields type=granularity, inputPath, filePattern, dataGranularity"
|
|
||||||
)
|
|
||||||
)
|
|
||||||
.add(
|
|
||||||
Pair.of(
|
|
||||||
"rollupSpec",
|
|
||||||
"JSON object with fields rollupGranularity, aggs=JSON Array of Aggregator specs"
|
|
||||||
)
|
|
||||||
)
|
|
||||||
.add(Pair.of("workingPath", "Path to store intermediate output data. Deleted when finished."))
|
|
||||||
.add(Pair.of("segmentOutputPath", "Path to store output segments."))
|
|
||||||
.add(
|
|
||||||
Pair.of(
|
|
||||||
"updaterJobSpec",
|
|
||||||
"JSON object with fields type=db, connectURI of the database, username, password, and segment table name"
|
|
||||||
)
|
|
||||||
)
|
|
||||||
.add(Pair.of("cleanupOnFailure", "Clean up intermediate files on failure? (default: true)"))
|
|
||||||
.add(Pair.of("leaveIntermediate", "Leave intermediate files. (default: false)"))
|
|
||||||
.add(Pair.of("partitionDimension", "Dimension to partition by (optional)"))
|
|
||||||
.add(
|
|
||||||
Pair.of(
|
|
||||||
"targetPartitionSize",
|
|
||||||
"Integer representing the target number of rows in a partition (required if partitionDimension != null)"
|
|
||||||
)
|
|
||||||
)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
private static void printHelp()
|
|
||||||
{
|
|
||||||
System.out.println("Usage: <java invocation> <config_spec>");
|
|
||||||
System.out.println("<config_spec> is either a JSON object or the path to a file that contains a JSON object.");
|
|
||||||
System.out.println();
|
|
||||||
System.out.println("JSON object description:");
|
|
||||||
System.out.println("{");
|
|
||||||
for (Pair<String, String> expectedField : expectedFields) {
|
|
||||||
System.out.printf(" \"%s\": %s%n", expectedField.lhs, expectedField.rhs);
|
|
||||||
}
|
|
||||||
System.out.println("}");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -97,26 +97,26 @@ public class HadoopDruidIndexerConfig
|
||||||
|
|
||||||
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
|
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
|
||||||
{
|
{
|
||||||
List<Registererer> registererers = Lists.transform(
|
//List<Registererer> registererers = Lists.transform(
|
||||||
MapUtils.getList(argSpec, "registererers", ImmutableList.of()),
|
// MapUtils.getList(argSpec, "registererers", ImmutableList.of()),
|
||||||
new Function<Object, Registererer>()
|
// new Function<Object, Registererer>()
|
||||||
{
|
// {
|
||||||
@Override
|
// @Override
|
||||||
public Registererer apply(@Nullable Object input)
|
// public Registererer apply(@Nullable Object input)
|
||||||
{
|
// {
|
||||||
try {
|
// try {
|
||||||
return (Registererer) Class.forName((String) input).newInstance();
|
// return (Registererer) Class.forName((String) input).newInstance();
|
||||||
}
|
// }
|
||||||
catch (Exception e) {
|
// catch (Exception e) {
|
||||||
throw Throwables.propagate(e);
|
// throw Throwables.propagate(e);
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
);
|
//);
|
||||||
|
|
||||||
if (!registererers.isEmpty()) {
|
//if (!registererers.isEmpty()) {
|
||||||
Registererers.registerHandlers(registererers, Arrays.asList(jsonMapper));
|
// Registererers.registerHandlers(registererers, Arrays.asList(jsonMapper));
|
||||||
}
|
//}
|
||||||
|
|
||||||
return jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);
|
return jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);
|
||||||
}
|
}
|
||||||
|
@ -179,7 +179,7 @@ public class HadoopDruidIndexerConfig
|
||||||
private volatile DataRollupSpec rollupSpec;
|
private volatile DataRollupSpec rollupSpec;
|
||||||
private volatile DbUpdaterJobSpec updaterJobSpec;
|
private volatile DbUpdaterJobSpec updaterJobSpec;
|
||||||
private volatile boolean ignoreInvalidRows = false;
|
private volatile boolean ignoreInvalidRows = false;
|
||||||
private volatile List<String> registererers = Lists.newArrayList();
|
//private volatile List<String> registererers = Lists.newArrayList();
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public HadoopDruidIndexerConfig(
|
public HadoopDruidIndexerConfig(
|
||||||
|
@ -203,8 +203,8 @@ public class HadoopDruidIndexerConfig
|
||||||
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
|
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
|
||||||
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
|
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
|
||||||
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
|
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
|
||||||
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
|
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows
|
||||||
final @JsonProperty("registererers") List<String> registererers
|
//final @JsonProperty("registererers") List<String> registererers
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.dataSource = dataSource;
|
this.dataSource = dataSource;
|
||||||
|
@ -224,7 +224,7 @@ public class HadoopDruidIndexerConfig
|
||||||
this.rollupSpec = rollupSpec;
|
this.rollupSpec = rollupSpec;
|
||||||
this.updaterJobSpec = updaterJobSpec;
|
this.updaterJobSpec = updaterJobSpec;
|
||||||
this.ignoreInvalidRows = ignoreInvalidRows;
|
this.ignoreInvalidRows = ignoreInvalidRows;
|
||||||
this.registererers = registererers;
|
//this.registererers = registererers;
|
||||||
|
|
||||||
if(partitionsSpec != null) {
|
if(partitionsSpec != null) {
|
||||||
Preconditions.checkArgument(
|
Preconditions.checkArgument(
|
||||||
|
@ -517,16 +517,16 @@ public class HadoopDruidIndexerConfig
|
||||||
this.ignoreInvalidRows = ignoreInvalidRows;
|
this.ignoreInvalidRows = ignoreInvalidRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
//@JsonProperty
|
||||||
public List<String> getRegistererers()
|
//public List<String> getRegistererers()
|
||||||
{
|
//{
|
||||||
return registererers;
|
// return registererers;
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
public void setRegistererers(List<String> registererers)
|
//public void setRegistererers(List<String> registererers)
|
||||||
{
|
//{
|
||||||
this.registererers = registererers;
|
// this.registererers = registererers;
|
||||||
}
|
//}
|
||||||
|
|
||||||
/********************************************
|
/********************************************
|
||||||
Granularity/Bucket Helper Methods
|
Granularity/Bucket Helper Methods
|
||||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.indexer;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
import com.google.inject.Inject;
|
||||||
import com.metamx.common.ISE;
|
import com.metamx.common.ISE;
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
import io.druid.timeline.DataSegment;
|
import io.druid.timeline.DataSegment;
|
||||||
|
@ -48,6 +49,7 @@ public class HadoopDruidIndexerJob implements Jobby
|
||||||
private IndexGeneratorJob indexJob;
|
private IndexGeneratorJob indexJob;
|
||||||
private volatile List<DataSegment> publishedSegments = null;
|
private volatile List<DataSegment> publishedSegments = null;
|
||||||
|
|
||||||
|
@Inject
|
||||||
public HadoopDruidIndexerJob(
|
public HadoopDruidIndexerJob(
|
||||||
HadoopDruidIndexerConfig config
|
HadoopDruidIndexerConfig config
|
||||||
)
|
)
|
||||||
|
|
|
@ -359,8 +359,8 @@ public class TaskSerdeTest
|
||||||
false,
|
false,
|
||||||
new DataRollupSpec(ImmutableList.<AggregatorFactory>of(), QueryGranularity.NONE),
|
new DataRollupSpec(ImmutableList.<AggregatorFactory>of(), QueryGranularity.NONE),
|
||||||
null,
|
null,
|
||||||
false,
|
false
|
||||||
ImmutableList.<String>of()
|
//ImmutableList.<String>of()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,8 @@ import io.druid.guice.DruidBinders;
|
||||||
import io.druid.guice.JsonConfigProvider;
|
import io.druid.guice.JsonConfigProvider;
|
||||||
import io.druid.guice.KeyHolder;
|
import io.druid.guice.KeyHolder;
|
||||||
import io.druid.guice.LazySingleton;
|
import io.druid.guice.LazySingleton;
|
||||||
|
import io.druid.guice.LifecycleModule;
|
||||||
|
import io.druid.guice.annotations.Self;
|
||||||
import io.druid.server.DruidNode;
|
import io.druid.server.DruidNode;
|
||||||
import io.druid.server.initialization.CuratorDiscoveryConfig;
|
import io.druid.server.initialization.CuratorDiscoveryConfig;
|
||||||
import org.apache.curator.framework.CuratorFramework;
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
|
@ -118,7 +120,7 @@ public class DiscoveryModule implements Module
|
||||||
*/
|
*/
|
||||||
public static void registerKey(Binder binder, Key<DruidNode> key)
|
public static void registerKey(Binder binder, Key<DruidNode> key)
|
||||||
{
|
{
|
||||||
DruidBinders.discoveryAnnouncementBinder(binder).addBinding().toInstance(new KeyHolder<>(key));
|
LifecycleModule.registerKey(binder, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -134,7 +136,7 @@ public class DiscoveryModule implements Module
|
||||||
// We bind this eagerly so that it gets instantiated and registers stuff with Lifecycle as a side-effect
|
// We bind this eagerly so that it gets instantiated and registers stuff with Lifecycle as a side-effect
|
||||||
binder.bind(ServiceAnnouncer.class)
|
binder.bind(ServiceAnnouncer.class)
|
||||||
.to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME)))
|
.to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME)))
|
||||||
.asEagerSingleton();
|
.in(LazySingleton.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Provides
|
@Provides
|
||||||
|
|
|
@ -0,0 +1,59 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.cli;
|
||||||
|
|
||||||
|
import com.google.inject.Injector;
|
||||||
|
import com.google.inject.servlet.GuiceFilter;
|
||||||
|
import io.druid.server.QueryServlet;
|
||||||
|
import io.druid.server.initialization.JettyServerInitializer;
|
||||||
|
import org.eclipse.jetty.server.Handler;
|
||||||
|
import org.eclipse.jetty.server.Server;
|
||||||
|
import org.eclipse.jetty.server.handler.DefaultHandler;
|
||||||
|
import org.eclipse.jetty.server.handler.HandlerList;
|
||||||
|
import org.eclipse.jetty.servlet.DefaultServlet;
|
||||||
|
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||||
|
import org.eclipse.jetty.servlet.ServletHolder;
|
||||||
|
import org.eclipse.jetty.servlets.GzipFilter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class BrokerJettyServerInitializer implements JettyServerInitializer
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void initialize(Server server, Injector injector)
|
||||||
|
{
|
||||||
|
final ServletContextHandler resources = new ServletContextHandler(ServletContextHandler.SESSIONS);
|
||||||
|
resources.addServlet(new ServletHolder(new DefaultServlet()), "/druid/v2/datasources/*");
|
||||||
|
resources.addFilter(GuiceFilter.class, "/druid/v2/datasources/*", null);
|
||||||
|
|
||||||
|
final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS);
|
||||||
|
queries.setResourceBase("/");
|
||||||
|
queries.addServlet(new ServletHolder(injector.getInstance(QueryServlet.class)), "/druid/v2/*");
|
||||||
|
|
||||||
|
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
|
||||||
|
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
|
||||||
|
root.addFilter(GzipFilter.class, "/*", null);
|
||||||
|
root.addFilter(GuiceFilter.class, "/*", null);
|
||||||
|
|
||||||
|
final HandlerList handlerList = new HandlerList();
|
||||||
|
handlerList.setHandlers(new Handler[]{resources, queries, root, new DefaultHandler()});
|
||||||
|
server.setHandler(handlerList);
|
||||||
|
}
|
||||||
|
}
|
|
@ -21,9 +21,7 @@ package io.druid.cli;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.inject.Binder;
|
import com.google.inject.Binder;
|
||||||
import com.google.inject.Injector;
|
|
||||||
import com.google.inject.Module;
|
import com.google.inject.Module;
|
||||||
import com.google.inject.servlet.GuiceFilter;
|
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
import io.airlift.command.Command;
|
import io.airlift.command.Command;
|
||||||
import io.druid.client.BrokerServerView;
|
import io.druid.client.BrokerServerView;
|
||||||
|
@ -44,18 +42,9 @@ import io.druid.query.QuerySegmentWalker;
|
||||||
import io.druid.query.QueryToolChestWarehouse;
|
import io.druid.query.QueryToolChestWarehouse;
|
||||||
import io.druid.server.ClientInfoResource;
|
import io.druid.server.ClientInfoResource;
|
||||||
import io.druid.server.ClientQuerySegmentWalker;
|
import io.druid.server.ClientQuerySegmentWalker;
|
||||||
import io.druid.server.QueryServlet;
|
|
||||||
import io.druid.server.StatusResource;
|
|
||||||
import io.druid.server.initialization.JettyServerInitializer;
|
import io.druid.server.initialization.JettyServerInitializer;
|
||||||
import io.druid.server.metrics.MetricsModule;
|
import io.druid.server.metrics.MetricsModule;
|
||||||
import org.eclipse.jetty.server.Handler;
|
|
||||||
import org.eclipse.jetty.server.Server;
|
import org.eclipse.jetty.server.Server;
|
||||||
import org.eclipse.jetty.server.handler.DefaultHandler;
|
|
||||||
import org.eclipse.jetty.server.handler.HandlerList;
|
|
||||||
import org.eclipse.jetty.servlet.DefaultServlet;
|
|
||||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
|
||||||
import org.eclipse.jetty.servlet.ServletHolder;
|
|
||||||
import org.eclipse.jetty.servlets.GzipFilter;
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -103,28 +92,4 @@ public class CliBroker extends ServerRunnable
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class BrokerJettyServerInitializer implements JettyServerInitializer
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void initialize(Server server, Injector injector)
|
|
||||||
{
|
|
||||||
final ServletContextHandler resources = new ServletContextHandler(ServletContextHandler.SESSIONS);
|
|
||||||
resources.addServlet(new ServletHolder(new DefaultServlet()), "/druid/v2/datasources/*");
|
|
||||||
resources.addFilter(GuiceFilter.class, "/druid/v2/datasources/*", null);
|
|
||||||
|
|
||||||
final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS);
|
|
||||||
queries.setResourceBase("/");
|
|
||||||
queries.addServlet(new ServletHolder(injector.getInstance(QueryServlet.class)), "/druid/v2/*");
|
|
||||||
|
|
||||||
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
|
|
||||||
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
|
|
||||||
root.addFilter(GzipFilter.class, "/*", null);
|
|
||||||
root.addFilter(GuiceFilter.class, "/*", null);
|
|
||||||
|
|
||||||
final HandlerList handlerList = new HandlerList();
|
|
||||||
handlerList.setHandlers(new Handler[]{resources, queries, root, new DefaultHandler()});
|
|
||||||
server.setHandler(handlerList);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,157 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.cli;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.io.CharStreams;
|
||||||
|
import com.google.common.io.InputSupplier;
|
||||||
|
import com.google.inject.Binder;
|
||||||
|
import com.google.inject.Injector;
|
||||||
|
import com.google.inject.Module;
|
||||||
|
import com.google.inject.Provides;
|
||||||
|
import com.metamx.common.lifecycle.Lifecycle;
|
||||||
|
import com.metamx.common.logger.Logger;
|
||||||
|
import com.metamx.emitter.core.LoggingEmitter;
|
||||||
|
import com.metamx.emitter.core.LoggingEmitterConfig;
|
||||||
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
|
import io.airlift.command.Arguments;
|
||||||
|
import io.airlift.command.Command;
|
||||||
|
import io.druid.guice.LazySingleton;
|
||||||
|
import io.druid.guice.ManageLifecycle;
|
||||||
|
import io.druid.indexer.HadoopDruidIndexerConfig;
|
||||||
|
import io.druid.indexer.HadoopDruidIndexerJob;
|
||||||
|
import io.druid.initialization.LogLevelAdjuster;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
@Command(
|
||||||
|
name = "hadoop",
|
||||||
|
description = "Runs the batch Hadoop Druid Indexer, see LINK GOES HERE for a description."
|
||||||
|
)
|
||||||
|
public class CliHadoopIndexer extends GuiceRunnable
|
||||||
|
{
|
||||||
|
@Arguments(description = "A JSON object or the path to a file that contains a JSON object")
|
||||||
|
private String argumentSpec;
|
||||||
|
|
||||||
|
private static final Logger log = new Logger(CliHadoopIndexer.class);
|
||||||
|
|
||||||
|
public CliHadoopIndexer()
|
||||||
|
{
|
||||||
|
super(log);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<Object> getModules()
|
||||||
|
{
|
||||||
|
return ImmutableList.<Object>of(
|
||||||
|
new Module()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void configure(Binder binder)
|
||||||
|
{
|
||||||
|
binder.bind(HadoopDruidIndexerJob.class).in(LazySingleton.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Provides
|
||||||
|
@LazySingleton
|
||||||
|
public HadoopDruidIndexerConfig getHadoopDruidIndexerConfig()
|
||||||
|
{
|
||||||
|
Preconditions.checkNotNull(argumentSpec, "argumentSpec");
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (argumentSpec.startsWith("{")) {
|
||||||
|
return HadoopDruidIndexerConfig.fromString(argumentSpec);
|
||||||
|
} else if (argumentSpec.startsWith("s3://")) {
|
||||||
|
final Path s3nPath = new Path(String.format("s3n://%s", argumentSpec.substring("s3://".length())));
|
||||||
|
final FileSystem fs = s3nPath.getFileSystem(new Configuration());
|
||||||
|
|
||||||
|
String configString = CharStreams.toString(
|
||||||
|
new InputSupplier<InputStreamReader>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public InputStreamReader getInput() throws IOException
|
||||||
|
{
|
||||||
|
return new InputStreamReader(fs.open(s3nPath));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
return HadoopDruidIndexerConfig.fromString(configString);
|
||||||
|
} else {
|
||||||
|
return HadoopDruidIndexerConfig.fromFile(new File(argumentSpec));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
LogLevelAdjuster.register();
|
||||||
|
|
||||||
|
final Injector injector = Initialization.makeInjectorWithModules(
|
||||||
|
getBaseInjector(), getModules()
|
||||||
|
);
|
||||||
|
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
|
||||||
|
final HadoopDruidIndexerJob job = injector.getInstance(HadoopDruidIndexerJob.class);
|
||||||
|
|
||||||
|
try {
|
||||||
|
lifecycle.start();
|
||||||
|
}
|
||||||
|
catch (Throwable t) {
|
||||||
|
log.error(t, "Error when starting up. Failing.");
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
job.run();
|
||||||
|
|
||||||
|
try {
|
||||||
|
lifecycle.stop();
|
||||||
|
}
|
||||||
|
catch (Throwable t) {
|
||||||
|
log.error(t, "Error when stopping. Failing.");
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw com.google.common.base.Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,76 @@
|
||||||
|
package io.druid.cli;
|
||||||
|
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
import com.google.inject.Injector;
|
||||||
|
import com.metamx.common.lifecycle.Lifecycle;
|
||||||
|
import com.metamx.common.logger.Logger;
|
||||||
|
import io.druid.initialization.LogLevelAdjuster;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public abstract class GuiceRunnable implements Runnable
|
||||||
|
{
|
||||||
|
private final Logger log;
|
||||||
|
|
||||||
|
private Injector baseInjector;
|
||||||
|
|
||||||
|
public GuiceRunnable(Logger log)
|
||||||
|
{
|
||||||
|
this.log = log;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public void configure(Injector injector)
|
||||||
|
{
|
||||||
|
this.baseInjector = injector;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Injector getBaseInjector()
|
||||||
|
{
|
||||||
|
return baseInjector;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract List<Object> getModules();
|
||||||
|
|
||||||
|
public Injector makeInjector()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
return Initialization.makeInjectorWithModules(
|
||||||
|
baseInjector, getModules()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Lifecycle initLifecycle(Injector injector)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
LogLevelAdjuster.register();
|
||||||
|
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
|
||||||
|
|
||||||
|
try {
|
||||||
|
lifecycle.start();
|
||||||
|
}
|
||||||
|
catch (Throwable t) {
|
||||||
|
log.error(t, "Error when starting up. Failing.");
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
return lifecycle;
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
initLifecycle(makeInjector());
|
||||||
|
}
|
||||||
|
}
|
|
@ -68,6 +68,11 @@ public class Main
|
||||||
.withDefaultCommand(Help.class)
|
.withDefaultCommand(Help.class)
|
||||||
.withCommands(ConvertProperties.class);
|
.withCommands(ConvertProperties.class);
|
||||||
|
|
||||||
|
builder.withGroup("index")
|
||||||
|
.withDescription("Run indexing for druid")
|
||||||
|
.withDefaultCommand(Help.class)
|
||||||
|
.withCommands(CliHadoopIndexer.class);
|
||||||
|
|
||||||
builder.withGroup("internal")
|
builder.withGroup("internal")
|
||||||
.withDescription("Processes that Druid runs \"internally\", you should rarely use these directly")
|
.withDescription("Processes that Druid runs \"internally\", you should rarely use these directly")
|
||||||
.withDefaultCommand(Help.class)
|
.withDefaultCommand(Help.class)
|
||||||
|
|
|
@ -20,54 +20,26 @@
|
||||||
package io.druid.cli;
|
package io.druid.cli;
|
||||||
|
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.inject.Inject;
|
|
||||||
import com.google.inject.Injector;
|
import com.google.inject.Injector;
|
||||||
import com.metamx.common.lifecycle.Lifecycle;
|
import com.metamx.common.lifecycle.Lifecycle;
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
import io.druid.initialization.LogLevelAdjuster;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public abstract class ServerRunnable implements Runnable
|
public abstract class ServerRunnable extends GuiceRunnable
|
||||||
{
|
{
|
||||||
private final Logger log;
|
|
||||||
|
|
||||||
private Injector baseInjector;
|
|
||||||
|
|
||||||
public ServerRunnable(Logger log)
|
public ServerRunnable(Logger log)
|
||||||
{
|
{
|
||||||
this.log = log;
|
super(log);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Inject
|
|
||||||
public void configure(Injector injector)
|
|
||||||
{
|
|
||||||
this.baseInjector = injector;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected abstract List<Object> getModules();
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run()
|
public void run()
|
||||||
{
|
{
|
||||||
try {
|
final Injector injector = makeInjector();
|
||||||
LogLevelAdjuster.register();
|
final Lifecycle lifecycle = initLifecycle(injector);
|
||||||
|
|
||||||
final Injector injector = Initialization.makeInjectorWithModules(
|
|
||||||
baseInjector, getModules()
|
|
||||||
);
|
|
||||||
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
lifecycle.start();
|
|
||||||
}
|
|
||||||
catch (Throwable t) {
|
|
||||||
log.error(t, "Error when starting up. Failing.");
|
|
||||||
System.exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
lifecycle.join();
|
lifecycle.join();
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
|
|
Loading…
Reference in New Issue