Remove the ability to create segments in v8 format (#4420)

* Remove ability to create segments in v8 format

* Fix IndexGeneratorJobTest

* Fix parameterized test name in IndexMergerTest

* Remove extra legacy merging stuff

* Remove legacy serializer builders

* Remove ConciseBitmapIndexMergerTest and RoaringBitmapIndexMergerTest
This commit is contained in:
Roman Leventov 2017-06-26 15:21:39 -05:00 committed by Jonathan Wei
parent 5fec619284
commit 05d58689ad
80 changed files with 750 additions and 3049 deletions

View File

@ -30,7 +30,6 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex;
@ -79,7 +78,6 @@ public class IndexMergeBenchmark
private static final Logger log = new Logger(IndexMergeBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMerger INDEX_MERGER;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
@ -101,7 +99,6 @@ public class IndexMergeBenchmark
}
}
);
INDEX_MERGER = new IndexMerger(JSON_MAPPER, INDEX_IO);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
}
@ -170,33 +167,6 @@ public class IndexMergeBenchmark
.buildOnheap();
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void merge(Blackhole blackhole) throws Exception
{
File tmpFile = File.createTempFile("IndexMergeBenchmark-MERGEDFILE-" + System.currentTimeMillis(), ".TEMPFILE");
tmpFile.delete();
tmpFile.mkdirs();
try {
log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory());
File mergedFile = INDEX_MERGER.mergeQueryableIndex(
indexesToMerge,
rollup,
schemaInfo.getAggsArray(),
tmpFile,
new IndexSpec()
);
blackhole.consume(mergedFile);
}
finally {
tmpFile.delete();
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)

View File

@ -30,7 +30,6 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.column.ColumnConfig;
@ -81,7 +80,6 @@ public class IndexPersistBenchmark
private BenchmarkSchemaInfo schemaInfo;
private static final IndexMerger INDEX_MERGER;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
@ -99,7 +97,6 @@ public class IndexPersistBenchmark
}
}
);
INDEX_MERGER = new IndexMerger(JSON_MAPPER, INDEX_IO);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
}
@ -164,28 +161,6 @@ public class IndexPersistBenchmark
.buildOnheap();
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void persist(Blackhole blackhole) throws Exception
{
File tmpDir = Files.createTempDir();
log.info("Using temp dir: " + tmpDir.getAbsolutePath());
try {
File indexFile = INDEX_MERGER.persist(
incIndex,
tmpDir,
new IndexSpec()
);
blackhole.consume(indexFile);
}
finally {
FileUtils.deleteDirectory(tmpDir);
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)

View File

@ -120,7 +120,6 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)|
|`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no|
|`buildV9Directly`|Boolean|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|no (default == true)|
|`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)|
|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever. This option is deprecated. Use `completionTimeout` of KafkaSupervisorIOConfig instead.|no (default == 0)|
|`resetOffsetAutomatically`|Boolean|Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. The consumer offset will be reset to either the earliest or latest offset depending on `useEarliestOffset` property of `KafkaSupervisorIOConfig` (see below). This situation typically occurs when messages in Kafka are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular partition will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)|

View File

@ -163,7 +163,6 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|useCombiner|Boolean|Use Hadoop combiner to merge rows at mapper if possible.|no (default == false)|
|jobProperties|Object|A map of properties to add to the Hadoop job configuration, see below for details.|no (default == null)|
|indexSpec|Object|Tune how data is indexed. See below for more information.|no|
|buildV9Directly|Boolean|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|no (default == true)|
|numBackgroundPersistThreads|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)|
|forceExtendableShardSpecs|Boolean|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|no (default = false)|
|useExplicitVersion|Boolean|Forces HadoopIndexTask to use version.|no (default = false)|

View File

@ -150,7 +150,6 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|rejectionPolicy|Object|Controls how data sets the data acceptance policy for creating and handing off segments. More on this below.|no (default == 'serverTime')|
|maxPendingPersists|Integer|Maximum number of persists that can be pending, but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0; meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime nodes indexing the same data stream in a [sharded fashion](#sharding).|no (default == 'NoneShardSpec')|
|buildV9Directly|Boolean|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|no (default == true)|
|persistThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the persisting thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default == 0; inherit and do not override)|
|mergeThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the merging thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default == 0; inherit and do not override)|
|reportParseExceptions|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion. If false, unparseable rows and fields will be skipped. If an entire row is skipped, the "unparseable" counter will be incremented. If some fields in a row were parseable and some were not, the parseable fields will be indexed and the "unparseable" counter will not be incremented.|no (default == false)|

View File

@ -117,7 +117,6 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if targetPartitionSize is set.|null|no|
|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
|maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
|buildV9Directly|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|true|no|
|forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no|
|reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no|
|publishTimeout|Milliseconds to wait for publishing segments. It must be >= 0, where 0 means to wait forever.|0|no|
@ -162,7 +161,6 @@ Append tasks append a list of segments together into a single segment (one after
"id": <task_id>,
"dataSource": <task_datasource>,
"segments": <JSON list of DataSegment objects to append>,
"buildV9Directly": <true or false, default true>,
"aggregations": <optional list of aggregators>
}
```
@ -181,7 +179,6 @@ The grammar is:
"dataSource": <task_datasource>,
"aggregations": <list of aggregators>,
"rollup": <whether or not to rollup data during a merge>,
"buildV9Directly": <true or false, default true>,
"segments": <JSON list of DataSegment objects to merge>
}
```
@ -199,7 +196,6 @@ The grammar is:
"dataSource": <task_datasource>,
"aggregations": <list of aggregators>,
"rollup": <whether or not to rollup data during a merge>,
"buildV9Directly": <true or false, default true>,
"interval": <DataSegment objects in this interval are going to be merged>
}
```

View File

@ -869,7 +869,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
toolbox.getSegmentPusher(),
toolbox.getObjectMapper(),
toolbox.getIndexIO(),
tuningConfig.getBuildV9Directly() ? toolbox.getIndexMergerV9() : toolbox.getIndexMerger(),
toolbox.getIndexMergerV9(),
toolbox.getQueryRunnerFactoryConglomerate(),
toolbox.getSegmentAnnouncer(),
toolbox.getEmitter(),

View File

@ -40,7 +40,6 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
private final File basePersistDirectory;
private final int maxPendingPersists;
private final IndexSpec indexSpec;
private final boolean buildV9Directly;
private final boolean reportParseExceptions;
@Deprecated
private final long handoffConditionTimeout;
@ -54,6 +53,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("indexSpec") IndexSpec indexSpec,
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@ -71,7 +71,6 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
this.basePersistDirectory = defaults.getBasePersistDirectory();
this.maxPendingPersists = maxPendingPersists == null ? defaults.getMaxPendingPersists() : maxPendingPersists;
this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec;
this.buildV9Directly = buildV9Directly == null ? defaults.getBuildV9Directly() : buildV9Directly;
this.reportParseExceptions = reportParseExceptions == null
? defaults.isReportParseExceptions()
: reportParseExceptions;
@ -92,7 +91,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
config.basePersistDirectory,
config.maxPendingPersists,
config.indexSpec,
config.buildV9Directly,
true,
config.reportParseExceptions,
config.handoffConditionTimeout,
config.resetOffsetAutomatically
@ -140,10 +139,14 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
return indexSpec;
}
/**
* Always returns true, doesn't affect the version being built.
*/
@Deprecated
@JsonProperty
public boolean getBuildV9Directly()
{
return buildV9Directly;
return true;
}
@Override
@ -175,7 +178,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
dir,
maxPendingPersists,
indexSpec,
buildV9Directly,
true,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically
@ -191,7 +194,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
basePersistDirectory,
maxPendingPersists,
indexSpec,
buildV9Directly,
true,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically
@ -219,9 +222,6 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
if (maxPendingPersists != that.maxPendingPersists) {
return false;
}
if (buildV9Directly != that.buildV9Directly) {
return false;
}
if (reportParseExceptions != that.reportParseExceptions) {
return false;
}
@ -254,7 +254,6 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
result = 31 * result + (basePersistDirectory != null ? basePersistDirectory.hashCode() : 0);
result = 31 * result + maxPendingPersists;
result = 31 * result + (indexSpec != null ? indexSpec.hashCode() : 0);
result = 31 * result + (buildV9Directly ? 1 : 0);
result = 31 * result + (reportParseExceptions ? 1 : 0);
result = 31 * result + (int) (handoffConditionTimeout ^ (handoffConditionTimeout >>> 32));
result = 31 * result + (resetOffsetAutomatically ? 1 : 0);
@ -271,7 +270,6 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
", basePersistDirectory=" + basePersistDirectory +
", maxPendingPersists=" + maxPendingPersists +
", indexSpec=" + indexSpec +
", buildV9Directly=" + buildV9Directly +
", reportParseExceptions=" + reportParseExceptions +
", handoffConditionTimeout=" + handoffConditionTimeout +
", resetOffsetAutomatically=" + resetOffsetAutomatically +

View File

@ -43,6 +43,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("indexSpec") IndexSpec indexSpec,
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, // for backward compatibility
@ -62,7 +63,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
basePersistDirectory,
maxPendingPersists,
indexSpec,
buildV9Directly,
true,
reportParseExceptions,
// Supervised kafka tasks should respect KafkaSupervisorIOConfig.completionTimeout instead of
// handoffConditionTimeout
@ -124,7 +125,6 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
", basePersistDirectory=" + getBasePersistDirectory() +
", maxPendingPersists=" + getMaxPendingPersists() +
", indexSpec=" + getIndexSpec() +
", buildV9Directly=" + getBuildV9Directly() +
", reportParseExceptions=" + isReportParseExceptions() +
", handoffConditionTimeout=" + getHandoffConditionTimeout() +
", resetOffsetAutomatically=" + isResetOffsetAutomatically() +

View File

@ -44,7 +44,6 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.MapCache;
import io.druid.java.util.common.StringUtils;
import io.druid.concurrent.Execs;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
@ -75,6 +74,7 @@ import io.druid.indexing.test.TestDataSegmentKiller;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequences;
import io.druid.metadata.EntryExistsException;
@ -131,8 +131,6 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
@ -146,7 +144,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@RunWith(Parameterized.class)
public class KafkaIndexTaskTest
{
private static final Logger log = new Logger(KafkaIndexTaskTest.class);
@ -160,7 +157,6 @@ public class KafkaIndexTaskTest
private static int topicPostfix;
private final List<Task> runningTasks = Lists.newArrayList();
private final boolean buildV9Directly;
private long handoffConditionTimeout = 0;
private boolean reportParseExceptions = false;
@ -221,17 +217,6 @@ public class KafkaIndexTaskTest
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@Parameterized.Parameters(name = "buildV9Directly = {0}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(new Object[]{true}, new Object[]{false});
}
public KafkaIndexTaskTest(boolean buildV9Directly)
{
this.buildV9Directly = buildV9Directly;
}
@Rule
public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule();
@ -1401,7 +1386,7 @@ public class KafkaIndexTaskTest
null,
null,
null,
buildV9Directly,
true,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically
@ -1566,7 +1551,6 @@ public class KafkaIndexTaskTest
)
),
testUtils.getTestObjectMapper(),
testUtils.getTestIndexMerger(),
testUtils.getTestIndexIO(),
MapCache.create(1024),
new CacheConfig(),

View File

@ -61,7 +61,6 @@ public class KafkaTuningConfigTest
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
Assert.assertEquals(true, config.getBuildV9Directly());
Assert.assertEquals(false, config.isReportParseExceptions());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
}
@ -76,7 +75,6 @@ public class KafkaTuningConfigTest
+ " \"maxRowsPerSegment\": 100,\n"
+ " \"intermediatePersistPeriod\": \"PT1H\",\n"
+ " \"maxPendingPersists\": 100,\n"
+ " \"buildV9Directly\": true,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100\n"
+ "}";
@ -96,7 +94,6 @@ public class KafkaTuningConfigTest
Assert.assertEquals(100, config.getMaxRowsPerSegment());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
Assert.assertEquals(100, config.getMaxPendingPersists());
Assert.assertEquals(true, config.getBuildV9Directly());
Assert.assertEquals(true, config.isReportParseExceptions());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
}
@ -124,7 +121,6 @@ public class KafkaTuningConfigTest
Assert.assertEquals(new File("/tmp/xxx"), copy.getBasePersistDirectory());
Assert.assertEquals(4, copy.getMaxPendingPersists());
Assert.assertEquals(new IndexSpec(), copy.getIndexSpec());
Assert.assertEquals(true, copy.getBuildV9Directly());
Assert.assertEquals(true, copy.isReportParseExceptions());
Assert.assertEquals(5L, copy.getHandoffConditionTimeout());
}

View File

@ -63,7 +63,6 @@ public class KafkaSupervisorTuningConfigTest
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
Assert.assertEquals(true, config.getBuildV9Directly());
Assert.assertEquals(false, config.isReportParseExceptions());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
Assert.assertNull(config.getWorkerThreads());
@ -84,7 +83,6 @@ public class KafkaSupervisorTuningConfigTest
+ " \"maxRowsPerSegment\": 100,\n"
+ " \"intermediatePersistPeriod\": \"PT1H\",\n"
+ " \"maxPendingPersists\": 100,\n"
+ " \"buildV9Directly\": false,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"workerThreads\": 12,\n"
@ -110,7 +108,6 @@ public class KafkaSupervisorTuningConfigTest
Assert.assertEquals(100, config.getMaxRowsPerSegment());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
Assert.assertEquals(100, config.getMaxPendingPersists());
Assert.assertEquals(false, config.getBuildV9Directly());
Assert.assertEquals(true, config.isReportParseExceptions());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
Assert.assertEquals(12, (int) config.getWorkerThreads());

View File

@ -50,7 +50,6 @@ import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.FunctionalIterable;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
@ -91,7 +90,6 @@ public class HadoopDruidIndexerConfig
public static final Joiner TAB_JOINER = Joiner.on("\t");
public static final ObjectMapper JSON_MAPPER;
public static final IndexIO INDEX_IO;
public static final IndexMerger INDEX_MERGER;
public static final IndexMergerV9 INDEX_MERGER_V9;
public static final HadoopKerberosConfig HADOOP_KERBEROS_CONFIG;
public static final DataSegmentPusher DATA_SEGMENT_PUSHER;
@ -119,7 +117,6 @@ public class HadoopDruidIndexerConfig
);
JSON_MAPPER = injector.getInstance(ObjectMapper.class);
INDEX_IO = injector.getInstance(IndexIO.class);
INDEX_MERGER = injector.getInstance(IndexMerger.class);
INDEX_MERGER_V9 = injector.getInstance(IndexMergerV9.class);
HADOOP_KERBEROS_CONFIG = injector.getInstance(HadoopKerberosConfig.class);
DATA_SEGMENT_PUSHER = injector.getInstance(DataSegmentPusher.class);
@ -378,11 +375,6 @@ public class HadoopDruidIndexerConfig
return schema.getTuningConfig().getShardSpecs().get(bucket.time.getMillis()).size();
}
public boolean isBuildV9Directly()
{
return schema.getTuningConfig().getBuildV9Directly();
}
/**
* Job instance should have Configuration set (by calling {@link #addJobProperties(Job)}
* or via injected system properties) before this method is called. The {@link PathSpec} may

View File

@ -44,7 +44,6 @@ public class HadoopTuningConfig implements TuningConfig
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 75000;
private static final boolean DEFAULT_USE_COMBINER = false;
private static final Boolean DEFAULT_BUILD_V9_DIRECTLY = Boolean.TRUE;
private static final int DEFAULT_NUM_BACKGROUND_PERSIST_THREADS = 0;
public static HadoopTuningConfig makeDefaultTuningConfig()
@ -64,7 +63,7 @@ public class HadoopTuningConfig implements TuningConfig
false,
false,
null,
DEFAULT_BUILD_V9_DIRECTLY,
true,
DEFAULT_NUM_BACKGROUND_PERSIST_THREADS,
false,
false,
@ -85,7 +84,6 @@ public class HadoopTuningConfig implements TuningConfig
private final Map<String, String> jobProperties;
private final boolean combineText;
private final boolean useCombiner;
private final Boolean buildV9Directly;
private final int numBackgroundPersistThreads;
private final boolean forceExtendableShardSpecs;
private final boolean useExplicitVersion;
@ -108,6 +106,7 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("useCombiner") Boolean useCombiner,
// See https://github.com/druid-io/druid/pull/1922
final @JsonProperty("rowFlushBoundary") Integer maxRowsInMemoryCOMPAT,
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
final @JsonProperty("buildV9Directly") Boolean buildV9Directly,
final @JsonProperty("numBackgroundPersistThreads") Integer numBackgroundPersistThreads,
final @JsonProperty("forceExtendableShardSpecs") boolean forceExtendableShardSpecs,
@ -132,7 +131,6 @@ public class HadoopTuningConfig implements TuningConfig
: ImmutableMap.copyOf(jobProperties));
this.combineText = combineText;
this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue();
this.buildV9Directly = buildV9Directly == null ? DEFAULT_BUILD_V9_DIRECTLY : buildV9Directly;
this.numBackgroundPersistThreads = numBackgroundPersistThreads == null
? DEFAULT_NUM_BACKGROUND_PERSIST_THREADS
: numBackgroundPersistThreads;
@ -222,10 +220,14 @@ public class HadoopTuningConfig implements TuningConfig
return useCombiner;
}
/**
* Always returns true, doesn't affect the version being built.
*/
@Deprecated
@JsonProperty
public Boolean getBuildV9Directly()
{
return buildV9Directly;
return true;
}
@JsonProperty
@ -263,7 +265,7 @@ public class HadoopTuningConfig implements TuningConfig
combineText,
useCombiner,
null,
buildV9Directly,
true,
numBackgroundPersistThreads,
forceExtendableShardSpecs,
useExplicitVersion,
@ -288,7 +290,7 @@ public class HadoopTuningConfig implements TuningConfig
combineText,
useCombiner,
null,
buildV9Directly,
true,
numBackgroundPersistThreads,
forceExtendableShardSpecs,
useExplicitVersion,
@ -313,7 +315,7 @@ public class HadoopTuningConfig implements TuningConfig
combineText,
useCombiner,
null,
buildV9Directly,
true,
numBackgroundPersistThreads,
forceExtendableShardSpecs,
useExplicitVersion,

View File

@ -504,15 +504,9 @@ public class IndexGeneratorJob implements Jobby
final ProgressIndicator progressIndicator
) throws IOException
{
if (config.isBuildV9Directly()) {
return HadoopDruidIndexerConfig.INDEX_MERGER_V9.persist(
index, interval, file, config.getIndexSpec(), progressIndicator
);
} else {
return HadoopDruidIndexerConfig.INDEX_MERGER.persist(
index, interval, file, config.getIndexSpec(), progressIndicator
);
}
return HadoopDruidIndexerConfig.INDEX_MERGER_V9.persist(
index, interval, file, config.getIndexSpec(), progressIndicator
);
}
protected File mergeQueryableIndex(
@ -523,15 +517,9 @@ public class IndexGeneratorJob implements Jobby
) throws IOException
{
boolean rollup = config.getSchema().getDataSchema().getGranularitySpec().isRollup();
if (config.isBuildV9Directly()) {
return HadoopDruidIndexerConfig.INDEX_MERGER_V9.mergeQueryableIndex(
indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator
);
} else {
return HadoopDruidIndexerConfig.INDEX_MERGER.mergeQueryableIndex(
indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator
);
}
return HadoopDruidIndexerConfig.INDEX_MERGER_V9.mergeQueryableIndex(
indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator
);
}
@Override

View File

@ -529,12 +529,18 @@ public class HadoopConverterJob
context.progress();
final File outDir = new File(tmpDir, "out");
FileUtils.forceMkdir(outDir);
HadoopDruidConverterConfig.INDEX_MERGER.convert(
inDir,
outDir,
config.getIndexSpec(),
JobHelper.progressIndicatorForContext(context)
);
try {
HadoopDruidConverterConfig.INDEX_MERGER.convert(
inDir,
outDir,
config.getIndexSpec(),
JobHelper.progressIndicatorForContext(context)
);
}
catch (Exception e) {
log.error(e, "Conversion failed.");
throw e;
}
if (config.isValidate()) {
context.setStatus("Validating");
HadoopDruidConverterConfig.INDEX_IO.validateTwoSegments(inDir, outDir);

View File

@ -89,7 +89,7 @@ public class IndexGeneratorJobTest
@Parameterized.Parameters(name = "useCombiner={0}, partitionType={1}, interval={2}, shardInfoForEachSegment={3}, " +
"data={4}, inputFormatName={5}, inputRowParser={6}, maxRowsInMemory={7}, " +
"aggs={8}, datasourceName={9}, forceExtendableShardSpecs={10}, buildV9Directly={11}")
"aggs={8}, datasourceName={9}, forceExtendableShardSpecs={10}")
public static Collection<Object[]> constructFeed()
{
final List<Object[]> baseConstructors = Arrays.asList(
@ -371,17 +371,14 @@ public class IndexGeneratorJobTest
}
);
// Run each baseConstructor with/without buildV9Directly and forceExtendableShardSpecs.
// Run each baseConstructor with/without forceExtendableShardSpecs.
final List<Object[]> constructors = Lists.newArrayList();
for (Object[] baseConstructor : baseConstructors) {
for (int buildV9Directly = 0; buildV9Directly < 2; buildV9Directly++) {
for (int forceExtendableShardSpecs = 0; forceExtendableShardSpecs < 2 ; forceExtendableShardSpecs++) {
final Object[] fullConstructor = new Object[baseConstructor.length + 2];
System.arraycopy(baseConstructor, 0, fullConstructor, 0, baseConstructor.length);
fullConstructor[baseConstructor.length] = forceExtendableShardSpecs == 0;
fullConstructor[baseConstructor.length + 1] = buildV9Directly == 0;
constructors.add(fullConstructor);
}
for (int forceExtendableShardSpecs = 0; forceExtendableShardSpecs < 2 ; forceExtendableShardSpecs++) {
final Object[] fullConstructor = new Object[baseConstructor.length + 1];
System.arraycopy(baseConstructor, 0, fullConstructor, 0, baseConstructor.length);
fullConstructor[baseConstructor.length] = forceExtendableShardSpecs == 0;
constructors.add(fullConstructor);
}
}
@ -402,7 +399,6 @@ public class IndexGeneratorJobTest
private final AggregatorFactory[] aggs;
private final String datasourceName;
private final boolean forceExtendableShardSpecs;
private final boolean buildV9Directly;
private ObjectMapper mapper;
private HadoopDruidIndexerConfig config;
@ -420,8 +416,7 @@ public class IndexGeneratorJobTest
Integer maxRowsInMemory,
AggregatorFactory[] aggs,
String datasourceName,
boolean forceExtendableShardSpecs,
boolean buildV9Directly
boolean forceExtendableShardSpecs
) throws IOException
{
this.useCombiner = useCombiner;
@ -435,7 +430,6 @@ public class IndexGeneratorJobTest
this.aggs = aggs;
this.datasourceName = datasourceName;
this.forceExtendableShardSpecs = forceExtendableShardSpecs;
this.buildV9Directly = buildV9Directly;
}
private void writeDataToLocalSequenceFile(File outputFile, List<String> data) throws IOException
@ -522,7 +516,7 @@ public class IndexGeneratorJobTest
false,
useCombiner,
null,
buildV9Directly,
true,
null,
forceExtendableShardSpecs,
false,

View File

@ -36,7 +36,6 @@ import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
@ -79,7 +78,6 @@ public class TaskToolbox
private final SegmentLoader segmentLoader;
private final ObjectMapper objectMapper;
private final File taskWorkDir;
private final IndexMerger indexMerger;
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
@ -103,7 +101,6 @@ public class TaskToolbox
SegmentLoader segmentLoader,
ObjectMapper objectMapper,
File taskWorkDir,
IndexMerger indexMerger,
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig,
@ -127,7 +124,6 @@ public class TaskToolbox
this.segmentLoader = segmentLoader;
this.objectMapper = objectMapper;
this.taskWorkDir = taskWorkDir;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
@ -244,11 +240,6 @@ public class TaskToolbox
return indexIO;
}
public IndexMerger getIndexMerger()
{
return indexMerger;
}
public Cache getCache()
{
return cache;

View File

@ -32,7 +32,6 @@ import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
@ -65,7 +64,6 @@ public class TaskToolboxFactory
private final MonitorScheduler monitorScheduler;
private final SegmentLoaderFactory segmentLoaderFactory;
private final ObjectMapper objectMapper;
private final IndexMerger indexMerger;
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
@ -88,7 +86,6 @@ public class TaskToolboxFactory
MonitorScheduler monitorScheduler,
SegmentLoaderFactory segmentLoaderFactory,
ObjectMapper objectMapper,
IndexMerger indexMerger,
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig,
@ -110,7 +107,6 @@ public class TaskToolboxFactory
this.monitorScheduler = monitorScheduler;
this.segmentLoaderFactory = segmentLoaderFactory;
this.objectMapper = objectMapper;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
@ -138,7 +134,6 @@ public class TaskToolboxFactory
segmentLoaderFactory.manufacturate(taskWorkDir),
objectMapper,
taskWorkDir,
indexMerger,
indexIO,
cache,
cacheConfig,

View File

@ -30,14 +30,12 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
@ -69,7 +67,6 @@ public class YeOldePlumberSchool implements PlumberSchool
private final String version;
private final DataSegmentPusher dataSegmentPusher;
private final File tmpSegmentDir;
private final IndexMerger indexMerger;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
@ -81,7 +78,6 @@ public class YeOldePlumberSchool implements PlumberSchool
@JsonProperty("version") String version,
@JacksonInject("segmentPusher") DataSegmentPusher dataSegmentPusher,
@JacksonInject("tmpSegmentDir") File tmpSegmentDir,
@JacksonInject IndexMerger indexMerger,
@JacksonInject IndexMergerV9 indexMergerV9,
@JacksonInject IndexIO indexIO
)
@ -90,7 +86,6 @@ public class YeOldePlumberSchool implements PlumberSchool
this.version = version;
this.dataSegmentPusher = dataSegmentPusher;
this.tmpSegmentDir = tmpSegmentDir;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
}
@ -118,9 +113,6 @@ public class YeOldePlumberSchool implements PlumberSchool
// Set of spilled segments. Will be merged at the end.
final Set<File> spilled = Sets.newHashSet();
// IndexMerger implementation.
final IndexMerger theIndexMerger = config.getBuildV9Directly() ? indexMergerV9 : indexMerger;
return new Plumber()
{
@Override
@ -189,7 +181,7 @@ public class YeOldePlumberSchool implements PlumberSchool
}
fileToUpload = new File(tmpSegmentDir, "merged");
theIndexMerger.mergeQueryableIndex(indexes, schema.getGranularitySpec().isRollup(), schema.getAggregators(), fileToUpload, config.getIndexSpec());
indexMergerV9.mergeQueryableIndex(indexes, schema.getGranularitySpec().isRollup(), schema.getAggregators(), fileToUpload, config.getIndexSpec());
}
// Map merged segment so we can extract dimensions
@ -234,7 +226,7 @@ public class YeOldePlumberSchool implements PlumberSchool
log.info("Spilling index[%d] with rows[%d] to: %s", indexToPersist.getCount(), rowsToPersist, dirToPersist);
try {
theIndexMerger.persist(
indexMergerV9.persist(
indexToPersist.getIndex(),
dirToPersist,
config.getIndexSpec()

View File

@ -50,10 +50,8 @@ import java.util.Map;
*/
public class AppendTask extends MergeTaskBase
{
private static final Boolean defaultBuildV9Directly = Boolean.TRUE;
private final IndexSpec indexSpec;
private final List<AggregatorFactory> aggregators;
private final Boolean buildV9Directly;
@JsonCreator
public AppendTask(
@ -62,6 +60,7 @@ public class AppendTask extends MergeTaskBase
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators,
@JsonProperty("indexSpec") IndexSpec indexSpec,
// This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("context") Map<String, Object> context
)
@ -69,7 +68,6 @@ public class AppendTask extends MergeTaskBase
super(id, dataSource, segments, context);
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
this.aggregators = aggregators;
this.buildV9Directly = buildV9Directly == null ? defaultBuildV9Directly : buildV9Directly;
}
@Override
@ -135,7 +133,7 @@ public class AppendTask extends MergeTaskBase
);
}
IndexMerger indexMerger = buildV9Directly ? toolbox.getIndexMergerV9() : toolbox.getIndexMerger();
IndexMerger indexMerger = toolbox.getIndexMergerV9();
return indexMerger.append(
adapters,
aggregators == null ? null : aggregators.toArray(new AggregatorFactory[aggregators.size()]),

View File

@ -351,7 +351,13 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
log.info("Subs are good! Italian BMT and Meatball are probably my favorite.");
convertSegment(toolbox, segment, indexSpec, force, validate);
try {
convertSegment(toolbox, segment, indexSpec, force, validate);
}
catch (Exception e) {
log.error(e, "Conversion failed.");
throw e;
}
return success();
}
}

View File

@ -539,7 +539,7 @@ public class IndexTask extends AbstractTask
toolbox.getSegmentPusher(),
toolbox.getObjectMapper(),
toolbox.getIndexIO(),
ingestionSchema.getTuningConfig().isBuildV9Directly() ? toolbox.getIndexMergerV9() : toolbox.getIndexMerger()
toolbox.getIndexMergerV9()
);
}
@ -579,7 +579,7 @@ public class IndexTask extends AbstractTask
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig == null
?
new IndexTuningConfig(null, null, null, null, null, null, null, null, null, (File) null)
new IndexTuningConfig(null, null, null, null, null, null, null, null, (File) null)
: tuningConfig;
}
@ -655,7 +655,6 @@ public class IndexTask extends AbstractTask
private final IndexSpec indexSpec;
private final File basePersistDirectory;
private final int maxPendingPersists;
private final boolean buildV9Directly;
private final boolean forceExtendableShardSpecs;
private final boolean reportParseExceptions;
private final long publishTimeout;
@ -668,6 +667,7 @@ public class IndexTask extends AbstractTask
@JsonProperty("numShards") @Nullable Integer numShards,
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
// This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
@JsonProperty("forceExtendableShardSpecs") @Nullable Boolean forceExtendableShardSpecs,
@JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
@ -680,7 +680,6 @@ public class IndexTask extends AbstractTask
numShards,
indexSpec,
maxPendingPersists,
buildV9Directly,
forceExtendableShardSpecs,
reportParseExceptions,
publishTimeout,
@ -694,7 +693,6 @@ public class IndexTask extends AbstractTask
@Nullable Integer numShards,
@Nullable IndexSpec indexSpec,
@Nullable Integer maxPendingPersists,
@Nullable Boolean buildV9Directly,
@Nullable Boolean forceExtendableShardSpecs,
@Nullable Boolean reportParseExceptions,
@Nullable Long publishTimeout,
@ -715,7 +713,6 @@ public class IndexTask extends AbstractTask
this.numShards = numShards == null || numShards.equals(-1) ? null : numShards;
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
this.buildV9Directly = buildV9Directly == null ? DEFAULT_BUILD_V9_DIRECTLY : buildV9Directly;
this.forceExtendableShardSpecs = forceExtendableShardSpecs == null
? DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS
: forceExtendableShardSpecs;
@ -734,7 +731,6 @@ public class IndexTask extends AbstractTask
numShards,
indexSpec,
maxPendingPersists,
buildV9Directly,
forceExtendableShardSpecs,
reportParseExceptions,
publishTimeout,
@ -781,10 +777,14 @@ public class IndexTask extends AbstractTask
return maxPendingPersists;
}
/**
* Always returns true, doesn't affect the version being built.
*/
@Deprecated
@JsonProperty
public boolean isBuildV9Directly()
{
return buildV9Directly;
return true;
}
@JsonProperty

View File

@ -43,12 +43,10 @@ import java.util.Map;
*/
public class MergeTask extends MergeTaskBase
{
private static final Boolean defaultBuildV9Directly = Boolean.TRUE;
@JsonIgnore
private final List<AggregatorFactory> aggregators;
private final Boolean rollup;
private final IndexSpec indexSpec;
private final Boolean buildV9Directly;
@JsonCreator
public MergeTask(
@ -58,6 +56,7 @@ public class MergeTask extends MergeTaskBase
@JsonProperty("aggregations") List<AggregatorFactory> aggregators,
@JsonProperty("rollup") Boolean rollup,
@JsonProperty("indexSpec") IndexSpec indexSpec,
// This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("context") Map<String, Object> context
)
@ -66,14 +65,13 @@ public class MergeTask extends MergeTaskBase
this.aggregators = Preconditions.checkNotNull(aggregators, "null aggregations");
this.rollup = rollup == null ? Boolean.TRUE : rollup;
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
this.buildV9Directly = buildV9Directly == null ? defaultBuildV9Directly : buildV9Directly;
}
@Override
public File merge(final TaskToolbox toolbox, final Map<DataSegment, File> segments, final File outDir)
throws Exception
{
IndexMerger indexMerger = buildV9Directly ? toolbox.getIndexMergerV9() : toolbox.getIndexMerger();
IndexMerger indexMerger = toolbox.getIndexMergerV9();
return indexMerger.mergeQueryableIndex(
Lists.transform(
ImmutableList.copyOf(segments.values()),

View File

@ -308,7 +308,6 @@ public class RealtimeIndexTask extends AbstractTask
segmentPublisher,
toolbox.getSegmentHandoffNotifierFactory(),
toolbox.getQueryExecutorService(),
toolbox.getIndexMerger(),
toolbox.getIndexMergerV9(),
toolbox.getIndexIO(),
toolbox.getCache(),

View File

@ -38,13 +38,11 @@ import java.util.Map;
*/
public class SameIntervalMergeTask extends AbstractFixedIntervalTask
{
private static final Boolean defaultBuildV9Directly = Boolean.TRUE;
private static final String TYPE = "same_interval_merge";
@JsonIgnore
private final List<AggregatorFactory> aggregators;
private final Boolean rollup;
private final IndexSpec indexSpec;
private final Boolean buildV9Directly;
public SameIntervalMergeTask(
@JsonProperty("id") String id,
@ -53,6 +51,7 @@ public class SameIntervalMergeTask extends AbstractFixedIntervalTask
@JsonProperty("aggregations") List<AggregatorFactory> aggregators,
@JsonProperty("rollup") Boolean rollup,
@JsonProperty("indexSpec") IndexSpec indexSpec,
// This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("context") Map<String, Object> context
)
@ -66,7 +65,6 @@ public class SameIntervalMergeTask extends AbstractFixedIntervalTask
this.aggregators = Preconditions.checkNotNull(aggregators, "null aggregations");
this.rollup = rollup == null ? Boolean.TRUE : rollup;
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
this.buildV9Directly = buildV9Directly == null ? defaultBuildV9Directly : buildV9Directly;
}
@JsonProperty("aggregations")
@ -87,10 +85,14 @@ public class SameIntervalMergeTask extends AbstractFixedIntervalTask
return indexSpec;
}
/**
* Always returns true, doesn't affect the version being built.
*/
@Deprecated
@JsonProperty
public Boolean getBuildV9Directly()
{
return buildV9Directly;
return true;
}
public static String makeId(String id, final String typeName, String dataSource, Interval interval)
@ -127,7 +129,6 @@ public class SameIntervalMergeTask extends AbstractFixedIntervalTask
aggregators,
rollup,
indexSpec,
buildV9Directly,
getContext()
);
final TaskStatus status = mergeTask.run(toolbox);
@ -146,7 +147,6 @@ public class SameIntervalMergeTask extends AbstractFixedIntervalTask
List<AggregatorFactory> aggregators,
Boolean rollup,
IndexSpec indexSpec,
Boolean buildV9Directly,
Map<String, Object> context
)
{
@ -157,7 +157,7 @@ public class SameIntervalMergeTask extends AbstractFixedIntervalTask
aggregators,
rollup,
indexSpec,
buildV9Directly,
true,
context
);
}

View File

@ -30,7 +30,6 @@ import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
@ -78,7 +77,6 @@ public class TaskToolboxTest
private ObjectMapper ObjectMapper = new ObjectMapper();
private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class);
private Task task = EasyMock.createMock(Task.class);
private IndexMerger mockIndexMerger = EasyMock.createMock(IndexMerger.class);
private IndexMergerV9 mockIndexMergerV9 = EasyMock.createMock(IndexMergerV9.class);
private IndexIO mockIndexIO = EasyMock.createMock(IndexIO.class);
private Cache mockCache = EasyMock.createMock(Cache.class);
@ -110,7 +108,6 @@ public class TaskToolboxTest
mockMonitorScheduler,
new SegmentLoaderFactory(mockSegmentLoaderLocalCacheManager),
ObjectMapper,
mockIndexMerger,
mockIndexIO,
mockCache,
mockCacheConfig,

View File

@ -29,7 +29,6 @@ import io.druid.java.util.common.ISE;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.expression.TestExprMacroTable;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
@ -43,7 +42,6 @@ import java.util.concurrent.TimeUnit;
public class TestUtils
{
private final ObjectMapper jsonMapper;
private final IndexMerger indexMerger;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
@ -61,7 +59,6 @@ public class TestUtils
}
}
);
indexMerger = new IndexMerger(jsonMapper, indexIO);
indexMergerV9 = new IndexMergerV9(jsonMapper, indexIO);
final List<? extends Module> list = new ServerModule().getJacksonModules();
@ -73,7 +70,6 @@ public class TestUtils
new InjectableValues.Std()
.addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE)
.addValue(IndexIO.class, indexIO)
.addValue(IndexMerger.class, indexMerger)
.addValue(ObjectMapper.class, jsonMapper)
.addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider())
);
@ -84,11 +80,6 @@ public class TestUtils
return jsonMapper;
}
public IndexMerger getTestIndexMerger()
{
return indexMerger;
}
public IndexMergerV9 getTestIndexMergerV9()
{
return indexMergerV9;

View File

@ -42,7 +42,6 @@ import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
@ -98,7 +97,6 @@ public class IndexTaskTest
private final IndexSpec indexSpec;
private final ObjectMapper jsonMapper;
private IndexMerger indexMerger;
private IndexMergerV9 indexMergerV9;
private IndexIO indexIO;
private volatile int segmentAllocatePartitionCounter;
@ -108,7 +106,6 @@ public class IndexTaskTest
indexSpec = new IndexSpec();
TestUtils testUtils = new TestUtils();
jsonMapper = testUtils.getTestObjectMapper();
indexMerger = testUtils.getTestIndexMerger();
indexMergerV9 = testUtils.getTestIndexMergerV9();
indexIO = testUtils.getTestIndexIO();
}
@ -559,7 +556,7 @@ public class IndexTaskTest
throw new UnsupportedOperationException();
}
}, null, null, null, null, null, null, null, null, null, null, jsonMapper, temporaryFolder.newFolder(),
indexMerger, indexIO, null, null, indexMergerV9
indexIO, null, null, indexMergerV9
)
);

View File

@ -116,22 +116,18 @@ import org.junit.internal.matchers.ThrowableCauseMatcher;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@RunWith(Parameterized.class)
public class RealtimeIndexTaskTest
{
private static final Logger log = new Logger(RealtimeIndexTaskTest.class);
@ -232,26 +228,10 @@ public class RealtimeIndexTaskTest
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
private final boolean buildV9Directly;
private DateTime now;
private ListeningExecutorService taskExec;
private Map<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks;
@Parameterized.Parameters(name = "buildV9Directly = {0}")
public static Collection<?> constructorFeeder() throws IOException
{
return ImmutableList.of(
new Object[]{true},
new Object[]{false}
);
}
public RealtimeIndexTaskTest(boolean buildV9Directly)
{
this.buildV9Directly = buildV9Directly;
}
@Before
public void setUp()
{
@ -907,7 +887,7 @@ public class RealtimeIndexTaskTest
null,
null,
null,
buildV9Directly,
true,
0,
0,
reportParseExceptions,
@ -1062,7 +1042,6 @@ public class RealtimeIndexTaskTest
)
),
testUtils.getTestObjectMapper(),
testUtils.getTestIndexMerger(),
testUtils.getTestIndexIO(),
MapCache.create(1024),
new CacheConfig(),

View File

@ -34,7 +34,6 @@ import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.Segment;
@ -234,7 +233,7 @@ public class SameIntervalMergeTaskTest
{
}
}, jsonMapper, temporaryFolder.newFolder(),
EasyMock.createMock(IndexMerger.class), indexIO, null, null, EasyMock.createMock(IndexMergerV9.class)
indexIO, null, null, EasyMock.createMock(IndexMergerV9.class)
)
);

View File

@ -98,7 +98,6 @@ public class TaskSerdeTest
IndexTask.IndexTuningConfig.class
);
Assert.assertEquals(true, tuningConfig.isBuildV9Directly());
Assert.assertEquals(false, tuningConfig.isForceExtendableShardSpecs());
Assert.assertEquals(false, tuningConfig.isReportParseExceptions());
Assert.assertEquals(new IndexSpec(), tuningConfig.getIndexSpec());
@ -223,7 +222,6 @@ public class TaskSerdeTest
Assert.assertEquals(taskTuningConfig.getMaxRowsInMemory(), task2TuningConfig.getMaxRowsInMemory());
Assert.assertEquals(taskTuningConfig.getNumShards(), task2TuningConfig.getNumShards());
Assert.assertEquals(taskTuningConfig.getTargetPartitionSize(), task2TuningConfig.getTargetPartitionSize());
Assert.assertEquals(taskTuningConfig.isBuildV9Directly(), task2TuningConfig.isBuildV9Directly());
Assert.assertEquals(
taskTuningConfig.isForceExtendableShardSpecs(),
task2TuningConfig.isForceExtendableShardSpecs()
@ -362,7 +360,6 @@ public class TaskSerdeTest
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
Assert.assertEquals(task.getRollup(), task2.getRollup());
Assert.assertEquals(task.getBuildV9Directly(), task2.getBuildV9Directly());
Assert.assertEquals(task.getIndexSpec(), task2.getIndexSpec());
Assert.assertEquals(
task.getAggregators().get(0).getName(),

View File

@ -60,7 +60,6 @@ import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.incremental.IncrementalIndex;
@ -108,14 +107,12 @@ import java.util.Set;
public class IngestSegmentFirehoseFactoryTest
{
private static final ObjectMapper MAPPER;
private static final IndexMerger INDEX_MERGER;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
static {
TestUtils testUtils = new TestUtils();
MAPPER = setupInjectablesInObjectMapper(testUtils.getTestObjectMapper());
INDEX_MERGER = testUtils.getTestIndexMerger();
INDEX_MERGER_V9 = testUtils.getTestIndexMergerV9();
INDEX_IO = testUtils.getTestIndexIO();
}
@ -150,7 +147,7 @@ public class IngestSegmentFirehoseFactoryTest
if (!persistDir.mkdirs() && !persistDir.exists()) {
throw new IOException(String.format("Could not create directory at [%s]", persistDir.getAbsolutePath()));
}
INDEX_MERGER.persist(index, persistDir, indexSpec);
INDEX_MERGER_V9.persist(index, persistDir, indexSpec);
final TaskLockbox tl = new TaskLockbox(ts);
final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null)
@ -290,7 +287,6 @@ public class IngestSegmentFirehoseFactoryTest
)
),
MAPPER,
INDEX_MERGER,
INDEX_IO,
null,
null,

View File

@ -52,7 +52,6 @@ import io.druid.indexing.common.task.Task;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.NoopDimFilter;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.incremental.IncrementalIndex;
@ -110,14 +109,12 @@ public class IngestSegmentFirehoseFactoryTimelineTest
private final long expectedSum;
private static final ObjectMapper MAPPER;
private static final IndexMerger INDEX_MERGER;
private static final IndexIO INDEX_IO;
private static final IndexMergerV9 INDEX_MERGER_V9;
static {
TestUtils testUtils = new TestUtils();
MAPPER = IngestSegmentFirehoseFactoryTest.setupInjectablesInObjectMapper(testUtils.getTestObjectMapper());
INDEX_MERGER = testUtils.getTestIndexMerger();
INDEX_IO = testUtils.getTestIndexIO();
INDEX_MERGER_V9 = testUtils.getTestIndexMergerV9();
}
@ -228,7 +225,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
}
try {
INDEX_MERGER.persist(index, persistDir, new IndexSpec());
INDEX_MERGER_V9.persist(index, persistDir, new IndexSpec());
}
catch (IOException e) {
throw Throwables.propagate(e);
@ -333,7 +330,6 @@ public class IngestSegmentFirehoseFactoryTimelineTest
)
),
MAPPER,
INDEX_MERGER,
INDEX_IO,
null,
null,

View File

@ -80,7 +80,6 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
@ -136,7 +135,6 @@ import java.util.concurrent.Executor;
public class TaskLifecycleTest
{
private static final ObjectMapper MAPPER;
private static final IndexMerger INDEX_MERGER;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
private static final TestUtils TEST_UTILS;
@ -144,7 +142,6 @@ public class TaskLifecycleTest
static {
TEST_UTILS = new TestUtils();
MAPPER = TEST_UTILS.getTestObjectMapper();
INDEX_MERGER = TEST_UTILS.getTestIndexMerger();
INDEX_MERGER_V9 = TEST_UTILS.getTestIndexMergerV9();
INDEX_IO = TEST_UTILS.getTestIndexIO();
}
@ -594,7 +591,6 @@ public class TaskLifecycleTest
)
),
MAPPER,
INDEX_MERGER,
INDEX_IO,
MapCache.create(0),
FireDepartmentTest.NO_CACHE_CONFIG,

View File

@ -39,7 +39,6 @@ import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
import io.druid.indexing.overlord.ThreadPoolTaskRunner;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
@ -81,7 +80,6 @@ public class WorkerTaskMonitorTest
private Worker worker;
private ObjectMapper jsonMapper;
private IndexMerger indexMerger;
private IndexMergerV9 indexMergerV9;
private IndexIO indexIO;
@ -89,7 +87,6 @@ public class WorkerTaskMonitorTest
{
TestUtils testUtils = new TestUtils();
jsonMapper = testUtils.getTestObjectMapper();
indexMerger = testUtils.getTestIndexMerger();
indexMergerV9 = testUtils.getTestIndexMergerV9();
indexIO = testUtils.getTestIndexIO();
}
@ -184,7 +181,6 @@ public class WorkerTaskMonitorTest
)
),
jsonMapper,
indexMerger,
indexIO,
null,
null,

View File

@ -102,28 +102,6 @@ public interface DimensionHandler
) throws IOException;
/**
* Creates a new DimensionMergerLegacy, a per-dimension object responsible for merging indexes/row data across
* segments and building the on-disk representation of a dimension. For use with IndexMerger only.
*
* See {@link DimensionMergerLegacy} interface for more information.
*
* @param indexSpec Specification object for the index merge
* @param outDir Location to store files generated by the merging process
* @param ioPeon ioPeon object passed in by IndexMerger, manages files created by the merging process
* @param capabilities The ColumnCapabilities of the dimension represented by this DimensionHandler
* @param progress ProgressIndicator used by the merging process
* @return A new DimensionMergerLegacy object.
*/
DimensionMergerLegacy<EncodedKeyComponentType> makeLegacyMerger(
IndexSpec indexSpec,
File outDir,
IOPeon ioPeon,
ColumnCapabilities capabilities,
ProgressIndicator progress
) throws IOException;
/**
* Given an key component representing a single set of row value(s) for this dimension as an Object,
* return the length of the key component after appropriate type-casting.

View File

@ -1,69 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.io.ByteSink;
import com.google.common.io.OutputSupplier;
import io.druid.common.guava.FileOutputSupplier;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
/**
* Processing related interface
*
* DimensionMerger subclass to be used with the legacy IndexMerger.
*
* NOTE: Remove this class when the legacy IndexMerger is deprecated and removed.
*/
public interface DimensionMergerLegacy<EncodedKeyComponentType> extends DimensionMergerV9<EncodedKeyComponentType>
{
/**
* Write this dimension's value metadata to a file.
*
* @param valueEncodingFile Destination file
* @throws IOException
*/
void writeValueMetadataToFile(FileOutputSupplier valueEncodingFile) throws IOException;
/**
* Write this dimension's sequence of row values to a file.
* @param rowValueOut Destination file
* @throws IOException
*/
void writeRowValuesToFile(FileOutputSupplier rowValueOut) throws IOException;
/**
* Write this dimension's bitmap and spatial indexes to a file.
* @param invertedOut Destination file for bitmap indexes
* @param spatialOut Destination file for spatial indexes
* @throws IOException
*/
void writeIndexesToFiles(
ByteSink invertedOut,
OutputSupplier<FileOutputStream> spatialOut
) throws IOException;
File makeDimFile() throws IOException;
}

View File

@ -66,21 +66,6 @@ public class FloatDimensionHandler implements DimensionHandler<Float, Float, Flo
);
}
@Override
public DimensionMergerLegacy<Float> makeLegacyMerger(
IndexSpec indexSpec, File outDir, IOPeon ioPeon, ColumnCapabilities capabilities, ProgressIndicator progress
) throws IOException
{
return new FloatDimensionMergerLegacy(
dimensionName,
indexSpec,
outDir,
ioPeon,
capabilities,
progress
);
}
@Override
public int getLengthOfEncodedKeyComponent(Float dimVals)
{

View File

@ -1,89 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.io.ByteSink;
import com.google.common.io.OutputSupplier;
import io.druid.common.guava.FileOutputSupplier;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.IOPeon;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
public class FloatDimensionMergerLegacy extends FloatDimensionMergerV9 implements DimensionMergerLegacy<Float>
{
private FloatMetricColumnSerializer serializerV8;
public FloatDimensionMergerLegacy(
String dimensionName,
IndexSpec indexSpec,
File outDir,
IOPeon ioPeon,
ColumnCapabilities capabilities,
ProgressIndicator progress
)
{
super(dimensionName, indexSpec, outDir, ioPeon, capabilities, progress);
}
@Override
protected void setupEncodedValueWriter() throws IOException
{
final CompressedObjectStrategy.CompressionStrategy metCompression = indexSpec.getMetricCompression();
serializerV8 = new FloatMetricColumnSerializer(dimensionName, outDir, ioPeon, metCompression);
serializerV8.open();
}
@Override
public void processMergedRow(Float rowValues) throws IOException
{
serializerV8.serialize(rowValues);
}
@Override
public void writeValueMetadataToFile(FileOutputSupplier valueEncodingFile) throws IOException
{
// floats have no metadata to write
}
@Override
public void writeRowValuesToFile(FileOutputSupplier rowValueOut) throws IOException
{
// closing the serializer writes its data to the file
serializerV8.closeFile(rowValueOut.getFile());
}
@Override
public void writeIndexesToFiles(
ByteSink invertedOut, OutputSupplier<FileOutputStream> spatialOut
) throws IOException
{
// floats have no indices to write
}
@Override
public File makeDimFile() throws IOException
{
return IndexIO.makeNumericDimFile(outDir, dimensionName, IndexIO.BYTE_ORDER);
}
}

View File

@ -1,91 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.io.FileWriteMode;
import com.google.common.io.Files;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressionFactory;
import io.druid.segment.data.FloatSupplierSerializer;
import io.druid.segment.data.IOPeon;
import java.io.File;
import java.io.IOException;
/**
*/
public class FloatMetricColumnSerializer implements MetricColumnSerializer
{
private final String metricName;
private final IOPeon ioPeon;
private final File outDir;
private final CompressedObjectStrategy.CompressionStrategy compression;
private FloatSupplierSerializer writer;
public FloatMetricColumnSerializer(
String metricName,
File outDir,
IOPeon ioPeon,
CompressedObjectStrategy.CompressionStrategy compression
)
{
this.metricName = metricName;
this.ioPeon = ioPeon;
this.outDir = outDir;
this.compression = compression;
}
@Override
public void open() throws IOException
{
writer = CompressionFactory.getFloatSerializer(
ioPeon, String.format("%s_little", metricName), IndexIO.BYTE_ORDER, compression
);
writer.open();
}
@Override
public void serialize(Object obj) throws IOException
{
float val = (obj == null) ? 0 : ((Number) obj).floatValue();
writer.add(val);
}
@Override
public void close() throws IOException
{
final File outFile = IndexIO.makeMetricFile(outDir, metricName, IndexIO.BYTE_ORDER);
closeFile(outFile);
}
@Override
public void closeFile(final File outFile) throws IOException
{
outFile.delete();
MetricHolder.writeFloatMetric(
Files.asByteSink(outFile, FileWriteMode.APPEND), metricName, writer
);
IndexIO.checkFileSize(outFile);
writer = null;
}
}

View File

@ -23,33 +23,22 @@ import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.collections.bitmap.BitmapFactory;
import io.druid.collections.bitmap.ConciseBitmapFactory;
import io.druid.collections.bitmap.ImmutableBitmap;
import io.druid.collections.bitmap.MutableBitmap;
import io.druid.collections.spatial.ImmutableRTree;
import io.druid.common.utils.SerializerUtils;
import io.druid.java.util.common.io.Closer;
import io.druid.io.ZeroCopyByteArrayOutputStream;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.io.smoosh.FileSmoosher;
import io.druid.java.util.common.io.smoosh.Smoosh;
import io.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import io.druid.java.util.common.io.smoosh.SmooshedWriter;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnBuilder;
@ -62,8 +51,6 @@ import io.druid.segment.data.BitmapSerde;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.ByteBufferSerializer;
import io.druid.segment.data.CompressedLongsIndexedSupplier;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
@ -71,33 +58,22 @@ import io.druid.segment.data.IndexedIterable;
import io.druid.segment.data.IndexedMultivalue;
import io.druid.segment.data.IndexedRTree;
import io.druid.segment.data.VSizeIndexed;
import io.druid.segment.data.VSizeIndexedInts;
import io.druid.segment.serde.BitmapIndexColumnPartSupplier;
import io.druid.segment.serde.ComplexColumnPartSerde;
import io.druid.segment.serde.ComplexColumnPartSupplier;
import io.druid.segment.serde.DictionaryEncodedColumnPartSerde;
import io.druid.segment.serde.DictionaryEncodedColumnSupplier;
import io.druid.segment.serde.FloatGenericColumnPartSerde;
import io.druid.segment.serde.FloatGenericColumnSupplier;
import io.druid.segment.serde.LongGenericColumnPartSerde;
import io.druid.segment.serde.LongGenericColumnSupplier;
import io.druid.segment.serde.SpatialIndexColumnPartSupplier;
import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.AbstractList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -115,26 +91,19 @@ public class IndexIO
private static final SerializerUtils serializerUtils = new SerializerUtils();
private final ObjectMapper mapper;
private final DefaultIndexIOHandler defaultIndexIOHandler;
@Inject
public IndexIO(ObjectMapper mapper, ColumnConfig columnConfig)
{
this.mapper = Preconditions.checkNotNull(mapper, "null ObjectMapper");
Preconditions.checkNotNull(columnConfig, "null ColumnConfig");
defaultIndexIOHandler = new DefaultIndexIOHandler(mapper);
indexLoaders = ImmutableMap.<Integer, IndexLoader>builder()
.put(0, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig))
.put(1, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig))
.put(2, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig))
.put(3, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig))
.put(4, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig))
.put(5, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig))
.put(6, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig))
.put(7, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig))
.put(8, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig))
.put(9, new V9IndexLoader(columnConfig))
.build();
ImmutableMap.Builder<Integer, IndexLoader> indexLoadersBuilder = ImmutableMap.builder();
LegacyIndexLoader legacyIndexLoader = new LegacyIndexLoader(new DefaultIndexIOHandler(), columnConfig);
for (int i = 0; i <= V8_VERSION; i++) {
indexLoadersBuilder.put(i, legacyIndexLoader);
}
indexLoadersBuilder.put((int) V9_VERSION, new V9IndexLoader(columnConfig));
indexLoaders = indexLoadersBuilder.build();
}
public void validateTwoSegments(File dir1, File dir2) throws IOException
@ -246,11 +215,6 @@ public class IndexIO
}
}
public boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec) throws IOException
{
return convertSegment(toConvert, converted, indexSpec, false, true);
}
public boolean convertSegment(
File toConvert,
File converted,
@ -260,50 +224,19 @@ public class IndexIO
) throws IOException
{
final int version = SegmentUtils.getVersionFromDir(toConvert);
switch (version) {
case 1:
case 2:
case 3:
log.makeAlert("Attempt to load segment of version <= 3.")
.addData("version", version)
.emit();
return false;
case 4:
case 5:
case 6:
case 7:
log.info("Old version, re-persisting.");
try (QueryableIndex segmentToConvert = loadIndex(toConvert)) {
new IndexMerger(mapper, this).append(
Arrays.<IndexableAdapter>asList(new QueryableIndexIndexableAdapter(segmentToConvert)),
null,
converted,
indexSpec
);
}
return true;
case 8:
defaultIndexIOHandler.convertV8toV9(toConvert, converted, indexSpec);
return true;
default:
if (forceIfCurrent) {
new IndexMerger(mapper, this).convert(toConvert, converted, indexSpec);
if (validate) {
validateTwoSegments(toConvert, converted);
}
return true;
} else {
log.info("Version[%s], skipping.", version);
return false;
}
boolean current = version == CURRENT_VERSION_ID;
if (!current || forceIfCurrent) {
new IndexMergerV9(mapper, this).convert(toConvert, converted, indexSpec);
if (validate) {
validateTwoSegments(toConvert, converted);
}
return true;
} else {
log.info("Current version[%d], skipping.", version);
return false;
}
}
public DefaultIndexIOHandler getDefaultIndexIOHandler()
{
return defaultIndexIOHandler;
}
static interface IndexIOHandler
{
public MMappedIndex mapDir(File inDir) throws IOException;
@ -366,12 +299,6 @@ public class IndexIO
public static class DefaultIndexIOHandler implements IndexIOHandler
{
private static final Logger log = new Logger(DefaultIndexIOHandler.class);
private final ObjectMapper mapper;
public DefaultIndexIOHandler(ObjectMapper mapper)
{
this.mapper = mapper;
}
@Override
public MMappedIndex mapDir(File inDir) throws IOException
@ -481,362 +408,6 @@ public class IndexIO
return retVal;
}
public void convertV8toV9(File v8Dir, File v9Dir, IndexSpec indexSpec)
throws IOException
{
log.info("Converting v8[%s] to v9[%s]", v8Dir, v9Dir);
InputStream indexIn = null;
try {
indexIn = new FileInputStream(new File(v8Dir, "index.drd"));
byte theVersion = (byte) indexIn.read();
if (theVersion != V8_VERSION) {
throw new IAE("Unknown version[%s]", theVersion);
}
}
finally {
Closeables.close(indexIn, false);
}
Closer closer = Closer.create();
try {
SmooshedFileMapper v8SmooshedFiles = closer.register(Smoosh.map(v8Dir));
FileUtils.forceMkdir(v9Dir);
final FileSmoosher v9Smoosher = closer.register(new FileSmoosher(v9Dir));
ByteStreams.write(Ints.toByteArray(9), Files.newOutputStreamSupplier(new File(v9Dir, "version.bin")));
Map<String, GenericIndexed<ImmutableBitmap>> bitmapIndexes = Maps.newHashMap();
final ByteBuffer invertedBuffer = v8SmooshedFiles.mapFile("inverted.drd");
BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory();
while (invertedBuffer.hasRemaining()) {
final String dimName = serializerUtils.readString(invertedBuffer);
bitmapIndexes.put(
dimName,
GenericIndexed.read(invertedBuffer, bitmapSerdeFactory.getObjectStrategy(), v8SmooshedFiles)
);
}
Map<String, ImmutableRTree> spatialIndexes = Maps.newHashMap();
final ByteBuffer spatialBuffer = v8SmooshedFiles.mapFile("spatial.drd");
while (spatialBuffer != null && spatialBuffer.hasRemaining()) {
spatialIndexes.put(
serializerUtils.readString(spatialBuffer),
ByteBufferSerializer.read(
spatialBuffer, new IndexedRTree.ImmutableRTreeObjectStrategy(
bitmapSerdeFactory.getBitmapFactory()
)
)
);
}
final LinkedHashSet<String> skippedFiles = Sets.newLinkedHashSet();
final Set<String> skippedDimensions = Sets.newLinkedHashSet();
for (String filename : v8SmooshedFiles.getInternalFilenames()) {
log.info("Processing file[%s]", filename);
if (filename.startsWith("dim_")) {
final ColumnDescriptor.Builder builder = ColumnDescriptor.builder();
builder.setValueType(ValueType.STRING);
final List<ByteBuffer> outParts = Lists.newArrayList();
ByteBuffer dimBuffer = v8SmooshedFiles.mapFile(filename);
String dimension = serializerUtils.readString(dimBuffer);
if (!filename.equals(String.format("dim_%s.drd", dimension))) {
throw new ISE("loaded dimension[%s] from file[%s]", dimension, filename);
}
ByteArrayOutputStream nameBAOS = new ByteArrayOutputStream();
serializerUtils.writeString(nameBAOS, dimension);
outParts.add(ByteBuffer.wrap(nameBAOS.toByteArray()));
GenericIndexed<String> dictionary = GenericIndexed.read(
dimBuffer, GenericIndexed.STRING_STRATEGY
);
if (dictionary.size() == 0) {
log.info("Dimension[%s] had cardinality 0, equivalent to no column, so skipping.", dimension);
skippedDimensions.add(dimension);
continue;
}
int emptyStrIdx = dictionary.indexOf("");
List<Integer> singleValCol = null;
VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer());
GenericIndexed<ImmutableBitmap> bitmaps = bitmapIndexes.get(dimension);
ImmutableRTree spatialIndex = spatialIndexes.get(dimension);
final BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory();
boolean onlyOneValue = true;
MutableBitmap nullsSet = null;
for (int i = 0; i < multiValCol.size(); ++i) {
VSizeIndexedInts rowValue = multiValCol.get(i);
if (!onlyOneValue) {
break;
}
if (rowValue.size() > 1) {
onlyOneValue = false;
}
if (rowValue.size() == 0 || rowValue.get(0) == emptyStrIdx) {
if (nullsSet == null) {
nullsSet = bitmapFactory.makeEmptyMutableBitmap();
}
nullsSet.add(i);
}
}
if (onlyOneValue) {
log.info("Dimension[%s] is single value, converting...", dimension);
final boolean bumpedDictionary;
if (nullsSet != null) {
log.info("Dimension[%s] has null rows.", dimension);
final ImmutableBitmap theNullSet = bitmapFactory.makeImmutableBitmap(nullsSet);
if (dictionary.get(0) != null) {
log.info("Dimension[%s] has no null value in the dictionary, expanding...", dimension);
bumpedDictionary = true;
final List<String> nullList = Lists.newArrayList();
nullList.add(null);
dictionary = GenericIndexed.fromIterable(
Iterables.concat(nullList, dictionary),
GenericIndexed.STRING_STRATEGY
);
bitmaps = GenericIndexed.fromIterable(
Iterables.concat(Collections.singletonList(theNullSet), bitmaps),
bitmapSerdeFactory.getObjectStrategy()
);
} else {
bumpedDictionary = false;
bitmaps = GenericIndexed.fromIterable(
Iterables.concat(
Collections.singletonList(
bitmapFactory
.union(Arrays.asList(theNullSet, bitmaps.get(0)))
),
Iterables.skip(bitmaps, 1)
),
bitmapSerdeFactory.getObjectStrategy()
);
}
} else {
bumpedDictionary = false;
}
final VSizeIndexed finalMultiValCol = multiValCol;
singleValCol = new AbstractList<Integer>()
{
@Override
public Integer get(int index)
{
final VSizeIndexedInts ints = finalMultiValCol.get(index);
return ints.size() == 0 ? 0 : ints.get(0) + (bumpedDictionary ? 1 : 0);
}
@Override
public int size()
{
return finalMultiValCol.size();
}
};
multiValCol = null;
} else {
builder.setHasMultipleValues(true);
}
final CompressedObjectStrategy.CompressionStrategy compressionStrategy = indexSpec.getDimensionCompression();
final DictionaryEncodedColumnPartSerde.LegacySerializerBuilder columnPartBuilder = DictionaryEncodedColumnPartSerde
.legacySerializerBuilder()
.withDictionary(dictionary)
.withBitmapSerdeFactory(bitmapSerdeFactory)
.withBitmaps(bitmaps)
.withSpatialIndex(spatialIndex)
.withByteOrder(BYTE_ORDER);
if (singleValCol != null) {
if (compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED) {
columnPartBuilder.withSingleValuedColumn(
CompressedVSizeIntsIndexedSupplier.fromList(
singleValCol,
dictionary.size(),
CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(dictionary.size()),
BYTE_ORDER,
compressionStrategy
)
);
} else {
columnPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size()));
}
} else if (compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED) {
columnPartBuilder.withMultiValuedColumn(
CompressedVSizeIndexedSupplier.fromIterable(
multiValCol,
dictionary.size(),
BYTE_ORDER,
compressionStrategy
)
);
} else {
columnPartBuilder.withMultiValuedColumn(multiValCol);
}
final ColumnDescriptor serdeficator = builder
.addSerde(columnPartBuilder.build())
.build();
makeColumn(v9Smoosher, dimension, serdeficator);
} else if (filename.startsWith("met_") || filename.startsWith("numeric_dim_")) {
// NOTE: identifying numeric dimensions by using a different filename pattern is meant to allow the
// legacy merger (which will be deprecated) to support long/float dims. Going forward, the V9 merger
// should be used instead if any dimension types beyond String are needed.
if (!filename.endsWith(String.format("%s.drd", BYTE_ORDER))) {
skippedFiles.add(filename);
continue;
}
MetricHolder holder = MetricHolder.fromByteBuffer(v8SmooshedFiles.mapFile(filename), v8SmooshedFiles);
final String metric = holder.getName();
final ColumnDescriptor.Builder builder = ColumnDescriptor.builder();
switch (holder.getType()) {
case LONG:
builder.setValueType(ValueType.LONG);
builder.addSerde(
LongGenericColumnPartSerde.legacySerializerBuilder()
.withByteOrder(BYTE_ORDER)
.withDelegate(holder.longType)
.build()
);
break;
case FLOAT:
builder.setValueType(ValueType.FLOAT);
builder.addSerde(
FloatGenericColumnPartSerde.legacySerializerBuilder()
.withByteOrder(BYTE_ORDER)
.withDelegate(holder.floatType)
.build()
);
break;
case COMPLEX:
if (!(holder.complexType instanceof GenericIndexed)) {
throw new ISE("Serialized complex types must be GenericIndexed objects.");
}
final GenericIndexed column = (GenericIndexed) holder.complexType;
final String complexType = holder.getTypeName();
builder.setValueType(ValueType.COMPLEX);
builder.addSerde(
ComplexColumnPartSerde.legacySerializerBuilder()
.withTypeName(complexType)
.withDelegate(column).build()
);
break;
default:
throw new ISE("Unknown type[%s]", holder.getType());
}
final ColumnDescriptor serdeficator = builder.build();
makeColumn(v9Smoosher, metric, serdeficator);
} else if (String.format("time_%s.drd", BYTE_ORDER).equals(filename)) {
CompressedLongsIndexedSupplier timestamps = CompressedLongsIndexedSupplier.fromByteBuffer(
v8SmooshedFiles.mapFile(filename),
BYTE_ORDER,
v8SmooshedFiles
);
final ColumnDescriptor.Builder builder = ColumnDescriptor.builder();
builder.setValueType(ValueType.LONG);
builder.addSerde(
LongGenericColumnPartSerde.legacySerializerBuilder()
.withByteOrder(BYTE_ORDER)
.withDelegate(timestamps)
.build()
);
final ColumnDescriptor serdeficator = builder.build();
makeColumn(v9Smoosher, "__time", serdeficator);
} else {
skippedFiles.add(filename);
}
}
final ByteBuffer indexBuffer = v8SmooshedFiles.mapFile("index.drd");
indexBuffer.get(); // Skip the version byte
final GenericIndexed<String> dims8 = GenericIndexed.read(
indexBuffer, GenericIndexed.STRING_STRATEGY
);
final GenericIndexed<String> dims9 = GenericIndexed.fromIterable(
Iterables.filter(
dims8, new Predicate<String>()
{
@Override
public boolean apply(String s)
{
return !skippedDimensions.contains(s);
}
}
),
GenericIndexed.STRING_STRATEGY
);
final GenericIndexed<String> availableMetrics = GenericIndexed.read(
indexBuffer, GenericIndexed.STRING_STRATEGY
);
final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer));
final BitmapSerdeFactory segmentBitmapSerdeFactory = mapper.readValue(
serializerUtils.readString(indexBuffer),
BitmapSerdeFactory.class
);
Set<String> columns = Sets.newTreeSet();
columns.addAll(Lists.newArrayList(dims9));
columns.addAll(Lists.newArrayList(availableMetrics));
GenericIndexed<String> cols = GenericIndexed.fromIterable(columns, GenericIndexed.STRING_STRATEGY);
final String segmentBitmapSerdeFactoryString = mapper.writeValueAsString(segmentBitmapSerdeFactory);
final long numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16
+ serializerUtils.getSerializedStringByteSize(segmentBitmapSerdeFactoryString);
final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes);
cols.writeToChannel(writer);
dims9.writeToChannel(writer);
serializerUtils.writeLong(writer, dataInterval.getStartMillis());
serializerUtils.writeLong(writer, dataInterval.getEndMillis());
serializerUtils.writeString(writer, segmentBitmapSerdeFactoryString);
writer.close();
final ByteBuffer metadataBuffer = v8SmooshedFiles.mapFile("metadata.drd");
if (metadataBuffer != null) {
v9Smoosher.add("metadata.drd", metadataBuffer);
}
log.info("Skipped files[%s]", skippedFiles);
}
catch (Throwable t) {
throw closer.rethrow(t);
}
finally {
closer.close();
}
}
private void makeColumn(FileSmoosher v9Smoosher, String dimension, ColumnDescriptor serdeficator)
throws IOException
{
ZeroCopyByteArrayOutputStream specBytes = new ZeroCopyByteArrayOutputStream();
serializerUtils.writeString(specBytes, mapper.writeValueAsString(serdeficator));
try (SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter(
dimension, serdeficator.numBytes() + specBytes.size()
)) {
specBytes.writeTo(channel);
serdeficator.write(channel, v9Smoosher);
}
}
}
static interface IndexLoader

File diff suppressed because it is too large Load Diff

View File

@ -21,17 +21,26 @@ package io.druid.segment;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.google.inject.Inject;
import io.druid.collections.CombiningIterable;
import io.druid.common.utils.JodaUtils;
import io.druid.io.ZeroCopyByteArrayOutputStream;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.common.guava.FunctionalIterable;
import io.druid.java.util.common.guava.MergeIterable;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.io.smoosh.FileSmoosher;
import io.druid.java.util.common.io.smoosh.SmooshedWriter;
@ -46,7 +55,10 @@ import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressionFactory;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.IOPeon;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.TmpFileIOPeon;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.segment.loading.MMappedQueryableSegmentizerFactory;
import io.druid.segment.serde.ComplexColumnPartSerde;
import io.druid.segment.serde.ComplexMetricSerde;
@ -60,6 +72,8 @@ import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@ -72,9 +86,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
public class IndexMergerV9 extends IndexMerger
public class IndexMergerV9 implements IndexMerger
{
private static final Logger log = new Logger(IndexMergerV9.class);
protected final ObjectMapper mapper;
protected final IndexIO indexIO;
@Inject
public IndexMergerV9(
@ -82,11 +98,24 @@ public class IndexMergerV9 extends IndexMerger
IndexIO indexIO
)
{
super(mapper, indexIO);
this.mapper = Preconditions.checkNotNull(mapper, "null ObjectMapper");
this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO");
}
@Override
protected File makeIndexFiles(
private static void registerDeleteDirectory(Closer closer, final File dir)
{
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
FileUtils.deleteDirectory(dir);
}
});
}
private File makeIndexFiles(
final List<IndexableAdapter> adapters,
final AggregatorFactory[] metricAggs,
final File outDir,
@ -581,4 +610,450 @@ public class IndexMergerV9 extends IndexMerger
dimCapabilities.add(capabilitiesMap.get(dim));
}
}
@Override
public File persist(
final IncrementalIndex index,
File outDir,
IndexSpec indexSpec
) throws IOException
{
return persist(index, index.getInterval(), outDir, indexSpec);
}
@Override
public File persist(
final IncrementalIndex index,
final Interval dataInterval,
File outDir,
IndexSpec indexSpec
) throws IOException
{
return persist(index, dataInterval, outDir, indexSpec, new BaseProgressIndicator());
}
@Override
public File persist(
final IncrementalIndex index,
final Interval dataInterval,
File outDir,
IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
{
if (index.isEmpty()) {
throw new IAE("Trying to persist an empty index!");
}
final long firstTimestamp = index.getMinTime().getMillis();
final long lastTimestamp = index.getMaxTime().getMillis();
if (!(dataInterval.contains(firstTimestamp) && dataInterval.contains(lastTimestamp))) {
throw new IAE(
"interval[%s] does not encapsulate the full range of timestamps[%s, %s]",
dataInterval,
new DateTime(firstTimestamp),
new DateTime(lastTimestamp)
);
}
FileUtils.forceMkdir(outDir);
log.info("Starting persist for interval[%s], rows[%,d]", dataInterval, index.size());
return merge(
Arrays.<IndexableAdapter>asList(
new IncrementalIndexAdapter(
dataInterval,
index,
indexSpec.getBitmapSerdeFactory().getBitmapFactory()
)
),
// if index is not rolled up, then it should be not rollup here
// if index is rolled up, then it is no need to rollup again.
// In this case, true/false won't cause reOrdering in merge stage
// while merging a single iterable
false,
index.getMetricAggs(),
outDir,
indexSpec,
progress
);
}
@Override
public File mergeQueryableIndex(
List<QueryableIndex> indexes,
boolean rollup,
final AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec
) throws IOException
{
return mergeQueryableIndex(indexes, rollup, metricAggs, outDir, indexSpec, new BaseProgressIndicator());
}
@Override
public File mergeQueryableIndex(
List<QueryableIndex> indexes,
boolean rollup,
final AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
{
return merge(
IndexMerger.toIndexableAdapters(indexes),
rollup,
metricAggs,
outDir,
indexSpec,
progress
);
}
@Override
public File merge(
List<IndexableAdapter> indexes,
boolean rollup,
final AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec
) throws IOException
{
return merge(indexes, rollup, metricAggs, outDir, indexSpec, new BaseProgressIndicator());
}
@Override
public File merge(
List<IndexableAdapter> indexes,
final boolean rollup,
final AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
{
FileUtils.deleteDirectory(outDir);
FileUtils.forceMkdir(outDir);
final List<String> mergedDimensions = IndexMerger.getMergedDimensions(indexes);
final List<String> mergedMetrics = Lists.transform(
IndexMerger.mergeIndexed(
Lists.newArrayList(
FunctionalIterable
.create(indexes)
.transform(
new Function<IndexableAdapter, Iterable<String>>()
{
@Override
public Iterable<String> apply(@Nullable IndexableAdapter input)
{
return input.getMetricNames();
}
}
)
)
),
new Function<String, String>()
{
@Override
public String apply(@Nullable String input)
{
return input;
}
}
);
final AggregatorFactory[] sortedMetricAggs = new AggregatorFactory[mergedMetrics.size()];
for (int i = 0; i < metricAggs.length; i++) {
AggregatorFactory metricAgg = metricAggs[i];
int metricIndex = mergedMetrics.indexOf(metricAgg.getName());
/*
If metricIndex is negative, one of the metricAggs was not present in the union of metrics from the indices
we are merging
*/
if (metricIndex > -1) {
sortedMetricAggs[metricIndex] = metricAgg;
}
}
/*
If there is nothing at sortedMetricAggs[i], then we did not have a metricAgg whose name matched the name
of the ith element of mergedMetrics. I.e. There was a metric in the indices to merge that we did not ask for.
*/
for (int i = 0; i < sortedMetricAggs.length; i++) {
if (sortedMetricAggs[i] == null) {
throw new IAE("Indices to merge contained metric[%s], but requested metrics did not", mergedMetrics.get(i));
}
}
for (int i = 0; i < mergedMetrics.size(); i++) {
if (!sortedMetricAggs[i].getName().equals(mergedMetrics.get(i))) {
throw new IAE(
"Metric mismatch, index[%d] [%s] != [%s]",
i,
sortedMetricAggs[i].getName(),
mergedMetrics.get(i)
);
}
}
Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn = new Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>>()
{
@Override
public Iterable<Rowboat> apply(
@Nullable ArrayList<Iterable<Rowboat>> boats
)
{
if (rollup) {
return CombiningIterable.create(
new MergeIterable<>(Comparators.naturalNullsFirst(), boats),
Comparators.naturalNullsFirst(),
new RowboatMergeFunction(sortedMetricAggs)
);
} else {
return new MergeIterable<Rowboat>(
new Ordering<Rowboat>()
{
@Override
public int compare(Rowboat left, Rowboat right)
{
return Longs.compare(left.getTimestamp(), right.getTimestamp());
}
}.nullsFirst(),
boats
);
}
}
};
return makeIndexFiles(
indexes,
sortedMetricAggs,
outDir,
progress,
mergedDimensions,
mergedMetrics,
rowMergerFn,
indexSpec
);
}
@Override
public File convert(final File inDir, final File outDir, final IndexSpec indexSpec) throws IOException
{
return convert(inDir, outDir, indexSpec, new BaseProgressIndicator());
}
@Override
public File convert(final File inDir, final File outDir, final IndexSpec indexSpec, final ProgressIndicator progress)
throws IOException
{
try (QueryableIndex index = indexIO.loadIndex(inDir)) {
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index);
return makeIndexFiles(
ImmutableList.of(adapter),
null,
outDir,
progress,
Lists.newArrayList(adapter.getDimensionNames()),
Lists.newArrayList(adapter.getMetricNames()),
new Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>>()
{
@Nullable
@Override
public Iterable<Rowboat> apply(ArrayList<Iterable<Rowboat>> input)
{
return input.get(0);
}
},
indexSpec
);
}
}
@Override
public File append(
List<IndexableAdapter> indexes, AggregatorFactory[] aggregators, File outDir, IndexSpec indexSpec
) throws IOException
{
return append(indexes, aggregators, outDir, indexSpec, new BaseProgressIndicator());
}
@Override
public File append(
List<IndexableAdapter> indexes,
AggregatorFactory[] aggregators,
File outDir,
IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
{
FileUtils.deleteDirectory(outDir);
FileUtils.forceMkdir(outDir);
final List<String> mergedDimensions = IndexMerger.getMergedDimensions(indexes);
final List<String> mergedMetrics = IndexMerger.mergeIndexed(
Lists.transform(
indexes,
new Function<IndexableAdapter, Iterable<String>>()
{
@Override
public Iterable<String> apply(@Nullable IndexableAdapter input)
{
return Iterables.transform(
input.getMetricNames(),
new Function<String, String>()
{
@Override
public String apply(@Nullable String input)
{
return input;
}
}
);
}
}
)
);
Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn = new Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>>()
{
@Override
public Iterable<Rowboat> apply(
@Nullable final ArrayList<Iterable<Rowboat>> boats
)
{
return new MergeIterable<>(Comparators.naturalNullsFirst(), boats);
}
};
return makeIndexFiles(
indexes,
aggregators,
outDir,
progress,
mergedDimensions,
mergedMetrics,
rowMergerFn,
indexSpec
);
}
private DimensionHandler[] makeDimensionHandlers(
final List<String> mergedDimensions,
final List<ColumnCapabilitiesImpl> dimCapabilities
)
{
final DimensionHandler[] handlers = new DimensionHandler[mergedDimensions.size()];
for (int i = 0; i < mergedDimensions.size(); i++) {
ColumnCapabilities capabilities = dimCapabilities.get(i);
String dimName = mergedDimensions.get(i);
handlers[i] = DimensionHandlerUtils.getHandlerFromCapabilities(dimName, capabilities, null);
}
return handlers;
}
private Iterable<Rowboat> makeRowIterable(
List<IndexableAdapter> indexes,
final List<String> mergedDimensions,
final List<String> mergedMetrics,
Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn,
final List<ColumnCapabilitiesImpl> dimCapabilities,
final DimensionHandler[] handlers,
final List<DimensionMerger> mergers
)
{
ArrayList<Iterable<Rowboat>> boats = Lists.newArrayListWithCapacity(indexes.size());
for (int i = 0; i < indexes.size(); ++i) {
final IndexableAdapter adapter = indexes.get(i);
final int[] dimLookup = getColumnIndexReorderingMap(adapter.getDimensionNames(), mergedDimensions);
final int[] metricLookup = getColumnIndexReorderingMap(adapter.getMetricNames(), mergedMetrics);
Iterable<Rowboat> target = indexes.get(i).getRows();
if (dimLookup != null || metricLookup != null) {
// resize/reorder index table if needed
target = Iterables.transform(
target,
new Function<Rowboat, Rowboat>()
{
@Override
public Rowboat apply(Rowboat input)
{
Object[] newDims;
if (dimLookup != null) {
newDims = new Object[mergedDimensions.size()];
int j = 0;
for (Object dim : input.getDims()) {
newDims[dimLookup[j]] = dim;
j++;
}
} else {
// It's possible for getColumnIndexReorderingMap to return null when
// both column lists are identical. Copy the old array, no dimension reordering is needed.
newDims = input.getDims();
}
Object[] newMetrics = input.getMetrics();
if (metricLookup != null) {
newMetrics = new Object[mergedMetrics.size()];
int j = 0;
for (Object met : input.getMetrics()) {
newMetrics[metricLookup[j]] = met;
j++;
}
}
return new Rowboat(
input.getTimestamp(),
newDims,
newMetrics,
input.getRowNum(),
handlers
);
}
}
);
}
boats.add(
new MMappedIndexRowIterable(
target, mergedDimensions, i, dimCapabilities, mergers
)
);
}
return rowMergerFn.apply(boats);
}
// If an adapter's column list differs from the merged column list across multiple indexes,
// return an array that maps the adapter's column orderings to the larger, merged column ordering
private int[] getColumnIndexReorderingMap(Indexed<String> adapterColumnNames, List<String> mergedColumnNames)
{
if (isSame(adapterColumnNames, mergedColumnNames)) {
return null; // no need to convert if column lists are identical
}
int[] dimLookup = new int[mergedColumnNames.size()];
for (int i = 0; i < adapterColumnNames.size(); i++) {
dimLookup[i] = mergedColumnNames.indexOf(adapterColumnNames.get(i));
}
return dimLookup;
}
private boolean isSame(Indexed<String> indexed, List<String> values)
{
if (indexed.size() != values.size()) {
return false;
}
for (int i = 0; i < indexed.size(); i++) {
if (!indexed.get(i).equals(values.get(i))) {
return false;
}
}
return true;
}
}

View File

@ -66,21 +66,6 @@ public class LongDimensionHandler implements DimensionHandler<Long, Long, Long>
);
}
@Override
public DimensionMergerLegacy<Long> makeLegacyMerger(
IndexSpec indexSpec, File outDir, IOPeon ioPeon, ColumnCapabilities capabilities, ProgressIndicator progress
) throws IOException
{
return new LongDimensionMergerLegacy(
dimensionName,
indexSpec,
outDir,
ioPeon,
capabilities,
progress
);
}
@Override
public int getLengthOfEncodedKeyComponent(Long dimVals)
{

View File

@ -1,91 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.io.ByteSink;
import com.google.common.io.OutputSupplier;
import io.druid.common.guava.FileOutputSupplier;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressionFactory;
import io.druid.segment.data.IOPeon;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
public class LongDimensionMergerLegacy extends LongDimensionMergerV9 implements DimensionMergerLegacy<Long>
{
private LongMetricColumnSerializer serializerV8;
public LongDimensionMergerLegacy(
String dimensionName,
IndexSpec indexSpec,
File outDir,
IOPeon ioPeon,
ColumnCapabilities capabilities,
ProgressIndicator progress
)
{
super(dimensionName, indexSpec, outDir, ioPeon, capabilities, progress);
}
@Override
protected void setupEncodedValueWriter() throws IOException
{
final CompressedObjectStrategy.CompressionStrategy metCompression = indexSpec.getMetricCompression();
final CompressionFactory.LongEncodingStrategy longEncoding = indexSpec.getLongEncoding();
serializerV8 = new LongMetricColumnSerializer(dimensionName, outDir, ioPeon, metCompression, longEncoding);
serializerV8.open();
}
@Override
public void processMergedRow(Long rowValues) throws IOException
{
serializerV8.serialize(rowValues);
}
@Override
public void writeValueMetadataToFile(FileOutputSupplier valueEncodingFile) throws IOException
{
// longs have no metadata to write
}
@Override
public void writeRowValuesToFile(FileOutputSupplier rowValueOut) throws IOException
{
// closing the serializer writes its data to the file
serializerV8.closeFile(rowValueOut.getFile());
}
@Override
public void writeIndexesToFiles(
ByteSink invertedOut, OutputSupplier<FileOutputStream> spatialOut
) throws IOException
{
// longs have no indices to write
}
@Override
public File makeDimFile() throws IOException
{
return IndexIO.makeNumericDimFile(outDir, dimensionName, IndexIO.BYTE_ORDER);
}
}

View File

@ -1,95 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.io.FileWriteMode;
import com.google.common.io.Files;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressionFactory;
import io.druid.segment.data.IOPeon;
import io.druid.segment.data.LongSupplierSerializer;
import java.io.File;
import java.io.IOException;
/**
* Unsafe for concurrent use from multiple threads.
*/
public class LongMetricColumnSerializer implements MetricColumnSerializer
{
private final String metricName;
private final IOPeon ioPeon;
private final File outDir;
private final CompressedObjectStrategy.CompressionStrategy compression;
private final CompressionFactory.LongEncodingStrategy encoding;
private LongSupplierSerializer writer;
public LongMetricColumnSerializer(
String metricName,
File outDir,
IOPeon ioPeon,
CompressedObjectStrategy.CompressionStrategy compression,
CompressionFactory.LongEncodingStrategy encoding
)
{
this.metricName = metricName;
this.ioPeon = ioPeon;
this.outDir = outDir;
this.compression = compression;
this.encoding = encoding;
}
@Override
public void open() throws IOException
{
writer = CompressionFactory.getLongSerializer(
ioPeon, String.format("%s_little", metricName), IndexIO.BYTE_ORDER, encoding, compression
);
writer.open();
}
@Override
public void serialize(Object obj) throws IOException
{
long val = (obj == null) ? 0 : ((Number) obj).longValue();
writer.add(val);
}
@Override
public void close() throws IOException
{
final File outFile = IndexIO.makeMetricFile(outDir, metricName, IndexIO.BYTE_ORDER);
closeFile(outFile);
}
@Override
public void closeFile(final File outFile) throws IOException
{
outFile.delete();
MetricHolder.writeLongMetric(
Files.asByteSink(outFile, FileWriteMode.APPEND), metricName, writer
);
IndexIO.checkFileSize(outFile);
writer = null;
}
}

View File

@ -1,33 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import java.io.File;
import java.io.IOException;
/**
*/
public interface MetricColumnSerializer
{
public void open() throws IOException;
public void serialize(Object aggs) throws IOException;
public void close() throws IOException;
public void closeFile(File outFile) throws IOException;
}

View File

@ -202,15 +202,4 @@ public class StringDimensionHandler implements DimensionHandler<Integer, int[],
return new StringDimensionMergerV9(dimensionName, indexSpec, outDir, ioPeon, capabilities, progress);
}
@Override
public DimensionMergerLegacy makeLegacyMerger(
IndexSpec indexSpec,
File outDir,
IOPeon ioPeon,
ColumnCapabilities capabilities,
ProgressIndicator progress
)
{
return new StringDimensionMergerLegacy(dimensionName, indexSpec, outDir, ioPeon, capabilities, progress);
}
}

View File

@ -1,220 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.io.ByteSink;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.io.OutputSupplier;
import com.google.common.primitives.Ints;
import io.druid.collections.bitmap.BitmapFactory;
import io.druid.collections.spatial.ImmutableRTree;
import io.druid.collections.spatial.RTree;
import io.druid.collections.spatial.split.LinearGutmanSplitStrategy;
import io.druid.common.guava.FileOutputSupplier;
import io.druid.common.utils.SerializerUtils;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.ByteBufferUtils;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.ByteBufferWriter;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.GenericIndexedWriter;
import io.druid.segment.data.IOPeon;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedRTree;
import io.druid.segment.data.VSizeIndexedWriter;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.IntBuffer;
import java.nio.MappedByteBuffer;
import java.util.List;
public class StringDimensionMergerLegacy extends StringDimensionMergerV9 implements DimensionMergerLegacy<int[]>
{
private static final Logger log = new Logger(StringDimensionMergerLegacy.class);
private VSizeIndexedWriter encodedValueWriterV8;
private File dictionaryFile;
public StringDimensionMergerLegacy(
String dimensionName,
IndexSpec indexSpec,
File outDir,
IOPeon ioPeon,
ColumnCapabilities capabilities,
ProgressIndicator progress
)
{
super(dimensionName, indexSpec, outDir, ioPeon, capabilities, progress);
}
@Override
protected void setupEncodedValueWriter() throws IOException
{
encodedValueWriterV8 = new VSizeIndexedWriter(ioPeon, dimensionName, cardinality);
encodedValueWriterV8.open();
}
@Override
protected void processMergedRowHelper(int[] vals) throws IOException
{
List<Integer> listToWrite = (vals == null)
? null
: Ints.asList(vals);
encodedValueWriterV8.add(listToWrite);
}
@Override
public void writeIndexes(List<IntBuffer> segmentRowNumConversions, Closer closer) throws IOException
{
final SerializerUtils serializerUtils = new SerializerUtils();
long dimStartTime = System.currentTimeMillis();
final BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory();
String bmpFilename = String.format("%s.inverted", dimensionName);
bitmapWriter = new GenericIndexedWriter<>(
ioPeon,
bmpFilename,
bitmapSerdeFactory.getObjectStrategy()
);
bitmapWriter.open();
final MappedByteBuffer dimValsMapped = Files.map(dictionaryFile);
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
ByteBufferUtils.unmap(dimValsMapped);
}
});
if (!dimensionName.equals(serializerUtils.readString(dimValsMapped))) {
throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimensionName);
}
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.STRING_STRATEGY);
log.info("Starting dimension[%s] with cardinality[%,d]", dimensionName, dimVals.size());
final BitmapFactory bmpFactory = bitmapSerdeFactory.getBitmapFactory();
RTree tree = null;
spatialWriter = null;
boolean hasSpatial = capabilities.hasSpatialIndexes();
if (hasSpatial) {
String spatialFilename = String.format("%s.spatial", dimensionName);
spatialWriter = new ByteBufferWriter<>(
ioPeon, spatialFilename, new IndexedRTree.ImmutableRTreeObjectStrategy(bmpFactory)
);
spatialWriter.open();
tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bmpFactory), bmpFactory);
}
IndexSeeker[] dictIdSeeker = toIndexSeekers(adapters, dimConversions, dimensionName);
//Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result.
for (int dictId = 0; dictId < dimVals.size(); dictId++) {
progress.progress();
mergeBitmaps(
segmentRowNumConversions,
dimVals,
bmpFactory,
tree,
hasSpatial,
dictIdSeeker,
dictId,
adapters,
dimensionName,
nullRowsBitmap,
bitmapWriter
);
}
log.info("Completed dimension[%s] in %,d millis.", dimensionName, System.currentTimeMillis() - dimStartTime);
if (hasSpatial) {
spatialWriter.write(ImmutableRTree.newImmutableFromMutable(tree));
}
}
@Override
public void writeValueMetadataToFile(final FileOutputSupplier valueEncodingFile) throws IOException
{
final SerializerUtils serializerUtils = new SerializerUtils();
dictionaryWriter.close();
serializerUtils.writeString(valueEncodingFile, dimensionName);
ByteStreams.copy(dictionaryWriter.combineStreams(), valueEncodingFile);
// save this File reference, we will read from it later when building bitmap/spatial indexes
dictionaryFile = valueEncodingFile.getFile();
}
@Override
public void writeRowValuesToFile(FileOutputSupplier rowValueFile) throws IOException
{
encodedValueWriterV8.close();
ByteStreams.copy(encodedValueWriterV8.combineStreams(), rowValueFile);
}
@Override
public void writeIndexesToFiles(
final ByteSink invertedIndexFile,
final OutputSupplier<FileOutputStream> spatialIndexFile
) throws IOException
{
final SerializerUtils serializerUtils = new SerializerUtils();
final OutputSupplier<OutputStream> invertedIndexOutputSupplier = new OutputSupplier<OutputStream>()
{
@Override
public OutputStream getOutput() throws IOException
{
return invertedIndexFile.openStream();
}
};
bitmapWriter.close();
serializerUtils.writeString(invertedIndexOutputSupplier, dimensionName);
ByteStreams.copy(bitmapWriter.combineStreams(), invertedIndexOutputSupplier);
if (capabilities.hasSpatialIndexes()) {
spatialWriter.close();
serializerUtils.writeString(spatialIndexFile, dimensionName);
ByteStreams.copy(spatialWriter.combineStreams(), spatialIndexFile);
}
}
@Override
public File makeDimFile() throws IOException
{
return IndexIO.makeDimFile(outDir, dimensionName);
}
}

View File

@ -25,7 +25,6 @@ import io.druid.java.util.common.io.smoosh.FileSmoosher;
import io.druid.segment.GenericColumnSerializer;
import io.druid.segment.column.ColumnBuilder;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.data.GenericIndexed;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -58,11 +57,6 @@ public class ComplexColumnPartSerde implements ColumnPartSerde
return new SerializerBuilder();
}
public static LegacySerializerBuilder legacySerializerBuilder()
{
return new LegacySerializerBuilder();
}
@JsonProperty
public String getTypeName()
{
@ -127,42 +121,4 @@ public class ComplexColumnPartSerde implements ColumnPartSerde
);
}
}
public static class LegacySerializerBuilder
{
private String typeName = null;
private GenericIndexed delegate = null;
public LegacySerializerBuilder withTypeName(final String typeName)
{
this.typeName = typeName;
return this;
}
public LegacySerializerBuilder withDelegate(final GenericIndexed delegate)
{
this.delegate = delegate;
return this;
}
public ComplexColumnPartSerde build()
{
return new ComplexColumnPartSerde(
typeName, new Serializer()
{
@Override
public long numBytes()
{
return delegate.getSerializedSize();
}
@Override
public void write(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{
delegate.writeToChannel(channel);
}
}
);
}
}
}

View File

@ -1,93 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.serde;
import com.google.common.io.Files;
import io.druid.segment.IndexIO;
import io.druid.segment.MetricColumnSerializer;
import io.druid.segment.MetricHolder;
import io.druid.segment.data.GenericIndexedWriter;
import io.druid.segment.data.IOPeon;
import java.io.File;
import java.io.IOException;
/**
*/
public class ComplexMetricColumnSerializer implements MetricColumnSerializer
{
private final String metricName;
private final ComplexMetricSerde serde;
private final IOPeon ioPeon;
private final File outDir;
private GenericIndexedWriter writer;
public ComplexMetricColumnSerializer(
String metricName,
File outDir,
IOPeon ioPeon,
ComplexMetricSerde serde
)
{
this.metricName = metricName;
this.serde = serde;
this.ioPeon = ioPeon;
this.outDir = outDir;
}
@SuppressWarnings(value = "unchecked")
@Override
public void open() throws IOException
{
writer = new GenericIndexedWriter(
ioPeon, String.format("%s_%s", metricName, outDir.getName()), serde.getObjectStrategy()
);
writer.open();
}
@Override
public void serialize(Object agg) throws IOException
{
writer.write(agg);
}
@Override
public void close() throws IOException
{
writer.close();
final File outFile = IndexIO.makeMetricFile(outDir, metricName, IndexIO.BYTE_ORDER);
closeFile(outFile);
}
@Override
public void closeFile(final File outFile) throws IOException
{
outFile.delete();
MetricHolder.writeComplexMetric(
Files.newOutputStreamSupplier(outFile, true), metricName, serde.getTypeName(), writer
);
IndexIO.checkFileSize(outFile);
writer = null;
}
}

View File

@ -251,164 +251,6 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
}
}
public static LegacySerializerBuilder legacySerializerBuilder()
{
return new LegacySerializerBuilder();
}
public static class LegacySerializerBuilder
{
private VERSION version = null;
private int flags = NO_FLAGS;
private GenericIndexed<String> dictionary = null;
private WritableSupplier<IndexedInts> singleValuedColumn = null;
private WritableSupplier<IndexedMultivalue<IndexedInts>> multiValuedColumn = null;
private BitmapSerdeFactory bitmapSerdeFactory = null;
private GenericIndexed<ImmutableBitmap> bitmaps = null;
private ImmutableRTree spatialIndex = null;
private ByteOrder byteOrder = null;
private LegacySerializerBuilder()
{
}
public LegacySerializerBuilder withDictionary(GenericIndexed<String> dictionary)
{
this.dictionary = dictionary;
return this;
}
public LegacySerializerBuilder withBitmapSerdeFactory(BitmapSerdeFactory bitmapSerdeFactory)
{
this.bitmapSerdeFactory = bitmapSerdeFactory;
return this;
}
public LegacySerializerBuilder withBitmaps(GenericIndexed<ImmutableBitmap> bitmaps)
{
this.bitmaps = bitmaps;
return this;
}
public LegacySerializerBuilder withSpatialIndex(ImmutableRTree spatialIndex)
{
this.spatialIndex = spatialIndex;
return this;
}
public LegacySerializerBuilder withByteOrder(ByteOrder byteOrder)
{
this.byteOrder = byteOrder;
return this;
}
public LegacySerializerBuilder withSingleValuedColumn(VSizeIndexedInts singleValuedColumn)
{
Preconditions.checkState(multiValuedColumn == null, "Cannot set both singleValuedColumn and multiValuedColumn");
this.version = VERSION.UNCOMPRESSED_SINGLE_VALUE;
this.singleValuedColumn = singleValuedColumn.asWritableSupplier();
return this;
}
public LegacySerializerBuilder withSingleValuedColumn(CompressedVSizeIntsIndexedSupplier singleValuedColumn)
{
Preconditions.checkState(multiValuedColumn == null, "Cannot set both singleValuedColumn and multiValuedColumn");
this.version = VERSION.COMPRESSED;
this.singleValuedColumn = singleValuedColumn;
return this;
}
public LegacySerializerBuilder withMultiValuedColumn(VSizeIndexed multiValuedColumn)
{
Preconditions.checkState(singleValuedColumn == null, "Cannot set both multiValuedColumn and singleValuedColumn");
this.version = VERSION.UNCOMPRESSED_MULTI_VALUE;
this.flags |= Feature.MULTI_VALUE.getMask();
this.multiValuedColumn = multiValuedColumn.asWritableSupplier();
return this;
}
public LegacySerializerBuilder withMultiValuedColumn(CompressedVSizeIndexedSupplier multiValuedColumn)
{
Preconditions.checkState(singleValuedColumn == null, "Cannot set both singleValuedColumn and multiValuedColumn");
this.version = VERSION.COMPRESSED;
this.flags |= Feature.MULTI_VALUE.getMask();
this.multiValuedColumn = multiValuedColumn;
return this;
}
public DictionaryEncodedColumnPartSerde build()
{
Preconditions.checkArgument(
singleValuedColumn != null ^ multiValuedColumn != null,
"Exactly one of singleValCol[%s] or multiValCol[%s] must be set",
singleValuedColumn, multiValuedColumn
);
return new DictionaryEncodedColumnPartSerde(
byteOrder,
bitmapSerdeFactory,
new Serializer()
{
@Override
public long numBytes()
{
long size = 1 + // version
(version.compareTo(VERSION.COMPRESSED) >= 0 ? Ints.BYTES : 0);// flag if version >= compressed
size += dictionary.getSerializedSize();
if (Feature.MULTI_VALUE.isSet(flags)) {
size += multiValuedColumn.getSerializedSize();
} else {
size += singleValuedColumn.getSerializedSize();
}
size += bitmaps.getSerializedSize();
if (spatialIndex != null) {
size += spatialIndex.size() + Ints.BYTES;
}
return size;
}
@Override
public void write(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{
channel.write(ByteBuffer.wrap(new byte[]{version.asByte()}));
if (version.compareTo(VERSION.COMPRESSED) >= 0) {
channel.write(ByteBuffer.wrap(Ints.toByteArray(flags)));
}
if (dictionary != null) {
dictionary.writeToChannel(channel);
}
if (Feature.MULTI_VALUE.isSet(flags)) {
if (multiValuedColumn != null) {
multiValuedColumn.writeToChannel(channel);
}
} else {
if (singleValuedColumn != null) {
singleValuedColumn.writeToChannel(channel);
}
}
if (bitmaps != null) {
bitmaps.writeToChannel(channel);
}
if (spatialIndex != null) {
ByteBufferSerializer.writeToChannel(
spatialIndex,
new IndexedRTree.ImmutableRTreeObjectStrategy(bitmapSerdeFactory.getBitmapFactory()),
channel
);
}
}
}
);
}
}
@Override
public Serializer getSerializer()
{

View File

@ -104,49 +104,6 @@ public class FloatGenericColumnPartSerde implements ColumnPartSerde
}
}
public static LegacySerializerBuilder legacySerializerBuilder()
{
return new LegacySerializerBuilder();
}
public static class LegacySerializerBuilder
{
private ByteOrder byteOrder = null;
private CompressedFloatsIndexedSupplier delegate = null;
public LegacySerializerBuilder withByteOrder(final ByteOrder byteOrder)
{
this.byteOrder = byteOrder;
return this;
}
public LegacySerializerBuilder withDelegate(final CompressedFloatsIndexedSupplier delegate)
{
this.delegate = delegate;
return this;
}
public FloatGenericColumnPartSerde build()
{
return new FloatGenericColumnPartSerde(
byteOrder, new Serializer()
{
@Override
public long numBytes()
{
return delegate.getSerializedSize();
}
@Override
public void write(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{
delegate.writeToChannel(channel);
}
}
);
}
}
@Override
public Serializer getSerializer()
{

View File

@ -104,49 +104,6 @@ public class LongGenericColumnPartSerde implements ColumnPartSerde
}
}
public static LegacySerializerBuilder legacySerializerBuilder()
{
return new LegacySerializerBuilder();
}
public static class LegacySerializerBuilder
{
private ByteOrder byteOrder = null;
private CompressedLongsIndexedSupplier delegate = null;
public LegacySerializerBuilder withByteOrder(final ByteOrder byteOrder)
{
this.byteOrder = byteOrder;
return this;
}
public LegacySerializerBuilder withDelegate(final CompressedLongsIndexedSupplier delegate)
{
this.delegate = delegate;
return this;
}
public LongGenericColumnPartSerde build()
{
return new LongGenericColumnPartSerde(
byteOrder, new Serializer()
{
@Override
public long numBytes()
{
return delegate.getSerializedSize();
}
@Override
public void write(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{
delegate.writeToChannel(channel);
}
}
);
}
}
@Override
public Serializer getSerializer()
{

View File

@ -141,7 +141,7 @@ public class MultiValuedDimensionTest
}
persistedSegmentDir = Files.createTempDir();
TestHelper.getTestIndexMerger()
TestHelper.getTestIndexMergerV9()
.persist(incrementalIndex, persistedSegmentDir, new IndexSpec());
queryableIndex = TestHelper.getTestIndexIO().loadIndex(persistedSegmentDir);

View File

@ -70,6 +70,7 @@ import io.druid.query.topn.TopNQueryRunnerFactory;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
@ -154,7 +155,7 @@ public class AggregationTestHelper
return new AggregationTestHelper(
mapper,
new IndexMerger(mapper, indexIO),
new IndexMergerV9(mapper, indexIO),
indexIO,
factory.getToolchest(),
factory,
@ -210,7 +211,7 @@ public class AggregationTestHelper
return new AggregationTestHelper(
mapper,
new IndexMerger(mapper, indexIO),
new IndexMergerV9(mapper, indexIO),
indexIO,
toolchest,
factory,
@ -250,7 +251,7 @@ public class AggregationTestHelper
return new AggregationTestHelper(
mapper,
new IndexMerger(mapper, indexIO),
new IndexMergerV9(mapper, indexIO),
indexIO,
toolchest,
factory,
@ -301,7 +302,7 @@ public class AggregationTestHelper
return new AggregationTestHelper(
mapper,
new IndexMerger(mapper, indexIO),
new IndexMergerV9(mapper, indexIO),
indexIO,
toolchest,
factory,

View File

@ -1,40 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import io.druid.segment.data.CompressedObjectStrategy.CompressionStrategy;
import io.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import io.druid.segment.data.ConciseBitmapSerdeFactory;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class ConciseBitmapIndexMergerTest extends IndexMergerTestBase
{
public ConciseBitmapIndexMergerTest(
CompressionStrategy compressionStrategy,
CompressionStrategy dimCompressionStrategy,
LongEncodingStrategy longEncodingStrategy
)
{
super(new ConciseBitmapSerdeFactory(), compressionStrategy, dimCompressionStrategy, longEncodingStrategy);
indexMerger = TestHelper.getTestIndexMerger();
}
}

View File

@ -57,7 +57,7 @@ public class EmptyIndexTest
emptyIndex,
new ConciseBitmapFactory()
);
TestHelper.getTestIndexMerger().merge(
TestHelper.getTestIndexMergerV9().merge(
Lists.<IndexableAdapter>newArrayList(emptyIndexAdapter),
true,
new AggregatorFactory[0],

View File

@ -49,7 +49,7 @@ public class IndexBuilder
private IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMetrics(new CountAggregatorFactory("count"))
.build();
private IndexMerger indexMerger = TestHelper.getTestIndexMerger();
private IndexMerger indexMerger = TestHelper.getTestIndexMergerV9();
private File tmpDir;
private IndexSpec indexSpec = new IndexSpec();
private int maxRows = DEFAULT_MAX_ROWS;

View File

@ -19,11 +19,9 @@
package io.druid.segment;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.InputRow;
@ -42,12 +40,10 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -56,7 +52,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RunWith(Parameterized.class)
public class IndexMergerV9CompatibilityTest
{
@Rule
@ -70,7 +65,6 @@ public class IndexMergerV9CompatibilityTest
private static final IndexMergerV9 INDEX_MERGER_V9 = TestHelper.getTestIndexMergerV9();
private static final IndexIO INDEX_IO = TestHelper.getTestIndexIO();
private static final IndexMerger INDEX_MERGER = TestHelper.getTestIndexMerger();
private static final IndexSpec INDEX_SPEC = IndexMergerTestBase.makeIndexSpec(
new ConciseBitmapSerdeFactory(),
@ -80,38 +74,12 @@ public class IndexMergerV9CompatibilityTest
);
private static final List<String> DIMS = ImmutableList.of("dim0", "dim1");
private static final Function<Collection<Map<String, Object>>, Object[]> OBJECT_MAKER = new Function<Collection<Map<String, Object>>, Object[]>()
{
@Nullable
@Override
public Object[] apply(Collection<Map<String, Object>> input)
{
final ArrayList<InputRow> list = new ArrayList<>();
int i = 0;
for (final Map<String, Object> map : input) {
list.add(new MapBasedInputRow(TIMESTAMP + i++, DIMS, map));
}
return new Object[]{list};
}
};
private final Collection<InputRow> events;
@SafeVarargs
public static Collection<Object[]> permute(Map<String, Object>... maps)
public IndexMergerV9CompatibilityTest()
{
if (maps == null) {
return ImmutableList.<Object[]>of();
}
return Collections2.transform(
Collections2.permutations(
Arrays.asList(maps)
),
OBJECT_MAKER
);
}
events = new ArrayList<>();
@Parameterized.Parameters
public static Iterable<Object[]> paramFeeder()
{
final Map<String, Object> map1 = ImmutableMap.<String, Object>of(
DIMS.get(0), ImmutableList.<String>of("dim00", "dim01"),
DIMS.get(1), "dim10"
@ -137,25 +105,10 @@ public class IndexMergerV9CompatibilityTest
final Map<String, Object> map6 = new HashMap<>();
map6.put(DIMS.get(1), null); // ImmutableMap cannot take null
return Iterables.<Object[]>concat(
permute(map1)
, permute(map1, map4)
, permute(map1, map5)
, permute(map5, map6)
, permute(map4, map5)
, Iterables.transform(ImmutableList.of(Arrays.asList(map1, map2, map3, map4, map5, map6)), OBJECT_MAKER)
);
}
private final Collection<InputRow> events;
public IndexMergerV9CompatibilityTest(
final Collection<InputRow> events
)
{
this.events = events;
int i = 0;
for (final Map<String, Object> map : Arrays.asList(map1, map2, map3, map4, map5, map6)) {
events.add(new MapBasedInputRow(TIMESTAMP + i++, DIMS, map));
}
}
IncrementalIndex toPersist;
@ -181,7 +134,18 @@ public class IndexMergerV9CompatibilityTest
}
tmpDir = Files.createTempDir();
persistTmpDir = new File(tmpDir, "persistDir");
INDEX_MERGER.persist(toPersist, persistTmpDir, INDEX_SPEC);
FileUtils.forceMkdir(persistTmpDir);
String[] files = new String[] {"00000.smoosh", "meta.smoosh", "version.bin"};
for (String file : files) {
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return IndexMergerV9CompatibilityTest.class.getResourceAsStream("/v8SegmentPersistDir/" + file);
}
}.copyTo(Files.asByteSink(new File(persistTmpDir, file)));
}
}
@After
@ -237,18 +201,6 @@ public class IndexMergerV9CompatibilityTest
return outDir;
}
private File appendAndValidate(File inDir, File tmpDir) throws IOException
{
final File outDir = INDEX_MERGER.append(
ImmutableList.<IndexableAdapter>of(new QueryableIndexIndexableAdapter(closer.closeLater(INDEX_IO.loadIndex(inDir)))),
null,
tmpDir,
INDEX_SPEC
);
INDEX_IO.validateTwoSegments(persistTmpDir, outDir);
return outDir;
}
@Test
public void testIdempotentReprocess() throws IOException
{
@ -273,43 +225,4 @@ public class IndexMergerV9CompatibilityTest
Assert.assertEquals(events.size(), adapter3.getNumRows());
reprocessAndValidate(tmpDir2, tmpDir3);
}
@Test
public void testSimpleAppend() throws IOException
{
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(
closer.closeLater(
INDEX_IO.loadIndex(
persistTmpDir
)
)
);
Assert.assertEquals(events.size(), adapter.getNumRows());
appendAndValidate(persistTmpDir, new File(tmpDir, "reprocessed"));
}
@Test
public void testIdempotentAppend() throws IOException
{
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(
closer.closeLater(
INDEX_IO.loadIndex(
persistTmpDir
)
)
);
Assert.assertEquals(events.size(), adapter.getNumRows());
final File tmpDir1 = new File(tmpDir, "reprocessed1");
appendAndValidate(persistTmpDir, tmpDir1);
final File tmpDir2 = new File(tmpDir, "reprocessed2");
final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(closer.closeLater(INDEX_IO.loadIndex(tmpDir1)));
Assert.assertEquals(events.size(), adapter2.getNumRows());
appendAndValidate(tmpDir1, tmpDir2);
final File tmpDir3 = new File(tmpDir, "reprocessed3");
final IndexableAdapter adapter3 = new QueryableIndexIndexableAdapter(closer.closeLater(INDEX_IO.loadIndex(tmpDir2)));
Assert.assertEquals(events.size(), adapter3.getNumRows());
appendAndValidate(tmpDir2, tmpDir3);
}
}

View File

@ -34,7 +34,7 @@ import java.io.File;
public class QueryableIndexIndexableAdapterTest
{
private final static IndexMerger INDEX_MERGER = TestHelper.getTestIndexMerger();
private final static IndexMerger INDEX_MERGER = TestHelper.getTestIndexMergerV9();
private final static IndexIO INDEX_IO = TestHelper.getTestIndexIO();
private static final IndexSpec INDEX_SPEC = IndexMergerTestBase.makeIndexSpec(
new ConciseBitmapSerdeFactory(),

View File

@ -1,40 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import io.druid.segment.data.CompressedObjectStrategy.CompressionStrategy;
import io.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import io.druid.segment.data.RoaringBitmapSerdeFactory;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class RoaringBitmapIndexMergerTest extends IndexMergerTestBase
{
public RoaringBitmapIndexMergerTest(
CompressionStrategy compressionStrategy,
CompressionStrategy dimCompressionStrategy,
LongEncodingStrategy longEncodingStrategy
)
{
super(new RoaringBitmapSerdeFactory(null), compressionStrategy, dimCompressionStrategy, longEncodingStrategy);
indexMerger = TestHelper.getTestIndexMerger();
}
}

View File

@ -86,7 +86,7 @@ public class SchemalessIndexTest
private static final Map<Integer, Map<Integer, QueryableIndex>> mergedIndexes = Maps.newHashMap();
private static final List<QueryableIndex> rowPersistedIndexes = Lists.newArrayList();
private static final IndexMerger INDEX_MERGER = TestHelper.getTestIndexMerger();
private static final IndexMerger INDEX_MERGER = TestHelper.getTestIndexMergerV9();
private static final IndexIO INDEX_IO = TestHelper.getTestIndexIO();
private static IncrementalIndex index = null;

View File

@ -41,7 +41,6 @@ import java.util.Map;
*/
public class TestHelper
{
private static final IndexMerger INDEX_MERGER;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
@ -58,15 +57,9 @@ public class TestHelper
}
}
);
INDEX_MERGER = new IndexMerger(jsonMapper, INDEX_IO);
INDEX_MERGER_V9 = new IndexMergerV9(jsonMapper, INDEX_IO);
}
public static IndexMerger getTestIndexMerger()
{
return INDEX_MERGER;
}
public static IndexMergerV9 getTestIndexMergerV9()
{
return INDEX_MERGER_V9;

View File

@ -123,7 +123,7 @@ public class TestIndex
};
private static final IndexSpec indexSpec = new IndexSpec();
private static final IndexMerger INDEX_MERGER = TestHelper.getTestIndexMerger();
private static final IndexMerger INDEX_MERGER = TestHelper.getTestIndexMergerV9();
private static final IndexIO INDEX_IO = TestHelper.getTestIndexIO();
static {

View File

@ -183,8 +183,7 @@ public abstract class BaseFilterTest
"roaring", new RoaringBitmapSerdeFactory(true)
);
final Map<String, IndexMerger> indexMergers = ImmutableMap.<String, IndexMerger>of(
"IndexMerger", TestHelper.getTestIndexMerger(),
final Map<String, IndexMerger> indexMergers = ImmutableMap.of(
"IndexMergerV9", TestHelper.getTestIndexMergerV9()
);

View File

@ -81,7 +81,7 @@ public class SpatialFilterBonusTest
new LongSumAggregatorFactory("val", "val")
};
private static List<String> DIMS = Lists.newArrayList("dim", "dim.geo");
private static final IndexMerger INDEX_MERGER = TestHelper.getTestIndexMerger();
private static final IndexMerger INDEX_MERGER = TestHelper.getTestIndexMergerV9();
private static final IndexIO INDEX_IO = TestHelper.getTestIndexIO();
private final Segment segment;

View File

@ -71,7 +71,7 @@ import java.util.Random;
@RunWith(Parameterized.class)
public class SpatialFilterTest
{
private static IndexMerger INDEX_MERGER = TestHelper.getTestIndexMerger();
private static IndexMerger INDEX_MERGER = TestHelper.getTestIndexMergerV9();
private static IndexIO INDEX_IO = TestHelper.getTestIndexIO();
public static final int NUM_POINTS = 5000;

View File

@ -0,0 +1,7 @@
v1,2147483647,1
__time,0,1793,2219
count,0,1386,1793
dim0,0,0,857
dim1,0,857,1386
index.drd,0,2219,2338
metadata.drd,0,2338,2525

View File

@ -47,7 +47,6 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
private static final int defaultMaxPendingPersists = 0;
private static final ShardSpec defaultShardSpec = NoneShardSpec.instance();
private static final IndexSpec defaultIndexSpec = new IndexSpec();
private static final Boolean defaultBuildV9Directly = Boolean.TRUE;
private static final Boolean defaultReportParseExceptions = Boolean.FALSE;
private static final long defaultHandoffConditionTimeout = 0;
private static final long defaultAlertTimeout = 0;
@ -70,7 +69,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
defaultMaxPendingPersists,
defaultShardSpec,
defaultIndexSpec,
defaultBuildV9Directly,
true,
0,
0,
defaultReportParseExceptions,
@ -88,7 +87,6 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
private final int maxPendingPersists;
private final ShardSpec shardSpec;
private final IndexSpec indexSpec;
private final boolean buildV9Directly;
private final int persistThreadPriority;
private final int mergeThreadPriority;
private final boolean reportParseExceptions;
@ -106,6 +104,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("shardSpec") ShardSpec shardSpec,
@JsonProperty("indexSpec") IndexSpec indexSpec,
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("persistThreadPriority") int persistThreadPriority,
@JsonProperty("mergeThreadPriority") int mergeThreadPriority,
@ -127,7 +126,6 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists;
this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec;
this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec;
this.buildV9Directly = buildV9Directly == null ? defaultBuildV9Directly : buildV9Directly;
this.mergeThreadPriority = mergeThreadPriority;
this.persistThreadPriority = persistThreadPriority;
this.reportParseExceptions = reportParseExceptions == null
@ -201,10 +199,14 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
return indexSpec;
}
/**
* Always returns true, doesn't affect the version being built.
*/
@Deprecated
@JsonProperty
public Boolean getBuildV9Directly()
{
return buildV9Directly;
return true;
}
@JsonProperty
@ -250,7 +252,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
maxPendingPersists,
shardSpec,
indexSpec,
buildV9Directly,
true,
persistThreadPriority,
mergeThreadPriority,
reportParseExceptions,
@ -271,7 +273,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
maxPendingPersists,
shardSpec,
indexSpec,
buildV9Directly,
true,
persistThreadPriority,
mergeThreadPriority,
reportParseExceptions,

View File

@ -30,7 +30,6 @@ import io.druid.client.cache.CacheConfig;
import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
@ -54,7 +53,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
private final QueryRunnerFactoryConglomerate conglomerate;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ExecutorService queryExecutorService;
private final IndexMerger indexMerger;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
private final Cache cache;
@ -68,7 +66,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
@JacksonInject QueryRunnerFactoryConglomerate conglomerate,
@JacksonInject DataSegmentAnnouncer segmentAnnouncer,
@JacksonInject @Processing ExecutorService queryExecutorService,
@JacksonInject IndexMerger indexMerger,
@JacksonInject IndexMergerV9 indexMergerV9,
@JacksonInject IndexIO indexIO,
@JacksonInject Cache cache,
@ -84,7 +81,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
null,
null,
queryExecutorService,
indexMerger,
indexMergerV9,
indexIO,
cache,
@ -97,7 +93,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
this.conglomerate = conglomerate;
this.segmentAnnouncer = segmentAnnouncer;
this.queryExecutorService = queryExecutorService;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
@ -123,7 +118,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
conglomerate,
segmentAnnouncer,
queryExecutorService,
config.getBuildV9Directly() ? indexMergerV9 : indexMerger,
indexMergerV9,
indexIO,
cache,
cacheConfig,

View File

@ -29,7 +29,6 @@ import io.druid.client.cache.CacheConfig;
import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
@ -51,7 +50,6 @@ public class RealtimePlumberSchool implements PlumberSchool
private final SegmentPublisher segmentPublisher;
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final ExecutorService queryExecutorService;
private final IndexMerger indexMerger;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
private final Cache cache;
@ -67,7 +65,6 @@ public class RealtimePlumberSchool implements PlumberSchool
@JacksonInject SegmentPublisher segmentPublisher,
@JacksonInject SegmentHandoffNotifierFactory handoffNotifierFactory,
@JacksonInject @Processing ExecutorService executorService,
@JacksonInject IndexMerger indexMerger,
@JacksonInject IndexMergerV9 indexMergerV9,
@JacksonInject IndexIO indexIO,
@JacksonInject Cache cache,
@ -82,7 +79,6 @@ public class RealtimePlumberSchool implements PlumberSchool
this.segmentPublisher = segmentPublisher;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryExecutorService = executorService;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
@ -111,7 +107,7 @@ public class RealtimePlumberSchool implements PlumberSchool
dataSegmentPusher,
segmentPublisher,
handoffNotifierFactory.createSegmentHandoffNotifier(schema.getDataSource()),
config.getBuildV9Directly() ? indexMergerV9 : indexMerger,
indexMergerV9,
indexIO,
cache,
cacheConfig,

View File

@ -65,7 +65,6 @@ public class RealtimeTuningConfigTest
);
Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(true, config.getBuildV9Directly());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
Assert.assertEquals(0, config.getAlertTimeout());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
@ -89,7 +88,6 @@ public class RealtimeTuningConfigTest
+ " \"windowPeriod\": \"PT1H\",\n"
+ " \"basePersistDirectory\": \"/tmp/xxx\",\n"
+ " \"maxPendingPersists\": 100,\n"
+ " \"buildV9Directly\": false,\n"
+ " \"persistThreadPriority\": 100,\n"
+ " \"mergeThreadPriority\": 100,\n"
+ " \"reportParseExceptions\": true,\n"
@ -109,7 +107,6 @@ public class RealtimeTuningConfigTest
);
Assert.assertEquals("/tmp/xxx", config.getBasePersistDirectory().toString());
Assert.assertEquals(false, config.getBuildV9Directly());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
Assert.assertEquals(70, config.getAlertTimeout());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());

View File

@ -109,7 +109,6 @@ public class FireDepartmentTest
null,
null,
null,
TestHelper.getTestIndexMerger(),
TestHelper.getTestIndexMergerV9(),
TestHelper.getTestIndexIO(),
MapCache.create(0),

View File

@ -48,6 +48,7 @@ import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
@ -153,7 +154,7 @@ public class AppenderatorTester implements AutoCloseable
}
}
);
indexMerger = new IndexMerger(objectMapper, indexIO);
indexMerger = new IndexMergerV9(objectMapper, indexIO);
emitter = new ServiceEmitter(
"test",

View File

@ -91,7 +91,7 @@ public class IngestSegmentFirehoseTest
public final TemporaryFolder tempFolder = new TemporaryFolder();
private IndexIO indexIO = TestHelper.getTestIndexIO();
private IndexMerger indexMerger = TestHelper.getTestIndexMerger();
private IndexMerger indexMerger = TestHelper.getTestIndexMergerV9();
@Test
public void testReadFromIndexAndWriteAnotherIndex() throws Exception

View File

@ -82,7 +82,6 @@ import java.util.concurrent.TimeUnit;
public class RealtimePlumberSchoolTest
{
private final RejectionPolicyFactory rejectionPolicy;
private final boolean buildV9Directly;
private RealtimePlumber plumber;
private RealtimePlumberSchool realtimePlumberSchool;
private DataSegmentAnnouncer announcer;
@ -97,26 +96,22 @@ public class RealtimePlumberSchoolTest
private FireDepartmentMetrics metrics;
private File tmpDir;
public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy, boolean buildV9Directly)
public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy)
{
this.rejectionPolicy = rejectionPolicy;
this.buildV9Directly = buildV9Directly;
}
@Parameterized.Parameters(name = "rejectionPolicy = {0}, buildV9Directly = {1}")
@Parameterized.Parameters(name = "rejectionPolicy = {0}")
public static Collection<?> constructorFeeder() throws IOException
{
final RejectionPolicyFactory[] rejectionPolicies = new RejectionPolicyFactory[]{
new NoopRejectionPolicyFactory(),
new MessageTimeRejectionPolicyFactory()
};
final boolean[] buildV9Directlies = new boolean[]{true, false};
final List<Object[]> constructors = Lists.newArrayList();
for (RejectionPolicyFactory rejectionPolicy : rejectionPolicies) {
for (boolean buildV9Directly : buildV9Directlies) {
constructors.add(new Object[]{rejectionPolicy, buildV9Directly});
}
constructors.add(new Object[]{rejectionPolicy});
}
return constructors;
}
@ -199,7 +194,7 @@ public class RealtimePlumberSchoolTest
null,
null,
null,
buildV9Directly,
true,
0,
0,
false,
@ -215,7 +210,6 @@ public class RealtimePlumberSchoolTest
segmentPublisher,
handoffNotifierFactory,
MoreExecutors.sameThreadExecutor(),
TestHelper.getTestIndexMerger(),
TestHelper.getTestIndexMergerV9(),
TestHelper.getTestIndexIO(),
MapCache.create(0),