Abstractify HadoopTask

* Add `invokeForeignLoader` to commonize the way tasks are attempted to be launched in a foreign class loader
* Add `buildClassLoader` to accomplish the common tasks for hadoop jobs when building a ClassLoader
This commit is contained in:
Charles Allen 2015-05-14 17:04:43 -07:00
parent 3c3db7229c
commit 29ba05c04f
2 changed files with 165 additions and 90 deletions

View File

@ -21,17 +21,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Joiner;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.inject.Injector;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
import io.druid.guice.ExtensionsConfig;
import io.druid.guice.GuiceInjectors;
import io.druid.indexer.HadoopDruidDetermineConfigurationJob; import io.druid.indexer.HadoopDruidDetermineConfigurationJob;
import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerJob; import io.druid.indexer.HadoopDruidIndexerJob;
@ -44,30 +39,16 @@ import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockTryAcquireAction; import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.initialization.Initialization;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.tesla.aether.internal.DefaultTeslaAether;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.File;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.SortedSet; import java.util.SortedSet;
public class HadoopIndexTask extends AbstractTask public class HadoopIndexTask extends HadoopTask
{ {
private static final Logger log = new Logger(HadoopIndexTask.class); private static final Logger log = new Logger(HadoopIndexTask.class);
private static final ExtensionsConfig extensionsConfig;
final static Injector injector = GuiceInjectors.makeStartupInjector();
static {
extensionsConfig = injector.getInstance(ExtensionsConfig.class);
}
private static String getTheDataSource(HadoopIngestionSpec spec) private static String getTheDataSource(HadoopIngestionSpec spec)
{ {
@ -77,8 +58,6 @@ public class HadoopIndexTask extends AbstractTask
@JsonIgnore @JsonIgnore
private final HadoopIngestionSpec spec; private final HadoopIngestionSpec spec;
@JsonIgnore @JsonIgnore
private final List<String> hadoopDependencyCoordinates;
@JsonIgnore
private final String classpathPrefix; private final String classpathPrefix;
/** /**
@ -102,7 +81,10 @@ public class HadoopIndexTask extends AbstractTask
{ {
super( super(
id != null ? id : String.format("index_hadoop_%s_%s", getTheDataSource(spec), new DateTime()), id != null ? id : String.format("index_hadoop_%s_%s", getTheDataSource(spec), new DateTime()),
getTheDataSource(spec) getTheDataSource(spec),
hadoopDependencyCoordinates == null
? (hadoopCoordinates == null ? null : ImmutableList.of(hadoopCoordinates))
: hadoopDependencyCoordinates
); );
@ -119,15 +101,6 @@ public class HadoopIndexTask extends AbstractTask
"metadataUpdateSpec must be absent" "metadataUpdateSpec must be absent"
); );
if (hadoopDependencyCoordinates != null) {
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates;
} else if (hadoopCoordinates != null) {
this.hadoopDependencyCoordinates = ImmutableList.of(hadoopCoordinates);
} else {
// Will be defaulted to something at runtime, based on taskConfig.
this.hadoopDependencyCoordinates = null;
}
this.classpathPrefix = classpathPrefix; this.classpathPrefix = classpathPrefix;
} }
@ -162,7 +135,7 @@ public class HadoopIndexTask extends AbstractTask
@JsonProperty @JsonProperty
public List<String> getHadoopDependencyCoordinates() public List<String> getHadoopDependencyCoordinates()
{ {
return hadoopDependencyCoordinates; return super.getHadoopDependencyCoordinates();
} }
@JsonProperty @JsonProperty
@ -176,58 +149,22 @@ public class HadoopIndexTask extends AbstractTask
@Override @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
{ {
final List<String> finalHadoopDependencyCoordinates = hadoopDependencyCoordinates != null final ClassLoader loader = buildClassLoader(toolbox);
? hadoopDependencyCoordinates
: toolbox.getConfig().getDefaultHadoopCoordinates();
final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig);
final List<URL> extensionURLs = Lists.newArrayList();
for (String coordinate : extensionsConfig.getCoordinates()) {
final ClassLoader coordinateLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, coordinate, extensionsConfig.getDefaultVersion()
);
extensionURLs.addAll(Arrays.asList(((URLClassLoader) coordinateLoader).getURLs()));
}
final List<URL> nonHadoopURLs = Lists.newArrayList();
nonHadoopURLs.addAll(Arrays.asList(((URLClassLoader) HadoopIndexTask.class.getClassLoader()).getURLs()));
final List<URL> driverURLs = Lists.newArrayList();
driverURLs.addAll(nonHadoopURLs);
// put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts
for (String hadoopDependencyCoordinate : finalHadoopDependencyCoordinates) {
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, hadoopDependencyCoordinate, extensionsConfig.getDefaultVersion()
);
driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs()));
}
final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null);
Thread.currentThread().setContextClassLoader(loader);
final List<URL> jobUrls = Lists.newArrayList();
jobUrls.addAll(nonHadoopURLs);
jobUrls.addAll(extensionURLs);
System.setProperty("druid.hadoop.internal.classpath", Joiner.on(File.pathSeparator).join(jobUrls));
boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent(); boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();
final Class<?> determineConfigurationMainClass = loader.loadClass(HadoopDetermineConfigInnerProcessing.class.getName()); final String config = invokeForeignLoader(
final Method determineConfigurationMainMethod = determineConfigurationMainClass.getMethod( "io.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessing",
"runTask", new String[]{
String[].class toolbox.getObjectMapper().writeValueAsString(spec),
toolbox.getConfig().getHadoopWorkingPath(),
toolbox.getSegmentPusher().getPathForHadoop(getDataSource())
},
loader
); );
String[] determineConfigArgs = new String[]{ final HadoopIngestionSpec indexerSchema = toolbox
toolbox.getObjectMapper().writeValueAsString(spec), .getObjectMapper()
toolbox.getConfig().getHadoopWorkingPath(), .readValue(config, HadoopIngestionSpec.class);
toolbox.getSegmentPusher().getPathForHadoop(getDataSource())
};
String config = (String) determineConfigurationMainMethod.invoke(null, new Object[]{determineConfigArgs});
HadoopIngestionSpec indexerSchema = toolbox.getObjectMapper()
.readValue(config, HadoopIngestionSpec.class);
// We should have a lock from before we started running only if interval was specified // We should have a lock from before we started running only if interval was specified
@ -245,19 +182,19 @@ public class HadoopIndexTask extends AbstractTask
final TaskLock myLock = Iterables.getOnlyElement(locks); final TaskLock myLock = Iterables.getOnlyElement(locks);
version = myLock.getVersion(); version = myLock.getVersion();
} }
log.info("Setting version to: %s", version); log.info("Setting version to: %s", version);
final Class<?> indexGeneratorMainClass = loader.loadClass(HadoopIndexGeneratorInnerProcessing.class.getName()); final String segments = invokeForeignLoader(
final Method indexGeneratorMainMethod = indexGeneratorMainClass.getMethod("runTask", String[].class); "io.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessing",
String[] indexGeneratorArgs = new String[]{ new String[]{
toolbox.getObjectMapper().writeValueAsString(indexerSchema), toolbox.getObjectMapper().writeValueAsString(indexerSchema),
version version
}; },
String segments = (String) indexGeneratorMainMethod.invoke(null, new Object[]{indexGeneratorArgs}); loader
);
if (segments != null) { if (segments != null) {
List<DataSegment> publishedSegments = toolbox.getObjectMapper().readValue( List<DataSegment> publishedSegments = toolbox.getObjectMapper().readValue(
segments, segments,
new TypeReference<List<DataSegment>>() new TypeReference<List<DataSegment>>()

View File

@ -0,0 +1,138 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.task;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.inject.Injector;
import com.metamx.common.logger.Logger;
import io.druid.guice.ExtensionsConfig;
import io.druid.guice.GuiceInjectors;
import io.druid.indexing.common.TaskToolbox;
import io.druid.initialization.Initialization;
import io.tesla.aether.internal.DefaultTeslaAether;
import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.List;
public abstract class HadoopTask extends AbstractTask
{
private static final Logger log = new Logger(HadoopTask.class);
private static final ExtensionsConfig extensionsConfig;
final static Injector injector = GuiceInjectors.makeStartupInjector();
static {
extensionsConfig = injector.getInstance(ExtensionsConfig.class);
}
private final List<String> hadoopDependencyCoordinates;
protected HadoopTask(String id, String dataSource, List<String> hadoopDependencyCoordinates)
{
super(id, dataSource);
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates;
}
public List<String> getHadoopDependencyCoordinates()
{
return hadoopDependencyCoordinates == null ? null : ImmutableList.copyOf(hadoopDependencyCoordinates);
}
protected ClassLoader buildClassLoader(final TaskToolbox toolbox) throws Exception
{
final List<String> finalHadoopDependencyCoordinates = hadoopDependencyCoordinates != null
? hadoopDependencyCoordinates
: toolbox.getConfig().getDefaultHadoopCoordinates();
final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig);
final List<URL> extensionURLs = Lists.newArrayList();
for (String coordinate : extensionsConfig.getCoordinates()) {
final ClassLoader coordinateLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, coordinate, extensionsConfig.getDefaultVersion()
);
extensionURLs.addAll(Arrays.asList(((URLClassLoader) coordinateLoader).getURLs()));
}
final List<URL> nonHadoopURLs = Lists.newArrayList();
nonHadoopURLs.addAll(Arrays.asList(((URLClassLoader) HadoopIndexTask.class.getClassLoader()).getURLs()));
final List<URL> driverURLs = Lists.newArrayList();
driverURLs.addAll(nonHadoopURLs);
// put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts
for (String hadoopDependencyCoordinate : finalHadoopDependencyCoordinates) {
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, hadoopDependencyCoordinate, extensionsConfig.getDefaultVersion()
);
driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs()));
}
final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null);
final List<URL> jobUrls = Lists.newArrayList();
jobUrls.addAll(nonHadoopURLs);
jobUrls.addAll(extensionURLs);
System.setProperty("druid.hadoop.internal.classpath", Joiner.on(File.pathSeparator).join(jobUrls));
return loader;
}
/**
* This method tries to isolate class loading during a Function call
*
* @param clazzName The Class which has a static method called `runTask`
* @param input The input for `runTask`, must have `input.getClass()` be the class of the input for runTask
* @param loader The loader to use as the context class loader during invocation
* @param <InputType> The input type of the method.
* @param <OutputType> The output type of the method. The result of runTask must be castable to this type.
*
* @return The result of the method invocation
*/
public static <InputType, OutputType> OutputType invokeForeignLoader(
final String clazzName,
final InputType input,
final ClassLoader loader
)
{
log.debug("Launching [%s] on class loader [%s] with input class [%s]", clazzName, loader, input.getClass());
final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(loader);
final Class<?> clazz = loader.loadClass(clazzName);
final Method method = clazz.getMethod("runTask", input.getClass());
return (OutputType) method.invoke(null, input);
}
catch (IllegalAccessException | InvocationTargetException | ClassNotFoundException | NoSuchMethodException e) {
throw Throwables.propagate(e);
}
finally {
Thread.currentThread().setContextClassLoader(oldLoader);
}
}
}