Merge pull request #248 from metamx/guice-hi

Port Hadoop Batch Indexer to new Guice framwork
This commit is contained in:
cheddar 2013-09-26 13:20:15 -07:00
commit a3e16335ef
12 changed files with 290 additions and 245 deletions

View File

@ -57,7 +57,7 @@ public class ConfigProvider<T> implements Provider<T>
private final Class<T> clazz;
private final Map<String, String> replacements;
private T object = null;
private ConfigurationObjectFactory factory = null;
public ConfigProvider(
Class<T> clazz,
@ -70,20 +70,21 @@ public class ConfigProvider<T> implements Provider<T>
@Inject
public void inject(ConfigurationObjectFactory factory)
{
this.factory = factory;
}
@Override
public T get()
{
try {
// ConfigMagic handles a null replacements
object = factory.buildWithReplacements(clazz, replacements);
Preconditions.checkNotNull(factory, "WTF!? Code misconfigured, inject() didn't get called.");
return factory.buildWithReplacements(clazz, replacements);
}
catch (IllegalArgumentException e) {
log.info("Unable to build instance of class[%s]", clazz);
throw e;
}
}
@Override
public T get()
{
return Preconditions.checkNotNull(object, "WTF!? Code misconfigured, inject() didn't get called.");
}
}

View File

@ -1,130 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer;
import com.google.common.collect.ImmutableList;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.Lifecycle;
import java.util.List;
/**
*/
@Deprecated
public class HadoopDruidIndexer
{
public static void main(String[] args) throws Exception
{
if (args.length < 1 || args.length > 2) {
printHelp();
System.exit(2);
}
HadoopDruidIndexerNode node = HadoopDruidIndexerNode.builder().build();
if (args.length == 2) {
node.setIntervalSpec(args[0]);
}
node.setArgumentSpec(args[args.length == 1 ? 0 : 1]);
Lifecycle lifecycle = new Lifecycle();
lifecycle.addManagedInstance(node);
try {
lifecycle.start();
}
catch (Exception e) {
e.printStackTrace();
Thread.sleep(500);
printHelp();
System.exit(1);
}
}
private static final List<Pair<String, String>> expectedFields =
ImmutableList.<Pair<String, String>>builder()
.add(Pair.of("dataSource", "Name of dataSource"))
.add(Pair.of("timestampColumn", "Column name of the timestamp column"))
.add(Pair.of("timestampFormat", "Format name of the timestamp column (posix or iso)"))
.add(
Pair.of(
"dataSpec",
"A JSON object with fields "
+
"format=(json, csv, tsv), "
+
"columns=JSON array of column names for the delimited text input file (only for csv and tsv formats),"
+
"dimensions=JSON array of dimensionn names (must match names in columns),"
+
"delimiter=delimiter of the data (only for tsv format)"
)
)
.add(
Pair.of(
"granularitySpec",
"A JSON object indicating the Granularity that segments should be created at."
)
)
.add(
Pair.of(
"pathSpec",
"A JSON object with fields type=granularity, inputPath, filePattern, dataGranularity"
)
)
.add(
Pair.of(
"rollupSpec",
"JSON object with fields rollupGranularity, aggs=JSON Array of Aggregator specs"
)
)
.add(Pair.of("workingPath", "Path to store intermediate output data. Deleted when finished."))
.add(Pair.of("segmentOutputPath", "Path to store output segments."))
.add(
Pair.of(
"updaterJobSpec",
"JSON object with fields type=db, connectURI of the database, username, password, and segment table name"
)
)
.add(Pair.of("cleanupOnFailure", "Clean up intermediate files on failure? (default: true)"))
.add(Pair.of("leaveIntermediate", "Leave intermediate files. (default: false)"))
.add(Pair.of("partitionDimension", "Dimension to partition by (optional)"))
.add(
Pair.of(
"targetPartitionSize",
"Integer representing the target number of rows in a partition (required if partitionDimension != null)"
)
)
.build();
private static void printHelp()
{
System.out.println("Usage: <java invocation> <config_spec>");
System.out.println("<config_spec> is either a JSON object or the path to a file that contains a JSON object.");
System.out.println();
System.out.println("JSON object description:");
System.out.println("{");
for (Pair<String, String> expectedField : expectedFields) {
System.out.printf(" \"%s\": %s%n", expectedField.lhs, expectedField.rhs);
}
System.out.println("}");
}
}

View File

