diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 5f745e2a4f1..08d9c48fa02 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -21,17 +21,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.inject.Injector; import com.metamx.common.logger.Logger; import io.druid.common.utils.JodaUtils; -import io.druid.guice.ExtensionsConfig; -import io.druid.guice.GuiceInjectors; import io.druid.indexer.HadoopDruidDetermineConfigurationJob; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.HadoopDruidIndexerJob; @@ -44,30 +39,16 @@ import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockTryAcquireAction; import io.druid.indexing.common.actions.TaskActionClient; -import io.druid.initialization.Initialization; import io.druid.timeline.DataSegment; -import io.tesla.aether.internal.DefaultTeslaAether; import org.joda.time.DateTime; import org.joda.time.Interval; -import java.io.File; -import java.lang.reflect.Method; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.Arrays; import java.util.List; import java.util.SortedSet; -public class HadoopIndexTask extends AbstractTask +public class HadoopIndexTask extends HadoopTask { private static final Logger log = new Logger(HadoopIndexTask.class); - private static final ExtensionsConfig extensionsConfig; - - final static Injector injector = GuiceInjectors.makeStartupInjector(); - - static { - extensionsConfig = injector.getInstance(ExtensionsConfig.class); - } private static String getTheDataSource(HadoopIngestionSpec spec) { @@ -77,8 +58,6 @@ public class HadoopIndexTask extends AbstractTask @JsonIgnore private final HadoopIngestionSpec spec; @JsonIgnore - private final List hadoopDependencyCoordinates; - @JsonIgnore private final String classpathPrefix; /** @@ -102,7 +81,10 @@ public class HadoopIndexTask extends AbstractTask { super( id != null ? id : String.format("index_hadoop_%s_%s", getTheDataSource(spec), new DateTime()), - getTheDataSource(spec) + getTheDataSource(spec), + hadoopDependencyCoordinates == null + ? (hadoopCoordinates == null ? null : ImmutableList.of(hadoopCoordinates)) + : hadoopDependencyCoordinates ); @@ -119,15 +101,6 @@ public class HadoopIndexTask extends AbstractTask "metadataUpdateSpec must be absent" ); - if (hadoopDependencyCoordinates != null) { - this.hadoopDependencyCoordinates = hadoopDependencyCoordinates; - } else if (hadoopCoordinates != null) { - this.hadoopDependencyCoordinates = ImmutableList.of(hadoopCoordinates); - } else { - // Will be defaulted to something at runtime, based on taskConfig. - this.hadoopDependencyCoordinates = null; - } - this.classpathPrefix = classpathPrefix; } @@ -162,7 +135,7 @@ public class HadoopIndexTask extends AbstractTask @JsonProperty public List getHadoopDependencyCoordinates() { - return hadoopDependencyCoordinates; + return super.getHadoopDependencyCoordinates(); } @JsonProperty @@ -176,58 +149,22 @@ public class HadoopIndexTask extends AbstractTask @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - final List finalHadoopDependencyCoordinates = hadoopDependencyCoordinates != null - ? hadoopDependencyCoordinates - : toolbox.getConfig().getDefaultHadoopCoordinates(); - - final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig); - - final List extensionURLs = Lists.newArrayList(); - for (String coordinate : extensionsConfig.getCoordinates()) { - final ClassLoader coordinateLoader = Initialization.getClassLoaderForCoordinates( - aetherClient, coordinate, extensionsConfig.getDefaultVersion() - ); - extensionURLs.addAll(Arrays.asList(((URLClassLoader) coordinateLoader).getURLs())); - } - - final List nonHadoopURLs = Lists.newArrayList(); - nonHadoopURLs.addAll(Arrays.asList(((URLClassLoader) HadoopIndexTask.class.getClassLoader()).getURLs())); - - final List driverURLs = Lists.newArrayList(); - driverURLs.addAll(nonHadoopURLs); - // put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts - for (String hadoopDependencyCoordinate : finalHadoopDependencyCoordinates) { - final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates( - aetherClient, hadoopDependencyCoordinate, extensionsConfig.getDefaultVersion() - ); - driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs())); - } - - final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null); - Thread.currentThread().setContextClassLoader(loader); - - final List jobUrls = Lists.newArrayList(); - jobUrls.addAll(nonHadoopURLs); - jobUrls.addAll(extensionURLs); - - System.setProperty("druid.hadoop.internal.classpath", Joiner.on(File.pathSeparator).join(jobUrls)); + final ClassLoader loader = buildClassLoader(toolbox); boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent(); - final Class determineConfigurationMainClass = loader.loadClass(HadoopDetermineConfigInnerProcessing.class.getName()); - final Method determineConfigurationMainMethod = determineConfigurationMainClass.getMethod( - "runTask", - String[].class + final String config = invokeForeignLoader( + "io.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessing", + new String[]{ + toolbox.getObjectMapper().writeValueAsString(spec), + toolbox.getConfig().getHadoopWorkingPath(), + toolbox.getSegmentPusher().getPathForHadoop(getDataSource()) + }, + loader ); - String[] determineConfigArgs = new String[]{ - toolbox.getObjectMapper().writeValueAsString(spec), - toolbox.getConfig().getHadoopWorkingPath(), - toolbox.getSegmentPusher().getPathForHadoop(getDataSource()) - }; - - String config = (String) determineConfigurationMainMethod.invoke(null, new Object[]{determineConfigArgs}); - HadoopIngestionSpec indexerSchema = toolbox.getObjectMapper() - .readValue(config, HadoopIngestionSpec.class); + final HadoopIngestionSpec indexerSchema = toolbox + .getObjectMapper() + .readValue(config, HadoopIngestionSpec.class); // We should have a lock from before we started running only if interval was specified @@ -245,19 +182,19 @@ public class HadoopIndexTask extends AbstractTask final TaskLock myLock = Iterables.getOnlyElement(locks); version = myLock.getVersion(); } + log.info("Setting version to: %s", version); - final Class indexGeneratorMainClass = loader.loadClass(HadoopIndexGeneratorInnerProcessing.class.getName()); - final Method indexGeneratorMainMethod = indexGeneratorMainClass.getMethod("runTask", String[].class); - String[] indexGeneratorArgs = new String[]{ - toolbox.getObjectMapper().writeValueAsString(indexerSchema), - version - }; - String segments = (String) indexGeneratorMainMethod.invoke(null, new Object[]{indexGeneratorArgs}); - + final String segments = invokeForeignLoader( + "io.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessing", + new String[]{ + toolbox.getObjectMapper().writeValueAsString(indexerSchema), + version + }, + loader + ); if (segments != null) { - List publishedSegments = toolbox.getObjectMapper().readValue( segments, new TypeReference>() diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopTask.java new file mode 100644 index 00000000000..1a1e31f33bd --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopTask.java @@ -0,0 +1,138 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.inject.Injector; +import com.metamx.common.logger.Logger; +import io.druid.guice.ExtensionsConfig; +import io.druid.guice.GuiceInjectors; +import io.druid.indexing.common.TaskToolbox; +import io.druid.initialization.Initialization; +import io.tesla.aether.internal.DefaultTeslaAether; + +import java.io.File; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.List; + + +public abstract class HadoopTask extends AbstractTask +{ + private static final Logger log = new Logger(HadoopTask.class); + private static final ExtensionsConfig extensionsConfig; + + final static Injector injector = GuiceInjectors.makeStartupInjector(); + + static { + extensionsConfig = injector.getInstance(ExtensionsConfig.class); + } + + private final List hadoopDependencyCoordinates; + + protected HadoopTask(String id, String dataSource, List hadoopDependencyCoordinates) + { + super(id, dataSource); + this.hadoopDependencyCoordinates = hadoopDependencyCoordinates; + } + + public List getHadoopDependencyCoordinates() + { + return hadoopDependencyCoordinates == null ? null : ImmutableList.copyOf(hadoopDependencyCoordinates); + } + + protected ClassLoader buildClassLoader(final TaskToolbox toolbox) throws Exception + { + final List finalHadoopDependencyCoordinates = hadoopDependencyCoordinates != null + ? hadoopDependencyCoordinates + : toolbox.getConfig().getDefaultHadoopCoordinates(); + + final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig); + + final List extensionURLs = Lists.newArrayList(); + for (String coordinate : extensionsConfig.getCoordinates()) { + final ClassLoader coordinateLoader = Initialization.getClassLoaderForCoordinates( + aetherClient, coordinate, extensionsConfig.getDefaultVersion() + ); + extensionURLs.addAll(Arrays.asList(((URLClassLoader) coordinateLoader).getURLs())); + } + + final List nonHadoopURLs = Lists.newArrayList(); + nonHadoopURLs.addAll(Arrays.asList(((URLClassLoader) HadoopIndexTask.class.getClassLoader()).getURLs())); + + final List driverURLs = Lists.newArrayList(); + driverURLs.addAll(nonHadoopURLs); + // put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts + for (String hadoopDependencyCoordinate : finalHadoopDependencyCoordinates) { + final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates( + aetherClient, hadoopDependencyCoordinate, extensionsConfig.getDefaultVersion() + ); + driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs())); + } + + final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null); + + final List jobUrls = Lists.newArrayList(); + jobUrls.addAll(nonHadoopURLs); + jobUrls.addAll(extensionURLs); + + System.setProperty("druid.hadoop.internal.classpath", Joiner.on(File.pathSeparator).join(jobUrls)); + return loader; + } + + /** + * This method tries to isolate class loading during a Function call + * + * @param clazzName The Class which has a static method called `runTask` + * @param input The input for `runTask`, must have `input.getClass()` be the class of the input for runTask + * @param loader The loader to use as the context class loader during invocation + * @param The input type of the method. + * @param The output type of the method. The result of runTask must be castable to this type. + * + * @return The result of the method invocation + */ + public static OutputType invokeForeignLoader( + final String clazzName, + final InputType input, + final ClassLoader loader + ) + { + log.debug("Launching [%s] on class loader [%s] with input class [%s]", clazzName, loader, input.getClass()); + final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(loader); + final Class clazz = loader.loadClass(clazzName); + final Method method = clazz.getMethod("runTask", input.getClass()); + return (OutputType) method.invoke(null, input); + } + catch (IllegalAccessException | InvocationTargetException | ClassNotFoundException | NoSuchMethodException e) { + throw Throwables.propagate(e); + } + finally { + Thread.currentThread().setContextClassLoader(oldLoader); + } + } +}