mirror of https://github.com/apache/druid.git
Merge pull request #1367 from metamx/hadoopTaskAbstraction
Abstractify HadoopTask
This commit is contained in:
commit
fae86e83ad
|
@ -21,17 +21,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.inject.Injector;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.common.utils.JodaUtils;
|
||||
import io.druid.guice.ExtensionsConfig;
|
||||
import io.druid.guice.GuiceInjectors;
|
||||
import io.druid.indexer.HadoopDruidDetermineConfigurationJob;
|
||||
import io.druid.indexer.HadoopDruidIndexerConfig;
|
||||
import io.druid.indexer.HadoopDruidIndexerJob;
|
||||
|
@ -44,30 +39,16 @@ import io.druid.indexing.common.TaskToolbox;
|
|||
import io.druid.indexing.common.actions.LockAcquireAction;
|
||||
import io.druid.indexing.common.actions.LockTryAcquireAction;
|
||||
import io.druid.indexing.common.actions.TaskActionClient;
|
||||
import io.druid.initialization.Initialization;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.tesla.aether.internal.DefaultTeslaAether;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.File;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.SortedSet;
|
||||
|
||||
public class HadoopIndexTask extends AbstractTask
|
||||
public class HadoopIndexTask extends HadoopTask
|
||||
{
|
||||
private static final Logger log = new Logger(HadoopIndexTask.class);
|
||||
private static final ExtensionsConfig extensionsConfig;
|
||||
|
||||
final static Injector injector = GuiceInjectors.makeStartupInjector();
|
||||
|
||||
static {
|
||||
extensionsConfig = injector.getInstance(ExtensionsConfig.class);
|
||||
}
|
||||
|
||||
private static String getTheDataSource(HadoopIngestionSpec spec)
|
||||
{
|
||||
|
@ -77,8 +58,6 @@ public class HadoopIndexTask extends AbstractTask
|
|||
@JsonIgnore
|
||||
private final HadoopIngestionSpec spec;
|
||||
@JsonIgnore
|
||||
private final List<String> hadoopDependencyCoordinates;
|
||||
@JsonIgnore
|
||||
private final String classpathPrefix;
|
||||
|
||||
/**
|
||||
|
@ -102,7 +81,10 @@ public class HadoopIndexTask extends AbstractTask
|
|||
{
|
||||
super(
|
||||
id != null ? id : String.format("index_hadoop_%s_%s", getTheDataSource(spec), new DateTime()),
|
||||
getTheDataSource(spec)
|
||||
getTheDataSource(spec),
|
||||
hadoopDependencyCoordinates == null
|
||||
? (hadoopCoordinates == null ? null : ImmutableList.of(hadoopCoordinates))
|
||||
: hadoopDependencyCoordinates
|
||||
);
|
||||
|
||||
|
||||
|
@ -119,15 +101,6 @@ public class HadoopIndexTask extends AbstractTask
|
|||
"metadataUpdateSpec must be absent"
|
||||
);
|
||||
|
||||
if (hadoopDependencyCoordinates != null) {
|
||||
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates;
|
||||
} else if (hadoopCoordinates != null) {
|
||||
this.hadoopDependencyCoordinates = ImmutableList.of(hadoopCoordinates);
|
||||
} else {
|
||||
// Will be defaulted to something at runtime, based on taskConfig.
|
||||
this.hadoopDependencyCoordinates = null;
|
||||
}
|
||||
|
||||
this.classpathPrefix = classpathPrefix;
|
||||
}
|
||||
|
||||
|
@ -162,7 +135,7 @@ public class HadoopIndexTask extends AbstractTask
|
|||
@JsonProperty
|
||||
public List<String> getHadoopDependencyCoordinates()
|
||||
{
|
||||
return hadoopDependencyCoordinates;
|
||||
return super.getHadoopDependencyCoordinates();
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -176,58 +149,22 @@ public class HadoopIndexTask extends AbstractTask
|
|||
@Override
|
||||
public TaskStatus run(TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
final List<String> finalHadoopDependencyCoordinates = hadoopDependencyCoordinates != null
|
||||
? hadoopDependencyCoordinates
|
||||
: toolbox.getConfig().getDefaultHadoopCoordinates();
|
||||
|
||||
final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig);
|
||||
|
||||
final List<URL> extensionURLs = Lists.newArrayList();
|
||||
for (String coordinate : extensionsConfig.getCoordinates()) {
|
||||
final ClassLoader coordinateLoader = Initialization.getClassLoaderForCoordinates(
|
||||
aetherClient, coordinate, extensionsConfig.getDefaultVersion()
|
||||
);
|
||||
extensionURLs.addAll(Arrays.asList(((URLClassLoader) coordinateLoader).getURLs()));
|
||||
}
|
||||
|
||||
final List<URL> nonHadoopURLs = Lists.newArrayList();
|
||||
nonHadoopURLs.addAll(Arrays.asList(((URLClassLoader) HadoopIndexTask.class.getClassLoader()).getURLs()));
|
||||
|
||||
final List<URL> driverURLs = Lists.newArrayList();
|
||||
driverURLs.addAll(nonHadoopURLs);
|
||||
// put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts
|
||||
for (String hadoopDependencyCoordinate : finalHadoopDependencyCoordinates) {
|
||||
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
|
||||
aetherClient, hadoopDependencyCoordinate, extensionsConfig.getDefaultVersion()
|
||||
);
|
||||
driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs()));
|
||||
}
|
||||
|
||||
final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null);
|
||||
Thread.currentThread().setContextClassLoader(loader);
|
||||
|
||||
final List<URL> jobUrls = Lists.newArrayList();
|
||||
jobUrls.addAll(nonHadoopURLs);
|
||||
jobUrls.addAll(extensionURLs);
|
||||
|
||||
System.setProperty("druid.hadoop.internal.classpath", Joiner.on(File.pathSeparator).join(jobUrls));
|
||||
final ClassLoader loader = buildClassLoader(toolbox);
|
||||
boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();
|
||||
|
||||
final Class<?> determineConfigurationMainClass = loader.loadClass(HadoopDetermineConfigInnerProcessing.class.getName());
|
||||
final Method determineConfigurationMainMethod = determineConfigurationMainClass.getMethod(
|
||||
"runTask",
|
||||
String[].class
|
||||
final String config = invokeForeignLoader(
|
||||
"io.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessing",
|
||||
new String[]{
|
||||
toolbox.getObjectMapper().writeValueAsString(spec),
|
||||
toolbox.getConfig().getHadoopWorkingPath(),
|
||||
toolbox.getSegmentPusher().getPathForHadoop(getDataSource())
|
||||
},
|
||||
loader
|
||||
);
|
||||
|
||||
String[] determineConfigArgs = new String[]{
|
||||
toolbox.getObjectMapper().writeValueAsString(spec),
|
||||
toolbox.getConfig().getHadoopWorkingPath(),
|
||||
toolbox.getSegmentPusher().getPathForHadoop(getDataSource())
|
||||
};
|
||||
|
||||
String config = (String) determineConfigurationMainMethod.invoke(null, new Object[]{determineConfigArgs});
|
||||
HadoopIngestionSpec indexerSchema = toolbox.getObjectMapper()
|
||||
.readValue(config, HadoopIngestionSpec.class);
|
||||
final HadoopIngestionSpec indexerSchema = toolbox
|
||||
.getObjectMapper()
|
||||
.readValue(config, HadoopIngestionSpec.class);
|
||||
|
||||
|
||||
// We should have a lock from before we started running only if interval was specified
|
||||
|
@ -245,19 +182,19 @@ public class HadoopIndexTask extends AbstractTask
|
|||
final TaskLock myLock = Iterables.getOnlyElement(locks);
|
||||
version = myLock.getVersion();
|
||||
}
|
||||
|
||||
log.info("Setting version to: %s", version);
|
||||
|
||||
final Class<?> indexGeneratorMainClass = loader.loadClass(HadoopIndexGeneratorInnerProcessing.class.getName());
|
||||
final Method indexGeneratorMainMethod = indexGeneratorMainClass.getMethod("runTask", String[].class);
|
||||
String[] indexGeneratorArgs = new String[]{
|
||||
toolbox.getObjectMapper().writeValueAsString(indexerSchema),
|
||||
version
|
||||
};
|
||||
String segments = (String) indexGeneratorMainMethod.invoke(null, new Object[]{indexGeneratorArgs});
|
||||
|
||||
final String segments = invokeForeignLoader(
|
||||
"io.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessing",
|
||||
new String[]{
|
||||
toolbox.getObjectMapper().writeValueAsString(indexerSchema),
|
||||
version
|
||||
},
|
||||
loader
|
||||
);
|
||||
|
||||
if (segments != null) {
|
||||
|
||||
List<DataSegment> publishedSegments = toolbox.getObjectMapper().readValue(
|
||||
segments,
|
||||
new TypeReference<List<DataSegment>>()
|
||||
|
|
|
@ -0,0 +1,138 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.indexing.common.task;
|
||||
|
||||
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.inject.Injector;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.guice.ExtensionsConfig;
|
||||
import io.druid.guice.GuiceInjectors;
|
||||
import io.druid.indexing.common.TaskToolbox;
|
||||
import io.druid.initialization.Initialization;
|
||||
import io.tesla.aether.internal.DefaultTeslaAether;
|
||||
|
||||
import java.io.File;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
public abstract class HadoopTask extends AbstractTask
|
||||
{
|
||||
private static final Logger log = new Logger(HadoopTask.class);
|
||||
private static final ExtensionsConfig extensionsConfig;
|
||||
|
||||
final static Injector injector = GuiceInjectors.makeStartupInjector();
|
||||
|
||||
static {
|
||||
extensionsConfig = injector.getInstance(ExtensionsConfig.class);
|
||||
}
|
||||
|
||||
private final List<String> hadoopDependencyCoordinates;
|
||||
|
||||
protected HadoopTask(String id, String dataSource, List<String> hadoopDependencyCoordinates)
|
||||
{
|
||||
super(id, dataSource);
|
||||
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates;
|
||||
}
|
||||
|
||||
public List<String> getHadoopDependencyCoordinates()
|
||||
{
|
||||
return hadoopDependencyCoordinates == null ? null : ImmutableList.copyOf(hadoopDependencyCoordinates);
|
||||
}
|
||||
|
||||
protected ClassLoader buildClassLoader(final TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
final List<String> finalHadoopDependencyCoordinates = hadoopDependencyCoordinates != null
|
||||
? hadoopDependencyCoordinates
|
||||
: toolbox.getConfig().getDefaultHadoopCoordinates();
|
||||
|
||||
final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig);
|
||||
|
||||
final List<URL> extensionURLs = Lists.newArrayList();
|
||||
for (String coordinate : extensionsConfig.getCoordinates()) {
|
||||
final ClassLoader coordinateLoader = Initialization.getClassLoaderForCoordinates(
|
||||
aetherClient, coordinate, extensionsConfig.getDefaultVersion()
|
||||
);
|
||||
extensionURLs.addAll(Arrays.asList(((URLClassLoader) coordinateLoader).getURLs()));
|
||||
}
|
||||
|
||||
final List<URL> nonHadoopURLs = Lists.newArrayList();
|
||||
nonHadoopURLs.addAll(Arrays.asList(((URLClassLoader) HadoopIndexTask.class.getClassLoader()).getURLs()));
|
||||
|
||||
final List<URL> driverURLs = Lists.newArrayList();
|
||||
driverURLs.addAll(nonHadoopURLs);
|
||||
// put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts
|
||||
for (String hadoopDependencyCoordinate : finalHadoopDependencyCoordinates) {
|
||||
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
|
||||
aetherClient, hadoopDependencyCoordinate, extensionsConfig.getDefaultVersion()
|
||||
);
|
||||
driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs()));
|
||||
}
|
||||
|
||||
final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null);
|
||||
|
||||
final List<URL> jobUrls = Lists.newArrayList();
|
||||
jobUrls.addAll(nonHadoopURLs);
|
||||
jobUrls.addAll(extensionURLs);
|
||||
|
||||
System.setProperty("druid.hadoop.internal.classpath", Joiner.on(File.pathSeparator).join(jobUrls));
|
||||
return loader;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method tries to isolate class loading during a Function call
|
||||
*
|
||||
* @param clazzName The Class which has a static method called `runTask`
|
||||
* @param input The input for `runTask`, must have `input.getClass()` be the class of the input for runTask
|
||||
* @param loader The loader to use as the context class loader during invocation
|
||||
* @param <InputType> The input type of the method.
|
||||
* @param <OutputType> The output type of the method. The result of runTask must be castable to this type.
|
||||
*
|
||||
* @return The result of the method invocation
|
||||
*/
|
||||
public static <InputType, OutputType> OutputType invokeForeignLoader(
|
||||
final String clazzName,
|
||||
final InputType input,
|
||||
final ClassLoader loader
|
||||
)
|
||||
{
|
||||
log.debug("Launching [%s] on class loader [%s] with input class [%s]", clazzName, loader, input.getClass());
|
||||
final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
|
||||
try {
|
||||
Thread.currentThread().setContextClassLoader(loader);
|
||||
final Class<?> clazz = loader.loadClass(clazzName);
|
||||
final Method method = clazz.getMethod("runTask", input.getClass());
|
||||
return (OutputType) method.invoke(null, input);
|
||||
}
|
||||
catch (IllegalAccessException | InvocationTargetException | ClassNotFoundException | NoSuchMethodException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
finally {
|
||||
Thread.currentThread().setContextClassLoader(oldLoader);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue