Add Hadoop Converter Job and task

* Fixes https://github.com/druid-io/druid/issues/1363
* Add extra utils in JobHelper based on PR feedback
This commit is contained in:
Charles Allen 2015-05-04 11:37:07 -07:00
parent 6ae4ecc7d4
commit 056cab93ed
15 changed files with 2303 additions and 43 deletions

View File

@ -104,6 +104,52 @@
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.compile.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${hadoop.compile.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.compile.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<version>${hadoop.compile.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbyclient</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -102,7 +102,7 @@ public class DetermineHashedPartitionsJob implements Jobby
} else {
groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size());
}
JobHelper.setupClasspath(config, groupByJob);
JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), groupByJob);
config.addInputPaths(groupByJob);
config.addJobProperties(groupByJob);

View File

@ -135,7 +135,7 @@ public class DeterminePartitionsJob implements Jobby
groupByJob.setOutputKeyClass(BytesWritable.class);
groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
JobHelper.setupClasspath(config, groupByJob);
JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), groupByJob);
config.addInputPaths(groupByJob);
config.addJobProperties(groupByJob);
@ -186,7 +186,7 @@ public class DeterminePartitionsJob implements Jobby
dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class);
dimSelectionJob.setPartitionerClass(DeterminePartitionsDimSelectionPartitioner.class);
dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size());
JobHelper.setupClasspath(config, dimSelectionJob);
JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), dimSelectionJob);
config.addJobProperties(dimSelectionJob);
config.intoConfiguration(dimSelectionJob);

View File

@ -176,7 +176,7 @@ public class IndexGeneratorJob implements Jobby
config.intoConfiguration(job);
JobHelper.setupClasspath(config, job);
JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), job);
job.submit();
log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());

View File

@ -26,6 +26,7 @@ import com.google.common.io.OutputSupplier;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.SegmentUtils;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration;
@ -39,6 +40,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
@ -46,8 +48,10 @@ import org.apache.hadoop.util.Progressable;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
@ -58,6 +62,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
/**
@ -68,12 +73,21 @@ public class JobHelper
private static final Set<Path> existing = Sets.newHashSet();
private static final int NUM_RETRIES = 6;
private static final int SECONDS_BETWEEN_RETRIES = 10;
private static final int NUM_RETRIES = 8;
private static final int SECONDS_BETWEEN_RETRIES = 2;
public static Path distributedClassPath(String path)
{
return distributedClassPath(new Path(path));
}
public static Path distributedClassPath(Path base)
{
return new Path(base, "classpath");
}
public static void setupClasspath(
HadoopDruidIndexerConfig config,
Path distributedClassPath,
Job job
)
throws IOException
@ -86,7 +100,6 @@ public class JobHelper
String[] jarFiles = classpathProperty.split(File.pathSeparator);
final Configuration conf = job.getConfiguration();
final Path distributedClassPath = new Path(config.getWorkingPath(), "classpath");
final FileSystem fs = distributedClassPath.getFileSystem(conf);
if (fs instanceof LocalFileSystem) {
@ -216,7 +229,7 @@ public class JobHelper
DataPusher.class, new DataPusher()
{
@Override
public void push() throws IOException
public long push() throws IOException
{
try (OutputStream outputStream = fileContext.create(
tmpPath,
@ -231,9 +244,10 @@ public class JobHelper
log.error(exception, "Exception in retry loop");
throw exception;
}
return -1;
}
},
RetryPolicies.retryUpToMaximumCountWithFixedSleep(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
RetryPolicies.exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
);
zipPusher.push();
log.info("Zipped %,d bytes to [%s]", size.get(), tmpPath.toUri());
@ -294,7 +308,7 @@ public class JobHelper
DataPusher.class, new DataPusher()
{
@Override
public void push() throws IOException
public long push() throws IOException
{
try {
progressable.progress();
@ -317,9 +331,10 @@ public class JobHelper
log.info(ex, "Error in retry loop");
throw ex;
}
return -1;
}
},
RetryPolicies.retryUpToMaximumCountWithFixedSleep(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
RetryPolicies.exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
);
descriptorPusher.push();
}
@ -329,7 +344,7 @@ public class JobHelper
*/
public interface DataPusher
{
void push() throws IOException;
long push() throws IOException;
}
public static long zipAndCopyDir(
@ -425,4 +440,108 @@ public class JobHelper
}
return path;
}
// TODO: Replace this whenever hadoop gets their act together and stops breaking with more recent versions of Guava
public static long unzipNoGuava(
final Path zip,
final Configuration configuration,
final File outDir,
final Progressable progressable
) throws IOException
{
final DataPusher zipPusher = (DataPusher) RetryProxy.create(
DataPusher.class, new DataPusher()
{
@Override
public long push() throws IOException
{
final FileContext context = FileContext.getFileContext(zip.toUri(), configuration);
long size = 0L;
final byte[] buffer = new byte[1 << 13];
progressable.progress();
try (ZipInputStream in = new ZipInputStream(context.open(zip, 1 << 13))) {
for (ZipEntry entry = in.getNextEntry(); entry != null; entry = in.getNextEntry()) {
final String fileName = entry.getName();
try (final OutputStream out = new BufferedOutputStream(
new FileOutputStream(
outDir.getAbsolutePath()
+ File.separator
+ fileName
), 1 << 13
)) {
for (int len = in.read(buffer); len >= 0; len = in.read(buffer)) {
progressable.progress();
if (len == 0) {
continue;
}
size += len;
out.write(buffer, 0, len);
}
out.flush();
}
}
}
catch (IOException | RuntimeException exception) {
log.error(exception, "Exception in retry loop");
throw exception;
}
progressable.progress();
return size;
}
},
RetryPolicies.exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
);
return zipPusher.push();
}
public static ProgressIndicator progressIndicatorForContext(
final TaskAttemptContext context
)
{
return new ProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
@Override
public void start()
{
context.progress();
context.setStatus("STARTED");
}
@Override
public void stop()
{
context.progress();
context.setStatus("STOPPED");
}
@Override
public void startSection(String section)
{
context.progress();
context.setStatus(String.format("STARTED [%s]", section));
}
@Override
public void progressSection(String section, String message)
{
log.info("Progress message for section [%s] : [%s]", section, message);
context.progress();
context.setStatus(String.format("PROGRESS [%s]", section));
}
@Override
public void stopSection(String section)
{
context.progress();
context.setStatus(String.format("STOPPED [%s]", section));
}
};
}
}

View File

@ -0,0 +1,723 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.updater;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.indexer.JobHelper;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.Progressable;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class HadoopConverterJob
{
private static final Logger log = new Logger(HadoopConverterJob.class);
private static final String COUNTER_GROUP = "Hadoop Druid Converter";
private static final String COUNTER_LOADED = "Loaded Bytes";
private static final String COUNTER_WRITTEN = "Written Bytes";
private static void setJobName(JobConf jobConf, List<DataSegment> segments)
{
if (segments.size() == 1) {
final DataSegment segment = segments.get(0);
jobConf.setJobName(
String.format(
"druid-convert-%s-%s-%s",
segment.getDataSource(),
segment.getInterval(),
segment.getVersion()
)
);
} else {
final Set<String> dataSources = Sets.newHashSet(
Iterables.transform(
segments,
new Function<DataSegment, String>()
{
@Override
public String apply(DataSegment input)
{
return input.getDataSource();
}
}
)
);
final Set<String> versions = Sets.newHashSet(
Iterables.transform(
segments,
new Function<DataSegment, String>()
{
@Override
public String apply(DataSegment input)
{
return input.getVersion();
}
}
)
);
jobConf.setJobName(
String.format(
"druid-convert-%s-%s",
Arrays.toString(dataSources.toArray()),
Arrays.toString(versions.toArray())
)
);
}
}
public static Path getJobPath(JobID jobID, Path workingDirectory)
{
return new Path(workingDirectory, jobID.toString());
}
public static Path getTaskPath(JobID jobID, TaskAttemptID taskAttemptID, Path workingDirectory)
{
return new Path(getJobPath(jobID, workingDirectory), taskAttemptID.toString());
}
public static void cleanup(Job job) throws IOException
{
final Path jobDir = getJobPath(job.getJobID(), job.getWorkingDirectory());
final FileSystem fs = jobDir.getFileSystem(job.getConfiguration());
fs.delete(jobDir, true);
}
public static HadoopDruidConverterConfig converterConfigFromConfiguration(Configuration configuration)
throws IOException
{
final String property = Preconditions.checkNotNull(
configuration.get(HadoopDruidConverterConfig.CONFIG_PROPERTY),
HadoopDruidConverterConfig.CONFIG_PROPERTY
);
return HadoopDruidConverterConfig.fromString(property);
}
public static void converterConfigIntoConfiguration(
HadoopDruidConverterConfig priorConfig,
List<DataSegment> segments,
Configuration configuration
)
{
final HadoopDruidConverterConfig config = new HadoopDruidConverterConfig(
priorConfig.getDataSource(),
priorConfig.getInterval(),
priorConfig.getIndexSpec(),
segments,
priorConfig.isValidate(),
priorConfig.getDistributedSuccessCache(),
priorConfig.getHadoopProperties(),
priorConfig.getJobPriority(),
priorConfig.getSegmentOutputPath()
);
try {
configuration.set(
HadoopDruidConverterConfig.CONFIG_PROPERTY,
HadoopDruidConverterConfig.jsonMapper.writeValueAsString(config)
);
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
}
private final HadoopDruidConverterConfig converterConfig;
private long loadedBytes = 0L;
private long writtenBytes = 0L;
public HadoopConverterJob(
HadoopDruidConverterConfig converterConfig
)
{
this.converterConfig = converterConfig;
}
public List<DataSegment> run() throws IOException
{
final JobConf jobConf = new JobConf();
jobConf.setKeepFailedTaskFiles(false);
for (Map.Entry<String, String> entry : converterConfig.getHadoopProperties().entrySet()) {
jobConf.set(entry.getKey(), entry.getValue(), "converterConfig.getHadoopProperties()");
}
final List<DataSegment> segments = converterConfig.getSegments();
if (segments.isEmpty()) {
throw new IAE(
"No segments found for datasource [%s]",
converterConfig.getDataSource()
);
}
converterConfigIntoConfiguration(converterConfig, segments, jobConf);
jobConf.setNumReduceTasks(0);// Map only. Number of map tasks determined by input format
jobConf.setWorkingDirectory(new Path(converterConfig.getDistributedSuccessCache()));
setJobName(jobConf, segments);
if (converterConfig.getJobPriority() != null) {
jobConf.setJobPriority(JobPriority.valueOf(converterConfig.getJobPriority()));
}
final Job job = Job.getInstance(jobConf);
job.setInputFormatClass(ConfigInputFormat.class);
job.setMapperClass(ConvertingMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setMapSpeculativeExecution(false);
job.setOutputFormatClass(ConvertingOutputFormat.class);
JobHelper.setupClasspath(JobHelper.distributedClassPath(jobConf.getWorkingDirectory()), job);
Throwable throwable = null;
try {
job.submit();
log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());
final boolean success = job.waitForCompletion(true);
if (!success) {
final TaskReport[] reports = job.getTaskReports(TaskType.MAP);
if (reports != null) {
for (final TaskReport report : reports) {
log.error("Error in task [%s] : %s", report.getTaskId(), Arrays.toString(report.getDiagnostics()));
}
}
return null;
}
try {
loadedBytes = job.getCounters().findCounter(COUNTER_GROUP, COUNTER_LOADED).getValue();
writtenBytes = job.getCounters().findCounter(COUNTER_GROUP, COUNTER_WRITTEN).getValue();
}
catch (IOException ex) {
log.error(ex, "Could not fetch counters");
}
final JobID jobID = job.getJobID();
final Path jobDir = getJobPath(jobID, job.getWorkingDirectory());
final FileSystem fs = jobDir.getFileSystem(job.getConfiguration());
final RemoteIterator<LocatedFileStatus> it = fs.listFiles(jobDir, true);
final List<Path> goodPaths = new ArrayList<>();
while (it.hasNext()) {
final LocatedFileStatus locatedFileStatus = it.next();
if (locatedFileStatus.isFile()) {
final Path myPath = locatedFileStatus.getPath();
if (ConvertingOutputFormat.DATA_SUCCESS_KEY.equals(myPath.getName())) {
goodPaths.add(new Path(myPath.getParent(), ConvertingOutputFormat.DATA_FILE_KEY));
}
}
}
if (goodPaths.isEmpty()) {
log.warn("No good data found at [%s]", jobDir);
return null;
}
final List<DataSegment> returnList = ImmutableList.copyOf(
Lists.transform(
goodPaths, new Function<Path, DataSegment>()
{
@Nullable
@Override
public DataSegment apply(final Path input)
{
try {
if (!fs.exists(input)) {
throw new ISE(
"Somehow [%s] was found but [%s] is missing at [%s]",
ConvertingOutputFormat.DATA_SUCCESS_KEY,
ConvertingOutputFormat.DATA_FILE_KEY,
jobDir
);
}
}
catch (final IOException e) {
throw Throwables.propagate(e);
}
try (final InputStream stream = fs.open(input)) {
return HadoopDruidConverterConfig.jsonMapper.readValue(stream, DataSegment.class);
}
catch (final IOException e) {
throw Throwables.propagate(e);
}
}
}
)
);
if (returnList.size() == segments.size()) {
return returnList;
} else {
throw new ISE(
"Tasks reported success but result length did not match! Expected %d found %d at path [%s]",
segments.size(),
returnList.size(),
jobDir
);
}
}
catch (InterruptedException | ClassNotFoundException e) {
RuntimeException exception = Throwables.propagate(e);
throwable = exception;
throw exception;
}
catch (Throwable t) {
throwable = t;
throw t;
}
finally {
try {
cleanup(job);
}
catch (IOException e) {
if (throwable != null) {
throwable.addSuppressed(e);
} else {
log.error(e, "Could not clean up job [%s]", job.getJobID());
}
}
}
}
public long getLoadedBytes()
{
return loadedBytes;
}
public long getWrittenBytes()
{
return writtenBytes;
}
public static class ConvertingOutputFormat extends OutputFormat<Text, Text>
{
protected static final String DATA_FILE_KEY = "result";
protected static final String DATA_SUCCESS_KEY = "_SUCCESS";
protected static final String PUBLISHED_SEGMENT_KEY = "io.druid.indexer.updater.converter.publishedSegment";
private static final Logger log = new Logger(ConvertingOutputFormat.class);
@Override
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException
{
return new RecordWriter<Text, Text>()
{
@Override
public void write(Text key, Text value) throws IOException, InterruptedException
{
// NOOP
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
// NOOP
}
};
}
@Override
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException
{
// NOOP
}
@Override
public OutputCommitter getOutputCommitter(final TaskAttemptContext context)
throws IOException, InterruptedException
{
return new OutputCommitter()
{
@Override
public void setupJob(JobContext jobContext) throws IOException
{
// NOOP
}
@Override
public void setupTask(TaskAttemptContext taskContext) throws IOException
{
// NOOP
}
@Override
public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException
{
return taskContext.getConfiguration().get(PUBLISHED_SEGMENT_KEY) != null;
}
@Override
public void commitTask(final TaskAttemptContext taskContext) throws IOException
{
final Progressable commitProgressable = new Progressable()
{
@Override
public void progress()
{
taskContext.progress();
}
};
final String finalSegmentString = taskContext.getConfiguration().get(PUBLISHED_SEGMENT_KEY);
if (finalSegmentString == null) {
throw new IOException("Could not read final segment");
}
final DataSegment newSegment = HadoopDruidConverterConfig.jsonMapper.readValue(
finalSegmentString,
DataSegment.class
);
log.info("Committing new segment [%s]", newSegment);
taskContext.progress();
final FileSystem fs = taskContext.getWorkingDirectory().getFileSystem(taskContext.getConfiguration());
final Path taskAttemptDir = getTaskPath(
context.getJobID(),
context.getTaskAttemptID(),
taskContext.getWorkingDirectory()
);
final Path taskAttemptFile = new Path(taskAttemptDir, DATA_FILE_KEY);
final Path taskAttemptSuccess = new Path(taskAttemptDir, DATA_SUCCESS_KEY);
try (final OutputStream outputStream = fs.create(taskAttemptFile, false, 1 << 10, commitProgressable)) {
outputStream.write(HadoopDruidConverterConfig.jsonMapper.writeValueAsBytes(newSegment));
}
fs.create(taskAttemptSuccess, false).close();
taskContext.progress();
taskContext.setStatus("Committed");
}
@Override
public void abortTask(TaskAttemptContext taskContext) throws IOException
{
log.warn("Aborting task. Nothing to clean up.");
}
};
}
}
public static class ConvertingMapper extends Mapper<String, String, Text, Text>
{
private static final Logger log = new Logger(ConvertingMapper.class);
private static final String TMP_FILE_LOC_KEY = "io.druid.indexer.updater.converter.reducer.tmpDir";
@Override
protected void map(
String key, String value,
final Context context
) throws IOException, InterruptedException
{
final InputSplit split = context.getInputSplit();
if (!(split instanceof DataSegmentSplit)) {
throw new IAE(
"Unexpected split type. Expected [%s] was [%s]",
DataSegmentSplit.class.getCanonicalName(),
split.getClass().getCanonicalName()
);
}
final String tmpDirLoc = context.getConfiguration().get(TMP_FILE_LOC_KEY);
final File tmpDir = Paths.get(tmpDirLoc).toFile();
final DataSegment segment = ((DataSegmentSplit) split).getDataSegment();
final HadoopDruidConverterConfig config = converterConfigFromConfiguration(context.getConfiguration());
context.setStatus("DOWNLOADING");
context.progress();
final Path inPath = new Path(getURIFromSegment(segment));
final File inDir = new File(tmpDir, "in");
if (inDir.exists() && !inDir.delete()) {
log.warn("Could not delete [%s]", inDir);
}
if (!inDir.mkdir() && (!inDir.exists() || inDir.isDirectory())) {
log.warn("Unable to make directory");
}
final long inSize = JobHelper.unzipNoGuava(inPath, context.getConfiguration(), inDir, context);
log.debug("Loaded %d bytes into [%s] for converting", inSize, inDir.getAbsolutePath());
context.getCounter(COUNTER_GROUP, COUNTER_LOADED).increment(inSize);
context.setStatus("CONVERTING");
context.progress();
final File outDir = new File(tmpDir, "out");
if (!outDir.mkdir() && (!outDir.exists() || !outDir.isDirectory())) {
throw new IOException(String.format("Could not create output directory [%s]", outDir));
}
IndexMaker.convert(
inDir,
outDir,
config.getIndexSpec(),
JobHelper.progressIndicatorForContext(context)
);
if (config.isValidate()) {
context.setStatus("Validating");
IndexIO.DefaultIndexIOHandler.validateTwoSegments(inDir, outDir);
}
context.progress();
context.setStatus("Starting PUSH");
final Path baseOutputPath = new Path(config.getSegmentOutputPath());
final FileSystem outputFS = baseOutputPath.getFileSystem(context.getConfiguration());
final DataSegment finalSegmentTemplate = segment.withVersion(
segment.getVersion()
+ "_converted"
);
final DataSegment finalSegment = JobHelper.serializeOutIndex(
finalSegmentTemplate,
context.getConfiguration(),
context,
context.getTaskAttemptID(),
outDir,
JobHelper.makeSegmentOutputPath(
baseOutputPath,
outputFS,
finalSegmentTemplate.getDataSource(),
finalSegmentTemplate.getVersion(),
finalSegmentTemplate.getInterval(),
finalSegmentTemplate.getShardSpec().getPartitionNum()
)
);
context.progress();
context.setStatus("Finished PUSH");
final String finalSegmentString = HadoopDruidConverterConfig.jsonMapper.writeValueAsString(finalSegment);
context.getConfiguration().set(ConvertingOutputFormat.PUBLISHED_SEGMENT_KEY, finalSegmentString);
context.write(new Text("dataSegment"), new Text(finalSegmentString));
context.getCounter(COUNTER_GROUP, COUNTER_WRITTEN).increment(finalSegment.getSize());
context.progress();
context.setStatus("Ready To Commit");
}
@Override
protected void setup(Context context) throws IOException, InterruptedException
{
final File tmpFile = Files.createTempDir();
context.getConfiguration().set(TMP_FILE_LOC_KEY, tmpFile.getAbsolutePath());
}
private static URI getURIFromSegment(DataSegment dataSegment)
{
// There is no good way around this...
// TODO: add getURI() to URIDataPuller
final Map<String, Object> loadSpec = dataSegment.getLoadSpec();
final String type = loadSpec.get("type").toString();
final URI segmentLocURI;
if ("s3_zip".equals(type)) {
segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key")));
} else if ("hdfs".equals(type)) {
segmentLocURI = URI.create(loadSpec.get("path").toString());
} else if ("local".equals(type)) {
try {
segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null);
}
catch (URISyntaxException e) {
throw new ISE(e, "Unable to form simple file uri");
}
} else {
try {
throw new IAE(
"Cannot figure out loadSpec %s",
HadoopDruidConverterConfig.jsonMapper.writeValueAsString(loadSpec)
);
}
catch (JsonProcessingException e) {
throw new ISE("Cannot write Map with json mapper");
}
}
return segmentLocURI;
}
@Override
protected void cleanup(
Context context
) throws IOException, InterruptedException
{
final String tmpDirLoc = context.getConfiguration().get(TMP_FILE_LOC_KEY);
final File tmpDir = Paths.get(tmpDirLoc).toFile();
FileUtils.deleteDirectory(tmpDir);
context.progress();
context.setStatus("Clean");
}
}
public static class DataSegmentSplit extends InputSplit implements Writable
{
private DataSegment dataSegment = null;
public DataSegmentSplit()
{
// For serialization purposes
}
public DataSegmentSplit(@NotNull DataSegment dataSegment)
{
this.dataSegment = Preconditions.checkNotNull(dataSegment, "dataSegment");
}
@Override
public long getLength() throws IOException, InterruptedException
{
return dataSegment.getSize();
}
@Override
public String[] getLocations() throws IOException, InterruptedException
{
return new String[]{};
}
protected DataSegment getDataSegment()
{
return dataSegment;
}
@Override
public void write(DataOutput out) throws IOException
{
out.write(HadoopDruidConverterConfig.jsonMapper.writeValueAsString(dataSegment).getBytes());
}
@Override
public void readFields(DataInput in) throws IOException
{
dataSegment = HadoopDruidConverterConfig.jsonMapper.readValue(in.readLine(), DataSegment.class);
}
}
public static class ConfigInputFormat extends InputFormat<String, String>
{
@Override
public List<InputSplit> getSplits(final JobContext jobContext) throws IOException, InterruptedException
{
final HadoopDruidConverterConfig config = converterConfigFromConfiguration(jobContext.getConfiguration());
final List<DataSegment> segments = config.getSegments();
if (segments == null) {
throw new IOException("Bad config, missing segments");
}
return Lists.transform(
segments, new Function<DataSegment, InputSplit>()
{
@Nullable
@Override
public InputSplit apply(DataSegment input)
{
return new DataSegmentSplit(input);
}
}
);
}
@Override
public RecordReader<String, String> createRecordReader(
final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext
) throws IOException, InterruptedException
{
return new RecordReader<String, String>()
{
boolean readAnything = false;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException
{
// NOOP
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException
{
return !readAnything;
}
@Override
public String getCurrentKey() throws IOException, InterruptedException
{
return "key";
}
@Override
public String getCurrentValue() throws IOException, InterruptedException
{
readAnything = true;
return "fakeValue";
}
@Override
public float getProgress() throws IOException, InterruptedException
{
return readAnything ? 0.0F : 1.0F;
}
@Override
public void close() throws IOException
{
// NOOP
}
};
}
}
}

View File

@ -0,0 +1,190 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.updater;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.initialization.Initialization;
import io.druid.segment.IndexSpec;
import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
public class HadoopDruidConverterConfig
{
public static final String CONFIG_PROPERTY = "io.druid.indexer.updater.converter";
public static final ObjectMapper jsonMapper;
private static final Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.<Module>of(
new Module()
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bindInstance(
binder, Key.get(DruidNode.class, Self.class), new DruidNode("hadoop-converter", null, null)
);
}
}
)
);
static {
jsonMapper = injector.getInstance(ObjectMapper.class);
jsonMapper.registerSubtypes(HadoopDruidConverterConfig.class);
}
private static final TypeReference<Map<String, Object>> mapTypeReference = new TypeReference<Map<String, Object>>()
{
};
public static HadoopDruidConverterConfig fromString(final String string) throws IOException
{
return fromMap(jsonMapper.<Map<String, Object>>readValue(string, mapTypeReference));
}
public static HadoopDruidConverterConfig fromFile(final File file) throws IOException
{
return fromMap(jsonMapper.<Map<String, Object>>readValue(file, mapTypeReference));
}
public static HadoopDruidConverterConfig fromMap(final Map<String, Object> map)
{
return jsonMapper.convertValue(map, HadoopDruidConverterConfig.class);
}
@JsonProperty
private final String dataSource;
@JsonProperty
private final Interval interval;
@JsonProperty
private final IndexSpec indexSpec;
@JsonProperty
private final List<DataSegment> segments;
@JsonProperty
private final boolean validate;
@JsonProperty
private final URI distributedSuccessCache;
@JsonProperty
private final Map<String, String> hadoopProperties;
@JsonProperty
private final String jobPriority;
@JsonProperty
private final String segmentOutputPath;
@JsonCreator
public HadoopDruidConverterConfig(
@JsonProperty("dataSource") final String dataSource,
@JsonProperty("interval") final Interval interval,
@JsonProperty("indexSpec") final IndexSpec indexSpec,
@JsonProperty("segments") final List<DataSegment> segments,
@JsonProperty("validate") final Boolean validate,
@JsonProperty("distributedSuccessCache") URI distributedSuccessCache,
@JsonProperty("hadoopProperties") Map<String, String> hadoopProperties,
@JsonProperty("jobPriority") String jobPriority,
@JsonProperty("segmentOutputPath") String segmentOutputPath
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.interval = Preconditions.checkNotNull(interval, "interval");
this.indexSpec = Preconditions.checkNotNull(indexSpec, "indexSpec");
this.distributedSuccessCache = Preconditions.checkNotNull(distributedSuccessCache, "distributedSuccessCache");
this.segments = segments;
this.validate = validate == null ? false : validate;
this.hadoopProperties = hadoopProperties == null
? ImmutableMap.<String, String>of()
: ImmutableMap.copyOf(hadoopProperties);
this.jobPriority = jobPriority;
this.segmentOutputPath = Preconditions.checkNotNull(segmentOutputPath, "segmentOutputPath");
}
@JsonProperty
public boolean isValidate()
{
return validate;
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty
public Interval getInterval()
{
return interval;
}
@JsonProperty
public IndexSpec getIndexSpec()
{
return indexSpec;
}
@JsonProperty
public List<DataSegment> getSegments()
{
return segments;
}
@JsonProperty
public URI getDistributedSuccessCache()
{
return distributedSuccessCache;
}
@JsonProperty
public Map<String, String> getHadoopProperties()
{
return hadoopProperties;
}
@JsonProperty
public String getJobPriority()
{
return jobPriority;
}
@JsonProperty
public String getSegmentOutputPath()
{
return segmentOutputPath;
}
}

