add config for build v9 directly and update docs

This commit is contained in:
Kurt Young 2016-01-13 14:06:35 +08:00
parent 1f2168fae5
commit 82ff98c2bf
34 changed files with 185 additions and 374 deletions

View File

@ -104,7 +104,8 @@ The spec\_file is a path to a file that contains JSON and an example looks like:
"ignoreInvalidRows" : false,
"jobProperties" : { },
"combineText" : false,
"rowFlushBoundary" : 300000
"rowFlushBoundary" : 300000,
"buildV9Directly" : false
}
}
```
@ -205,6 +206,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|ignoreInvalidRows|Boolean|Ignore rows found to have problems.|no (default == false)|
|useCombiner|Boolean|Use hadoop combiner to merge rows at mapper if possible.|no (default == false)|
|jobProperties|Object|a map of properties to add to the Hadoop job configuration.|no (default == null)|
|buildV9Directly|Boolean|Whether to build v9 index directly instead of building v8 index and convert it to v9 format|no (default = false)|
### Partitioning specification

View File

@ -142,6 +142,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|rejectionPolicy|Object|Controls how data sets the data acceptance policy for creating and handing off segments. More on this below.|no (default=='serverTime')|
|maxPendingPersists|Integer|Maximum number of persists that can be pending, but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0; meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime nodes indexing the same data stream in a [sharded fashion](#sharding).|no (default == 'NoneShardSpec'|
|buildV9Directly|Boolean|Whether to build v9 index directly instead of building v8 index and convert it to v9 format|no (default = false)|
#### Rejection Policy

View File

@ -51,6 +51,7 @@ import io.druid.indexer.path.PathSpec;
import io.druid.initialization.Initialization;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.server.DruidNode;
@ -89,6 +90,7 @@ public class HadoopDruidIndexerConfig
public static final ObjectMapper JSON_MAPPER;
public static final IndexIO INDEX_IO;
public static final IndexMerger INDEX_MERGER;
public static final IndexMergerV9 INDEX_MERGER_V9;
private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing";
@ -112,6 +114,7 @@ public class HadoopDruidIndexerConfig
JSON_MAPPER = injector.getInstance(ObjectMapper.class);
INDEX_IO = injector.getInstance(IndexIO.class);
INDEX_MERGER = injector.getInstance(IndexMerger.class);
INDEX_MERGER_V9 = injector.getInstance(IndexMergerV9.class);
}
public static enum IndexJobCounters
@ -351,6 +354,11 @@ public class HadoopDruidIndexerConfig
return schema.getTuningConfig().getShardSpecs().get(bucket.time).get(bucket.partitionNum);
}
public boolean isBuildV9Directly()
{
return schema.getTuningConfig().getBuildV9Directly();
}
/**
* Job instance should have Configuration set (by calling {@link #addJobProperties(Job)}
* or via injected system properties) before this method is called. The {@link PathSpec} may

View File

@ -42,6 +42,7 @@ public class HadoopTuningConfig implements TuningConfig
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000;
private static final boolean DEFAULT_USE_COMBINER = false;
private static final Boolean DEFAULT_BUILD_V9_DIRECTLY = Boolean.FALSE;
public static HadoopTuningConfig makeDefaultTuningConfig()
{
@ -59,7 +60,8 @@ public class HadoopTuningConfig implements TuningConfig
null,
false,
false,
null
null,
DEFAULT_BUILD_V9_DIRECTLY
);
}
@ -76,6 +78,7 @@ public class HadoopTuningConfig implements TuningConfig
private final Map<String, String> jobProperties;
private final boolean combineText;
private final boolean useCombiner;
private final Boolean buildV9Directly;
@JsonCreator
public HadoopTuningConfig(
@ -93,7 +96,8 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("combineText") boolean combineText,
final @JsonProperty("useCombiner") Boolean useCombiner,
// See https://github.com/druid-io/druid/pull/1922
final @JsonProperty("rowFlushBoundary") Integer maxRowsInMemoryCOMPAT
final @JsonProperty("rowFlushBoundary") Integer maxRowsInMemoryCOMPAT,
final @JsonProperty("buildV9Directly") Boolean buildV9Directly
)
{
this.workingPath = workingPath;
@ -111,6 +115,7 @@ public class HadoopTuningConfig implements TuningConfig
: ImmutableMap.copyOf(jobProperties));
this.combineText = combineText;
this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue();
this.buildV9Directly = buildV9Directly == null ? DEFAULT_BUILD_V9_DIRECTLY : buildV9Directly;
}
@JsonProperty
@ -191,6 +196,11 @@ public class HadoopTuningConfig implements TuningConfig
return useCombiner;
}
@JsonProperty
public Boolean getBuildV9Directly() {
return buildV9Directly;
}
public HadoopTuningConfig withWorkingPath(String path)
{
return new HadoopTuningConfig(
@ -207,7 +217,8 @@ public class HadoopTuningConfig implements TuningConfig
jobProperties,
combineText,
useCombiner,
null
null,
buildV9Directly
);
}
@ -227,7 +238,8 @@ public class HadoopTuningConfig implements TuningConfig
jobProperties,
combineText,
useCombiner,
null
null,
buildV9Directly
);
}
@ -247,7 +259,8 @@ public class HadoopTuningConfig implements TuningConfig
jobProperties,
combineText,
useCombiner,
null
null,
buildV9Directly
);
}
}

View File

@ -467,9 +467,15 @@ public class IndexGeneratorJob implements Jobby
final ProgressIndicator progressIndicator
) throws IOException
{
return HadoopDruidIndexerConfig.INDEX_MERGER.persist(
index, interval, file, null, config.getIndexSpec(), progressIndicator
);
if (config.isBuildV9Directly()) {
return HadoopDruidIndexerConfig.INDEX_MERGER_V9.persist(
index, interval, file, null, config.getIndexSpec(), progressIndicator
);
} else {
return HadoopDruidIndexerConfig.INDEX_MERGER.persist(
index, interval, file, null, config.getIndexSpec(), progressIndicator
);
}
}
protected File mergeQueryableIndex(
@ -479,9 +485,15 @@ public class IndexGeneratorJob implements Jobby
ProgressIndicator progressIndicator
) throws IOException
{
return HadoopDruidIndexerConfig.INDEX_MERGER.mergeQueryableIndex(
indexes, aggs, file, config.getIndexSpec(), progressIndicator
);
if (config.isBuildV9Directly()) {
return HadoopDruidIndexerConfig.INDEX_MERGER_V9.mergeQueryableIndex(
indexes, aggs, file, config.getIndexSpec(), progressIndicator
);
} else {
return HadoopDruidIndexerConfig.INDEX_MERGER.mergeQueryableIndex(
indexes, aggs, file, config.getIndexSpec(), progressIndicator
);
}
}
@Override
@ -587,7 +599,7 @@ public class IndexGeneratorJob implements Jobby
indexes.add(HadoopDruidIndexerConfig.INDEX_IO.loadIndex(file));
}
mergedBase = mergeQueryableIndex(
indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator
indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator
);
}
final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath())

View File

@ -381,6 +381,7 @@ public class BatchDeltaIngestionTest
null,
false,
false,
null,
null
)
)

View File

@ -160,6 +160,7 @@ public class DetermineHashedPartitionsJobTest
null,
false,
false,
null,
null
)
);

View File

@ -264,6 +264,7 @@ public class DeterminePartitionsJobTest
null,
false,
false,
null,
null
)
)

View File

@ -207,6 +207,7 @@ public class HadoopDruidIndexerConfigTest
null,
false,
false,
null,
null
)
);

View File

@ -53,6 +53,7 @@ public class HadoopTuningConfigTest
null,
true,
true,
null,
null
);

View File

@ -395,6 +395,7 @@ public class IndexGeneratorJobTest
ImmutableMap.of(JobContext.NUM_REDUCES, "0"), //verifies that set num reducers is ignored
false,
useCombiner,
null,
null
)
)

View File

@ -115,6 +115,7 @@ public class JobHelperTest
),
false,
false,
null,
null
)
)

View File

@ -120,7 +120,7 @@ public class GranularityPathSpecTest
jsonMapper
),
new HadoopIOConfig(null, null, null),
new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null)
new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null, null)
);
granularityPathSpec.setDataGranularity(Granularity.HOUR);

View File

@ -201,6 +201,7 @@ public class HadoopConverterJobTest
null,
false,
false,
null,
null
)
)

View File

@ -37,6 +37,7 @@ import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
@ -80,7 +81,7 @@ public class TaskToolbox
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
private final IndexMergerV9 indexMergerV9;
public TaskToolbox(
TaskConfig config,
@ -102,7 +103,8 @@ public class TaskToolbox
IndexMerger indexMerger,
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig
CacheConfig cacheConfig,
IndexMergerV9 indexMergerV9
)
{
this.config = config;
@ -125,6 +127,7 @@ public class TaskToolbox
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
}
public TaskConfig getConfig()
@ -247,4 +250,8 @@ public class TaskToolbox
{
return cacheConfig;
}
public IndexMergerV9 getIndexMergerV9() {
return indexMergerV9;
}
}

View File

@ -34,6 +34,7 @@ import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
@ -67,6 +68,7 @@ public class TaskToolboxFactory
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
private final IndexMergerV9 indexMergerV9;
@Inject
public TaskToolboxFactory(
@ -87,7 +89,8 @@ public class TaskToolboxFactory
IndexMerger indexMerger,
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig
CacheConfig cacheConfig,
IndexMergerV9 indexMergerV9
)
{
this.config = config;
@ -108,6 +111,7 @@ public class TaskToolboxFactory
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
this.indexMergerV9 = indexMergerV9;
}
public TaskToolbox build(Task task)
@ -133,7 +137,8 @@ public class TaskToolboxFactory
indexMerger,
indexIO,
cache,
cacheConfig
cacheConfig,
indexMergerV9
);
}
}

View File

@ -48,6 +48,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.index.YeOldePlumberSchool;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IOConfig;
@ -134,7 +135,8 @@ public class IndexTask extends AbstractFixedIntervalTask
null,
null,
shardSpec,
indexSpec
indexSpec,
null
);
}
@ -353,12 +355,15 @@ public class IndexTask extends AbstractFixedIntervalTask
final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
final Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser());
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);
final IndexMerger indexMerger = ingestionSchema.getTuningConfig().getBuildV9Directly()
? toolbox.getIndexMergerV9()
: toolbox.getIndexMerger();
final Plumber plumber = new YeOldePlumberSchool(
interval,
version,
wrappedDataSegmentPusher,
tmpDir,
toolbox.getIndexMerger(),
indexMerger,
toolbox.getIndexIO()
).findPlumber(
schema,
@ -434,7 +439,7 @@ public class IndexTask extends AbstractFixedIntervalTask
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig == null ? new IndexTuningConfig(0, 0, null, null) : tuningConfig;
this.tuningConfig = tuningConfig == null ? new IndexTuningConfig(0, 0, null, null, null) : tuningConfig;
}
@Override
@ -485,18 +490,21 @@ public class IndexTask extends AbstractFixedIntervalTask
private static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000;
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 500000;
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final Boolean DEFAULT_BUILD_V9_DIRECTLY = Boolean.FALSE;
private final int targetPartitionSize;
private final int rowFlushBoundary;
private final int numShards;
private final IndexSpec indexSpec;
private final Boolean buildV9Directly;
@JsonCreator
public IndexTuningConfig(
@JsonProperty("targetPartitionSize") int targetPartitionSize,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary,
@JsonProperty("numShards") @Nullable Integer numShards,
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
@JsonProperty("buildV9Directly") Boolean buildV9Directly
)
{
this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize;
@ -508,6 +516,7 @@ public class IndexTask extends AbstractFixedIntervalTask
this.targetPartitionSize == -1 || this.numShards == -1,
"targetPartitionsSize and shardCount both cannot be set"
);
this.buildV9Directly = buildV9Directly == null ? DEFAULT_BUILD_V9_DIRECTLY : buildV9Directly;
}
@JsonProperty
@ -533,5 +542,11 @@ public class IndexTask extends AbstractFixedIntervalTask
{
return indexSpec;
}
@JsonProperty
public Boolean getBuildV9Directly()
{
return buildV9Directly;
}
}
}

View File

@ -47,6 +47,7 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryToolChest;
import io.druid.segment.IndexMerger;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
@ -286,6 +287,9 @@ public class RealtimeIndexTask extends AbstractTask
);
this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate();
IndexMerger indexMerger = spec.getTuningConfig().getBuildV9Directly()
? toolbox.getIndexMergerV9()
: toolbox.getIndexMerger();
// NOTE: This pusher selects path based purely on global configuration and the DataSegment, which means
// NOTE: that redundant realtime tasks will upload to the same location. This can cause index.zip and
// NOTE: descriptor.json to mismatch, or it can cause historical nodes to load different instances of the
@ -298,7 +302,7 @@ public class RealtimeIndexTask extends AbstractTask
segmentPublisher,
toolbox.getSegmentHandoffNotifierFactory(),
toolbox.getQueryExecutorService(),
toolbox.getIndexMerger(),
indexMerger,
toolbox.getIndexIO(),
toolbox.getCache(),
toolbox.getCacheConfig(),

View File

@ -31,6 +31,7 @@ import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
@ -77,6 +78,7 @@ public class TaskToolboxTest
private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class);
private Task task = EasyMock.createMock(Task.class);
private IndexMerger mockIndexMerger = EasyMock.createMock(IndexMerger.class);
private IndexMergerV9 mockIndexMergerV9 = EasyMock.createMock(IndexMergerV9.class);
private IndexIO mockIndexIO = EasyMock.createMock(IndexIO.class);
private Cache mockCache = EasyMock.createMock(Cache.class);
private CacheConfig mockCacheConfig = EasyMock.createMock(CacheConfig.class);
@ -109,7 +111,8 @@ public class TaskToolboxTest
mockIndexMerger,
mockIndexIO,
mockCache,
mockCacheConfig
mockCacheConfig,
mockIndexMergerV9
);
}

View File

@ -28,6 +28,7 @@ import io.druid.guice.ServerModule;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.column.ColumnConfig;
import java.util.List;
@ -39,6 +40,7 @@ public class TestUtils
{
private final ObjectMapper jsonMapper;
private final IndexMerger indexMerger;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
public TestUtils()
@ -56,6 +58,7 @@ public class TestUtils
}
);
indexMerger = new IndexMerger(jsonMapper, indexIO);
indexMergerV9 = new IndexMergerV9(jsonMapper, indexIO);
final List<? extends Module> list = new ServerModule().getJacksonModules();
for (Module module : list) {
@ -80,6 +83,10 @@ public class TestUtils
return indexMerger;
}
public IndexMergerV9 getTestIndexMergerV9() {
return indexMergerV9;
}
public IndexIO getTestIndexIO()
{
return indexIO;

View File

@ -39,6 +39,7 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
@ -70,6 +71,7 @@ public class IndexTaskTest
private final IndexSpec indexSpec;
private final ObjectMapper jsonMapper;
private IndexMerger indexMerger;
private IndexMergerV9 indexMergerV9;
private IndexIO indexIO;
public IndexTaskTest()
@ -78,6 +80,7 @@ public class IndexTaskTest
TestUtils testUtils = new TestUtils();
jsonMapper = testUtils.getTestObjectMapper();
indexMerger = testUtils.getTestIndexMerger();
indexMergerV9 = testUtils.getTestIndexMergerV9();
indexIO = testUtils.getTestIndexIO();
}
@ -140,7 +143,8 @@ public class IndexTaskTest
2,
0,
null,
indexSpec
indexSpec,
null
)
),
jsonMapper,
@ -252,7 +256,7 @@ public class IndexTaskTest
return segment;
}
}, null, null, null, null, null, null, null, null, null, null, temporaryFolder.newFolder(),
indexMerger, indexIO, null, null
indexMerger, indexIO, null, null, indexMergerV9
)
);
@ -332,7 +336,8 @@ public class IndexTaskTest
100,
1000,
null,
new IndexSpec()
new IndexSpec(),
null
);
RealtimeTuningConfig realtimeTuningConfig = IndexTask.convertTuningConfig(
spec,

View File

@ -571,6 +571,7 @@ public class RealtimeIndexTaskTest
null,
null,
null,
null,
null
);
return new RealtimeIndexTask(
@ -715,7 +716,8 @@ public class RealtimeIndexTaskTest
testUtils.getTestIndexMerger(),
testUtils.getTestIndexIO(),
MapCache.create(1024),
new CacheConfig()
new CacheConfig(),
testUtils.getTestIndexMergerV9()
);
return toolboxFactory.build(task);

View File

@ -91,7 +91,7 @@ public class TaskSerdeTest
jsonMapper
),
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null)
),
jsonMapper,
null
@ -132,7 +132,7 @@ public class TaskSerdeTest
jsonMapper
),
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null)
),
jsonMapper,
null
@ -332,7 +332,8 @@ public class TaskSerdeTest
null,
1,
new NoneShardSpec(),
indexSpec
indexSpec,
null
)
),
null

View File

@ -64,6 +64,7 @@ import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
@ -109,12 +110,14 @@ public class IngestSegmentFirehoseFactoryTest
{
private static final ObjectMapper MAPPER;
private static final IndexMerger INDEX_MERGER;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
static {
TestUtils testUtils = new TestUtils();
MAPPER = setupInjectablesInObjectMapper(testUtils.getTestObjectMapper());
INDEX_MERGER = testUtils.getTestIndexMerger();
INDEX_MERGER_V9 = testUtils.getTestIndexMergerV9();
INDEX_IO = testUtils.getTestIndexIO();
}
@ -274,7 +277,8 @@ public class IngestSegmentFirehoseFactoryTest
INDEX_MERGER,
INDEX_IO,
null,
null
null,
INDEX_MERGER_V9
);
Collection<Object[]> values = new LinkedList<>();
for (InputRowParser parser : Arrays.<InputRowParser>asList(

View File

@ -55,6 +55,7 @@ import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.NoopDimFilter;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
@ -111,12 +112,14 @@ public class IngestSegmentFirehoseFactoryTimelineTest
private static final ObjectMapper MAPPER;
private static final IndexMerger INDEX_MERGER;
private static final IndexIO INDEX_IO;
private static final IndexMergerV9 INDEX_MERGER_V9;
static {
TestUtils testUtils = new TestUtils();
MAPPER = IngestSegmentFirehoseFactoryTest.setupInjectablesInObjectMapper(testUtils.getTestObjectMapper());
INDEX_MERGER = testUtils.getTestIndexMerger();
INDEX_IO = testUtils.getTestIndexIO();
INDEX_MERGER_V9 = testUtils.getTestIndexMergerV9();
}
public IngestSegmentFirehoseFactoryTimelineTest(
@ -334,7 +337,8 @@ public class IngestSegmentFirehoseFactoryTimelineTest
INDEX_MERGER,
INDEX_IO,
null,
null
null,
INDEX_MERGER_V9
);
final Injector injector = Guice.createInjector(
new Module()

View File

@ -82,6 +82,7 @@ import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
@ -132,12 +133,14 @@ public class TaskLifecycleTest
{
private static final ObjectMapper MAPPER;
private static final IndexMerger INDEX_MERGER;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
static {
TestUtils testUtils = new TestUtils();
MAPPER = testUtils.getTestObjectMapper();
INDEX_MERGER = testUtils.getTestIndexMerger();
INDEX_MERGER_V9 = testUtils.getTestIndexMergerV9();
INDEX_IO = testUtils.getTestIndexIO();
}
@ -532,7 +535,8 @@ public class TaskLifecycleTest
INDEX_MERGER,
INDEX_IO,
MapCache.create(0),
FireDepartmentTest.NO_CACHE_CONFIG
FireDepartmentTest.NO_CACHE_CONFIG,
INDEX_MERGER_V9
);
tr = new ThreadPoolTaskRunner(tb, taskConfig, emitter);
tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter);
@ -565,7 +569,7 @@ public class TaskLifecycleTest
mapper
),
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null)
),
mapper,
null
@ -623,7 +627,7 @@ public class TaskLifecycleTest
mapper
),
new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory()),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null)
),
mapper,
null
@ -969,7 +973,7 @@ public class TaskLifecycleTest
mapper
),
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null)
),
mapper,
null
@ -1080,6 +1084,7 @@ public class TaskLifecycleTest
null,
null,
null,
null,
null
);
FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig);

View File

@ -41,6 +41,7 @@ import io.druid.indexing.overlord.ThreadPoolTaskRunner;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.StorageLocationConfig;
@ -80,6 +81,7 @@ public class WorkerTaskMonitorTest
private Worker worker;
private ObjectMapper jsonMapper;
private IndexMerger indexMerger;
private IndexMergerV9 indexMergerV9;
private IndexIO indexIO;
public WorkerTaskMonitorTest()
@ -87,6 +89,7 @@ public class WorkerTaskMonitorTest
TestUtils testUtils = new TestUtils();
jsonMapper = testUtils.getTestObjectMapper();
indexMerger = testUtils.getTestIndexMerger();
indexMergerV9 = testUtils.getTestIndexMergerV9();
indexIO = testUtils.getTestIndexIO();
}
@ -183,7 +186,8 @@ public class WorkerTaskMonitorTest
indexMerger,
indexIO,
null,
null
null,
indexMergerV9
),
taskConfig,
new NoopServiceEmitter()

View File

@ -104,12 +104,12 @@ public class IndexMerger
{
private static final Logger log = new Logger(IndexMerger.class);
private static final SerializerUtils serializerUtils = new SerializerUtils();
private static final int INVALID_ROW = -1;
private static final Splitter SPLITTER = Splitter.on(",");
protected static final SerializerUtils serializerUtils = new SerializerUtils();
protected static final int INVALID_ROW = -1;
protected static final Splitter SPLITTER = Splitter.on(",");
private final ObjectMapper mapper;
private final IndexIO indexIO;
protected final ObjectMapper mapper;
protected final IndexIO indexIO;
@Inject
public IndexMerger(
@ -473,7 +473,7 @@ public class IndexMerger
return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, null, rowMergerFn, indexSpec);
}
private File makeIndexFiles(
protected File makeIndexFiles(
final List<IndexableAdapter> indexes,
final File outDir,
final ProgressIndicator progress,

View File

@ -100,335 +100,21 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
public class IndexMergerV9
public class IndexMergerV9 extends IndexMerger
{
private static final Logger log = new Logger(IndexMergerV9.class);
private static final SerializerUtils serializerUtils = new SerializerUtils();
private static final int INVALID_ROW = -1;
private static final Splitter SPLITTER = Splitter.on(",");
private final ObjectMapper mapper;
private final IndexIO indexIO;
@Inject
public IndexMergerV9(
ObjectMapper mapper,
IndexIO indexIO
)
{
this.mapper = Preconditions.checkNotNull(mapper, "null ObjectMapper");
this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO");
super(mapper, indexIO);
}
public File persist(
final IncrementalIndex index,
File outDir,
Map<String, Object> segmentMetadata,
IndexSpec indexSpec
) throws IOException
{
return persist(index, index.getInterval(), outDir, segmentMetadata, indexSpec);
}
public File persist(
final IncrementalIndex index,
final Interval dataInterval,
File outDir,
Map<String, Object> segmentMetadata,
IndexSpec indexSpec
) throws IOException
{
return persist(
index,
dataInterval,
outDir,
segmentMetadata,
indexSpec,
new BaseProgressIndicator()
);
}
public File persist(
final IncrementalIndex index,
final Interval dataInterval,
File outDir,
Map<String, Object> segmentMetadata,
IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
{
if (index.isEmpty()) {
throw new IAE("Trying to persist an empty index!");
}
final long firstTimestamp = index.getMinTime().getMillis();
final long lastTimestamp = index.getMaxTime().getMillis();
if (!(dataInterval.contains(firstTimestamp) && dataInterval.contains(lastTimestamp))) {
throw new IAE(
"interval[%s] does not encapsulate the full range of timestamps[%s, %s]",
dataInterval,
new DateTime(firstTimestamp),
new DateTime(lastTimestamp)
);
}
if (!outDir.exists()) {
outDir.mkdirs();
}
if (!outDir.isDirectory()) {
throw new ISE("Can only persist to directories, [%s] wasn't a directory", outDir);
}
log.info("Starting persist for interval[%s], rows[%,d]", dataInterval, index.size());
return merge(
Arrays.<IndexableAdapter>asList(
new IncrementalIndexAdapter(
dataInterval,
index,
indexSpec.getBitmapSerdeFactory().getBitmapFactory()
)
),
index.getMetricAggs(),
outDir,
segmentMetadata,
indexSpec,
progress
);
}
public File convert(final File inDir, final File outDir, final IndexSpec indexSpec) throws IOException
{
return convert(inDir, outDir, indexSpec, new BaseProgressIndicator());
}
public File convert(
final File inDir, final File outDir, final IndexSpec indexSpec, final ProgressIndicator progress
) throws IOException
{
try (QueryableIndex index = indexIO.loadIndex(inDir)) {
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index);
return makeIndexFiles(
ImmutableList.of(adapter),
outDir,
progress,
Lists.newArrayList(adapter.getDimensionNames()),
Lists.newArrayList(adapter.getMetricNames()),
null,
new Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>>()
{
@Nullable
@Override
public Iterable<Rowboat> apply(ArrayList<Iterable<Rowboat>> input)
{
return input.get(0);
}
},
indexSpec
);
}
}
public File mergeQueryableIndex(
List<QueryableIndex> indexes, final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec
) throws IOException
{
return mergeQueryableIndex(indexes, metricAggs, outDir, indexSpec, new BaseProgressIndicator());
}
public File mergeQueryableIndex(
List<QueryableIndex> indexes,
final AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
{
// We are materializing the list for performance reasons. Lists.transform
// only creates a "view" of the original list, meaning the function gets
// applied every time you access an element.
List<IndexableAdapter> indexAdapteres = Lists.newArrayList(
Iterables.transform(
indexes,
new Function<QueryableIndex, IndexableAdapter>()
{
@Override
public IndexableAdapter apply(final QueryableIndex input)
{
return new QueryableIndexIndexableAdapter(input);
}
}
)
);
return merge(
indexAdapteres,
metricAggs,
outDir,
null,
indexSpec,
progress
);
}
public File merge(
List<IndexableAdapter> indexes,
final AggregatorFactory[] metricAggs,
File outDir,
Map<String, Object> segmentMetadata,
IndexSpec indexSpec
) throws IOException
{
return merge(indexes, metricAggs, outDir, segmentMetadata, indexSpec, new BaseProgressIndicator());
}
public File merge(
List<IndexableAdapter> adapters,
final AggregatorFactory[] metricAggs,
File outDir,
Map<String, Object> segmentMetadata,
IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
{
FileUtils.deleteDirectory(outDir);
if (!outDir.mkdirs()) {
throw new ISE("Couldn't make outdir[%s].", outDir);
}
final List<String> mergedDimensions = IndexMerger.getMergedDimensions(adapters);
final List<String> mergedMetrics = Lists.transform(
IndexMerger.mergeIndexed(
Lists.newArrayList(
FunctionalIterable
.create(adapters)
.transform(
new Function<IndexableAdapter, Iterable<String>>()
{
@Override
public Iterable<String> apply(@Nullable IndexableAdapter input)
{
return input.getMetricNames();
}
}
)
.concat(Arrays.<Iterable<String>>asList(new IndexMerger.AggFactoryStringIndexed(metricAggs)))
)
),
new Function<String, String>()
{
@Override
public String apply(@Nullable String input)
{
return input;
}
}
);
if (mergedMetrics.size() != metricAggs.length) {
throw new IAE("Bad number of metrics[%d], expected [%d]", mergedMetrics.size(), metricAggs.length);
}
final AggregatorFactory[] sortedMetricAggs = new AggregatorFactory[mergedMetrics.size()];
for (int i = 0; i < metricAggs.length; i++) {
AggregatorFactory metricAgg = metricAggs[i];
sortedMetricAggs[mergedMetrics.indexOf(metricAgg.getName())] = metricAgg;
}
for (int i = 0; i < mergedMetrics.size(); i++) {
if (!sortedMetricAggs[i].getName().equals(mergedMetrics.get(i))) {
throw new IAE(
"Metric mismatch, index[%d] [%s] != [%s]",
i,
metricAggs[i].getName(),
mergedMetrics.get(i)
);
}
}
Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn =
new Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>>()
{
@Override
public Iterable<Rowboat> apply(
@Nullable ArrayList<Iterable<Rowboat>> boats
)
{
return CombiningIterable.create(
new MergeIterable<>(
Ordering.<Rowboat>natural().nullsFirst(),
boats
),
Ordering.<Rowboat>natural().nullsFirst(),
new IndexMerger.RowboatMergeFunction(sortedMetricAggs)
);
}
};
return makeIndexFiles(
adapters, outDir, progress, mergedDimensions, mergedMetrics, segmentMetadata, rowMergerFn, indexSpec
);
}
public File append(
List<IndexableAdapter> indexes, File outDir, IndexSpec indexSpec
) throws IOException
{
return append(indexes, outDir, indexSpec, new BaseProgressIndicator());
}
public File append(
List<IndexableAdapter> indexes, File outDir, IndexSpec indexSpec, ProgressIndicator progress
) throws IOException
{
FileUtils.deleteDirectory(outDir);
if (!outDir.mkdirs()) {
throw new ISE("Couldn't make outdir[%s].", outDir);
}
final List<String> mergedDimensions = IndexMerger.getMergedDimensions(indexes);
final List<String> mergedMetrics = IndexMerger.mergeIndexed(
Lists.transform(
indexes,
new Function<IndexableAdapter, Iterable<String>>()
{
@Override
public Iterable<String> apply(@Nullable IndexableAdapter input)
{
return Iterables.transform(
input.getMetricNames(),
new Function<String, String>()
{
@Override
public String apply(@Nullable String input)
{
return input;
}
}
);
}
}
)
);
Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn = new Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>>()
{
@Override
public Iterable<Rowboat> apply(
@Nullable final ArrayList<Iterable<Rowboat>> boats
)
{
return new MergeIterable<Rowboat>(
Ordering.<Rowboat>natural().nullsFirst(),
boats
);
}
};
return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, null, rowMergerFn, indexSpec);
}
private File makeIndexFiles(
@Override
protected File makeIndexFiles(
final List<IndexableAdapter> adapters,
final File outDir,
final ProgressIndicator progress,
@ -829,7 +515,7 @@ public class IndexMergerV9
List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(adapters.size());
for (int j = 0; j < adapters.size(); ++j) {
convertedInverteds.add(
new IndexMerger.ConvertingIndexedInts(
new ConvertingIndexedInts(
bitmapIndexSeeker[j].seek(dimVal), rowNumConversions.get(j)
)
);
@ -1094,7 +780,7 @@ public class IndexMergerV9
}
boats.add(
new IndexMerger.MMappedIndexRowIterable(
new MMappedIndexRowIterable(
Iterables.transform(
adapters.get(i).getRows(),
new Function<Rowboat, Rowboat>()
@ -1177,14 +863,14 @@ public class IndexMergerV9
List<Indexed<String>> dimValueLookups = Lists.newArrayListWithCapacity(adapters.size());
// each converter converts dim values of this dimension to global dictionary
IndexMerger.DimValueConverter[] converters = new IndexMerger.DimValueConverter[adapters.size()];
DimValueConverter[] converters = new DimValueConverter[adapters.size()];
boolean existNullColumn = false;
for (int i = 0; i < adapters.size(); i++) {
Indexed<String> dimValues = adapters.get(i).getDimValueLookup(dimension);
if (!IndexMerger.isNullColumn(dimValues)) {
if (!isNullColumn(dimValues)) {
dimValueLookups.add(dimValues);
converters[i] = new IndexMerger.DimValueConverter(dimValues);
converters[i] = new DimValueConverter(dimValues);
} else {
existNullColumn = true;
}
@ -1233,7 +919,7 @@ public class IndexMergerV9
writer.write(value);
for (int i = 0; i < adapters.size(); i++) {
IndexMerger.DimValueConverter converter = converters[i];
DimValueConverter converter = converters[i];
if (converter != null) {
converter.convert(value, cardinality);
}
@ -1259,7 +945,7 @@ public class IndexMergerV9
// make the conversion
for (int i = 0; i < adapters.size(); ++i) {
IndexMerger.DimValueConverter converter = converters[i];
DimValueConverter converter = converters[i];
if (converter != null) {
dimConversions.get(i).put(dimension, converters[i].getConversionBuffer());
}

View File

@ -46,6 +46,7 @@ public class RealtimeTuningConfig implements TuningConfig
private static final int defaultMaxPendingPersists = 0;
private static final ShardSpec defaultShardSpec = new NoneShardSpec();
private static final IndexSpec defaultIndexSpec = new IndexSpec();
private static final Boolean defaultBuildV9Directly = Boolean.FALSE;
// Might make sense for this to be a builder
public static RealtimeTuningConfig makeDefaultTuningConfig()
@ -59,7 +60,8 @@ public class RealtimeTuningConfig implements TuningConfig
defaultRejectionPolicyFactory,
defaultMaxPendingPersists,
defaultShardSpec,
defaultIndexSpec
defaultIndexSpec,
defaultBuildV9Directly
);
}
@ -72,6 +74,7 @@ public class RealtimeTuningConfig implements TuningConfig
private final int maxPendingPersists;
private final ShardSpec shardSpec;
private final IndexSpec indexSpec;
private final Boolean buildV9Directly;
@JsonCreator
public RealtimeTuningConfig(
@ -83,7 +86,8 @@ public class RealtimeTuningConfig implements TuningConfig
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("shardSpec") ShardSpec shardSpec,
@JsonProperty("indexSpec") IndexSpec indexSpec
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("buildV9Directly") Boolean buildV9Directly
)
{
this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
@ -99,6 +103,7 @@ public class RealtimeTuningConfig implements TuningConfig
this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists;
this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec;
this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec;
this.buildV9Directly = buildV9Directly == null ? defaultBuildV9Directly : buildV9Directly;
}
@JsonProperty
@ -155,6 +160,11 @@ public class RealtimeTuningConfig implements TuningConfig
return indexSpec;
}
@JsonProperty
public Boolean getBuildV9Directly() {
return buildV9Directly;
}
public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{
return new RealtimeTuningConfig(
@ -166,7 +176,8 @@ public class RealtimeTuningConfig implements TuningConfig
rejectionPolicyFactory,
maxPendingPersists,
shardSpec,
indexSpec
indexSpec,
buildV9Directly
);
}
@ -181,7 +192,8 @@ public class RealtimeTuningConfig implements TuningConfig
rejectionPolicyFactory,
maxPendingPersists,
shardSpec,
indexSpec
indexSpec,
buildV9Directly
);
}
}

View File

@ -116,7 +116,7 @@ public class FireDepartmentTest
null
),
new RealtimeTuningConfig(
null, null, null, null, null, null, null, null, null
null, null, null, null, null, null, null, null, null, null
)
);

View File

@ -150,6 +150,7 @@ public class RealtimeManagerTest
null,
null,
null,
null,
null
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString()));

View File

@ -185,6 +185,7 @@ public class RealtimePlumberSchoolTest
rejectionPolicy,
null,
null,
null,
null
);

View File

@ -66,6 +66,7 @@ public class SinkTest
null,
null,
null,
null,
null
);
final Sink sink = new Sink(interval, schema, tuningConfig, version);