@ -97,27 +97,6 @@ public class HadoopDruidIndexerConfig
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
{
List<Registererer> registererers = Lists.transform(
MapUtils.getList(argSpec, "registererers", ImmutableList.of()),
new Function<Object, Registererer>()
{
@Override
public Registererer apply(@Nullable Object input)
{
try {
return (Registererer) Class.forName((String) input).newInstance();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
if (!registererers.isEmpty()) {
Registererers.registerHandlers(registererers, Arrays.asList(jsonMapper));
}
return jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);
}
@ -179,7 +158,6 @@ public class HadoopDruidIndexerConfig
private volatile DataRollupSpec rollupSpec;
private volatile DbUpdaterJobSpec updaterJobSpec;
private volatile boolean ignoreInvalidRows = false;
private volatile List<String> registererers = Lists.newArrayList();
@JsonCreator
public HadoopDruidIndexerConfig(
@ -203,8 +181,7 @@ public class HadoopDruidIndexerConfig
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @JsonProperty("registererers") List<String> registererers
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows
)
{
this.dataSource = dataSource;
@ -224,7 +201,6 @@ public class HadoopDruidIndexerConfig
this.rollupSpec = rollupSpec;
this.updaterJobSpec = updaterJobSpec;
this.ignoreInvalidRows = ignoreInvalidRows;
this.registererers = registererers;
if(partitionsSpec != null) {
Preconditions.checkArgument(
@ -517,17 +493,6 @@ public class HadoopDruidIndexerConfig
this.ignoreInvalidRows = ignoreInvalidRows;
}
@JsonProperty
public List<String> getRegistererers()
{
return registererers;
}
public void setRegistererers(List<String> registererers)
{
this.registererers = registererers;
}
/********************************************
Granularity/Bucket Helper Methods
********************************************/

View File

@ -22,6 +22,7 @@ package io.druid.indexer;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.timeline.DataSegment;
@ -48,6 +49,7 @@ public class HadoopDruidIndexerJob implements Jobby
private IndexGeneratorJob indexJob;
private volatile List<DataSegment> publishedSegments = null;
@Inject
public HadoopDruidIndexerJob(
HadoopDruidIndexerConfig config
)

View File

@ -359,8 +359,7 @@ public class TaskSerdeTest
false,
new DataRollupSpec(ImmutableList.<AggregatorFactory>of(), QueryGranularity.NONE),
null,
false,
ImmutableList.<String>of()
false
)
);

View File

@ -35,6 +35,8 @@ import io.druid.guice.DruidBinders;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.KeyHolder;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode;
import io.druid.server.initialization.CuratorDiscoveryConfig;
import org.apache.curator.framework.CuratorFramework;
@ -119,6 +121,7 @@ public class DiscoveryModule implements Module
public static void registerKey(Binder binder, Key<DruidNode> key)
{
DruidBinders.discoveryAnnouncementBinder(binder).addBinding().toInstance(new KeyHolder<>(key));
LifecycleModule.registerKey(binder, key);
}
@Override
@ -134,7 +137,7 @@ public class DiscoveryModule implements Module
// We bind this eagerly so that it gets instantiated and registers stuff with Lifecycle as a side-effect
binder.bind(ServiceAnnouncer.class)
.to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME)))
.asEagerSingleton();
.in(LazySingleton.class);
}
@Provides

View File

@ -0,0 +1,59 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.cli;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import io.druid.server.QueryServlet;
import io.druid.server.initialization.JettyServerInitializer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
/**
*/
public class BrokerJettyServerInitializer implements JettyServerInitializer
{
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler resources = new ServletContextHandler(ServletContextHandler.SESSIONS);
resources.addServlet(new ServletHolder(new DefaultServlet()), "/druid/v2/datasources/*");
resources.addFilter(GuiceFilter.class, "/druid/v2/datasources/*", null);
final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS);
queries.setResourceBase("/");
queries.addServlet(new ServletHolder(injector.getInstance(QueryServlet.class)), "/druid/v2/*");
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{resources, queries, root, new DefaultHandler()});
server.setHandler(handlerList);
}
}

View File

@ -21,9 +21,7 @@ package io.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.client.BrokerServerView;
@ -44,18 +42,9 @@ import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.ClientInfoResource;
import io.druid.server.ClientQuerySegmentWalker;
import io.druid.server.QueryServlet;
import io.druid.server.StatusResource;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.metrics.MetricsModule;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
import java.util.List;
@ -103,28 +92,4 @@ public class CliBroker extends ServerRunnable
}
);
}
private static class BrokerJettyServerInitializer implements JettyServerInitializer
{
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler resources = new ServletContextHandler(ServletContextHandler.SESSIONS);
resources.addServlet(new ServletHolder(new DefaultServlet()), "/druid/v2/datasources/*");
resources.addFilter(GuiceFilter.class, "/druid/v2/datasources/*", null);
final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS);
queries.setResourceBase("/");
queries.addServlet(new ServletHolder(injector.getInstance(QueryServlet.class)), "/druid/v2/*");
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{resources, queries, root, new DefaultHandler()});
server.setHandler(handlerList);
}
}
}