View File

@ -0,0 +1,497 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.updater;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import com.metamx.common.FileUtils;
import com.metamx.common.Granularity;
import io.druid.client.DruidDataSource;
import io.druid.data.input.impl.DelimitedParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.HadoopDruidDetermineConfigurationJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerJob;
import io.druid.indexer.HadoopIOConfig;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.HadoopTuningConfig;
import io.druid.indexer.JobHelper;
import io.druid.indexer.Jobby;
import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
import io.druid.metadata.MetadataSegmentManagerConfig;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.SQLMetadataSegmentManager;
import io.druid.metadata.TestDerbyConnector;
import io.druid.metadata.storage.derby.DerbyConnector;
import io.druid.query.Query;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.IndexSpec;
import io.druid.segment.TestIndex;
import io.druid.segment.data.RoaringBitmapSerdeFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
public class HadoopConverterJobTest
{
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
private String storageLocProperty = null;
private File tmpSegmentDir = null;
private static final String DATASOURCE = "testDatasource";
private static final String STORAGE_PROPERTY_KEY = "druid.storage.storageDirectory";
private final MetadataStorageUpdaterJobSpec metadataStorageUpdaterJobSpc = new MetadataStorageUpdaterJobSpec()
{
@Override
@JsonProperty
public String getSegmentTable()
{
return "druid_segments";
}
@Override
@JsonProperty
public String getType()
{
return "derby";
}
@JsonProperty
public String getConnectURI()
{
return "jdbc:derby:memory:druidTest;create=true";
}
@JsonProperty
public String getUser()
{
return "sb";
}
@JsonProperty
public String getPassword()
{
return "sb";
}
@Override
public MetadataStorageConnectorConfig get()
{
return new MetadataStorageConnectorConfig()
{
public boolean isCreateTables()
{
return true;
}
public String getHost()
{
return "localhost";
}
public int getPort()
{
return -1;
}
public String getConnectURI()
{
return "jdbc:derby:memory:druidTest;create=true";
}
public String getUser()
{
return "sb";
}
public String getPassword()
{
return "sb";
}
@Override
public String toString()
{
return "DbConnectorConfig{" +
"createTables=" + isCreateTables() +
", connectURI='" + getConnectURI() + '\'' +
", user='" + getUser() + '\'' +
", passwordProvider=" + getPassword() +
'}';
}
};
}
};
private Supplier<MetadataStorageTablesConfig> metadataStorageTablesConfigSupplier;
private DerbyConnector connector;
private final Interval interval = Interval.parse("2011-01-01T00:00:00.000Z/2011-05-01T00:00:00.000Z");
@After
public void tearDown()
{
if (storageLocProperty == null) {
System.clearProperty(STORAGE_PROPERTY_KEY);
} else {
System.setProperty(STORAGE_PROPERTY_KEY, storageLocProperty);
}
tmpSegmentDir = null;
}
@Before
public void setUp() throws Exception
{
final File scratchFileDir = temporaryFolder.newFolder();
storageLocProperty = System.getProperty(STORAGE_PROPERTY_KEY);
tmpSegmentDir = temporaryFolder.newFolder();
System.setProperty(STORAGE_PROPERTY_KEY, tmpSegmentDir.getAbsolutePath());
final URL url = Preconditions.checkNotNull(Query.class.getClassLoader().getResource("druid.sample.tsv"));
final File tmpInputFile = temporaryFolder.newFile();
FileUtils.retryCopy(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return url.openStream();
}
},
tmpInputFile,
FileUtils.IS_EXCEPTION,
3
);
final HadoopDruidIndexerConfig hadoopDruidIndexerConfig = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(
new DataSchema(
DATASOURCE,
new StringInputRowParser(
new DelimitedParseSpec(
new TimestampSpec("ts", "iso", null),
new DimensionsSpec(Arrays.asList(TestIndex.DIMENSIONS), null, null),
"\t",
"\u0001",
Arrays.asList(TestIndex.COLUMNS)
)
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory(TestIndex.METRICS[0], TestIndex.METRICS[0]),
new HyperUniquesAggregatorFactory("quality_uniques", "quality")
},
new UniformGranularitySpec(
Granularity.MONTH,
QueryGranularity.DAY,
ImmutableList.<Interval>of(interval)
)
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(
"type", "static",
"paths", tmpInputFile.getAbsolutePath()
),
metadataStorageUpdaterJobSpc,
tmpSegmentDir.getAbsolutePath()
),
new HadoopTuningConfig(
scratchFileDir.getAbsolutePath(),
null,
null,
null,
null,
null,
false,
false,
false,
false,
null,
false,
false,
false,
null,
null
)
)
);
metadataStorageTablesConfigSupplier =
new Supplier<MetadataStorageTablesConfig>()
{
@Override
public MetadataStorageTablesConfig get()
{
return MetadataStorageTablesConfig.fromBase("druid");
}
};
connector = new TestDerbyConnector(
new Supplier<MetadataStorageConnectorConfig>()
{
@Override
public MetadataStorageConnectorConfig get()
{
return metadataStorageUpdaterJobSpc.get();
}
},
new Supplier<MetadataStorageTablesConfig>()
{
@Override
public MetadataStorageTablesConfig get()
{
return new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null);
}
}
);
try {
connector.getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.execute("DROP TABLE druid_segments");
return null;
}
}
);
} catch (CallbackFailedException e){
// Who cares
}
List<Jobby> jobs = ImmutableList.of(
new Jobby()
{
@Override
public boolean run()
{
connector.createSegmentTable(connector.getDBI(), "druid_segments");
return true;
}
},
new HadoopDruidDetermineConfigurationJob(hadoopDruidIndexerConfig),
new HadoopDruidIndexerJob(
hadoopDruidIndexerConfig,
new SQLMetadataStorageUpdaterJobHandler(connector)
)
);
JobHelper.runJobs(jobs, hadoopDruidIndexerConfig);
}
private List<DataSegment> getDataSegments(
SQLMetadataSegmentManager manager
) throws InterruptedException
{
manager.start();
while (!manager.isStarted()) {
Thread.sleep(10);
}
manager.poll();
final DruidDataSource druidDataSource = manager.getInventoryValue(DATASOURCE);
manager.stop();
return Lists.newArrayList(druidDataSource.getSegments());
}
@Test
public void testSimpleJob() throws IOException, InterruptedException
{
final SQLMetadataSegmentManager manager = new SQLMetadataSegmentManager(
HadoopDruidConverterConfig.jsonMapper,
new Supplier<MetadataSegmentManagerConfig>()
{
@Override
public MetadataSegmentManagerConfig get()
{
return new MetadataSegmentManagerConfig();
}
},
metadataStorageTablesConfigSupplier,
connector
);
final List<DataSegment> oldSemgments = getDataSegments(manager);
final File tmpDir = temporaryFolder.newFolder();
final HadoopConverterJob converterJob = new HadoopConverterJob(
new HadoopDruidConverterConfig(
DATASOURCE,
interval,
new IndexSpec(new RoaringBitmapSerdeFactory(), "uncompressed", "uncompressed"),
oldSemgments,
true,
tmpDir.toURI(),
ImmutableMap.<String, String>of(),
null,
tmpSegmentDir.toURI().toString()
)
);
final List<DataSegment> segments = Lists.newArrayList(converterJob.run());
Assert.assertNotNull("bad result", segments);
Assert.assertEquals("wrong segment count", 4, segments.size());
Assert.assertTrue(converterJob.getLoadedBytes() > 0);
Assert.assertTrue(converterJob.getWrittenBytes() > 0);
Assert.assertTrue(converterJob.getWrittenBytes() > converterJob.getLoadedBytes());
Assert.assertEquals(oldSemgments.size(), segments.size());
final DataSegment segment = segments.get(0);
Assert.assertTrue(interval.contains(segment.getInterval()));
Assert.assertTrue(segment.getVersion().endsWith("_converted"));
Assert.assertTrue(segment.getLoadSpec().get("path").toString().contains("_converted"));
for (File file : tmpDir.listFiles()) {
Assert.assertFalse(file.isDirectory());
Assert.assertTrue(file.isFile());
}
final Comparator<DataSegment> segmentComparator = new Comparator<DataSegment>()
{
@Override
public int compare(DataSegment o1, DataSegment o2)
{
return o1.getIdentifier().compareTo(o2.getIdentifier());
}
};
Collections.sort(
oldSemgments,
segmentComparator
);
Collections.sort(
segments,
segmentComparator
);
for (int i = 0; i < oldSemgments.size(); ++i) {
final DataSegment oldSegment = oldSemgments.get(i);
final DataSegment newSegment = segments.get(i);
Assert.assertEquals(oldSegment.getDataSource(), newSegment.getDataSource());
Assert.assertEquals(oldSegment.getInterval(), newSegment.getInterval());
Assert.assertEquals(
Sets.<String>newHashSet(oldSegment.getMetrics()),
Sets.<String>newHashSet(newSegment.getMetrics())
);
Assert.assertEquals(
Sets.<String>newHashSet(oldSegment.getDimensions()),
Sets.<String>newHashSet(newSegment.getDimensions())
);
Assert.assertEquals(oldSegment.getVersion() + "_converted", newSegment.getVersion());
Assert.assertTrue(oldSegment.getSize() < newSegment.getSize());
Assert.assertEquals(oldSegment.getBinaryVersion(), newSegment.getBinaryVersion());
}
}
private static void corrupt(
DataSegment segment
) throws IOException
{
final Map<String, Object> localLoadSpec = segment.getLoadSpec();
final Path segmentPath = Paths.get(localLoadSpec.get("path").toString());
final MappedByteBuffer buffer = Files.map(segmentPath.toFile(), FileChannel.MapMode.READ_WRITE);
while (buffer.hasRemaining()) {
buffer.put((byte) 0xFF);
}
}
@Test
public void testHadoopFailure() throws IOException, InterruptedException
{
final SQLMetadataSegmentManager manager = new SQLMetadataSegmentManager(
HadoopDruidConverterConfig.jsonMapper,
new Supplier<MetadataSegmentManagerConfig>()
{
@Override
public MetadataSegmentManagerConfig get()
{
return new MetadataSegmentManagerConfig();
}
},
metadataStorageTablesConfigSupplier,
connector
);
final List<DataSegment> oldSemgments = getDataSegments(manager);
final File tmpDir = temporaryFolder.newFolder();
final HadoopConverterJob converterJob = new HadoopConverterJob(
new HadoopDruidConverterConfig(
DATASOURCE,
interval,
new IndexSpec(new RoaringBitmapSerdeFactory(), "uncompressed", "uncompressed"),
oldSemgments,
true,
tmpDir.toURI(),
ImmutableMap.<String, String>of(),
null,
tmpSegmentDir.toURI().toString()
)
);
corrupt(oldSemgments.get(0));
final List<DataSegment> result = converterJob.run();
Assert.assertNull("result should be null", result);
final List<DataSegment> segments = getDataSegments(manager);
Assert.assertEquals(oldSemgments.size(), segments.size());
Assert.assertEquals(oldSemgments, segments);
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.updater;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.IndexSpec;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.net.URI;
public class HadoopDruidConverterConfigTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Test
public void simpleSerDe() throws IOException
{
final HadoopDruidConverterConfig config = new HadoopDruidConverterConfig(
"datasource",
Interval.parse("2000/2010"),
new IndexSpec(),
ImmutableList.<DataSegment>of(),
true,
URI.create("file:/dev/null"),
ImmutableMap.<String, String>of(),
"HIGH",
temporaryFolder.newFolder().getAbsolutePath()
);
final ObjectMapper mapper = new DefaultObjectMapper();
mapper.registerSubtypes(HadoopDruidConverterConfig.class);
final byte[] value = mapper.writeValueAsBytes(config);
final HadoopDruidConverterConfig config2 = mapper.readValue(
value,
HadoopDruidConverterConfig.class
);
Assert.assertEquals(mapper.writeValueAsString(config), mapper.writeValueAsString(config2));
}
}

