diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java
index f912f5e70b2..1a6fc81e4eb 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java
@@ -304,9 +304,7 @@ public class BaseColumnarLongsBenchmark
}
serializer.open();
- for (long val : vals) {
- serializer.add(val);
- }
+ serializer.addAll(vals, 0, vals.length);
serializer.writeTo(output, null);
return (int) serializer.getSerializedSize();
}
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 3b3c2711d3b..6c09e07634f 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -160,7 +160,6 @@ Druid interacts with ZooKeeper through a set of standard path configurations. We
|`druid.zk.paths.announcementsPath`|Druid service announcement path.|`${druid.zk.paths.base}/announcements`|
|`druid.zk.paths.liveSegmentsPath`|Current path for where Druid services announce their segments.|`${druid.zk.paths.base}/segments`|
|`druid.zk.paths.coordinatorPath`|Used by the Coordinator for leader election.|`${druid.zk.paths.base}/coordinator`|
-|`druid.zk.paths.servedSegmentsPath`|Deprecated. Legacy path for where Druid services announce their segments.|`${druid.zk.paths.base}/servedSegments`|
The indexing service also uses its own set of paths. These configs can be included in the common configuration.
diff --git a/docs/design/zookeeper.md b/docs/design/zookeeper.md
index 50241bd3d9d..eb8bea57741 100644
--- a/docs/design/zookeeper.md
+++ b/docs/design/zookeeper.md
@@ -53,7 +53,7 @@ ${druid.zk.paths.coordinatorPath}/_COORDINATOR
## Segment "publishing" protocol from Historical and Realtime
-The `announcementsPath` and `servedSegmentsPath` are used for this.
+The `announcementsPath` and `liveSegmentsPath` are used for this.
All [Historical](../design/historical.md) processes publish themselves on the `announcementsPath`, specifically, they will create an ephemeral znode at
@@ -64,13 +64,13 @@ ${druid.zk.paths.announcementsPath}/${druid.host}
Which signifies that they exist. They will also subsequently create a permanent znode at
```
-${druid.zk.paths.servedSegmentsPath}/${druid.host}
+${druid.zk.paths.liveSegmentsPath}/${druid.host}
```
And as they load up segments, they will attach ephemeral znodes that look like
```
-${druid.zk.paths.servedSegmentsPath}/${druid.host}/_segment_identifier_
+${druid.zk.paths.liveSegmentsPath}/${druid.host}/_segment_identifier_
```
Processes like the [Coordinator](../design/coordinator.md) and [Broker](../design/broker.md) can then watch these paths to see which processes are currently serving which segments.
diff --git a/docs/querying/sql-functions.md b/docs/querying/sql-functions.md
index 2ec056290b6..4325537a627 100644
--- a/docs/querying/sql-functions.md
+++ b/docs/querying/sql-functions.md
@@ -655,20 +655,23 @@ Returns the following:
## CHAR_LENGTH
-`CHAR_LENGTH(expr)`
-
-**Function type:** [Scalar, string](sql-scalar.md#string-functions)
-
Alias for [`LENGTH`](#length).
+* **Syntax:** `CHAR_LENGTH(expr)`
+* **Function type:** Scalar, string
+
+[Learn more](sql-scalar.md#string-functions)
+
## CHARACTER_LENGTH
-`CHARACTER_LENGTH(expr)`
-
-**Function type:** [Scalar, string](sql-scalar.md#string-functions)
-
Alias for [`LENGTH`](#length).
+* **Syntax:** `CHARACTER_LENGTH(expr)`
+* **Function type:** Scalar, string
+
+[Learn more](sql-scalar.md#string-functions)
+
+
## COALESCE
`COALESCE(expr, expr, ...)`
@@ -679,19 +682,64 @@ Returns the first non-null value.
## CONCAT
-`CONCAT(expr, expr...)`
-
-**Function type:** [Scalar, string](sql-scalar.md#string-functions)
-
Concatenates a list of expressions.
+* **Syntax:** `CONCAT(expr[, expr,...])`
+* **Function type:** Scalar, string
+
+Example
+
+The following example concatenates the `OriginCityName` column from `flight-carriers`, the string ` to `, and the `DestCityName` column from `flight-carriers`.
+
+```sql
+SELECT
+ "OriginCityName" AS "origin_city",
+ "DestCityName" AS "destination_city",
+ CONCAT("OriginCityName", ' to ', "DestCityName") AS "concatenate_flight_details"
+FROM "flight-carriers"
+LIMIT 1
+```
+
+Returns the following:
+
+| `origin_city` | `destination_city` | `concatenate_flight_details` |
+| -- | -- | -- |
+| `San Juan, PR` | `Washington, DC` | `San Juan, PR to Washington, DC` |
+
+
+
+[Learn more](sql-scalar.md#string-functions)
+
## CONTAINS_STRING
-`CONTAINS_STRING(, )`
+Returns `true` if `str` is a substring of `expr`, case-sensitive. Otherwise returns `false`.
-**Function type:** [Scalar, string](sql-scalar.md#string-functions)
+* **Syntax:** `CONTAINS_STRING(expr, str)`
+* **Function type:** Scalar, string
-Finds whether a string is in a given expression, case-sensitive.
+Example
+
+The following example returns `true` if the `OriginCityName` column from the `flight-carriers` datasource contains the substring `San`.
+
+```sql
+SELECT
+ "OriginCityName" AS "origin_city",
+ CONTAINS_STRING("OriginCityName", 'San') AS "contains_string"
+FROM "flight-carriers"
+LIMIT 2
+```
+
+Returns the following:
+
+| `origin_city` | `contains_string` |
+| -- | -- |
+| `San Juan, PR` | `true` |
+| `Boston, MA` | `false` |
+
+
+
+
+[Learn more](sql-scalar.md#string-functions)
## COS
@@ -791,13 +839,31 @@ Decodes a Base64-encoded string into a complex data type, where `dataType` is th
## DECODE_BASE64_UTF8
-`DECODE_BASE64_UTF8(expr)`
-
-**Function type:** [Scalar, string](sql-scalar.md#string-functions)
-
-
Decodes a Base64-encoded string into a UTF-8 encoded string.
+* **Syntax:** `DECODE_BASE64_UTF8(expr)`
+* **Function type:** Scalar, string
+
+Example
+
+The following example converts the base64 encoded string `SGVsbG8gV29ybGQhCg==` into an UTF-8 encoded string.
+
+```sql
+SELECT
+ 'SGVsbG8gV29ybGQhCg==' AS "base64_encoding",
+ DECODE_BASE64_UTF8('SGVsbG8gV29ybGQhCg==') AS "convert_to_UTF8_encoding"
+```
+
+Returns the following:
+
+| `base64_encoding` | `convert_to_UTF8_encoding` |
+| -- | -- |
+| `SGVsbG8gV29ybGQhCg==` | `Hello World!` |
+
+
+
+[Learn more](sql-scalar.md#string-functions)
+
## DEGREES
Converts an angle from radians to degrees.
@@ -1191,11 +1257,33 @@ Returns the following:
## ICONTAINS_STRING
-`ICONTAINS_STRING(, str)`
+Returns `true` if `str` is a substring of `expr`, case-insensitive. Otherwise returns `false`.
-**Function type:** [Scalar, string](sql-scalar.md#string-functions)
+* **Syntax:** `ICONTAINS_STRING(expr, str)`
+* **Function type:** Scalar, string
-Finds whether a string is in a given expression, case-insensitive.
+Example
+
+The following example returns `true` if the `OriginCityName` column from the `flight-carriers` datasource contains the case-insensitive substring `san`.
+
+```sql
+SELECT
+ "OriginCityName" AS "origin_city",
+ ICONTAINS_STRING("OriginCityName", 'san') AS "contains_case_insensitive_string"
+FROM "flight-carriers"
+LIMIT 2
+```
+
+Returns the following:
+
+| `origin_city` | `contains_case_insensitive_string` |
+| -- | -- |
+| `San Juan, PR` | `true` |
+| `Boston, MA` | `false` |
+
+
+
+[Learn more](sql-scalar.md#string-functions)
## IPV4_MATCH
@@ -1327,19 +1415,59 @@ Returns the minimum value from the provided arguments.
## LEFT
-`LEFT(expr, [length])`
+Returns the `N` leftmost characters of an expression, where `N` is an integer value.
-**Function type:** [Scalar, string](sql-scalar.md#string-functions)
+* **Syntax:** `LEFT(expr, N)`
+* **Function type:** Scalar, string
-Returns the leftmost number of characters from an expression.
+Example
+
+The following example returns the `3` leftmost characters of the expression `ABCDEFG`.
+
+```sql
+SELECT
+ 'ABCDEFG' AS "expression",
+ LEFT('ABCDEFG', 3) AS "leftmost_characters"
+```
+
+Returns the following:
+
+| `expression` | `leftmost_characters` |
+| -- | -- |
+| `ABCDEFG` | `ABC` |
+
+
+
+[Learn more](sql-scalar.md#string-functions)
## LENGTH
-`LENGTH(expr)`
+Returns the length of the expression in UTF-16 code units.
-**Function type:** [Scalar, string](sql-scalar.md#string-functions)
+* **Syntax:** `LENGTH(expr)`
+* **Function type:** Scalar, string
-Returns the length of the expression in UTF-16 encoding.
+Example
+
+The following example returns the character length of the `OriginCityName` column from the `flight-carriers` datasource.
+
+```sql
+SELECT
+ "OriginCityName" AS "origin_city_name",
+ LENGTH("OriginCityName") AS "city_name_length"
+FROM "flight-carriers"
+LIMIT 1
+```
+
+Returns the following:
+
+| `origin_city_name` | `city_name_length` |
+| -- | -- |
+| `San Juan, PR` | `12` |
+
+
+
+[Learn more](sql-scalar.md#string-functions)
## LN
@@ -1755,11 +1883,30 @@ Reverses the given expression.
## RIGHT
-`RIGHT(expr, [length])`
+Returns the `N` rightmost characters of an expression, where `N` is an integer value.
-**Function type:** [Scalar, string](sql-scalar.md#string-functions)
+* **Syntax:** `RIGHT(expr, N)`
+* **Function type:** Scalar, string
-Returns the rightmost number of characters from an expression.
+Example
+
+The following example returns the `3` rightmost characters of the expression `ABCDEFG`.
+
+```sql
+SELECT
+ 'ABCDEFG' AS "expression",
+ RIGHT('ABCDEFG', 3) AS "rightmost_characters"
+```
+
+Returns the following:
+
+| `expression` | `rightmost_characters` |
+| -- | -- |
+| `ABCDEFG` | `EFG` |
+
+
+
+[Learn more](sql-scalar.md#string-functions)
## ROUND
@@ -1943,12 +2090,13 @@ Splits `str1` into an multi-value string on the delimiter specified by `str2`, w
## STRLEN
-`STRLEN(expr)`
-
-**Function type:** [Scalar, string](sql-scalar.md#string-functions)
-
Alias for [`LENGTH`](#length).
+* **Syntax:** `STRLEN(expr)`
+* **Function type:** Scalar, string
+
+[Learn more](sql-scalar.md#string-functions)
+
## STRPOS
`STRPOS(, )`
@@ -2022,12 +2170,33 @@ Returns the quantile for the specified fraction from a T-Digest sketch construct
## TEXTCAT
-`TEXTCAT(, )`
-
-**Function type:** [Scalar, string](sql-scalar.md#string-functions)
-
Concatenates two string expressions.
+* **Syntax:** `TEXTCAT(expr, expr)`
+* **Function type:** Scalar, string
+
+Example
+
+The following example concatenates the `OriginState` column from the `flight-carriers` datasource to `, USA`.
+
+```sql
+SELECT
+ "OriginState" AS "origin_state",
+ TEXTCAT("OriginState", ', USA') AS "concatenate_state_with_USA"
+FROM "flight-carriers"
+LIMIT 1
+```
+
+Returns the following:
+
+| `origin_state` | `concatenate_state_with_USA` |
+| -- | -- |
+| `PR` | `PR, USA` |
+
+
+
+[Learn more](sql-scalar.md#string-functions)
+
## THETA_SKETCH_ESTIMATE
`THETA_SKETCH_ESTIMATE(expr)`
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
index 7b4d3235dc0..efc2cbb2afb 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
@@ -66,7 +66,6 @@ import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
@@ -120,15 +119,14 @@ public class MSQCompactionRunner implements CompactionRunner
*
*
partitionsSpec of type HashedParititionsSpec.
*
maxTotalRows in DynamicPartitionsSpec.
- *
rollup set to false in granularitySpec when metricsSpec is specified. Null is treated as true.
- *
queryGranularity set to ALL in granularitySpec.
- *
Each metric has output column name same as the input name.
+ *
rollup in granularitySpec set to false when metricsSpec is specified or true when it's null.
+ * Null is treated as true if metricsSpec exist and false if empty.
+ *
any metric is non-idempotent, i.e. it defines some aggregatorFactory 'A' s.t. 'A != A.combiningFactory()'.
*
*/
@Override
public CompactionConfigValidationResult validateCompactionTask(
- CompactionTask compactionTask,
- Map intervalToDataSchemaMap
+ CompactionTask compactionTask
)
{
List validationResults = new ArrayList<>();
@@ -144,57 +142,13 @@ public class MSQCompactionRunner implements CompactionRunner
));
}
validationResults.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(compactionTask.getContext()));
- validationResults.add(validateRolledUpSegments(intervalToDataSchemaMap));
+ validationResults.add(ClientCompactionRunnerInfo.validateMetricsSpecForMSQ(compactionTask.getMetricsSpec()));
return validationResults.stream()
.filter(result -> !result.isValid())
.findFirst()
.orElse(CompactionConfigValidationResult.success());
}
- /**
- * Valides that there are no rolled-up segments where either:
- *
- *
aggregator factory differs from its combining factory
- *
input col name is different from the output name (non-idempotent)
- *
- */
- private CompactionConfigValidationResult validateRolledUpSegments(Map intervalToDataSchemaMap)
- {
- for (Map.Entry intervalDataSchema : intervalToDataSchemaMap.entrySet()) {
- if (intervalDataSchema.getValue() instanceof CombinedDataSchema) {
- CombinedDataSchema combinedDataSchema = (CombinedDataSchema) intervalDataSchema.getValue();
- if (combinedDataSchema.hasRolledUpSegments()) {
- for (AggregatorFactory aggregatorFactory : combinedDataSchema.getAggregators()) {
- // This is a conservative check as existing rollup may have been idempotent but the aggregator provided in
- // compaction spec isn't. This would get properly compacted yet fails in the below pre-check.
- if (
- !(
- aggregatorFactory.getClass().equals(aggregatorFactory.getCombiningFactory().getClass()) &&
- (
- aggregatorFactory.requiredFields().isEmpty() ||
- (aggregatorFactory.requiredFields().size() == 1 &&
- aggregatorFactory.requiredFields()
- .get(0)
- .equals(aggregatorFactory.getName()))
- )
- )
- ) {
- // MSQ doesn't support rolling up already rolled-up segments when aggregate column name is different from
- // the aggregated column name. This is because the aggregated values would then get overwritten by new
- // values and the existing values would be lost. Note that if no rollup is specified in an index spec,
- // the default value is true.
- return CompactionConfigValidationResult.failure(
- "MSQ: Rolled-up segments in compaction interval[%s].",
- intervalDataSchema.getKey()
- );
- }
- }
- }
- }
- }
- return CompactionConfigValidationResult.success();
- }
-
@Override
public CurrentSubTaskHolder getCurrentSubTaskHolder()
{
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
index 6c5d1957265..d868ddf20e5 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
@@ -42,7 +42,6 @@ import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.TuningConfigBuilder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.math.expr.ExprMacroTable;
@@ -60,7 +59,6 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
-import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.transform.TransformSpec;
@@ -131,7 +129,7 @@ public class MSQCompactionRunnerTest
null,
null
);
- Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
+ Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}
@Test
@@ -144,7 +142,7 @@ public class MSQCompactionRunnerTest
null,
null
);
- Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
+ Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}
@Test
@@ -157,7 +155,7 @@ public class MSQCompactionRunnerTest
null,
null
);
- Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
+ Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}
@Test
@@ -170,7 +168,7 @@ public class MSQCompactionRunnerTest
null,
null
);
- Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
+ Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}
@Test
@@ -183,7 +181,7 @@ public class MSQCompactionRunnerTest
new ClientCompactionTaskGranularitySpec(null, Granularities.ALL, null),
null
);
- Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
+ Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}
@Test
@@ -196,7 +194,41 @@ public class MSQCompactionRunnerTest
new ClientCompactionTaskGranularitySpec(null, null, false),
AGGREGATORS.toArray(new AggregatorFactory[0])
);
- Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
+ Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+ }
+
+ @Test
+ public void testRollupTrueWithoutMetricsSpecIsInValid()
+ {
+ CompactionTask compactionTask = createCompactionTask(
+ new DynamicPartitionsSpec(3, null),
+ null,
+ Collections.emptyMap(),
+ new ClientCompactionTaskGranularitySpec(null, null, true),
+ null
+ );
+ Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+ }
+
+ @Test
+ public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
+ {
+ // Aggregators having different input and ouput column names are unsupported.
+ final String inputColName = "added";
+ final String outputColName = "sum_added";
+ CompactionTask compactionTask = createCompactionTask(
+ new DynamicPartitionsSpec(3, null),
+ null,
+ Collections.emptyMap(),
+ new ClientCompactionTaskGranularitySpec(null, null, null),
+ new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}
+ );
+ CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask);
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ "MSQ: Non-idempotent aggregator[sum_added] not supported in 'metricsSpec'.",
+ validationResult.getReason()
+ );
}
@Test
@@ -345,48 +377,6 @@ public class MSQCompactionRunnerTest
Assert.assertEquals(WorkerAssignmentStrategy.MAX, actualMSQSpec.getAssignmentStrategy());
}
- @Test
- public void testIntervalsWithRolledUpSegmentsAndNonIdempotentAggregatorFails()
- {
- final String inputColName = "added";
- final String outputColName = "sum_added";
- CompactionTask compactionTask = createCompactionTask(
- null,
- null,
- Collections.emptyMap(),
- null,
- new AggregatorFactory[]{
- new LongSumAggregatorFactory(
- outputColName,
- inputColName
- )
- }
- );
- CombinedDataSchema dataSchema = new CombinedDataSchema(
- DATA_SOURCE,
- new TimestampSpec(TIMESTAMP_COLUMN, null, null),
- new DimensionsSpec(DIMENSIONS),
- new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)},
- new UniformGranularitySpec(
- SEGMENT_GRANULARITY.getDefaultGranularity(),
- null,
- false,
- Collections.singletonList(COMPACTION_INTERVAL)
- ),
- null,
- true
- );
- CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(
- compactionTask,
- Collections.singletonMap(COMPACTION_INTERVAL, dataSchema)
- );
- Assert.assertFalse(validationResult.isValid());
- Assert.assertEquals(validationResult.getReason(), StringUtils.format(
- "MSQ: Rolled-up segments in compaction interval[%s].",
- COMPACTION_INTERVAL
- ));
- }
-
private CompactionTask createCompactionTask(
@Nullable PartitionsSpec partitionsSpec,
@Nullable DimFilter dimFilter,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
index 0abaeed8eb2..8d30a60d04e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
@@ -57,9 +57,6 @@ public interface CompactionRunner
* Checks if the provided compaction config is supported by the runner.
* The same validation is done at {@link org.apache.druid.msq.indexing.MSQCompactionRunner#validateCompactionTask}
*/
- CompactionConfigValidationResult validateCompactionTask(
- CompactionTask compactionTask,
- Map intervalToDataSchemaMap
- );
+ CompactionConfigValidationResult validateCompactionTask(CompactionTask compactionTask);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 68320387845..8659eb0f397 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -77,7 +77,6 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
-import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
@@ -460,13 +459,11 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
transformSpec,
metricsSpec,
granularitySpec,
- getMetricBuilder(),
- compactionRunner
+ getMetricBuilder()
);
registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder());
- CompactionConfigValidationResult supportsCompactionConfig =
- compactionRunner.validateCompactionTask(this, intervalDataSchemas);
+ CompactionConfigValidationResult supportsCompactionConfig = compactionRunner.validateCompactionTask(this);
if (!supportsCompactionConfig.isValid()) {
throw InvalidInput.exception("Compaction spec not supported. Reason[%s].", supportsCompactionConfig.getReason());
}
@@ -488,8 +485,7 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
@Nullable final ClientCompactionTaskTransformSpec transformSpec,
@Nullable final AggregatorFactory[] metricsSpec,
@Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
- final ServiceMetricEvent.Builder metricBuilder,
- CompactionRunner compactionRunner
+ final ServiceMetricEvent.Builder metricBuilder
) throws IOException
{
final Iterable timelineSegments = retrieveRelevantTimelineHolders(
@@ -553,8 +549,7 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
metricsSpec,
granularitySpec == null
? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null, null)
- : granularitySpec.withSegmentGranularity(segmentGranularityToUse),
- compactionRunner
+ : granularitySpec.withSegmentGranularity(segmentGranularityToUse)
);
intervalDataSchemaMap.put(interval, dataSchema);
}
@@ -579,8 +574,7 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
dimensionsSpec,
transformSpec,
metricsSpec,
- granularitySpec,
- compactionRunner
+ granularitySpec
);
return Collections.singletonMap(segmentProvider.interval, dataSchema);
}
@@ -610,17 +604,13 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
@Nullable DimensionsSpec dimensionsSpec,
@Nullable ClientCompactionTaskTransformSpec transformSpec,
@Nullable AggregatorFactory[] metricsSpec,
- @Nonnull ClientCompactionTaskGranularitySpec granularitySpec,
- @Nullable CompactionRunner compactionRunner
+ @Nonnull ClientCompactionTaskGranularitySpec granularitySpec
)
{
// Check index metadata & decide which values to propagate (i.e. carry over) for rollup & queryGranularity
final ExistingSegmentAnalyzer existingSegmentAnalyzer = new ExistingSegmentAnalyzer(
segments,
- // For MSQ, always need rollup to check if there are some rollup segments already present.
- compactionRunner instanceof NativeCompactionRunner
- ? (granularitySpec.isRollup() == null)
- : true,
+ granularitySpec.isRollup() == null,
granularitySpec.getQueryGranularity() == null,
dimensionsSpec == null,
metricsSpec == null
@@ -675,14 +665,13 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
finalMetricsSpec = metricsSpec;
}
- return new CombinedDataSchema(
+ return new DataSchema(
dataSource,
new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null),
finalDimensionsSpec,
finalMetricsSpec,
uniformGranularitySpec,
- transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null),
- existingSegmentAnalyzer.hasRolledUpSegments()
+ transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null)
);
}
@@ -759,7 +748,6 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
// For processRollup:
private boolean rollup = true;
- private boolean hasRolledUpSegments = false;
// For processQueryGranularity:
private Granularity queryGranularity;
@@ -827,11 +815,6 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
return rollup;
}
- public boolean hasRolledUpSegments()
- {
- return hasRolledUpSegments;
- }
-
public Granularity getQueryGranularity()
{
if (!needQueryGranularity) {
@@ -921,7 +904,6 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
// Pick rollup value if all segments being compacted have the same, non-null, value otherwise set it to false
final Boolean isIndexRollup = index.getMetadata().isRollup();
rollup = rollup && Boolean.valueOf(true).equals(isIndexRollup);
- hasRolledUpSegments = hasRolledUpSegments || Boolean.valueOf(true).equals(isIndexRollup);
}
private void processQueryGranularity(final QueryableIndex index)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
index 5aa7af71451..2074d14f0f9 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
@@ -85,8 +85,7 @@ public class NativeCompactionRunner implements CompactionRunner
@Override
public CompactionConfigValidationResult validateCompactionTask(
- CompactionTask compactionTask,
- Map intervalToDataSchemaMap
+ CompactionTask compactionTask
)
{
return CompactionConfigValidationResult.success();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 3a386bc4aa7..3c144929546 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -737,7 +737,7 @@ public class CompactionTaskTest
);
provider.checkSegments(LockGranularity.TIME_CHUNK, ImmutableList.of());
}
-
+
@Test
public void testCreateIngestionSchema() throws IOException
{
@@ -749,8 +749,7 @@ public class CompactionTaskTest
null,
null,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -811,8 +810,7 @@ public class CompactionTaskTest
null,
null,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -874,8 +872,7 @@ public class CompactionTaskTest
null,
null,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -938,8 +935,7 @@ public class CompactionTaskTest
null,
null,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1009,8 +1005,7 @@ public class CompactionTaskTest
null,
null,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1060,8 +1055,7 @@ public class CompactionTaskTest
null,
customMetricsSpec,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1104,8 +1098,7 @@ public class CompactionTaskTest
null,
null,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1155,8 +1148,7 @@ public class CompactionTaskTest
null,
null,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
NativeCompactionRunner.createIngestionSpecs(
@@ -1186,8 +1178,7 @@ public class CompactionTaskTest
null,
null,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
NativeCompactionRunner.createIngestionSpecs(
@@ -1228,8 +1219,7 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null, null),
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1273,8 +1263,7 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null), null),
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
dataSchemasForIntervals,
@@ -1319,8 +1308,7 @@ public class CompactionTaskTest
new PeriodGranularity(Period.months(3), null, null),
null
),
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1367,8 +1355,7 @@ public class CompactionTaskTest
null,
null,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1413,8 +1400,7 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(null, null, null),
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1459,8 +1445,7 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(null, null, true),
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1490,8 +1475,7 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(null, null, null),
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1871,14 +1855,6 @@ public class CompactionTaskTest
}
}
- final Metadata metadata = new Metadata(
- null,
- aggregatorFactories.toArray(new AggregatorFactory[0]),
- null,
- null,
- null
- );
-
queryableIndexMap.put(
entry.getValue(),
new SimpleQueryableIndex(
@@ -1887,9 +1863,21 @@ public class CompactionTaskTest
null,
columnMap,
null,
- metadata,
false
)
+ {
+ @Override
+ public Metadata getMetadata()
+ {
+ return new Metadata(
+ null,
+ aggregatorFactories.toArray(new AggregatorFactory[0]),
+ null,
+ null,
+ null
+ );
+ }
+ }
);
}
}
@@ -1912,10 +1900,15 @@ public class CompactionTaskTest
index.getBitmapFactoryForDimensions(),
index.getColumns(),
index.getFileMapper(),
- null,
false
)
- );
+ {
+ @Override
+ public Metadata getMetadata()
+ {
+ return null;
+ }
+ });
}
}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index d09bced3313..8f070f33405 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -80,7 +80,6 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
-import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
@@ -184,7 +183,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), null, false, false),
new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L, null)
},
- false
+ false,
+ CompactionEngine.NATIVE
);
// should now only have 1 row after compaction
// added = null, count = 3, sum_added = 93.0
@@ -286,7 +286,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
),
new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L, null)
},
- false
+ false,
+ CompactionEngine.NATIVE
);
// should now only have 1 row after compaction
// added = null, count = 3, sum_added = 93
@@ -328,8 +329,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
- @Test(dataProvider = "engine")
- public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics(CompactionEngine engine) throws Exception
+ @Test()
+ public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics() throws Exception
{
// added = 31, count = null, sum_added = null
loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS);
@@ -357,7 +358,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
null,
new AggregatorFactory[] {new CountAggregatorFactory("count"), new LongSumAggregatorFactory("sum_added", "added")},
false,
- engine
+ CompactionEngine.NATIVE
);
// should now only have 1 row after compaction
// added = null, count = 2, sum_added = 62
@@ -480,7 +481,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
null,
new AggregatorFactory[] {new CountAggregatorFactory("count"), new LongSumAggregatorFactory("sum_added", "added")},
- false
+ false,
+ CompactionEngine.NATIVE
);
// should now only have 1 row after compaction
// added = null, count = 4, sum_added = 124
@@ -521,7 +523,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1));
+ submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1), CompactionEngine.NATIVE);
//...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (3 total)
forceTriggerAutoCompaction(3);
verifyQuery(INDEX_QUERIES_RESOURCE);
@@ -539,7 +541,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
0,
1,
1);
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET);
+ submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, CompactionEngine.NATIVE);
//...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (2 total)
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
@@ -651,7 +653,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET);
+ submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, CompactionEngine.NATIVE);
// ...should remains unchanged (4 total)
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
@@ -863,7 +865,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to true
// "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
- submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine);
+ submitCompactionConfig(
+ 1000,
+ NO_SKIP_OFFSET,
+ new UserCompactionTaskGranularityConfig(newGranularity, null, null),
+ true,
+ engine
+ );
List expectedIntervalAfterCompaction = new ArrayList<>();
for (String interval : intervalsBeforeCompaction) {
@@ -881,7 +889,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
// "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
newGranularity = Granularities.MONTH;
// Set dropExisting to true
- submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine);
+ submitCompactionConfig(
+ 1000,
+ NO_SKIP_OFFSET,
+ new UserCompactionTaskGranularityConfig(newGranularity, null, null),
+ true,
+ engine
+ );
// Since dropExisting is set to true...
// Again data is only in two days
@@ -950,7 +964,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to false
- submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
+ submitCompactionConfig(
+ 1000,
+ NO_SKIP_OFFSET,
+ new UserCompactionTaskGranularityConfig(newGranularity, null, null),
+ false,
+ CompactionEngine.NATIVE
+ );
LOG.info("Auto compaction test with YEAR segment granularity");
@@ -967,7 +987,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
newGranularity = Granularities.DAY;
// Set dropExisting to false
- submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
+ submitCompactionConfig(
+ 1000,
+ NO_SKIP_OFFSET,
+ new UserCompactionTaskGranularityConfig(newGranularity, null, null),
+ false,
+ CompactionEngine.NATIVE
+ );
LOG.info("Auto compaction test with DAY segment granularity");
@@ -1169,7 +1195,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to false
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
+ submitCompactionConfig(
+ MAX_ROWS_PER_SEGMENT_COMPACTED,
+ NO_SKIP_OFFSET,
+ new UserCompactionTaskGranularityConfig(newGranularity, null, null),
+ false,
+ CompactionEngine.NATIVE
+ );
List expectedIntervalAfterCompaction = new ArrayList<>();
// We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR)
@@ -1195,7 +1227,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
newGranularity = Granularities.MONTH;
// Set dropExisting to false
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
+ submitCompactionConfig(
+ MAX_ROWS_PER_SEGMENT_COMPACTED,
+ NO_SKIP_OFFSET,
+ new UserCompactionTaskGranularityConfig(newGranularity, null, null),
+ false,
+ CompactionEngine.NATIVE
+ );
// Since dropExisting is set to true...
// This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity
expectedIntervalAfterCompaction = new ArrayList<>();
@@ -1239,7 +1277,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(Granularities.WEEK, null, null),
- false
+ false,
+ CompactionEngine.NATIVE
);
// Before compaction, we have segments with the interval 2013-08-01/2013-09-01 and 2013-09-01/2013-10-01
// We will compact the latest segment, 2013-09-01/2013-10-01, to WEEK.
@@ -1319,8 +1358,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
- @Test(dataProvider = "engine")
- public void testAutoCompactionDutyWithRollup(CompactionEngine engine) throws Exception
+ @Test()
+ public void testAutoCompactionDutyWithRollup() throws Exception
{
final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"));
Map specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
@@ -1337,7 +1376,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(null, null, true),
false,
- engine
+ CompactionEngine.NATIVE
);
forceTriggerAutoCompaction(2);
queryAndResultFields = ImmutableMap.of(
@@ -1470,7 +1509,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
null,
new UserCompactionTaskTransformConfig(new SelectorDimFilter("page", "Striker Eureka", null)),
null,
- false
+ false,
+ CompactionEngine.NATIVE
);
forceTriggerAutoCompaction(2);
@@ -1517,7 +1557,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
null,
null,
new AggregatorFactory[] {new DoubleSumAggregatorFactory("double_sum_added", "added"), new LongSumAggregatorFactory("long_sum_added", "added")},
- false
+ false,
+ CompactionEngine.NATIVE
);
forceTriggerAutoCompaction(2);
@@ -1577,7 +1618,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
null,
null,
null,
- false
+ false,
+ CompactionEngine.NATIVE
);
// Compact the MONTH segment
forceTriggerAutoCompaction(2);
@@ -1679,57 +1721,31 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
queryHelper.testQueriesFromString(queryResponseTemplate);
}
- private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest)
- throws Exception
- {
- submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, null, null);
- }
-
private void submitCompactionConfig(
Integer maxRowsPerSegment,
Period skipOffsetFromLatest,
- @Nullable CompactionEngine engine
+ CompactionEngine engine
) throws Exception
{
submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, null, engine);
}
- private void submitCompactionConfig(
- Integer maxRowsPerSegment,
- Period skipOffsetFromLatest,
- UserCompactionTaskGranularityConfig granularitySpec
- ) throws Exception
- {
- submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, granularitySpec, false, null);
- }
-
-
private void submitCompactionConfig(
Integer maxRowsPerSegment,
Period skipOffsetFromLatest,
UserCompactionTaskGranularityConfig granularitySpec,
- @Nullable CompactionEngine engine
+ CompactionEngine engine
) throws Exception
{
submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, granularitySpec, false, engine);
}
- private void submitCompactionConfig(
- Integer maxRowsPerSegment,
- Period skipOffsetFromLatest,
- UserCompactionTaskGranularityConfig granularitySpec,
- boolean dropExisting
- ) throws Exception
- {
- submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, granularitySpec, dropExisting, null);
- }
-
private void submitCompactionConfig(
Integer maxRowsPerSegment,
Period skipOffsetFromLatest,
UserCompactionTaskGranularityConfig granularitySpec,
boolean dropExisting,
- @Nullable CompactionEngine engine
+ CompactionEngine engine
) throws Exception
{
submitCompactionConfig(
@@ -1744,28 +1760,6 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
);
}
- private void submitCompactionConfig(
- Integer maxRowsPerSegment,
- Period skipOffsetFromLatest,
- UserCompactionTaskGranularityConfig granularitySpec,
- UserCompactionTaskDimensionsConfig dimensionsSpec,
- UserCompactionTaskTransformConfig transformSpec,
- AggregatorFactory[] metricsSpec,
- boolean dropExisting
- ) throws Exception
- {
- submitCompactionConfig(
- maxRowsPerSegment,
- skipOffsetFromLatest,
- granularitySpec,
- dimensionsSpec,
- transformSpec,
- metricsSpec,
- dropExisting,
- null
- );
- }
-
private void submitCompactionConfig(
Integer maxRowsPerSegment,
Period skipOffsetFromLatest,
@@ -1774,7 +1768,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
UserCompactionTaskTransformConfig transformSpec,
AggregatorFactory[] metricsSpec,
boolean dropExisting,
- @Nullable CompactionEngine engine
+ CompactionEngine engine
) throws Exception
{
submitCompactionConfig(
@@ -1799,7 +1793,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
UserCompactionTaskTransformConfig transformSpec,
AggregatorFactory[] metricsSpec,
boolean dropExisting,
- @Nullable CompactionEngine engine
+ CompactionEngine engine
) throws Exception
{
DataSourceCompactionConfig dataSourceCompactionConfig = new DataSourceCompactionConfig(
diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/SemanticCreator.java b/processing/src/main/java/org/apache/druid/common/semantic/SemanticCreator.java
similarity index 84%
rename from processing/src/main/java/org/apache/druid/query/rowsandcols/SemanticCreator.java
rename to processing/src/main/java/org/apache/druid/common/semantic/SemanticCreator.java
index bb1af0e4d9f..0142b3e8ed0 100644
--- a/processing/src/main/java/org/apache/druid/query/rowsandcols/SemanticCreator.java
+++ b/processing/src/main/java/org/apache/druid/common/semantic/SemanticCreator.java
@@ -17,7 +17,9 @@
* under the License.
*/
-package org.apache.druid.query.rowsandcols;
+package org.apache.druid.common.semantic;
+
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
@@ -26,8 +28,8 @@ import java.lang.annotation.Target;
/**
* Annotation used to indicate that the method is used as a creator for a semantic interface.
- *
- * Used in conjuction with {@link RowsAndColumns#makeAsMap(Class)} to build maps for simplified implementation of
+ *
+ * Used in conjuction with {@link SemanticUtils#makeAsMap(Class)} to build maps for simplified implementation of
* the {@link RowsAndColumns#as(Class)} method.
*/
@Retention(RetentionPolicy.RUNTIME)
diff --git a/processing/src/main/java/org/apache/druid/common/semantic/SemanticUtils.java b/processing/src/main/java/org/apache/druid/common/semantic/SemanticUtils.java
new file mode 100644
index 00000000000..4424b5fcccc
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/common/semantic/SemanticUtils.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.common.semantic;
+
+import org.apache.druid.error.DruidException;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+public class SemanticUtils
+{
+ private static final Map, Map, Function, ?>>> OVERRIDES = new LinkedHashMap<>();
+
+ /**
+ * Allows the registration of overrides, which allows overriding of already existing mappings.
+ * This allows extensions to register mappings.
+ */
+ @SuppressWarnings("unused")
+ public static void registerAsOverride(Class clazz, Class asInterface, Function fn)
+ {
+ final Map, Function, ?>> classOverrides = OVERRIDES.computeIfAbsent(
+ clazz,
+ theClazz -> new LinkedHashMap<>()
+ );
+
+ final Function, ?> oldVal = classOverrides.get(asInterface);
+ if (oldVal != null) {
+ throw DruidException.defensive(
+ "Attempt to side-override the same interface [%s] multiple times for the same class [%s].",
+ asInterface,
+ clazz
+ );
+ } else {
+ classOverrides.put(asInterface, fn);
+ }
+ }
+
+ public static Map, Function> makeAsMap(Class clazz)
+ {
+ final Map, Function> retVal = new HashMap<>();
+
+ for (Method method : clazz.getMethods()) {
+ if (method.isAnnotationPresent(SemanticCreator.class)) {
+ if (method.getParameterCount() != 0) {
+ throw DruidException.defensive("Method [%s] annotated with SemanticCreator was not 0-argument.", method);
+ }
+
+ retVal.put(method.getReturnType(), arg -> {
+ try {
+ return method.invoke(arg);
+ }
+ catch (InvocationTargetException | IllegalAccessException e) {
+ throw DruidException.defensive().build(e, "Problem invoking method [%s]", method);
+ }
+ });
+ }
+ }
+
+ final Map, Function, ?>> classOverrides = OVERRIDES.get(clazz);
+ if (classOverrides != null) {
+ for (Map.Entry, Function, ?>> overrideEntry : classOverrides.entrySet()) {
+ //noinspection unchecked
+ retVal.put(overrideEntry.getKey(), (Function) overrideEntry.getValue());
+ }
+ }
+
+ return retVal;
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/error/DruidException.java b/processing/src/main/java/org/apache/druid/error/DruidException.java
index a04f3f6512c..b1056bbef52 100644
--- a/processing/src/main/java/org/apache/druid/error/DruidException.java
+++ b/processing/src/main/java/org/apache/druid/error/DruidException.java
@@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.concurrent.NotThreadSafe;
+import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -130,6 +131,8 @@ import java.util.Map;
@NotThreadSafe
public class DruidException extends RuntimeException
{
+ public static final String CLASS_NAME_STR = DruidException.class.getName();
+
/**
* Starts building a "general" DruidException targeting the specified persona.
*
@@ -176,6 +179,17 @@ public class DruidException extends RuntimeException
return defensive().build(format, args);
}
+ /**
+ * Build a "defensive" exception, this is an exception that should never actually be triggered, but we are
+ * throwing it inside a defensive check.
+ *
+ * @return A builder for a defensive exception.
+ */
+ public static DruidException defensive(Throwable cause, String format, Object... args)
+ {
+ return defensive().build(cause, format, args);
+ }
+
/**
* Build a "defensive" exception, this is an exception that should never actually be triggered. Throw to
* allow messages to be seen by developers
@@ -467,7 +481,7 @@ public class DruidException extends RuntimeException
public DruidException build(Throwable cause, String formatMe, Object... vals)
{
- return new DruidException(
+ final DruidException retVal = new DruidException(
cause,
errorCode,
targetPersona,
@@ -475,6 +489,19 @@ public class DruidException extends RuntimeException
StringUtils.nonStrictFormat(formatMe, vals),
deserialized
);
+
+ StackTraceElement[] stackTrace = retVal.getStackTrace();
+ int firstNonDruidExceptionIndex = 0;
+ while (
+ firstNonDruidExceptionIndex < stackTrace.length
+ && stackTrace[firstNonDruidExceptionIndex].getClassName().startsWith(CLASS_NAME_STR)) {
+ ++firstNonDruidExceptionIndex;
+ }
+ if (firstNonDruidExceptionIndex > 0) {
+ retVal.setStackTrace(Arrays.copyOfRange(stackTrace, firstNonDruidExceptionIndex, stackTrace.length));
+ }
+
+ return retVal;
}
}
diff --git a/processing/src/main/java/org/apache/druid/error/NotYetImplemented.java b/processing/src/main/java/org/apache/druid/error/NotYetImplemented.java
new file mode 100644
index 00000000000..b283034fab3
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/error/NotYetImplemented.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.error;
+
+/**
+ * A failure class that is used to indicate that something is just not implemented yet. This is useful when a
+ * developer builds something and they intentionally do not implement a specific branch of code or type of object.
+ *
+ * The lack of implementation is not necessarily a statement that it SHOULDN'T be implemented, it's just an indication
+ * that it has not YET been implemented. When one of these exceptions is seen, it is usually an indication that it is
+ * now time to actually implement the path that was previously elided.
+ *
+ * Oftentimes, the code path wasn't implemented because the developer thought that it wasn't actually possible to
+ * see it executed. So, collecting and providing information about why the particular path got executed is often
+ * extremely helpful in understanding why it happened and accelerating the implementation of what the correct behavior
+ * should be.
+ */
+public class NotYetImplemented extends DruidException.Failure
+{
+ public static DruidException ex(Throwable t, String msg, Object... args)
+ {
+ return DruidException.fromFailure(new NotYetImplemented(t, msg, args));
+ }
+
+ private final Throwable t;
+ private final String msg;
+ private final Object[] args;
+
+ public NotYetImplemented(Throwable t, String msg, Object[] args)
+ {
+ super("notYetImplemented");
+ this.t = t;
+ this.msg = msg;
+ this.args = args;
+ }
+
+
+ @Override
+ protected DruidException makeException(DruidException.DruidExceptionBuilder bob)
+ {
+ bob = bob.forPersona(DruidException.Persona.DEVELOPER)
+ .ofCategory(DruidException.Category.DEFENSIVE);
+
+ if (t == null) {
+ return bob.build(msg, args);
+ } else {
+ return bob.build(t, msg, args);
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/frame/read/FrameReader.java b/processing/src/main/java/org/apache/druid/frame/read/FrameReader.java
index 8ddf99325d3..46a848fb6b1 100644
--- a/processing/src/main/java/org/apache/druid/frame/read/FrameReader.java
+++ b/processing/src/main/java/org/apache/druid/frame/read/FrameReader.java
@@ -20,6 +20,7 @@
package org.apache.druid.frame.read;
import com.google.common.base.Preconditions;
+import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.field.FieldReader;
import org.apache.druid.frame.field.FieldReaders;
@@ -31,7 +32,6 @@ import org.apache.druid.frame.read.columnar.FrameColumnReaders;
import org.apache.druid.frame.segment.row.FrameCursorFactory;
import org.apache.druid.frame.write.FrameWriterUtils;
import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
@@ -44,7 +44,7 @@ import java.util.Set;
/**
* Embeds the logic to read frames with a given {@link RowSignature}.
- *
+ *
* Stateless and immutable.
*/
public class FrameReader
@@ -146,7 +146,7 @@ public class FrameReader
case ROW_BASED:
return new FrameCursorFactory(frame, this, fieldReaders);
default:
- throw new ISE("Unrecognized frame type [%s]", frame.type());
+ throw DruidException.defensive("Unrecognized frame type [%s]", frame.type());
}
}
diff --git a/processing/src/main/java/org/apache/druid/guice/JsonConfigurator.java b/processing/src/main/java/org/apache/druid/guice/JsonConfigurator.java
index 1e4f18dc1cd..8a53dbffabf 100644
--- a/processing/src/main/java/org/apache/druid/guice/JsonConfigurator.java
+++ b/processing/src/main/java/org/apache/druid/guice/JsonConfigurator.java
@@ -236,7 +236,7 @@ public class JsonConfigurator
// to configure ParametrizedUriEmitterConfig object. So skipping xxx=yyy key-value pair when configuring Emitter
// doesn't make any difference. That is why we just log this situation, instead of throwing an exception.
log.info(
- "Skipping %s property: one of it's prefixes is also used as a property key. Prefix: %s",
+ "Skipping property [%s]: one of it's prefixes [%s] is also used as a property key.",
originalProperty,
propertyPrefix
);
diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/ArrayListRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/ArrayListRowsAndColumns.java
index 05b9dee5458..3b5541ca5bb 100644
--- a/processing/src/main/java/org/apache/druid/query/rowsandcols/ArrayListRowsAndColumns.java
+++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/ArrayListRowsAndColumns.java
@@ -23,6 +23,8 @@ import it.unimi.dsi.fastutil.Arrays;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntComparator;
import it.unimi.dsi.fastutil.ints.IntList;
+import org.apache.druid.common.semantic.SemanticCreator;
+import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.operator.ColumnWithDirection;
@@ -73,7 +75,7 @@ import java.util.function.Function;
public class ArrayListRowsAndColumns implements AppendableRowsAndColumns
{
@SuppressWarnings("rawtypes")
- private static final Map, Function> AS_MAP = RowsAndColumns
+ private static final Map, Function> AS_MAP = SemanticUtils
.makeAsMap(ArrayListRowsAndColumns.class);
private final ArrayList rows;
diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java
index 0dae40467f3..ce199a7803c 100644
--- a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java
+++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java
@@ -20,6 +20,8 @@
package org.apache.druid.query.rowsandcols;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.common.semantic.SemanticCreator;
+import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory;
import org.apache.druid.frame.key.KeyColumn;
@@ -66,7 +68,7 @@ import java.util.function.Function;
public class LazilyDecoratedRowsAndColumns implements RowsAndColumns
{
- private static final Map, Function> AS_MAP = RowsAndColumns
+ private static final Map, Function> AS_MAP = SemanticUtils
.makeAsMap(LazilyDecoratedRowsAndColumns.class);
private RowsAndColumns base;
diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
index 121e4863bcd..42972f9340d 100644
--- a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
+++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
@@ -25,6 +25,7 @@ import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
+import org.apache.druid.query.rowsandcols.column.LongArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.segment.column.ColumnType;
@@ -170,6 +171,12 @@ public class MapOfColumnsRowsAndColumns implements RowsAndColumns
return add(name, new IntArrayColumn(vals));
}
+ @SuppressWarnings("unused")
+ public Builder add(String name, long[] vals)
+ {
+ return add(name, new LongArrayColumn(vals));
+ }
+
public Builder add(String name, double[] vals)
{
return add(name, new DoubleArrayColumn(vals));
diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java
index d139265d147..7b6a1f6215d 100644
--- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java
+++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java
@@ -19,19 +19,13 @@
package org.apache.druid.query.rowsandcols;
-import org.apache.druid.error.DruidException;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.Function;
/**
* An interface representing a chunk of RowsAndColumns. Essentially a RowsAndColumns is just a batch of rows
@@ -75,31 +69,6 @@ public interface RowsAndColumns
return retVal;
}
- static Map, Function> makeAsMap(Class clazz)
- {
- Map, Function> retVal = new HashMap<>();
-
- for (Method method : clazz.getMethods()) {
- if (method.isAnnotationPresent(SemanticCreator.class)) {
- if (method.getParameterCount() != 0) {
- throw DruidException.defensive("Method [%s] annotated with SemanticCreator was not 0-argument.", method);
- }
-
- retVal.put(method.getReturnType(), arg -> {
- try {
- return method.invoke(arg);
- }
- catch (InvocationTargetException | IllegalAccessException e) {
- throw DruidException.defensive().build(e, "Problem invoking method [%s]", method);
- }
- });
- }
- }
-
- return retVal;
- }
-
-
/**
* The set of column names available from the RowsAndColumns
*
diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/BinarySearchableAccessor.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/BinarySearchableAccessor.java
index 4eddcc77f1c..f7c339b2080 100644
--- a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/BinarySearchableAccessor.java
+++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/BinarySearchableAccessor.java
@@ -22,6 +22,10 @@ package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.rowsandcols.util.FindResult;
+/**
+ * The implementations of this interface will not validate that things are sorted for the binary search, it assumes that
+ * they must be. As such, behavior are undefined if the column is not actually sorted.
+ */
public interface BinarySearchableAccessor extends ColumnAccessor
{
static BinarySearchableAccessor fromColumn(Column col)
diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ConstantObjectColumn.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ConstantObjectColumn.java
index 28a7c3dd10d..01af9b07536 100644
--- a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ConstantObjectColumn.java
+++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ConstantObjectColumn.java
@@ -19,7 +19,7 @@
package org.apache.druid.query.rowsandcols.column;
-import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.error.DruidException;
import org.apache.druid.query.rowsandcols.util.FindResult;
import org.apache.druid.segment.column.ColumnType;
@@ -55,7 +55,13 @@ public class ConstantObjectColumn implements Column
if (VectorCopier.class.equals(clazz)) {
return (T) (VectorCopier) (into, intoStart) -> {
if (Integer.MAX_VALUE - numRows < intoStart) {
- throw new ISE("too many rows!!! intoStart[%,d], numRows[%,d] combine to exceed max_int", intoStart, numRows);
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
+ .build(
+ "too many rows!!! intoStart[%,d], vals.length[%,d] combine to exceed max_int",
+ intoStart,
+ numRows
+ );
}
Arrays.fill(into, intoStart, intoStart + numRows, obj);
};
diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DoubleArrayColumn.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DoubleArrayColumn.java
index 9c3b799d30e..18cb8ad9c5a 100644
--- a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DoubleArrayColumn.java
+++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DoubleArrayColumn.java
@@ -19,8 +19,8 @@
package org.apache.druid.query.rowsandcols.column;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Numbers;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.NotYetImplemented;
import org.apache.druid.query.rowsandcols.util.FindResult;
import org.apache.druid.segment.column.ColumnType;
@@ -54,11 +54,13 @@ public class DoubleArrayColumn implements Column
if (VectorCopier.class.equals(clazz)) {
return (T) (VectorCopier) (into, intoStart) -> {
if (Integer.MAX_VALUE - vals.length < intoStart) {
- throw new ISE(
- "too many rows!!! intoStart[%,d], vals.length[%,d] combine to exceed max_int",
- intoStart,
- vals.length
- );
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
+ .build(
+ "too many rows!!! intoStart[%,d], vals.length[%,d] combine to exceed max_int",
+ intoStart,
+ vals.length
+ );
}
for (int i = 0; i < vals.length; ++i) {
into[intoStart + i] = vals[i];
@@ -183,13 +185,13 @@ public class DoubleArrayColumn implements Column
@Override
public FindResult findString(int startIndex, int endIndex, String val)
{
- return findDouble(startIndex, endIndex, Numbers.tryParseDouble(val, 0));
+ throw NotYetImplemented.ex(null, "findString is not currently supported for DoubleArrayColumns");
}
@Override
public FindResult findComplex(int startIndex, int endIndex, Object val)
{
- return findDouble(startIndex, endIndex, Numbers.tryParseDouble(val, 0));
+ throw NotYetImplemented.ex(null, "findComplex is not currently supported for DoubleArrayColumns");
}
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/IntArrayColumn.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/IntArrayColumn.java
index 673cebf0e2e..4a9d7c2c5b9 100644
--- a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/IntArrayColumn.java
+++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/IntArrayColumn.java
@@ -19,8 +19,8 @@
package org.apache.druid.query.rowsandcols.column;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Numbers;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.NotYetImplemented;
import org.apache.druid.query.rowsandcols.util.FindResult;
import org.apache.druid.segment.column.ColumnType;
@@ -54,11 +54,13 @@ public class IntArrayColumn implements Column
if (VectorCopier.class.equals(clazz)) {
return (T) (VectorCopier) (into, intoStart) -> {
if (Integer.MAX_VALUE - vals.length < intoStart) {
- throw new ISE(
- "too many rows!!! intoStart[%,d], vals.length[%,d] combine to exceed max_int",
- intoStart,
- vals.length
- );
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
+ .build(
+ "too many rows!!! intoStart[%,d], vals.length[%,d] combine to exceed max_int",
+ intoStart,
+ vals.length
+ );
}
for (int i = 0; i < vals.length; ++i) {
into[intoStart + i] = vals[i];
@@ -189,13 +191,13 @@ public class IntArrayColumn implements Column
@Override
public FindResult findString(int startIndex, int endIndex, String val)
{
- return findInt(startIndex, endIndex, (int) Numbers.tryParseLong(val, 0));
+ throw NotYetImplemented.ex(null, "findString is not currently supported for IntArrayColumns");
}
@Override
public FindResult findComplex(int startIndex, int endIndex, Object val)
{
- return findDouble(startIndex, endIndex, (int) Numbers.tryParseLong(val, 0));
+ throw NotYetImplemented.ex(null, "findComplex is not currently supported for IntArrayColumns");
}
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/LongArrayColumn.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/LongArrayColumn.java
new file mode 100644
index 00000000000..bddf235eeb8
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/LongArrayColumn.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.column;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.NotYetImplemented;
+import org.apache.druid.query.rowsandcols.util.FindResult;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.Arrays;
+
+public class LongArrayColumn implements Column
+{
+ private final long[] vals;
+
+ public LongArrayColumn(
+ long[] vals
+ )
+ {
+ this.vals = vals;
+ }
+
+ @Nonnull
+ @Override
+ public ColumnAccessor toAccessor()
+ {
+ return new MyColumnAccessor();
+ }
+
+ @Nullable
+ @SuppressWarnings("unchecked")
+ @Override
+ public T as(Class extends T> clazz)
+ {
+ if (VectorCopier.class.equals(clazz)) {
+ return (T) (VectorCopier) (into, intoStart) -> {
+ if (Integer.MAX_VALUE - vals.length < intoStart) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
+ .build(
+ "too many rows!!! intoStart[%,d], vals.length[%,d] combine to exceed max_int",
+ intoStart,
+ vals.length
+ );
+ }
+ for (int i = 0; i < vals.length; ++i) {
+ into[intoStart + i] = vals[i];
+ }
+ };
+ }
+ if (ColumnValueSwapper.class.equals(clazz)) {
+ return (T) (ColumnValueSwapper) (lhs, rhs) -> {
+ long tmp = vals[lhs];
+ vals[lhs] = vals[rhs];
+ vals[rhs] = tmp;
+ };
+ }
+ return null;
+ }
+
+ private class MyColumnAccessor implements BinarySearchableAccessor
+ {
+ @Override
+ public ColumnType getType()
+ {
+ return ColumnType.LONG;
+ }
+
+ @Override
+ public int numRows()
+ {
+ return vals.length;
+ }
+
+ @Override
+ public boolean isNull(int rowNum)
+ {
+ return false;
+ }
+
+ @Override
+ public Object getObject(int rowNum)
+ {
+ return vals[rowNum];
+ }
+
+ @Override
+ public double getDouble(int rowNum)
+ {
+ return vals[rowNum];
+ }
+
+ @Override
+ public float getFloat(int rowNum)
+ {
+ return vals[rowNum];
+ }
+
+ @Override
+ public long getLong(int rowNum)
+ {
+ return vals[rowNum];
+ }
+
+ @Override
+ public int getInt(int rowNum)
+ {
+ return (int) vals[rowNum];
+ }
+
+ @Override
+ public int compareRows(int lhsRowNum, int rhsRowNum)
+ {
+ return Long.compare(vals[lhsRowNum], vals[rhsRowNum]);
+ }
+
+
+ @Override
+ public FindResult findNull(int startIndex, int endIndex)
+ {
+ return FindResult.notFound(endIndex);
+ }
+
+ @Override
+ public FindResult findDouble(int startIndex, int endIndex, double val)
+ {
+ return findLong(startIndex, endIndex, (long) val);
+ }
+
+ @Override
+ public FindResult findFloat(int startIndex, int endIndex, float val)
+ {
+ return findLong(startIndex, endIndex, (long) val);
+ }
+
+ @Override
+ public FindResult findLong(int startIndex, int endIndex, long val)
+ {
+ if (vals[startIndex] == val) {
+ int end = startIndex + 1;
+
+ while (end < endIndex && vals[end] == val) {
+ ++end;
+ }
+ return FindResult.found(startIndex, end);
+ }
+
+ int i = Arrays.binarySearch(vals, startIndex, endIndex, val);
+ if (i > 0) {
+ int foundStart = i;
+ int foundEnd = i + 1;
+
+ while (foundStart - 1 >= startIndex && vals[foundStart - 1] == val) {
+ --foundStart;
+ }
+
+ while (foundEnd < endIndex && vals[foundEnd] == val) {
+ ++foundEnd;
+ }
+
+ return FindResult.found(foundStart, foundEnd);
+ } else {
+ return FindResult.notFound(-(i + 1));
+ }
+ }
+
+ @SuppressWarnings("unused")
+ public FindResult findInt(int startIndex, int endIndex, int val)
+ {
+ return findLong(startIndex, endIndex, val);
+ }
+
+ @Override
+ public FindResult findString(int startIndex, int endIndex, String val)
+ {
+ throw NotYetImplemented.ex(null, "findString is not currently supported for LongArrayColumns");
+ }
+
+ @Override
+ public FindResult findComplex(int startIndex, int endIndex, Object val)
+ {
+ throw NotYetImplemented.ex(null, "findComplex is not currently supported for LongArrayColumns");
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java
index ada3da164ea..71c2541b387 100644
--- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java
+++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java
@@ -28,7 +28,6 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
-import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
import org.apache.druid.segment.CloseableShapeshifter;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ColumnType;
@@ -80,7 +79,6 @@ public class ColumnBasedFrameRowsAndColumns implements RowsAndColumns, AutoClose
}
}
return colCache.get(name);
-
}
@SuppressWarnings("unchecked")
@@ -91,9 +89,6 @@ public class ColumnBasedFrameRowsAndColumns implements RowsAndColumns, AutoClose
if (StorageAdapter.class.equals(clazz)) {
return (T) new FrameStorageAdapter(frame, FrameReader.create(signature), Intervals.ETERNITY);
}
- if (WireTransferable.class.equals(clazz)) {
- return (T) this;
- }
return null;
}
diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnHolderRACColumn.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnHolderRACColumn.java
index ed4f8ead52e..d68f8872bf4 100644
--- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnHolderRACColumn.java
+++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnHolderRACColumn.java
@@ -91,7 +91,7 @@ public class ColumnHolderRACColumn implements Column, Closeable
public boolean isNull(int rowNum)
{
offset.set(rowNum);
- return valueSelector.isNull();
+ return valueSelector.getObject() == null;
}
@Nullable
diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/QueryableIndexRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/QueryableIndexRowsAndColumns.java
index 209d4430b1d..73fc72a1ee4 100644
--- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/QueryableIndexRowsAndColumns.java
+++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/QueryableIndexRowsAndColumns.java
@@ -19,10 +19,11 @@
package org.apache.druid.query.rowsandcols.concrete;
+import org.apache.druid.common.semantic.SemanticCreator;
+import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
-import org.apache.druid.query.rowsandcols.SemanticCreator;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.CloseableShapeshifter;
import org.apache.druid.segment.QueryableIndex;
@@ -41,7 +42,7 @@ import java.util.function.Function;
public class QueryableIndexRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter
{
- private static final Map, Function> AS_MAP = RowsAndColumns
+ private static final Map, Function> AS_MAP = SemanticUtils
.makeAsMap(QueryableIndexRowsAndColumns.class);
private final QueryableIndex index;
diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFrameMaker.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFrameMaker.java
new file mode 100644
index 00000000000..204b5bd8548
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFrameMaker.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.semantic;
+
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DefaultFrameMaker implements FrameMaker
+{
+ private final RowsAndColumns rac;
+
+ public DefaultFrameMaker(RowsAndColumns rac)
+ {
+ this.rac = rac;
+ }
+
+ @Override
+ public RowSignature computeSignature()
+ {
+ final RowSignature.Builder signatureBuilder = RowSignature.builder();
+ for (String column : rac.getColumnNames()) {
+ final Column racColumn = rac.findColumn(column);
+ if (racColumn == null) {
+ continue;
+ }
+ signatureBuilder.add(column, racColumn.toAccessor().getType());
+ }
+
+ return signatureBuilder.build();
+ }
+
+ @Override
+ public Frame toColumnBasedFrame()
+ {
+ final AtomicInteger rowId = new AtomicInteger(0);
+ final int numRows = rac.numRows();
+ final ColumnSelectorFactoryMaker csfm = ColumnSelectorFactoryMaker.fromRAC(rac);
+ final ColumnSelectorFactory selectorFactory = csfm.make(rowId);
+
+ final ArenaMemoryAllocatorFactory memFactory = new ArenaMemoryAllocatorFactory(200 << 20); // 200 MB
+
+ final FrameWriter frameWriter = FrameWriters.makeColumnBasedFrameWriterFactory(
+ memFactory,
+ computeSignature(),
+ Collections.emptyList()
+ ).newFrameWriter(selectorFactory);
+
+ rowId.set(0);
+ for (; rowId.get() < numRows; rowId.incrementAndGet()) {
+ frameWriter.addSelection();
+ }
+
+ return Frame.wrap(frameWriter.toByteArray());
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/FrameMaker.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/FrameMaker.java
new file mode 100644
index 00000000000..095bfe1ed87
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/FrameMaker.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.semantic;
+
+import org.apache.druid.frame.Frame;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.segment.column.RowSignature;
+
+public interface FrameMaker
+{
+ static FrameMaker fromRAC(RowsAndColumns rac)
+ {
+ FrameMaker retVal = rac.as(FrameMaker.class);
+ if (retVal == null) {
+ retVal = new DefaultFrameMaker(rac);
+ }
+ return retVal;
+ }
+
+ RowSignature computeSignature();
+
+ Frame toColumnBasedFrame();
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
index 5d1198f5460..801eaf112a5 100644
--- a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
+++ b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
@@ -22,7 +22,7 @@ package org.apache.druid.segment;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
-import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.column.ColumnDescriptor;
@@ -212,7 +212,7 @@ public class AutoTypeColumnMerger implements DimensionMergerV9
);
break;
default:
- throw new ISE(
+ throw DruidException.defensive(
"How did we get here? Column [%s] with type [%s] does not have specialized serializer",
name,
logicalType
@@ -349,7 +349,7 @@ public class AutoTypeColumnMerger implements DimensionMergerV9
@Override
public ColumnDescriptor makeColumnDescriptor()
{
- ColumnDescriptor.Builder descriptorBuilder = new ColumnDescriptor.Builder();
+ ColumnDescriptor.Builder descriptorBuilder = ColumnDescriptor.builder();
final NestedCommonFormatColumnPartSerde partSerde =
NestedCommonFormatColumnPartSerde.serializerBuilder()
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
index dd0ac9ab117..966de405206 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
@@ -510,9 +510,15 @@ public class IndexIO
new ConciseBitmapFactory(),
columns,
index.getFileMapper(),
- null,
lazy
- );
+ )
+ {
+ @Override
+ public Metadata getMetadata()
+ {
+ return null;
+ }
+ };
}
private Supplier getColumnHolderSupplier(ColumnBuilder builder, boolean lazy)
@@ -604,25 +610,6 @@ public class IndexIO
allDims = null;
}
- Metadata metadata = null;
- ByteBuffer metadataBB = smooshedFiles.mapFile("metadata.drd");
- if (metadataBB != null) {
- try {
- metadata = mapper.readValue(
- SERIALIZER_UTILS.readBytes(metadataBB, metadataBB.remaining()),
- Metadata.class
- );
- }
- catch (JsonParseException | JsonMappingException ex) {
- // Any jackson deserialization errors are ignored e.g. if metadata contains some aggregator which
- // is no longer supported then it is OK to not use the metadata instead of failing segment loading
- log.warn(ex, "Failed to load metadata for segment [%s]", inDir);
- }
- catch (IOException ex) {
- throw new IOException("Failed to read metadata", ex);
- }
- }
-
Map> columns = new LinkedHashMap<>();
// Register the time column
@@ -663,9 +650,32 @@ public class IndexIO
segmentBitmapSerdeFactory.getBitmapFactory(),
columns,
smooshedFiles,
- metadata,
lazy
- );
+ )
+ {
+ @Override
+ public Metadata getMetadata()
+ {
+ try {
+ ByteBuffer metadataBB = smooshedFiles.mapFile("metadata.drd");
+ if (metadataBB != null) {
+ return mapper.readValue(
+ SERIALIZER_UTILS.readBytes(metadataBB, metadataBB.remaining()),
+ Metadata.class
+ );
+ }
+ }
+ catch (JsonParseException | JsonMappingException ex) {
+ // Any jackson deserialization errors are ignored e.g. if metadata contains some aggregator which
+ // is no longer supported then it is OK to not use the metadata instead of failing segment loading
+ log.warn(ex, "Failed to load metadata for segment [%s]", inDir);
+ }
+ catch (IOException ex) {
+ log.warn(ex, "Failed to read metadata for segment [%s]", inDir);
+ }
+ return null;
+ }
+ };
log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime);
diff --git a/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java b/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java
index 9d75748b416..b8d4d2d16cf 100644
--- a/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java
+++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java
@@ -19,17 +19,24 @@
package org.apache.druid.segment;
+import org.apache.druid.common.semantic.SemanticCreator;
+import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.query.rowsandcols.concrete.QueryableIndexRowsAndColumns;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.function.Function;
/**
*/
public class QueryableIndexSegment implements Segment
{
+ private static final Map, Function> AS_MAP = SemanticUtils
+ .makeAsMap(QueryableIndexSegment.class);
+
private final QueryableIndex index;
private final QueryableIndexStorageAdapter storageAdapter;
private final SegmentId segmentId;
@@ -77,10 +84,18 @@ public class QueryableIndexSegment implements Segment
@Override
public T as(@Nonnull Class clazz)
{
- if (CloseableShapeshifter.class.equals(clazz)) {
- return (T) new QueryableIndexRowsAndColumns(index);
+ final Function fn = AS_MAP.get(clazz);
+ if (fn != null) {
+ return (T) fn.apply(this);
}
return Segment.super.as(clazz);
}
+
+ @SemanticCreator
+ @SuppressWarnings("unused")
+ public CloseableShapeshifter toCloseableShapeshifter()
+ {
+ return new QueryableIndexRowsAndColumns(index);
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java
index 924c7911f8a..013a634fdc4 100644
--- a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java
@@ -38,7 +38,7 @@ import java.util.Map;
/**
*
*/
-public class SimpleQueryableIndex implements QueryableIndex
+public abstract class SimpleQueryableIndex implements QueryableIndex
{
private final Interval dataInterval;
private final List columnNames;
@@ -46,8 +46,6 @@ public class SimpleQueryableIndex implements QueryableIndex
private final BitmapFactory bitmapFactory;
private final Map> columns;
private final SmooshedFileMapper fileMapper;
- @Nullable
- private final Metadata metadata;
private final Supplier