View File

@ -0,0 +1,139 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.cli;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.io.CharStreams;
import com.google.common.io.InputSupplier;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import io.airlift.command.Arguments;
import io.airlift.command.Command;
import io.druid.guice.LazySingleton;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
/**
*/
@Command(
name = "hadoop",
description = "Runs the batch Hadoop Druid Indexer, see https://github.com/metamx/druid/wiki/Batch-ingestion for a description."
)
public class CliHadoopIndexer extends GuiceRunnable
{
@Arguments(description = "A JSON object or the path to a file that contains a JSON object")
private String argumentSpec;
private static final Logger log = new Logger(CliHadoopIndexer.class);
public CliHadoopIndexer()
{
super(log);
}
@Override
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(HadoopDruidIndexerJob.class).in(LazySingleton.class);
}
@Provides
@LazySingleton
public HadoopDruidIndexerConfig getHadoopDruidIndexerConfig()
{
Preconditions.checkNotNull(argumentSpec, "argumentSpec");
try {
if (argumentSpec.startsWith("{")) {
return HadoopDruidIndexerConfig.fromString(argumentSpec);
} else if (argumentSpec.startsWith("s3://")) {
final Path s3nPath = new Path(String.format("s3n://%s", argumentSpec.substring("s3://".length())));
final FileSystem fs = s3nPath.getFileSystem(new Configuration());
String configString = CharStreams.toString(
new InputSupplier<InputStreamReader>()
{
@Override
public InputStreamReader getInput() throws IOException
{
return new InputStreamReader(fs.open(s3nPath));
}
}
);
return HadoopDruidIndexerConfig.fromString(configString);
} else {
return HadoopDruidIndexerConfig.fromFile(new File(argumentSpec));
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
@Override
public void run()
{
try {
Injector injector = makeInjector();
final HadoopDruidIndexerJob job = injector.getInstance(HadoopDruidIndexerJob.class);
Lifecycle lifecycle = initLifecycle(injector);
job.run();
try {
lifecycle.stop();
}
catch (Throwable t) {
log.error(t, "Error when stopping. Failing.");
System.exit(1);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -0,0 +1,65 @@
package io.druid.cli;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import io.druid.initialization.LogLevelAdjuster;
import java.util.List;
/**
*/
public abstract class GuiceRunnable implements Runnable
{
private final Logger log;
private Injector baseInjector;
public GuiceRunnable(Logger log)
{
this.log = log;
}
@Inject
public void configure(Injector injector)
{
this.baseInjector = injector;
}
protected abstract List<Object> getModules();
public Injector makeInjector()
{
try {
return Initialization.makeInjectorWithModules(
baseInjector, getModules()
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public Lifecycle initLifecycle(Injector injector)
{
try {
LogLevelAdjuster.register();
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
try {
lifecycle.start();
}
catch (Throwable t) {
log.error(t, "Error when starting up. Failing.");
System.exit(1);
}
return lifecycle;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -68,6 +68,11 @@ public class Main
.withDefaultCommand(Help.class)
.withCommands(ConvertProperties.class);
builder.withGroup("index")
.withDescription("Run indexing for druid")
.withDefaultCommand(Help.class)
.withCommands(CliHadoopIndexer.class);
builder.withGroup("internal")
.withDescription("Processes that Druid runs \"internally\", you should rarely use these directly")
.withDefaultCommand(Help.class)

View File

@ -20,54 +20,26 @@
package io.druid.cli;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import io.druid.initialization.LogLevelAdjuster;
import java.util.List;
/**
*/
public abstract class ServerRunnable implements Runnable
public abstract class ServerRunnable extends GuiceRunnable
{
private final Logger log;
private Injector baseInjector;
public ServerRunnable(Logger log)
{
this.log = log;
super(log);
}
@Inject
public void configure(Injector injector)
{
this.baseInjector = injector;
}
protected abstract List<Object> getModules();
@Override
public void run()
{
try {
LogLevelAdjuster.register();
final Injector injector = Initialization.makeInjectorWithModules(
baseInjector, getModules()
);
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
final Injector injector = makeInjector();
final Lifecycle lifecycle = initLifecycle(injector);
try {
lifecycle.start();
}
catch (Throwable t) {
log.error(t, "Error when starting up. Failing.");
System.exit(1);
}
lifecycle.join();
}
catch (Exception e) {