View File

@ -22,6 +22,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
@ -37,6 +39,7 @@ import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
@ -45,7 +48,7 @@ import java.util.Map;
/**
* This task takes a segment and attempts to reindex it in the latest version with the specified indexSpec.
*
* <p/>
* Only datasource must be specified. `indexSpec` and `force` are highly suggested but optional. The rest get
* auto-configured and should only be modified with great care
*/
@ -82,7 +85,7 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
)
{
final String id = makeId(dataSource, interval);
return new ConvertSegmentTask(id, id, dataSource, interval, null, indexSpec, force, validate);
return new ConvertSegmentTask(id, dataSource, interval, null, indexSpec, force, validate);
}
/**
@ -100,7 +103,7 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
final Interval interval = segment.getInterval();
final String dataSource = segment.getDataSource();
final String id = makeId(dataSource, interval);
return new ConvertSegmentTask(id, id, dataSource, interval, segment, indexSpec, force, validate);
return new ConvertSegmentTask(id, dataSource, interval, segment, indexSpec, force, validate);
}
private static String makeId(String dataSource, Interval interval)
@ -113,7 +116,6 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
@JsonCreator
private static ConvertSegmentTask createFromJson(
@JsonProperty("id") String id,
@JsonProperty("groupId") String groupId,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("segment") DataSegment segment,
@ -131,12 +133,11 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
return create(segment, indexSpec, isForce, isValidate);
}
}
return new ConvertSegmentTask(id, groupId, dataSource, interval, segment, indexSpec, isForce, isValidate);
return new ConvertSegmentTask(id, dataSource, interval, segment, indexSpec, isForce, isValidate);
}
private ConvertSegmentTask(
protected ConvertSegmentTask(
String id,
String groupId,
String dataSource,
Interval interval,
DataSegment segment,
@ -145,7 +146,7 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
boolean validate
)
{
super(id, groupId, dataSource, interval);
super(id, dataSource, interval);
this.segment = segment;
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
this.force = force;
@ -185,6 +186,7 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final Iterable<DataSegment> segmentsToUpdate;
if (segment == null) {
final List<DataSegment> segments = toolbox.getTaskActionClient().submit(
new SegmentListUsedAction(
@ -192,46 +194,66 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
getInterval()
)
);
final FunctionalIterable<Task> tasks = FunctionalIterable
segmentsToUpdate = FunctionalIterable
.create(segments)
.keep(
new Function<DataSegment, Task>()
.filter(
new Predicate<DataSegment>()
{
@Override
public Task apply(DataSegment segment)
public boolean apply(DataSegment segment)
{
final Integer segmentVersion = segment.getBinaryVersion();
if (!CURR_VERSION_INTEGER.equals(segmentVersion)) {
return new SubTask(getGroupId(), segment, indexSpec, force, validate);
return true;
} else if (force) {
log.info(
"Segment[%s] already at version[%s], forcing conversion",
segment.getIdentifier(),
segmentVersion
);
return new SubTask(getGroupId(), segment, indexSpec, force, validate);
return true;
} else {
log.info("Skipping[%s], already version[%s]", segment.getIdentifier(), segmentVersion);
return null;
return false;
}
}
}
);
// Vestigial from a past time when this task spawned subtasks.
for (final Task subTask : tasks) {
final TaskStatus status = subTask.run(toolbox);
if (!status.isSuccess()) {
return status;
}
}
} else {
log.info("I'm in a subless mood.");
convertSegment(toolbox, segment, indexSpec, force, validate);
segmentsToUpdate = Collections.singleton(segment);
}
// Vestigial from a past time when this task spawned subtasks.
for (final Task subTask : generateSubTasks(getGroupId(), segmentsToUpdate, indexSpec, force, validate)) {
final TaskStatus status = subTask.run(toolbox);
if (!status.isSuccess()) {
return TaskStatus.fromCode(getId(), status.getStatusCode());
}
}
return success();
}
protected Iterable<Task> generateSubTasks(
final String groupId,
final Iterable<DataSegment> segments,
final IndexSpec indexSpec,
final boolean force,
final boolean validate
)
{
return Iterables.transform(
segments,
new Function<DataSegment, Task>()
{
@Override
public Task apply(DataSegment input)
{
return new SubTask(groupId, segment, indexSpec, force, validate);
}
}
);
}
@Override
public boolean equals(Object o)
{

View File

@ -0,0 +1,277 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.metamx.common.UOE;
import com.metamx.common.logger.Logger;
import io.druid.indexer.updater.HadoopConverterJob;
import io.druid.indexer.updater.HadoopDruidConverterConfig;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.segment.IndexSpec;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class HadoopConverterTask extends ConvertSegmentTask
{
private static final String TYPE = "hadoop_convert_segment";
private static final Logger log = new Logger(HadoopConverterTask.class);
@JsonCreator
public HadoopConverterTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("force") boolean force,
@JsonProperty("validate") Boolean validate,
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates,
@JsonProperty("distributedSuccessCache") URI distributedSuccessCache,
@JsonProperty("jobPriority") String jobPriority,
@JsonProperty("segmentOutputPath") String segmentOutputPath,
@JsonProperty("classpathPrefix") String classpathPrefix
)
{
super(
makeId(
id,
TYPE,
Preconditions.checkNotNull(dataSource, "dataSource"),
Preconditions.checkNotNull(interval, "interval")
),
dataSource,
interval,
null, // Always call subtask codepath
indexSpec,
force,
validate == null ? true : validate
);
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates;
this.distributedSuccessCache = Preconditions.checkNotNull(distributedSuccessCache, "distributedSuccessCache");
this.segmentOutputPath = Preconditions.checkNotNull(segmentOutputPath, "segmentOutputPath");
this.jobPriority = jobPriority;
this.classpathPrefix = classpathPrefix;
}
private final List<String> hadoopDependencyCoordinates;
private final URI distributedSuccessCache;
private final String jobPriority;
private final String segmentOutputPath;
private final String classpathPrefix;
@JsonProperty
public List<String> getHadoopDependencyCoordinates()
{
return hadoopDependencyCoordinates;
}
@JsonProperty
public URI getDistributedSuccessCache()
{
return distributedSuccessCache;
}
@JsonProperty
public String getJobPriority()
{
return jobPriority;
}
@JsonProperty
public String getSegmentOutputPath()
{
return segmentOutputPath;
}
@Override
@JsonProperty
public String getClasspathPrefix()
{
return classpathPrefix;
}
@Override
protected Iterable<Task> generateSubTasks(
final String groupId,
final Iterable<DataSegment> segments,
final IndexSpec indexSpec,
final boolean force,
final boolean validate
)
{
return Collections.<Task>singleton(
new ConverterSubTask(
ImmutableList.copyOf(segments),
this
)
);
}
@Override
@JsonIgnore
public DataSegment getSegment()
{
throw new UOE(
"Sub-less data segment not supported for hadoop converter task. Specify interval and datasource instead"
);
}
@Override
public String getType()
{
return TYPE;
}
public static class ConverterSubTask extends HadoopTask
{
private final List<DataSegment> segments;
private final HadoopConverterTask parent;
@JsonCreator
public ConverterSubTask(
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("parent") HadoopConverterTask parent
)
{
super(
joinId(
Preconditions.checkNotNull(parent, "parent").getGroupId(),
"sub",
parent.getInterval().getStart(),
parent.getInterval().getEnd()
),
parent.getDataSource(),
parent.getHadoopDependencyCoordinates()
);
this.segments = segments;
this.parent = parent;
}
@JsonProperty
public List<DataSegment> getSegments()
{
return segments;
}
@JsonProperty
public HadoopConverterTask getParent()
{
return parent;
}
@Override
public String getType()
{
return TYPE + "_sub";
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return true;
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final Map<String, String> hadoopProperties = new HashMap<>();
final Properties properties = injector.getInstance(Properties.class);
for (String name : properties.stringPropertyNames()) {
if (name.startsWith("hadoop.")) {
hadoopProperties.put(name.substring("hadoop.".length()), properties.getProperty(name));
}
}
final ClassLoader loader = buildClassLoader(toolbox);
final HadoopDruidConverterConfig config = new HadoopDruidConverterConfig(
getDataSource(),
parent.getInterval(),
parent.getIndexSpec(),
segments,
parent.isValidate(),
parent.getDistributedSuccessCache(),
hadoopProperties,
parent.getJobPriority(),
parent.getSegmentOutputPath()
);
final String finishedSegmentString = invokeForeignLoader(
"io.druid.indexing.common.task.HadoopConverterTask$JobInvoker",
new String[]{HadoopDruidConverterConfig.jsonMapper.writeValueAsString(config)},
loader
);
if (finishedSegmentString == null) {
return TaskStatus.failure(getId());
}
final List<DataSegment> finishedSegments = HadoopDruidConverterConfig.jsonMapper.readValue(
finishedSegmentString,
new TypeReference<List<DataSegment>>()
{
}
);
log.debug("Found new segments %s", Arrays.toString(finishedSegments.toArray()));
toolbox.pushSegments(finishedSegments);
return success();
}
}
public static class JobInvoker
{
public static String runTask(String[] input)
{
final HadoopDruidConverterConfig config;
try {
config = HadoopDruidConverterConfig.jsonMapper.readValue(
input[0],
HadoopDruidConverterConfig.class
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
final HadoopConverterJob hadoopConverterJob = new HadoopConverterJob(config);
try {
final List<DataSegment> result = hadoopConverterJob.run();
return result == null
? null
: HadoopDruidConverterConfig.jsonMapper.writeValueAsString(result);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
}
}

View File

@ -47,6 +47,8 @@ import io.druid.query.QueryRunner;
@JsonSubTypes.Type(name = "restore", value = RestoreTask.class),
@JsonSubTypes.Type(name = "index", value = IndexTask.class),
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
@JsonSubTypes.Type(name = "hadoop_convert_segment", value = HadoopConverterTask.class),
@JsonSubTypes.Type(name = "hadoop_convert_segment_sub", value = HadoopConverterTask.ConverterSubTask.class),
@JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class),
@JsonSubTypes.Type(name = "noop", value = NoopTask.class),
@JsonSubTypes.Type(name = "version_converter", value = ConvertSegmentTask.class), // Backwards compat - Deprecated

View File

@ -0,0 +1,315 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.IndexSpec;
import io.druid.segment.data.ConciseBitmapSerdeFactory;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
public class HadoopConverterTaskSerDeTest
{
private static ObjectMapper objectMapper;
private static final String TASK_ID = "task id";
private static final String DATA_SOURCE = "datasource";
private static final Interval INTERVAL = Interval.parse("2010/2011");
private static final String SEGMENT_VERSION = "some version";
private static final Map<String, Object> LOAD_SPEC = ImmutableMap.<String, Object>of("someKey", "someVal");
private static final List<String> DIMENSIONS = ImmutableList.of("dim1", "dim2");
private static final List<String> METRICS = ImmutableList.of("metric1", "metric2");
private static final ShardSpec SHARD_SPEC = new NoneShardSpec();
private static final int BINARY_VERSION = 34718;
private static final long SEGMENT_SIZE = 7483901348790L;
private static final IndexSpec INDEX_SPEC = new IndexSpec(new ConciseBitmapSerdeFactory(), "lz4", "lzf");
private static final DataSegment DATA_SEGMENT = new DataSegment(
DATA_SOURCE,
INTERVAL,
SEGMENT_VERSION,
LOAD_SPEC,
DIMENSIONS,
METRICS,
SHARD_SPEC,
BINARY_VERSION,
SEGMENT_SIZE
);
private static final List<String> HADOOP_DEPENDENCY = ImmutableList.of("dependency1");
private static final URI DISTRIBUTED_CACHE = URI.create("http://your.momma"); // Should have plenty of space
private static final String PRIORITY = "0";
private static final String OUTPUT_PATH = "/dev/null";
private static final String CLASSPATH_PREFIX = "something:where:I:need:stuff";
@BeforeClass
public static void setUpStatic()
{
objectMapper = new DefaultObjectMapper();
objectMapper.registerSubtypes(
HadoopConverterTask.class,
HadoopConverterTask.ConverterSubTask.class,
ShardSpec.class
);
}
@Test
public void testSimpleConverterTaskSerDe() throws IOException
{
HadoopConverterTask orig = new HadoopConverterTask(
TASK_ID,
DATA_SOURCE,
INTERVAL,
INDEX_SPEC,
true,
true,
HADOOP_DEPENDENCY,
DISTRIBUTED_CACHE,
PRIORITY,
OUTPUT_PATH,
CLASSPATH_PREFIX
);
final String strOrig = objectMapper.writeValueAsString(orig);
HadoopConverterTask other = objectMapper.readValue(strOrig, HadoopConverterTask.class);
Assert.assertEquals(strOrig, objectMapper.writeValueAsString(other));
Assert.assertFalse(orig == other);
Assert.assertEquals(orig, other);
assertExpectedTask(other);
}
@Test
public void testSimpleSubTaskSerDe() throws IOException
{
HadoopConverterTask parent = new HadoopConverterTask(
TASK_ID,
DATA_SOURCE,
INTERVAL,
INDEX_SPEC,
true,
true,
HADOOP_DEPENDENCY,
DISTRIBUTED_CACHE,
PRIORITY,
OUTPUT_PATH,
CLASSPATH_PREFIX
);
HadoopConverterTask.ConverterSubTask subTask = new HadoopConverterTask.ConverterSubTask(
ImmutableList.of(
DATA_SEGMENT
), parent
);
final String origString = objectMapper.writeValueAsString(subTask);
final HadoopConverterTask.ConverterSubTask otherSub = objectMapper.readValue(
origString,
HadoopConverterTask.ConverterSubTask.class
);
Assert.assertEquals(subTask, otherSub);
Assert.assertEquals(origString, objectMapper.writeValueAsString(otherSub));
Assert.assertEquals(ImmutableList.of(DATA_SEGMENT), otherSub.getSegments());
Assert.assertFalse(parent == otherSub.getParent());
Assert.assertEquals(parent, otherSub.getParent());
assertExpectedTask(otherSub.getParent());
}
private static void assertExpectedTask(HadoopConverterTask other)
{
Assert.assertEquals(TASK_ID, other.getId());
Assert.assertEquals(DATA_SOURCE, other.getDataSource());
Assert.assertEquals(INTERVAL, other.getInterval());
Assert.assertEquals(INDEX_SPEC, other.getIndexSpec());
Assert.assertTrue(other.isForce());
Assert.assertTrue(other.isValidate());
Assert.assertEquals(HADOOP_DEPENDENCY, other.getHadoopDependencyCoordinates());
Assert.assertEquals(DISTRIBUTED_CACHE, other.getDistributedSuccessCache());
Assert.assertEquals(PRIORITY, other.getJobPriority());
Assert.assertEquals(OUTPUT_PATH, other.getSegmentOutputPath());
Assert.assertEquals(CLASSPATH_PREFIX, other.getClasspathPrefix());
}
@Test
public void testSubTask()
{
HadoopConverterTask parent = new HadoopConverterTask(
TASK_ID,
DATA_SOURCE,
INTERVAL,
INDEX_SPEC,
true,
true,
HADOOP_DEPENDENCY,
DISTRIBUTED_CACHE,
PRIORITY,
OUTPUT_PATH,
CLASSPATH_PREFIX
);
HadoopConverterTask.ConverterSubTask subTask = new HadoopConverterTask.ConverterSubTask(
ImmutableList.of(
DATA_SEGMENT
), parent
);
Assert.assertEquals(parent.getType(), "hadoop_convert_segment");
Assert.assertEquals(parent.getType() + "_sub", subTask.getType());
}
@Test
public void testNullValidate()
{
HadoopConverterTask orig = new HadoopConverterTask(
TASK_ID,
DATA_SOURCE,
INTERVAL,
INDEX_SPEC,
true,
null,
HADOOP_DEPENDENCY,
DISTRIBUTED_CACHE,
PRIORITY,
OUTPUT_PATH,
CLASSPATH_PREFIX
);
Assert.assertTrue(orig.isValidate());
}
@Test
public void testMinimal()
{
HadoopConverterTask parent = new HadoopConverterTask(
null,
DATA_SOURCE,
INTERVAL,
null,
true,
null,
null,
DISTRIBUTED_CACHE,
null,
OUTPUT_PATH,
null
);
Assert.assertEquals(DATA_SOURCE, parent.getDataSource());
Assert.assertEquals(INTERVAL, parent.getInterval());
Assert.assertEquals(DISTRIBUTED_CACHE, parent.getDistributedSuccessCache());
Assert.assertEquals(OUTPUT_PATH, parent.getSegmentOutputPath());
Assert.assertNotNull(parent.getId());
Assert.assertFalse(parent.getId().isEmpty());
}
@Test(expected = UnsupportedOperationException.class)
public void testGetDataSegment()
{
HadoopConverterTask orig = new HadoopConverterTask(
TASK_ID,
DATA_SOURCE,
INTERVAL,
INDEX_SPEC,
true,
null,
HADOOP_DEPENDENCY,
DISTRIBUTED_CACHE,
PRIORITY,
OUTPUT_PATH,
CLASSPATH_PREFIX
);
orig.getSegment();
}
@Test(expected = NullPointerException.class)
public void testNull1()
{
HadoopConverterTask parent = new HadoopConverterTask(
null,
null,
INTERVAL,
null,
true,
null,
null,
DISTRIBUTED_CACHE,
null,
OUTPUT_PATH,
null
);
}
@Test(expected = NullPointerException.class)
public void testNull2()
{
HadoopConverterTask parent = new HadoopConverterTask(
null,
DATA_SOURCE,
null,
null,
true,
null,
null,
DISTRIBUTED_CACHE,
null,
OUTPUT_PATH,
null
);
}
@Test(expected = NullPointerException.class)
public void testNull3()
{
HadoopConverterTask parent = new HadoopConverterTask(
null,
DATA_SOURCE,
INTERVAL,
null,
true,
null,
null,
null,
null,
OUTPUT_PATH,
null
);
}
@Test(expected = NullPointerException.class)
public void testNull4()
{
HadoopConverterTask parent = new HadoopConverterTask(
null,
DATA_SOURCE,
INTERVAL,
null,
true,
null,
null,
DISTRIBUTED_CACHE,
null,
null,
null
);
}
}

View File

@ -72,6 +72,7 @@
<jackson.version>2.4.4</jackson.version>
<log4j.version>2.2</log4j.version>
<slf4j.version>1.7.10</slf4j.version>
<hadoop.compile.version>2.3.0</hadoop.compile.version>
</properties>
<modules>
@ -497,7 +498,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.3.0</version>
<version>${hadoop.compile.version}</version>
<scope>provided</scope>
</dependency>
<dependency>

View File

@ -62,9 +62,9 @@ public class Main
.withCommands(ConvertProperties.class, DruidJsonValidator.class, PullDependencies.class, CreateTables.class);
builder.withGroup("index")
.withDescription("Run indexing for druid")
.withDefaultCommand(Help.class)
.withCommands(CliHadoopIndexer.class);
.withDescription("Run indexing for druid")
.withDefaultCommand(Help.class)
.withCommands(CliHadoopIndexer.class);
builder.withGroup("internal")
.withDescription("Processes that Druid runs \"internally\", you should rarely use these directly")
@ -73,7 +73,10 @@ public class Main
final Injector injector = GuiceInjectors.makeStartupInjector();
final ExtensionsConfig config = injector.getInstance(ExtensionsConfig.class);
final Collection<CliCommandCreator> extensionCommands = Initialization.getFromExtensions(config, CliCommandCreator.class);
final Collection<CliCommandCreator> extensionCommands = Initialization.getFromExtensions(
config,
CliCommandCreator.class
);
for (CliCommandCreator creator : extensionCommands) {
creator.addCommands(builder);
@ -82,7 +85,7 @@ public class Main
final Cli<Runnable> cli = builder.build();
try {
final Runnable command = cli.parse(args);
if (! (command instanceof Help)) { // Hack to work around Help not liking being injected
if (!(command instanceof Help)) { // Hack to work around Help not liking being injected
injector.injectMembers(command);
}
command.run();