mirror of https://github.com/apache/druid.git
make making v9 segments something completely configurable
This commit is contained in:
parent
351afb8be7
commit
fec7b43fcb
|
@ -47,7 +47,6 @@ import io.druid.guice.annotations.Self;
|
|||
import io.druid.indexer.partitions.PartitionsSpec;
|
||||
import io.druid.indexer.path.PathSpec;
|
||||
import io.druid.initialization.Initialization;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.indexing.granularity.GranularitySpec;
|
||||
import io.druid.server.DruidNode;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
@ -170,7 +169,6 @@ public class HadoopDruidIndexerConfig
|
|||
|
||||
private volatile HadoopIngestionSpec schema;
|
||||
private volatile PathSpec pathSpec;
|
||||
private volatile ColumnConfig columnConfig;
|
||||
private volatile Map<DateTime,ShardSpecLookup> shardSpecLookups = Maps.newHashMap();
|
||||
private volatile Map<ShardSpec, HadoopyShardSpec> hadoopShardSpecLookup = Maps.newHashMap();
|
||||
|
||||
|
@ -179,7 +177,6 @@ public class HadoopDruidIndexerConfig
|
|||
final @JsonProperty("schema") HadoopIngestionSpec schema
|
||||
)
|
||||
{
|
||||
this.columnConfig = columnConfig;
|
||||
this.schema = schema;
|
||||
this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
|
||||
for (Map.Entry<DateTime, List<HadoopyShardSpec>> entry : schema.getTuningConfig().getShardSpecs().entrySet()) {
|
||||
|
@ -207,23 +204,12 @@ public class HadoopDruidIndexerConfig
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: remove this
|
||||
public boolean isLegacy()
|
||||
{
|
||||
return schema.isLegacy();
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public HadoopIngestionSpec getSchema()
|
||||
{
|
||||
return schema;
|
||||
}
|
||||
|
||||
public ColumnConfig getColumnConfig()
|
||||
{
|
||||
return columnConfig;
|
||||
}
|
||||
|
||||
public String getDataSource()
|
||||
{
|
||||
return schema.getDataSchema().getDataSource();
|
||||
|
@ -404,6 +390,11 @@ public class HadoopDruidIndexerConfig
|
|||
}
|
||||
}
|
||||
|
||||
public boolean isPersistInHeap()
|
||||
{
|
||||
return schema.getTuningConfig().isPersistInHeap();
|
||||
}
|
||||
|
||||
/******************************************
|
||||
Path helper logic
|
||||
******************************************/
|
||||
|
|
|
@ -57,10 +57,10 @@ public class HadoopDruidIndexerJob implements Jobby
|
|||
List<Jobby> jobs = Lists.newArrayList();
|
||||
JobHelper.ensurePaths(config);
|
||||
|
||||
if (config.isLegacy()) {
|
||||
indexJob = new LegacyIndexGeneratorJob(config);
|
||||
} else {
|
||||
if (config.isPersistInHeap()) {
|
||||
indexJob = new IndexGeneratorJob(config);
|
||||
} else {
|
||||
indexJob = new LegacyIndexGeneratorJob(config);
|
||||
}
|
||||
jobs.add(indexJob);
|
||||
|
||||
|
|
|
@ -50,14 +50,11 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
private final HadoopIOConfig ioConfig;
|
||||
private final HadoopTuningConfig tuningConfig;
|
||||
|
||||
private final boolean legacy; // TODO: remove
|
||||
|
||||
@JsonCreator
|
||||
public HadoopIngestionSpec(
|
||||
@JsonProperty("dataSchema") DataSchema dataSchema,
|
||||
@JsonProperty("ioConfig") HadoopIOConfig ioConfig,
|
||||
@JsonProperty("tuningConfig") HadoopTuningConfig tuningConfig,
|
||||
@JsonProperty("legacy") boolean legacy, // TODO: remove
|
||||
// All deprecated
|
||||
final @JsonProperty("dataSource") String dataSource,
|
||||
final @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
|
||||
|
@ -87,7 +84,6 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
)
|
||||
{
|
||||
super(dataSchema, ioConfig, tuningConfig);
|
||||
this.legacy = legacy;
|
||||
|
||||
if (dataSchema != null) {
|
||||
this.dataSchema = dataSchema;
|
||||
|
@ -169,7 +165,8 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
overwriteFiles,
|
||||
ignoreInvalidRows,
|
||||
jobProperties,
|
||||
combineText
|
||||
combineText,
|
||||
false
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -201,7 +198,6 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
schema,
|
||||
ioConfig,
|
||||
tuningConfig,
|
||||
legacy,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
@ -235,7 +231,6 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
dataSchema,
|
||||
config,
|
||||
tuningConfig,
|
||||
legacy,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
@ -269,7 +264,6 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
dataSchema,
|
||||
ioConfig,
|
||||
config,
|
||||
legacy,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
@ -296,11 +290,4 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
null
|
||||
);
|
||||
}
|
||||
|
||||
// TODO: remove
|
||||
@JsonProperty
|
||||
public boolean isLegacy()
|
||||
{
|
||||
return legacy;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,6 +53,7 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
false,
|
||||
false,
|
||||
null,
|
||||
false,
|
||||
false
|
||||
);
|
||||
}
|
||||
|
@ -68,6 +69,7 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
private final boolean ignoreInvalidRows;
|
||||
private final Map<String, String> jobProperties;
|
||||
private final boolean combineText;
|
||||
private final boolean persistInHeap;
|
||||
|
||||
@JsonCreator
|
||||
public HadoopTuningConfig(
|
||||
|
@ -81,7 +83,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
|
||||
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
|
||||
final @JsonProperty("jobProperties") Map<String, String> jobProperties,
|
||||
final @JsonProperty("combineText") boolean combineText
|
||||
final @JsonProperty("combineText") boolean combineText,
|
||||
final @JsonProperty("persistInHeap") boolean persistInHeap
|
||||
)
|
||||
{
|
||||
this.workingPath = workingPath == null ? null : workingPath;
|
||||
|
@ -97,6 +100,7 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
? ImmutableMap.<String, String>of()
|
||||
: ImmutableMap.copyOf(jobProperties));
|
||||
this.combineText = combineText;
|
||||
this.persistInHeap = persistInHeap;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -165,6 +169,12 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
return combineText;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isPersistInHeap()
|
||||
{
|
||||
return persistInHeap;
|
||||
}
|
||||
|
||||
public HadoopTuningConfig withWorkingPath(String path)
|
||||
{
|
||||
return new HadoopTuningConfig(
|
||||
|
@ -178,7 +188,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
overwriteFiles,
|
||||
ignoreInvalidRows,
|
||||
jobProperties,
|
||||
combineText
|
||||
combineText,
|
||||
persistInHeap
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -195,7 +206,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
overwriteFiles,
|
||||
ignoreInvalidRows,
|
||||
jobProperties,
|
||||
combineText
|
||||
combineText,
|
||||
persistInHeap
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -212,7 +224,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
overwriteFiles,
|
||||
ignoreInvalidRows,
|
||||
jobProperties,
|
||||
combineText
|
||||
combineText,
|
||||
persistInHeap
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,7 +37,6 @@ import io.druid.data.input.InputRow;
|
|||
import io.druid.data.input.impl.StringInputRowParser;
|
||||
import io.druid.offheap.OffheapBufferPool;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.segment.BaseProgressIndicator;
|
||||
import io.druid.segment.IndexIO;
|
||||
import io.druid.segment.IndexMaker;
|
||||
import io.druid.segment.LoggingProgressIndicator;
|
||||
|
@ -90,16 +89,6 @@ import java.util.zip.ZipOutputStream;
|
|||
public class IndexGeneratorJob implements Jobby
|
||||
{
|
||||
private static final Logger log = new Logger(IndexGeneratorJob.class);
|
||||
private final HadoopDruidIndexerConfig config;
|
||||
private IndexGeneratorStats jobStats;
|
||||
|
||||
public IndexGeneratorJob(
|
||||
HadoopDruidIndexerConfig config
|
||||
)
|
||||
{
|
||||
this.config = config;
|
||||
this.jobStats = new IndexGeneratorStats();
|
||||
}
|
||||
|
||||
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config)
|
||||
{
|
||||
|
@ -133,6 +122,22 @@ public class IndexGeneratorJob implements Jobby
|
|||
return publishedSegments;
|
||||
}
|
||||
|
||||
private final HadoopDruidIndexerConfig config;
|
||||
private IndexGeneratorStats jobStats;
|
||||
|
||||
public IndexGeneratorJob(
|
||||
HadoopDruidIndexerConfig config
|
||||
)
|
||||
{
|
||||
this.config = config;
|
||||
this.jobStats = new IndexGeneratorStats();
|
||||
}
|
||||
|
||||
protected void setReducerClass(final Job job)
|
||||
{
|
||||
job.setReducerClass(IndexGeneratorReducer.class);
|
||||
}
|
||||
|
||||
public IndexGeneratorStats getJobStats()
|
||||
{
|
||||
return jobStats;
|
||||
|
@ -164,7 +169,7 @@ public class IndexGeneratorJob implements Jobby
|
|||
job.setNumReduceTasks(Iterables.size(config.getAllBuckets().get()));
|
||||
job.setPartitionerClass(IndexGeneratorPartitioner.class);
|
||||
|
||||
job.setReducerClass(IndexGeneratorReducer.class);
|
||||
setReducerClass(job);
|
||||
job.setOutputKeyClass(BytesWritable.class);
|
||||
job.setOutputValueClass(Text.class);
|
||||
job.setOutputFormatClass(IndexGeneratorOutputFormat.class);
|
||||
|
@ -193,7 +198,6 @@ public class IndexGeneratorJob implements Jobby
|
|||
}
|
||||
|
||||
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Text>
|
||||
|
||||
{
|
||||
@Override
|
||||
protected void innerMap(
|
||||
|
@ -259,6 +263,42 @@ public class IndexGeneratorJob implements Jobby
|
|||
private List<String> metricNames = Lists.newArrayList();
|
||||
private StringInputRowParser parser;
|
||||
|
||||
protected ProgressIndicator makeProgressIndicator(final Context context)
|
||||
{
|
||||
return new LoggingProgressIndicator("IndexGeneratorJob")
|
||||
{
|
||||
@Override
|
||||
public void progress()
|
||||
{
|
||||
context.progress();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected File persist(
|
||||
final IncrementalIndex index,
|
||||
final Interval interval,
|
||||
final File file,
|
||||
final ProgressIndicator progressIndicator
|
||||
) throws IOException
|
||||
{
|
||||
return IndexMaker.persist(
|
||||
index, interval, file, progressIndicator
|
||||
);
|
||||
}
|
||||
|
||||
protected File mergeQueryableIndex(
|
||||
final List<QueryableIndex> indexes,
|
||||
final AggregatorFactory[] aggs,
|
||||
final File file,
|
||||
ProgressIndicator progressIndicator
|
||||
) throws IOException
|
||||
{
|
||||
return IndexMaker.mergeQueryableIndex(
|
||||
indexes, aggs, file, progressIndicator
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setup(Context context)
|
||||
throws IOException, InterruptedException
|
||||
|
@ -297,14 +337,7 @@ public class IndexGeneratorJob implements Jobby
|
|||
long startTime = System.currentTimeMillis();
|
||||
|
||||
Set<String> allDimensionNames = Sets.newHashSet();
|
||||
final ProgressIndicator progressIndicator = new LoggingProgressIndicator("IndexGeneratorJob")
|
||||
{
|
||||
@Override
|
||||
public void progress()
|
||||
{
|
||||
context.progress();
|
||||
}
|
||||
};
|
||||
final ProgressIndicator progressIndicator = makeProgressIndicator(context);
|
||||
|
||||
for (final Text value : values) {
|
||||
context.progress();
|
||||
|
@ -327,9 +360,7 @@ public class IndexGeneratorJob implements Jobby
|
|||
toMerge.add(file);
|
||||
|
||||
context.progress();
|
||||
IndexMaker.persist(
|
||||
index, interval, file, progressIndicator
|
||||
);
|
||||
persist(index, interval, file, progressIndicator);
|
||||
// close this index and make a new one
|
||||
index.close();
|
||||
index = makeIncrementalIndex(bucket, aggs);
|
||||
|
@ -350,22 +381,18 @@ public class IndexGeneratorJob implements Jobby
|
|||
}
|
||||
|
||||
mergedBase = new File(baseFlushFile, "merged");
|
||||
IndexMaker.persist(
|
||||
index, interval, mergedBase, progressIndicator
|
||||
);
|
||||
persist(index, interval, mergedBase, progressIndicator);
|
||||
} else {
|
||||
if (!index.isEmpty()) {
|
||||
final File finalFile = new File(baseFlushFile, "final");
|
||||
IndexMaker.persist(
|
||||
index, interval, finalFile, progressIndicator
|
||||
);
|
||||
persist(index, interval, finalFile, progressIndicator);
|
||||
toMerge.add(finalFile);
|
||||
}
|
||||
|
||||
for (File file : toMerge) {
|
||||
indexes.add(IndexIO.loadIndex(file));
|
||||
}
|
||||
mergedBase = IndexMaker.mergeQueryableIndex(
|
||||
mergedBase = mergeQueryableIndex(
|
||||
indexes, aggs, new File(baseFlushFile, "merged"), progressIndicator
|
||||
);
|
||||
}
|
||||
|
|
|
@ -19,244 +19,42 @@
|
|||
|
||||
package io.druid.indexer;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.io.Closeables;
|
||||
import com.google.common.primitives.Longs;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.CloseQuietly;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.StringInputRowParser;
|
||||
import io.druid.offheap.OffheapBufferPool;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.segment.IndexIO;
|
||||
import io.druid.segment.BaseProgressIndicator;
|
||||
import io.druid.segment.IndexMerger;
|
||||
import io.druid.segment.ProgressIndicator;
|
||||
import io.druid.segment.QueryableIndex;
|
||||
import io.druid.segment.SegmentUtils;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.InvalidJobConfException;
|
||||
import org.apache.hadoop.mapreduce.Counter;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.mapreduce.Partitioner;
|
||||
import org.apache.hadoop.mapreduce.Reducer;
|
||||
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.zip.ZipEntry;
|
||||
import java.util.zip.ZipOutputStream;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class LegacyIndexGeneratorJob extends IndexGeneratorJob
|
||||
{
|
||||
private static final Logger log = new Logger(IndexGeneratorJob.class);
|
||||
private final HadoopDruidIndexerConfig config;
|
||||
private IndexGeneratorStats jobStats;
|
||||
|
||||
public LegacyIndexGeneratorJob(
|
||||
HadoopDruidIndexerConfig config
|
||||
)
|
||||
{
|
||||
super(config);
|
||||
this.config = config;
|
||||
this.jobStats = new IndexGeneratorStats();
|
||||
}
|
||||
|
||||
public boolean run()
|
||||
{
|
||||
try {
|
||||
Job job = new Job(
|
||||
new Configuration(),
|
||||
String.format("%s-index-generator-%s", config.getDataSource(), config.getIntervals())
|
||||
);
|
||||
|
||||
job.getConfiguration().set("io.sort.record.percent", "0.23");
|
||||
|
||||
JobHelper.injectSystemProperties(job);
|
||||
|
||||
if (config.isCombineText()) {
|
||||
job.setInputFormatClass(CombineTextInputFormat.class);
|
||||
} else {
|
||||
job.setInputFormatClass(TextInputFormat.class);
|
||||
}
|
||||
|
||||
job.setMapperClass(IndexGeneratorMapper.class);
|
||||
job.setMapOutputValueClass(Text.class);
|
||||
|
||||
SortableBytes.useSortableBytesAsMapOutputKey(job);
|
||||
|
||||
job.setNumReduceTasks(Iterables.size(config.getAllBuckets().get()));
|
||||
job.setPartitionerClass(IndexGeneratorPartitioner.class);
|
||||
|
||||
job.setReducerClass(IndexGeneratorReducer.class);
|
||||
job.setOutputKeyClass(BytesWritable.class);
|
||||
job.setOutputValueClass(Text.class);
|
||||
job.setOutputFormatClass(IndexGeneratorOutputFormat.class);
|
||||
FileOutputFormat.setOutputPath(job, config.makeIntermediatePath());
|
||||
|
||||
config.addInputPaths(job);
|
||||
config.addJobProperties(job);
|
||||
config.intoConfiguration(job);
|
||||
|
||||
JobHelper.setupClasspath(config, job);
|
||||
|
||||
job.submit();
|
||||
log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());
|
||||
|
||||
boolean success = job.waitForCompletion(true);
|
||||
|
||||
Counter invalidRowCount = job.getCounters()
|
||||
.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER);
|
||||
jobStats.setInvalidRowCount(invalidRowCount.getValue());
|
||||
|
||||
return success;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Text>
|
||||
|
||||
{
|
||||
@Override
|
||||
protected void innerMap(
|
||||
InputRow inputRow,
|
||||
Text text,
|
||||
Context context
|
||||
) throws IOException, InterruptedException
|
||||
{
|
||||
// Group by bucket, sort by timestamp
|
||||
final Optional<Bucket> bucket = getConfig().getBucket(inputRow);
|
||||
|
||||
if (!bucket.isPresent()) {
|
||||
throw new ISE("WTF?! No bucket found for row: %s", inputRow);
|
||||
}
|
||||
|
||||
context.write(
|
||||
new SortableBytes(
|
||||
bucket.get().toGroupKey(),
|
||||
Longs.toByteArray(inputRow.getTimestampFromEpoch())
|
||||
).toBytesWritable(),
|
||||
text
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public static class IndexGeneratorPartitioner extends Partitioner<BytesWritable, Text> implements Configurable
|
||||
{
|
||||
private Configuration config;
|
||||
|
||||
@Override
|
||||
public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions)
|
||||
{
|
||||
final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
|
||||
bytes.position(4); // Skip length added by SortableBytes
|
||||
int shardNum = bytes.getInt();
|
||||
if (config.get("mapred.job.tracker").equals("local")) {
|
||||
return shardNum % numPartitions;
|
||||
} else {
|
||||
if (shardNum >= numPartitions) {
|
||||
throw new ISE("Not enough partitions, shard[%,d] >= numPartitions[%,d]", shardNum, numPartitions);
|
||||
}
|
||||
return shardNum;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConf()
|
||||
protected void setReducerClass(Job job)
|
||||
{
|
||||
return config;
|
||||
job.setReducerClass(LegacyIndexGeneratorReducer.class);
|
||||
}
|
||||
|
||||
public static class LegacyIndexGeneratorReducer extends IndexGeneratorJob.IndexGeneratorReducer
|
||||
{
|
||||
@Override
|
||||
public void setConf(Configuration config)
|
||||
protected ProgressIndicator makeProgressIndicator(final Context context)
|
||||
{
|
||||
this.config = config;
|
||||
}
|
||||
}
|
||||
|
||||
public static class IndexGeneratorReducer extends Reducer<BytesWritable, Text, BytesWritable, Text>
|
||||
{
|
||||
private HadoopDruidIndexerConfig config;
|
||||
private List<String> metricNames = Lists.newArrayList();
|
||||
private StringInputRowParser parser;
|
||||
|
||||
@Override
|
||||
protected void setup(Context context)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
|
||||
|
||||
for (AggregatorFactory factory : config.getSchema().getDataSchema().getAggregators()) {
|
||||
metricNames.add(factory.getName().toLowerCase());
|
||||
}
|
||||
|
||||
parser = config.getParser();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void reduce(
|
||||
BytesWritable key, Iterable<Text> values, final Context context
|
||||
) throws IOException, InterruptedException
|
||||
{
|
||||
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
|
||||
Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
|
||||
|
||||
final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();
|
||||
//final DataRollupSpec rollupSpec = config.getRollupSpec();
|
||||
final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators();
|
||||
|
||||
IncrementalIndex index = makeIncrementalIndex(bucket, aggs);
|
||||
try {
|
||||
File baseFlushFile = File.createTempFile("base", "flush");
|
||||
baseFlushFile.delete();
|
||||
baseFlushFile.mkdirs();
|
||||
|
||||
Set<File> toMerge = Sets.newTreeSet();
|
||||
int indexCount = 0;
|
||||
int lineCount = 0;
|
||||
int runningTotalLineCount = 0;
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
Set<String> allDimensionNames = Sets.newHashSet();
|
||||
final IndexMerger.ProgressIndicator progressIndicator = new IndexMerger.ProgressIndicator()
|
||||
return new BaseProgressIndicator()
|
||||
{
|
||||
@Override
|
||||
public void progress()
|
||||
|
@ -264,346 +62,25 @@ public class LegacyIndexGeneratorJob extends IndexGeneratorJob
|
|||
context.progress();
|
||||
}
|
||||
};
|
||||
|
||||
for (final Text value : values) {
|
||||
context.progress();
|
||||
final InputRow inputRow = index.formatRow(parser.parse(value.toString()));
|
||||
allDimensionNames.addAll(inputRow.getDimensions());
|
||||
|
||||
int numRows = index.add(inputRow);
|
||||
++lineCount;
|
||||
|
||||
if (index.isFull()) {
|
||||
log.info(
|
||||
"%,d lines to %,d rows in %,d millis",
|
||||
lineCount - runningTotalLineCount,
|
||||
numRows,
|
||||
System.currentTimeMillis() - startTime
|
||||
);
|
||||
runningTotalLineCount = lineCount;
|
||||
|
||||
final File file = new File(baseFlushFile, String.format("index%,05d", indexCount));
|
||||
toMerge.add(file);
|
||||
|
||||
context.progress();
|
||||
IndexMerger.persist(
|
||||
index, interval, file, progressIndicator
|
||||
);
|
||||
// close this index and make a new one
|
||||
index.close();
|
||||
index = makeIncrementalIndex(bucket, aggs);
|
||||
|
||||
startTime = System.currentTimeMillis();
|
||||
++indexCount;
|
||||
}
|
||||
}
|
||||
|
||||
log.info("%,d lines completed.", lineCount);
|
||||
|
||||
List<QueryableIndex> indexes = Lists.newArrayListWithCapacity(indexCount);
|
||||
final File mergedBase;
|
||||
|
||||
if (toMerge.size() == 0) {
|
||||
if (index.isEmpty()) {
|
||||
throw new IAE("If you try to persist empty indexes you are going to have a bad time");
|
||||
}
|
||||
|
||||
mergedBase = new File(baseFlushFile, "merged");
|
||||
IndexMerger.persist(
|
||||
index, interval, mergedBase, progressIndicator
|
||||
);
|
||||
} else {
|
||||
if (!index.isEmpty()) {
|
||||
final File finalFile = new File(baseFlushFile, "final");
|
||||
IndexMerger.persist(
|
||||
index, interval, finalFile, progressIndicator
|
||||
);
|
||||
toMerge.add(finalFile);
|
||||
}
|
||||
|
||||
for (File file : toMerge) {
|
||||
indexes.add(IndexIO.loadIndex(file));
|
||||
}
|
||||
mergedBase = IndexMerger.mergeQueryableIndex(
|
||||
indexes, aggs, new File(baseFlushFile, "merged"), progressIndicator
|
||||
);
|
||||
}
|
||||
serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames));
|
||||
for (File file : toMerge) {
|
||||
FileUtils.deleteDirectory(file);
|
||||
}
|
||||
}
|
||||
finally {
|
||||
index.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void serializeOutIndex(Context context, Bucket bucket, File mergedBase, List<String> dimensionNames)
|
||||
throws IOException
|
||||
{
|
||||
Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();
|
||||
|
||||
int attemptNumber = context.getTaskAttemptID().getId();
|
||||
|
||||
FileSystem fileSystem = FileSystem.get(context.getConfiguration());
|
||||
Path indexBasePath = config.makeSegmentOutputPath(fileSystem, bucket);
|
||||
Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber));
|
||||
final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration());
|
||||
final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration());
|
||||
|
||||
outputFS.mkdirs(indexBasePath);
|
||||
|
||||
Exception caughtException = null;
|
||||
ZipOutputStream out = null;
|
||||
long size = 0;
|
||||
try {
|
||||
out = new ZipOutputStream(new BufferedOutputStream(outputFS.create(indexZipFilePath), 256 * 1024));
|
||||
|
||||
List<String> filesToCopy = Arrays.asList(mergedBase.list());
|
||||
|
||||
for (String file : filesToCopy) {
|
||||
size += copyFile(context, out, mergedBase, file);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
caughtException = e;
|
||||
}
|
||||
finally {
|
||||
if (caughtException == null) {
|
||||
Closeables.close(out, false);
|
||||
} else {
|
||||
CloseQuietly.close(out);
|
||||
throw Throwables.propagate(caughtException);
|
||||
}
|
||||
}
|
||||
|
||||
Path finalIndexZipFilePath = new Path(indexBasePath, "index.zip");
|
||||
final URI indexOutURI = finalIndexZipFilePath.toUri();
|
||||
ImmutableMap<String, Object> loadSpec;
|
||||
if (outputFS instanceof NativeS3FileSystem) {
|
||||
loadSpec = ImmutableMap.<String, Object>of(
|
||||
"type", "s3_zip",
|
||||
"bucket", indexOutURI.getHost(),
|
||||
"key", indexOutURI.getPath().substring(1) // remove the leading "/"
|
||||
);
|
||||
} else if (outputFS instanceof LocalFileSystem) {
|
||||
loadSpec = ImmutableMap.<String, Object>of(
|
||||
"type", "local",
|
||||
"path", indexOutURI.getPath()
|
||||
);
|
||||
} else if (outputFS instanceof DistributedFileSystem) {
|
||||
loadSpec = ImmutableMap.<String, Object>of(
|
||||
"type", "hdfs",
|
||||
"path", indexOutURI.getPath()
|
||||
);
|
||||
} else {
|
||||
throw new ISE("Unknown file system[%s]", outputFS.getClass());
|
||||
}
|
||||
|
||||
DataSegment segment = new DataSegment(
|
||||
config.getDataSource(),
|
||||
interval,
|
||||
config.getSchema().getTuningConfig().getVersion(),
|
||||
loadSpec,
|
||||
dimensionNames,
|
||||
metricNames,
|
||||
config.getShardSpec(bucket).getActualSpec(),
|
||||
SegmentUtils.getVersionFromDir(mergedBase),
|
||||
size
|
||||
);
|
||||
|
||||
// retry 1 minute
|
||||
boolean success = false;
|
||||
for (int i = 0; i < 6; i++) {
|
||||
if (renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) {
|
||||
log.info("Successfully renamed [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath);
|
||||
success = true;
|
||||
break;
|
||||
} else {
|
||||
log.info("Failed to rename [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath);
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
context.progress();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw new ISE(
|
||||
"Thread error in retry loop for renaming [%s] to [%s]",
|
||||
indexZipFilePath.toUri().getPath(),
|
||||
finalIndexZipFilePath.toUri().getPath()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!success) {
|
||||
if (!outputFS.exists(indexZipFilePath)) {
|
||||
throw new ISE("File [%s] does not exist after retry loop.", indexZipFilePath.toUri().getPath());
|
||||
}
|
||||
|
||||
if (outputFS.getFileStatus(indexZipFilePath).getLen() == outputFS.getFileStatus(finalIndexZipFilePath)
|
||||
.getLen()) {
|
||||
outputFS.delete(indexZipFilePath, true);
|
||||
} else {
|
||||
outputFS.delete(finalIndexZipFilePath, true);
|
||||
if (!renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) {
|
||||
throw new ISE(
|
||||
"Files [%s] and [%s] are different, but still cannot rename after retry loop",
|
||||
indexZipFilePath.toUri().getPath(),
|
||||
finalIndexZipFilePath.toUri().getPath()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean renameIndexFiles(
|
||||
FileSystem intermediateFS,
|
||||
FileSystem outputFS,
|
||||
Path indexBasePath,
|
||||
Path indexZipFilePath,
|
||||
Path finalIndexZipFilePath,
|
||||
DataSegment segment
|
||||
)
|
||||
throws IOException
|
||||
{
|
||||
final boolean needRename;
|
||||
|
||||
if (outputFS.exists(finalIndexZipFilePath)) {
|
||||
// NativeS3FileSystem.rename won't overwrite, so we might need to delete the old index first
|
||||
final FileStatus zipFile = outputFS.getFileStatus(indexZipFilePath);
|
||||
final FileStatus finalIndexZipFile = outputFS.getFileStatus(finalIndexZipFilePath);
|
||||
|
||||
if (zipFile.getModificationTime() >= finalIndexZipFile.getModificationTime()
|
||||
|| zipFile.getLen() != finalIndexZipFile.getLen()) {
|
||||
log.info(
|
||||
"File[%s / %s / %sB] existed, but wasn't the same as [%s / %s / %sB]",
|
||||
finalIndexZipFile.getPath(),
|
||||
new DateTime(finalIndexZipFile.getModificationTime()),
|
||||
finalIndexZipFile.getLen(),
|
||||
zipFile.getPath(),
|
||||
new DateTime(zipFile.getModificationTime()),
|
||||
zipFile.getLen()
|
||||
);
|
||||
outputFS.delete(finalIndexZipFilePath, false);
|
||||
needRename = true;
|
||||
} else {
|
||||
log.info(
|
||||
"File[%s / %s / %sB] existed and will be kept",
|
||||
finalIndexZipFile.getPath(),
|
||||
new DateTime(finalIndexZipFile.getModificationTime()),
|
||||
finalIndexZipFile.getLen()
|
||||
);
|
||||
needRename = false;
|
||||
}
|
||||
} else {
|
||||
needRename = true;
|
||||
}
|
||||
|
||||
if (needRename && !outputFS.rename(indexZipFilePath, finalIndexZipFilePath)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
writeSegmentDescriptor(outputFS, segment, new Path(indexBasePath, "descriptor.json"));
|
||||
final Path descriptorPath = config.makeDescriptorInfoPath(segment);
|
||||
log.info("Writing descriptor to path[%s]", descriptorPath);
|
||||
intermediateFS.mkdirs(descriptorPath.getParent());
|
||||
writeSegmentDescriptor(intermediateFS, segment, descriptorPath);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private void writeSegmentDescriptor(FileSystem outputFS, DataSegment segment, Path descriptorPath)
|
||||
throws IOException
|
||||
{
|
||||
if (outputFS.exists(descriptorPath)) {
|
||||
outputFS.delete(descriptorPath, false);
|
||||
}
|
||||
|
||||
final FSDataOutputStream descriptorOut = outputFS.create(descriptorPath);
|
||||
try {
|
||||
HadoopDruidIndexerConfig.jsonMapper.writeValue(descriptorOut, segment);
|
||||
}
|
||||
finally {
|
||||
descriptorOut.close();
|
||||
}
|
||||
}
|
||||
|
||||
private long copyFile(
|
||||
Context context, ZipOutputStream out, File mergedBase, final String filename
|
||||
@Override
|
||||
protected File persist(
|
||||
IncrementalIndex index, Interval interval, File file, ProgressIndicator progressIndicator
|
||||
) throws IOException
|
||||
{
|
||||
createNewZipEntry(out, filename);
|
||||
long numRead = 0;
|
||||
|
||||
InputStream in = null;
|
||||
try {
|
||||
in = new FileInputStream(new File(mergedBase, filename));
|
||||
byte[] buf = new byte[0x10000];
|
||||
int read;
|
||||
while (true) {
|
||||
read = in.read(buf);
|
||||
if (read == -1) {
|
||||
break;
|
||||
return IndexMerger.persist(index, interval, file, progressIndicator);
|
||||
}
|
||||
|
||||
out.write(buf, 0, read);
|
||||
numRead += read;
|
||||
context.progress();
|
||||
}
|
||||
}
|
||||
finally {
|
||||
CloseQuietly.close(in);
|
||||
}
|
||||
out.closeEntry();
|
||||
context.progress();
|
||||
|
||||
return numRead;
|
||||
}
|
||||
|
||||
private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs)
|
||||
{
|
||||
return new IncrementalIndex(
|
||||
new IncrementalIndexSchema.Builder()
|
||||
.withMinTimestamp(theBucket.time.getMillis())
|
||||
.withDimensionsSpec(config.getSchema().getDataSchema().getParser())
|
||||
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
|
||||
.withMetrics(aggs)
|
||||
.build(),
|
||||
new OffheapBufferPool(config.getSchema().getTuningConfig().getBufferSize())
|
||||
);
|
||||
}
|
||||
|
||||
private void createNewZipEntry(ZipOutputStream out, String name) throws IOException
|
||||
{
|
||||
log.info("Creating new ZipEntry[%s]", name);
|
||||
out.putNextEntry(new ZipEntry(name));
|
||||
}
|
||||
}
|
||||
|
||||
public static class IndexGeneratorOutputFormat extends TextOutputFormat
|
||||
{
|
||||
@Override
|
||||
public void checkOutputSpecs(JobContext job) throws IOException
|
||||
protected File mergeQueryableIndex(
|
||||
List<QueryableIndex> indexes,
|
||||
AggregatorFactory[] aggs,
|
||||
File file,
|
||||
ProgressIndicator progressIndicator
|
||||
) throws IOException
|
||||
{
|
||||
Path outDir = getOutputPath(job);
|
||||
if (outDir == null) {
|
||||
throw new InvalidJobConfException("Output directory not set.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class IndexGeneratorStats
|
||||
{
|
||||
private long invalidRowCount = 0;
|
||||
|
||||
public long getInvalidRowCount()
|
||||
{
|
||||
return invalidRowCount;
|
||||
}
|
||||
|
||||
public void setInvalidRowCount(long invalidRowCount)
|
||||
{
|
||||
this.invalidRowCount = invalidRowCount;
|
||||
return IndexMerger.mergeQueryableIndex(indexes, aggs, file, progressIndicator);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -403,7 +403,17 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
tmpDir
|
||||
).findPlumber(
|
||||
schema,
|
||||
new RealtimeTuningConfig(ingestionSchema.getTuningConfig().getBufferSize(), null, null, null, null, null, null, shardSpec),
|
||||
new RealtimeTuningConfig(
|
||||
ingestionSchema.getTuningConfig().getBufferSize(),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
shardSpec,
|
||||
null
|
||||
),
|
||||
metrics
|
||||
);
|
||||
|
||||
|
|
|
@ -118,7 +118,8 @@ public class RealtimeIndexTask extends AbstractTask
|
|||
null,
|
||||
rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy,
|
||||
maxPendingPersists,
|
||||
spec.getShardSpec()
|
||||
spec.getShardSpec(),
|
||||
false
|
||||
),
|
||||
null, null, null, null
|
||||
);
|
||||
|
|
|
@ -350,7 +350,7 @@ public class TaskSerdeTest
|
|||
null,
|
||||
null,
|
||||
new HadoopIngestionSpec(
|
||||
null, null, null, false, // TODO
|
||||
null, null, null,
|
||||
"foo",
|
||||
new TimestampSpec("timestamp", "auto"),
|
||||
new JSONDataSpec(ImmutableList.of("foo"), null),
|
||||
|
|
|
@ -122,7 +122,7 @@ public class IndexMerger
|
|||
*/
|
||||
public static File persist(final IncrementalIndex index, final Interval dataInterval, File outDir) throws IOException
|
||||
{
|
||||
return persist(index, dataInterval, outDir, new NoopProgressIndicator());
|
||||
return persist(index, dataInterval, outDir, new BaseProgressIndicator());
|
||||
}
|
||||
|
||||
public static File persist(
|
||||
|
@ -164,7 +164,7 @@ public class IndexMerger
|
|||
List<QueryableIndex> indexes, final AggregatorFactory[] metricAggs, File outDir
|
||||
) throws IOException
|
||||
{
|
||||
return mergeQueryableIndex(indexes, metricAggs, outDir, new NoopProgressIndicator());
|
||||
return mergeQueryableIndex(indexes, metricAggs, outDir, new BaseProgressIndicator());
|
||||
}
|
||||
|
||||
public static File mergeQueryableIndex(
|
||||
|
@ -193,7 +193,7 @@ public class IndexMerger
|
|||
List<IndexableAdapter> indexes, final AggregatorFactory[] metricAggs, File outDir
|
||||
) throws IOException
|
||||
{
|
||||
return merge(indexes, metricAggs, outDir, new NoopProgressIndicator());
|
||||
return merge(indexes, metricAggs, outDir, new BaseProgressIndicator());
|
||||
}
|
||||
|
||||
public static File merge(
|
||||
|
@ -316,7 +316,7 @@ public class IndexMerger
|
|||
List<IndexableAdapter> indexes, File outDir
|
||||
) throws IOException
|
||||
{
|
||||
return append(indexes, outDir, new NoopProgressIndicator());
|
||||
return append(indexes, outDir, new BaseProgressIndicator());
|
||||
}
|
||||
|
||||
public static File append(
|
||||
|
@ -1190,15 +1190,4 @@ public class IndexMerger
|
|||
return retVal;
|
||||
}
|
||||
}
|
||||
|
||||
public static interface ProgressIndicator
|
||||
{
|
||||
public void progress();
|
||||
}
|
||||
|
||||
private static class NoopProgressIndicator implements ProgressIndicator
|
||||
{
|
||||
@Override
|
||||
public void progress() {}
|
||||
}
|
||||
}
|
|
@ -44,6 +44,7 @@ public class RealtimeTuningConfig implements TuningConfig
|
|||
private static final RejectionPolicyFactory defaultRejectionPolicyFactory = new ServerTimeRejectionPolicyFactory();
|
||||
private static final int defaultMaxPendingPersists = 0;
|
||||
private static final ShardSpec defaultShardSpec = new NoneShardSpec();
|
||||
private static final boolean defaultPersistInHeap = false;
|
||||
|
||||
// Might make sense for this to be a builder
|
||||
public static RealtimeTuningConfig makeDefaultTuningConfig()
|
||||
|
@ -56,7 +57,8 @@ public class RealtimeTuningConfig implements TuningConfig
|
|||
defaultVersioningPolicy,
|
||||
defaultRejectionPolicyFactory,
|
||||
defaultMaxPendingPersists,
|
||||
defaultShardSpec
|
||||
defaultShardSpec,
|
||||
defaultPersistInHeap
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -68,6 +70,7 @@ public class RealtimeTuningConfig implements TuningConfig
|
|||
private final RejectionPolicyFactory rejectionPolicyFactory;
|
||||
private final int maxPendingPersists;
|
||||
private final ShardSpec shardSpec;
|
||||
private final boolean persistInHeap;
|
||||
|
||||
@JsonCreator
|
||||
public RealtimeTuningConfig(
|
||||
|
@ -78,7 +81,8 @@ public class RealtimeTuningConfig implements TuningConfig
|
|||
@JsonProperty("versioningPolicy") VersioningPolicy versioningPolicy,
|
||||
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory,
|
||||
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
|
||||
@JsonProperty("shardSpec") ShardSpec shardSpec
|
||||
@JsonProperty("shardSpec") ShardSpec shardSpec,
|
||||
@JsonProperty("persistInHeap") Boolean persistInHeap
|
||||
)
|
||||
{
|
||||
this.bufferSize = bufferSize == null ? defaultBufferSize : bufferSize;
|
||||
|
@ -93,6 +97,7 @@ public class RealtimeTuningConfig implements TuningConfig
|
|||
: rejectionPolicyFactory;
|
||||
this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists;
|
||||
this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec;
|
||||
this.persistInHeap = persistInHeap == null ? defaultPersistInHeap : persistInHeap;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -143,6 +148,12 @@ public class RealtimeTuningConfig implements TuningConfig
|
|||
return shardSpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isPersistInHeap()
|
||||
{
|
||||
return persistInHeap;
|
||||
}
|
||||
|
||||
public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
|
||||
{
|
||||
return new RealtimeTuningConfig(
|
||||
|
@ -153,7 +164,8 @@ public class RealtimeTuningConfig implements TuningConfig
|
|||
policy,
|
||||
rejectionPolicyFactory,
|
||||
maxPendingPersists,
|
||||
shardSpec
|
||||
shardSpec,
|
||||
persistInHeap
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -167,7 +179,8 @@ public class RealtimeTuningConfig implements TuningConfig
|
|||
versioningPolicy,
|
||||
rejectionPolicyFactory,
|
||||
maxPendingPersists,
|
||||
shardSpec
|
||||
shardSpec,
|
||||
persistInHeap
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,7 +96,8 @@ public class FireDepartment extends IngestionSpec<RealtimeIOConfig, RealtimeTuni
|
|||
((RealtimePlumberSchool) plumberSchool).getVersioningPolicy(),
|
||||
((RealtimePlumberSchool) plumberSchool).getRejectionPolicyFactory(),
|
||||
((RealtimePlumberSchool) plumberSchool).getMaxPendingPersists(),
|
||||
schema.getShardSpec()
|
||||
schema.getShardSpec(),
|
||||
false
|
||||
);
|
||||
} else {
|
||||
Preconditions.checkNotNull(dataSchema, "dataSchema");
|
||||
|
|
|
@ -34,10 +34,10 @@ import io.druid.query.spec.SpecificSegmentQueryRunner;
|
|||
import io.druid.query.spec.SpecificSegmentSpec;
|
||||
import io.druid.segment.IndexIO;
|
||||
import io.druid.segment.IndexMaker;
|
||||
import io.druid.segment.IndexMerger;
|
||||
import io.druid.segment.QueryableIndex;
|
||||
import io.druid.segment.QueryableIndexSegment;
|
||||
import io.druid.segment.Segment;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.indexing.DataSchema;
|
||||
import io.druid.segment.indexing.RealtimeTuningConfig;
|
||||
import io.druid.segment.loading.DataSegmentPusher;
|
||||
|
@ -350,11 +350,20 @@ public class RealtimePlumber implements Plumber
|
|||
indexes.add(queryableIndex);
|
||||
}
|
||||
|
||||
final File mergedFile = IndexMaker.mergeQueryableIndex(
|
||||
final File mergedFile;
|
||||
if (config.isPersistInHeap()) {
|
||||
mergedFile = IndexMaker.mergeQueryableIndex(
|
||||
indexes,
|
||||
schema.getAggregators(),
|
||||
mergedTarget
|
||||
);
|
||||
} else {
|
||||
mergedFile = IndexMerger.mergeQueryableIndex(
|
||||
indexes,
|
||||
schema.getAggregators(),
|
||||
mergedTarget
|
||||
);
|
||||
}
|
||||
|
||||
QueryableIndex index = IndexIO.loadIndex(mergedFile);
|
||||
|
||||
|
@ -720,10 +729,19 @@ public class RealtimePlumber implements Plumber
|
|||
try {
|
||||
int numRows = indexToPersist.getIndex().size();
|
||||
|
||||
File persistedFile = IndexMaker.persist(
|
||||
final File persistedFile;
|
||||
if (config.isPersistInHeap()) {
|
||||
persistedFile = IndexMaker.persist(
|
||||
indexToPersist.getIndex(),
|
||||
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
|
||||
);
|
||||
} else {
|
||||
persistedFile = IndexMerger.persist(
|
||||
indexToPersist.getIndex(),
|
||||
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
|
||||
);
|
||||
}
|
||||
|
||||
indexToPersist.swapSegment(
|
||||
new QueryableIndexSegment(
|
||||
indexToPersist.getSegment().getIdentifier(),
|
||||
|
|
|
@ -77,7 +77,7 @@ public class FireDepartmentTest
|
|||
)
|
||||
),
|
||||
new RealtimeTuningConfig(
|
||||
null, null, null, null, null, null, null, null
|
||||
null, null, null, null, null, null, null, null, false
|
||||
),
|
||||
null, null, null, null
|
||||
);
|
||||
|
|
|
@ -121,6 +121,7 @@ public class RealtimeManagerTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString()));
|
||||
|
|
|
@ -133,6 +133,7 @@ public class RealtimePlumberSchoolTest
|
|||
new IntervalStartVersioningPolicy(),
|
||||
new NoopRejectionPolicyFactory(),
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
|
|
|
@ -63,7 +63,8 @@ public class SinkTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
final Sink sink = new Sink(interval, schema, tuningConfig, version);
|
||||
|
||||
|
|
Loading…
Reference in New Issue