From 7ac0862287270fcb0cfb33b01a31ae2e650621ef Mon Sep 17 00:00:00 2001 From: Sree Charan Manamala Date: Thu, 20 Jun 2024 15:05:58 +0530 Subject: [PATCH 01/72] Grouping Engine fix when a limit spec with different order by columns is applied (#16534) --- .../druid/query/groupby/GroupByQuery.java | 155 ++++++++---------- .../druid/query/groupby/GroupingEngine.java | 5 +- .../druid/sql/calcite/CalciteQueryTest.java | 31 +++- 3 files changed, 99 insertions(+), 92 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java index cdcf9e3daf4..994705f55e3 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java @@ -560,15 +560,20 @@ public class GroupByQuery extends BaseQuery return false; } - /** - * When limit push down is applied, the partial results would be sorted by the ordering specified by the - * limit/order spec (unlike non-push down case where the results always use the default natural ascending order), - * so when merging these partial result streams, the merge needs to use the same ordering to get correct results. - */ - private Ordering getRowOrderingForPushDown( - final boolean granular, - final DefaultLimitSpec limitSpec - ) + public Ordering getRowOrdering(final boolean granular) + { + return getOrderingAndDimensions(granular).getRowOrdering(); + } + + public List getDimensionNamesInOrder() + { + return getOrderingAndDimensions(false).getDimensions() + .stream() + .map(DimensionSpec::getOutputName) + .collect(Collectors.toList()); + } + + public OrderingAndDimensions getOrderingAndDimensions(final boolean granular) { final boolean sortByDimsFirst = getContextSortByDimsFirst(); @@ -577,18 +582,30 @@ public class GroupByQuery extends BaseQuery final List needsReverseList = new ArrayList<>(); final List dimensionTypes = new ArrayList<>(); final List comparators = new ArrayList<>(); + final List dimensionsInOrder = new ArrayList<>(); - for (OrderByColumnSpec orderSpec : limitSpec.getColumns()) { - boolean needsReverse = orderSpec.getDirection() != OrderByColumnSpec.Direction.ASCENDING; - int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec, dimensions); - if (dimIndex >= 0) { - DimensionSpec dim = dimensions.get(dimIndex); - orderedFieldNumbers.add(resultRowSignature.indexOf(dim.getOutputName())); - dimsInOrderBy.add(dimIndex); - needsReverseList.add(needsReverse); - final ColumnType type = dimensions.get(dimIndex).getOutputType(); - dimensionTypes.add(type); - comparators.add(orderSpec.getDimensionComparator()); + /* + * When limit push down is applied, the partial results would be sorted by the ordering specified by the + * limit/order spec (unlike non-push down case where the results always use the default natural ascending order), + * so when merging these partial result streams, the merge needs to use the same ordering to get correct results. + */ + if (isApplyLimitPushDown()) { + DefaultLimitSpec limitSpec1 = (DefaultLimitSpec) limitSpec; + if (!DefaultLimitSpec.sortingOrderHasNonGroupingFields(limitSpec1, dimensions)) { + for (OrderByColumnSpec orderSpec : ((DefaultLimitSpec) limitSpec).getColumns()) { + boolean needsReverse = orderSpec.getDirection() != OrderByColumnSpec.Direction.ASCENDING; + int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec, dimensions); + if (dimIndex >= 0) { + DimensionSpec dim = dimensions.get(dimIndex); + orderedFieldNumbers.add(resultRowSignature.indexOf(dim.getOutputName())); + dimsInOrderBy.add(dimIndex); + needsReverseList.add(needsReverse); + final ColumnType type = dimensions.get(dimIndex).getOutputType(); + dimensionTypes.add(type); + comparators.add(orderSpec.getDimensionComparator()); + dimensionsInOrder.add(dim); + } + } } } @@ -599,14 +616,16 @@ public class GroupByQuery extends BaseQuery final ColumnType type = dimensions.get(i).getOutputType(); dimensionTypes.add(type); comparators.add(StringComparators.NATURAL); + dimensionsInOrder.add(dimensions.get(i)); } } final Comparator timeComparator = getTimeComparator(granular); + Ordering ordering; if (timeComparator == null) { - return Ordering.from( - (lhs, rhs) -> compareDimsForLimitPushDown( + ordering = Ordering.from( + (lhs, rhs) -> compareDims( orderedFieldNumbers, needsReverseList, dimensionTypes, @@ -616,9 +635,9 @@ public class GroupByQuery extends BaseQuery ) ); } else if (sortByDimsFirst) { - return Ordering.from( + ordering = Ordering.from( (lhs, rhs) -> { - final int cmp = compareDimsForLimitPushDown( + final int cmp = compareDims( orderedFieldNumbers, needsReverseList, dimensionTypes, @@ -634,7 +653,7 @@ public class GroupByQuery extends BaseQuery } ); } else { - return Ordering.from( + ordering = Ordering.from( (lhs, rhs) -> { final int timeCompare = timeComparator.compare(lhs, rhs); @@ -642,7 +661,7 @@ public class GroupByQuery extends BaseQuery return timeCompare; } - return compareDimsForLimitPushDown( + return compareDims( orderedFieldNumbers, needsReverseList, dimensionTypes, @@ -653,45 +672,8 @@ public class GroupByQuery extends BaseQuery } ); } - } - public Ordering getRowOrdering(final boolean granular) - { - if (isApplyLimitPushDown()) { - if (!DefaultLimitSpec.sortingOrderHasNonGroupingFields((DefaultLimitSpec) limitSpec, dimensions)) { - return getRowOrderingForPushDown(granular, (DefaultLimitSpec) limitSpec); - } - } - - final boolean sortByDimsFirst = getContextSortByDimsFirst(); - final Comparator timeComparator = getTimeComparator(granular); - - if (timeComparator == null) { - return Ordering.from((lhs, rhs) -> compareDims(dimensions, lhs, rhs)); - } else if (sortByDimsFirst) { - return Ordering.from( - (lhs, rhs) -> { - final int cmp = compareDims(dimensions, lhs, rhs); - if (cmp != 0) { - return cmp; - } - - return timeComparator.compare(lhs, rhs); - } - ); - } else { - return Ordering.from( - (lhs, rhs) -> { - final int timeCompare = timeComparator.compare(lhs, rhs); - - if (timeCompare != 0) { - return timeCompare; - } - - return compareDims(dimensions, lhs, rhs); - } - ); - } + return new OrderingAndDimensions(ordering, dimensionsInOrder); } @Nullable @@ -716,25 +698,6 @@ public class GroupByQuery extends BaseQuery } } - private int compareDims(List dimensions, ResultRow lhs, ResultRow rhs) - { - final int dimensionStart = getResultRowDimensionStart(); - - for (int i = 0; i < dimensions.size(); i++) { - DimensionSpec dimension = dimensions.get(i); - final int dimCompare = DimensionHandlerUtils.compareObjectsAsType( - lhs.get(dimensionStart + i), - rhs.get(dimensionStart + i), - dimension.getOutputType() - ); - if (dimCompare != 0) { - return dimCompare; - } - } - - return 0; - } - /** * Computes the timestamp that will be returned by {@link #getUniversalTimestamp()}. */ @@ -760,12 +723,12 @@ public class GroupByQuery extends BaseQuery } /** - * Compares the dimensions for limit pushdown. + * Compares the dimensions. * * Due to legacy reason, the provided StringComparator for the arrays isn't applied and must be changed once we * get rid of the StringComparators for array types */ - private static int compareDimsForLimitPushDown( + private static int compareDims( final IntList fields, final List needsReverseList, final List dimensionTypes, @@ -924,6 +887,28 @@ public class GroupByQuery extends BaseQuery } } + public static class OrderingAndDimensions + { + Ordering rowOrdering; + List dimensions; + + public OrderingAndDimensions(Ordering rowOrdering, List dimensions) + { + this.rowOrdering = rowOrdering; + this.dimensions = dimensions; + } + + public Ordering getRowOrdering() + { + return rowOrdering; + } + + public List getDimensions() + { + return dimensions; + } + } + public static class Builder { @Nullable diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java index 6451fb9b943..ab1ee1052b4 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java @@ -686,8 +686,7 @@ public class GroupingEngine processingConfig.intermediateComputeSizeBytes() ); - List queryDimNames = baseSubtotalQuery.getDimensions().stream().map(DimensionSpec::getOutputName) - .collect(Collectors.toList()); + List queryDimNamesInOrder = baseSubtotalQuery.getDimensionNamesInOrder(); // Only needed to make LimitSpec.filterColumns(..) call later in case base query has a non default LimitSpec. Set aggsAndPostAggs = null; @@ -724,7 +723,7 @@ public class GroupingEngine .withLimitSpec(subtotalQueryLimitSpec); final GroupByRowProcessor.ResultSupplier resultSupplierOneFinal = resultSupplierOne; - if (Utils.isPrefix(subtotalSpec, queryDimNames)) { + if (Utils.isPrefix(subtotalSpec, queryDimNamesInOrder)) { // Since subtotalSpec is a prefix of base query dimensions, so results from base query are also sorted // by subtotalSpec as needed by stream merging. subtotalsResults.add( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 1975f5589e6..9a0a0318210 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -13823,10 +13823,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .build() ), ImmutableList.builder().add( - new Object[]{"", null, 2L}, - new Object[]{"a", null, 1L}, - new Object[]{"", null, 1L}, - new Object[]{"a", null, 1L}, + new Object[]{"", null, 3L}, + new Object[]{"a", null, 2L}, new Object[]{"abc", null, 1L}, new Object[]{NULL_STRING, null, 6L}, new Object[]{"", timestamp("2000-01-01"), 2L}, @@ -16290,4 +16288,29 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ) ).run(); } + + @SqlTestFrameworkConfig.NumMergeBuffers(3) + @Test + public void testGroupingSetsWithDifferentOrderLimitSpec() + { + msqIncompatible(); + testBuilder() + .sql( + "SELECT\n" + + " isNew, isRobot, COUNT(*) AS \"Cnt\"\n" + + "FROM \"wikipedia\"\n" + + "GROUP BY GROUPING SETS ((isRobot), (isNew))\n" + + "ORDER BY 2, 1\n" + + "limit 100" + ) + .expectedResults( + ResultMatchMode.RELAX_NULLS, + ImmutableList.of( + new Object[]{"false", null, 36966L}, + new Object[]{"true", null, 2278L}, + new Object[]{null, "false", 23824L}, + new Object[]{null, "true", 15420L} + ) + ).run(); + } } From b20c3dbadfe299a8d4715d4ecc924ccfab432fc0 Mon Sep 17 00:00:00 2001 From: Abhishek Radhakrishnan Date: Thu, 20 Jun 2024 08:40:28 -0700 Subject: [PATCH 02/72] Fix malformed period throwing `ADMIN` persona error (#16626) * Turn invalid periods into user-facing exception providing more context. The current exception is targeting the ADMIN persona. Catch that and turn it into a USER persona instead. Also, provide more context in the error message. * Review comment: pass the wrapping expression and stringify. * Update processing/src/main/java/org/apache/druid/query/expression/ExprUtils.java Co-authored-by: Clint Wylie --------- Co-authored-by: Clint Wylie --- .../druid/query/expression/ExprUtils.java | 17 ++++++++++++-- .../expression/TimestampCeilExprMacro.java | 11 ++++++--- .../expression/TimestampFloorExprMacro.java | 11 ++++++--- .../sql/calcite/planner/CalcitePlanner.java | 2 +- .../sql/calcite/CalciteSelectQueryTest.java | 23 +++++++++++++++++++ 5 files changed, 55 insertions(+), 9 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/expression/ExprUtils.java b/processing/src/main/java/org/apache/druid/query/expression/ExprUtils.java index e2bd808d7b9..be513b40248 100644 --- a/processing/src/main/java/org/apache/druid/query/expression/ExprUtils.java +++ b/processing/src/main/java/org/apache/druid/query/expression/ExprUtils.java @@ -20,6 +20,7 @@ package org.apache.druid.query.expression; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.InvalidInput; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.granularity.PeriodGranularity; @@ -45,13 +46,25 @@ public class ExprUtils } static PeriodGranularity toPeriodGranularity( + final Expr wrappingExpr, final Expr periodArg, @Nullable final Expr originArg, @Nullable final Expr timeZoneArg, final Expr.ObjectBinding bindings ) { - final Period period = new Period(periodArg.eval(bindings).asString()); + final Period period; + try { + period = new Period(periodArg.eval(bindings).asString()); + } + catch (IllegalArgumentException iae) { + throw InvalidInput.exception( + "Invalid period[%s] specified for expression[%s]: [%s]", + periodArg.stringify(), + wrappingExpr.stringify(), + iae.getMessage() + ); + } final DateTime origin; final DateTimeZone timeZone; @@ -69,7 +82,7 @@ public class ExprUtils final Object value = originArg.eval(bindings).valueOrDefault(); if (value instanceof String && NullHandling.isNullOrEquivalent((String) value)) { // We get a blank string here, when sql compatible null handling is enabled - // and expression contains empty string for for origin + // and expression contains empty string for origin // e.g timestamp_floor(\"__time\",'PT1M','','UTC') origin = null; } else { diff --git a/processing/src/main/java/org/apache/druid/query/expression/TimestampCeilExprMacro.java b/processing/src/main/java/org/apache/druid/query/expression/TimestampCeilExprMacro.java index cfd63f1ea61..3c5102ae7a2 100644 --- a/processing/src/main/java/org/apache/druid/query/expression/TimestampCeilExprMacro.java +++ b/processing/src/main/java/org/apache/druid/query/expression/TimestampCeilExprMacro.java @@ -63,7 +63,7 @@ public class TimestampCeilExprMacro implements ExprMacroTable.ExprMacro TimestampCeilExpr(final TimestampCeilExprMacro macro, final List args) { super(macro, args); - this.granularity = getGranularity(args, InputBindings.nilBindings()); + this.granularity = getGranularity(this, args, InputBindings.nilBindings()); } @Nonnull @@ -113,9 +113,14 @@ public class TimestampCeilExprMacro implements ExprMacroTable.ExprMacro } } - private static PeriodGranularity getGranularity(final List args, final Expr.ObjectBinding bindings) + private static PeriodGranularity getGranularity( + final Expr expr, + final List args, + final Expr.ObjectBinding bindings + ) { return ExprUtils.toPeriodGranularity( + expr, args.get(1), args.size() > 2 ? args.get(2) : null, args.size() > 3 ? args.get(3) : null, @@ -135,7 +140,7 @@ public class TimestampCeilExprMacro implements ExprMacroTable.ExprMacro @Override public ExprEval eval(final ObjectBinding bindings) { - final PeriodGranularity granularity = getGranularity(args, bindings); + final PeriodGranularity granularity = getGranularity(this, args, bindings); long argTime = args.get(0).eval(bindings).asLong(); long bucketStartTime = granularity.bucketStart(argTime); if (argTime == bucketStartTime) { diff --git a/processing/src/main/java/org/apache/druid/query/expression/TimestampFloorExprMacro.java b/processing/src/main/java/org/apache/druid/query/expression/TimestampFloorExprMacro.java index a243273b8f0..02eed7327f1 100644 --- a/processing/src/main/java/org/apache/druid/query/expression/TimestampFloorExprMacro.java +++ b/processing/src/main/java/org/apache/druid/query/expression/TimestampFloorExprMacro.java @@ -56,9 +56,14 @@ public class TimestampFloorExprMacro implements ExprMacroTable.ExprMacro } } - private static PeriodGranularity computeGranularity(final List args, final Expr.ObjectBinding bindings) + private static PeriodGranularity computeGranularity( + final Expr expr, + final List args, + final Expr.ObjectBinding bindings + ) { return ExprUtils.toPeriodGranularity( + expr, args.get(1), args.size() > 2 ? args.get(2) : null, args.size() > 3 ? args.get(3) : null, @@ -73,7 +78,7 @@ public class TimestampFloorExprMacro implements ExprMacroTable.ExprMacro TimestampFloorExpr(final TimestampFloorExprMacro macro, final List args) { super(macro, args); - this.granularity = computeGranularity(args, InputBindings.nilBindings()); + this.granularity = computeGranularity(this, args, InputBindings.nilBindings()); } /** @@ -170,7 +175,7 @@ public class TimestampFloorExprMacro implements ExprMacroTable.ExprMacro @Override public ExprEval eval(final ObjectBinding bindings) { - final PeriodGranularity granularity = computeGranularity(args, bindings); + final PeriodGranularity granularity = computeGranularity(this, args, bindings); return ExprEval.of(granularity.bucketStart(args.get(0).eval(bindings).asLong())); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java index 933baaac9ba..8eb9541961c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java @@ -200,7 +200,7 @@ public class CalcitePlanner implements Planner, ViewExpander state = CalcitePlanner.State.STATE_2_READY; - // If user specify own traitDef, instead of default default trait, + // If user specifies own traitDef, instead of default trait, // register the trait def specified in traitDefs. if (this.traitDefs == null) { planner.addRelTraitDef(ConventionTraitDef.INSTANCE); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSelectQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSelectQueryTest.java index 3203ae9915e..d1f256207d5 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSelectQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSelectQueryTest.java @@ -22,6 +22,7 @@ package org.apache.druid.sql.calcite; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; @@ -130,6 +131,28 @@ public class CalciteSelectQueryTest extends BaseCalciteQueryTest ); } + @Test + public void testTimeCeilExpressionContainingInvalidPeriod() + { + testQueryThrows( + "SELECT TIME_CEIL(__time, 'PT1Y') FROM foo", + DruidExceptionMatcher.invalidInput().expectMessageContains( + "Invalid period['PT1Y'] specified for expression[timestamp_ceil(\"__time\", 'PT1Y', null, 'UTC')]" + ) + ); + } + + @Test + public void testTimeFloorExpressionContainingInvalidPeriod() + { + testQueryThrows( + "SELECT TIME_FLOOR(TIMESTAMPADD(DAY, -1, __time), 'PT1D') FROM foo", + DruidExceptionMatcher.invalidInput().expectMessageContains( + "Invalid period['PT1D'] specified for expression[timestamp_floor((\"__time\" + -86400000), 'PT1D', null, 'UTC')]" + ) + ); + } + @Test public void testValuesContainingNull() { From ae70e18bc8e6caf57c6f469bc5cd971e69af6169 Mon Sep 17 00:00:00 2001 From: Andreas Maechler Date: Thu, 20 Jun 2024 10:31:29 -0600 Subject: [PATCH 03/72] docs: Update Azure extension (#16585) Co-authored-by: Victoria Lim --- docs/configuration/extensions.md | 6 +- docs/configuration/index.md | 22 +++--- docs/development/extensions-core/azure.md | 84 +++++++++++++++++----- docs/development/extensions-core/hdfs.md | 7 +- docs/development/extensions-core/s3.md | 29 ++++---- docs/ingestion/hadoop.md | 4 +- docs/ingestion/input-sources.md | 88 ++++++++++++----------- docs/ingestion/native-batch.md | 22 +++--- website/.spelling | 1 + 9 files changed, 161 insertions(+), 102 deletions(-) diff --git a/docs/configuration/extensions.md b/docs/configuration/extensions.md index 29356ebc05c..d396bc29000 100644 --- a/docs/configuration/extensions.md +++ b/docs/configuration/extensions.md @@ -22,7 +22,6 @@ title: "Extensions" ~ under the License. --> - Druid implements an extension system that allows for adding functionality at runtime. Extensions are commonly used to add support for deep storages (like HDFS and S3), metadata stores (like MySQL and PostgreSQL), new aggregators, new input formats, and so on. @@ -55,7 +54,7 @@ Core extensions are maintained by Druid committers. |druid-parquet-extensions|Support for data in Apache Parquet data format. Requires druid-avro-extensions to be loaded.|[link](../development/extensions-core/parquet.md)| |druid-protobuf-extensions| Support for data in Protobuf data format.|[link](../development/extensions-core/protobuf.md)| |druid-ranger-security|Support for access control through Apache Ranger.|[link](../development/extensions-core/druid-ranger-security.md)| -|druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.md)| +|druid-s3-extensions|Interfacing with data in Amazon S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.md)| |druid-ec2-extensions|Interfacing with AWS EC2 for autoscaling middle managers|UNDOCUMENTED| |druid-aws-rds-extensions|Support for AWS token based access to AWS RDS DB Cluster.|[link](../development/extensions-core/druid-aws-rds.md)| |druid-stats|Statistics related module including variance and standard deviation.|[link](../development/extensions-core/stats.md)| @@ -101,7 +100,7 @@ All of these community extensions can be downloaded using [pull-deps](../operati |druid-momentsketch|Support for approximate quantile queries using the [momentsketch](https://github.com/stanford-futuredata/momentsketch) library|[link](../development/extensions-contrib/momentsketch-quantiles.md)| |druid-tdigestsketch|Support for approximate sketch aggregators based on [T-Digest](https://github.com/tdunning/t-digest)|[link](../development/extensions-contrib/tdigestsketch-quantiles.md)| |gce-extensions|GCE Extensions|[link](../development/extensions-contrib/gce-extensions.md)| -|prometheus-emitter|Exposes [Druid metrics](../operations/metrics.md) for Prometheus server collection (https://prometheus.io/)|[link](../development/extensions-contrib/prometheus.md)| +|prometheus-emitter|Exposes [Druid metrics](../operations/metrics.md) for Prometheus server collection ()|[link](../development/extensions-contrib/prometheus.md)| |druid-kubernetes-overlord-extensions|Support for launching tasks in k8s without Middle Managers|[link](../development/extensions-contrib/k8s-jobs.md)| |druid-spectator-histogram|Support for efficient approximate percentile queries|[link](../development/extensions-contrib/spectator-histogram.md)| |druid-rabbit-indexing-service|Support for creating and managing [RabbitMQ](https://www.rabbitmq.com/) indexing tasks|[link](../development/extensions-contrib/rabbit-stream-ingestion.md)| @@ -111,7 +110,6 @@ All of these community extensions can be downloaded using [pull-deps](../operati Please post on [dev@druid.apache.org](https://lists.apache.org/list.html?dev@druid.apache.org) if you'd like an extension to be promoted to core. If we see a community extension actively supported by the community, we can promote it to core based on community feedback. - For information how to create your own extension, please see [here](../development/modules.md). ## Loading extensions diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 4eceec8beec..b62ab9c0db8 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -668,14 +668,12 @@ Store task logs in S3. Note that the `druid-s3-extensions` extension must be loa ##### Azure Blob Store task logs -Store task logs in Azure Blob Store. +Store task logs in Azure Blob Store. To enable this feature, load the `druid-azure-extensions` extension, and configure deep storage for Azure. Druid uses the same authentication method configured for deep storage and stores task logs in the same storage account (set in `druid.azure.account`). -Note: The `druid-azure-extensions` extension must be loaded, and this uses the same storage account as the deep storage module for azure. - -|Property|Description|Default| -|--------|-----------|-------| -|`druid.indexer.logs.container`|The Azure Blob Store container to write logs to|none| -|`druid.indexer.logs.prefix`|The path to prepend to logs|none| +| Property | Description | Default | +|---|---|---| +| `druid.indexer.logs.container` | The Azure Blob Store container to write logs to. | Must be set. | +| `druid.indexer.logs.prefix` | The path to prepend to logs. | Must be set. | ##### Google Cloud Storage task logs @@ -714,7 +712,7 @@ You can configure Druid API error responses to hide internal information like th |`druid.server.http.showDetailedJettyErrors`|When set to true, any error from the Jetty layer / Jetty filter includes the following fields in the JSON response: `servlet`, `message`, `url`, `status`, and `cause`, if it exists. When set to false, the JSON response only includes `message`, `url`, and `status`. The field values remain unchanged.|true| |`druid.server.http.errorResponseTransform.strategy`|Error response transform strategy. The strategy controls how Druid transforms error responses from Druid services. When unset or set to `none`, Druid leaves error responses unchanged.|`none`| -##### Error response transform strategy +#### Error response transform strategy You can use an error response transform strategy to transform error responses from within Druid services to hide internal information. When you specify an error response transform strategy other than `none`, Druid transforms the error responses from Druid services as follows: @@ -723,12 +721,12 @@ When you specify an error response transform strategy other than `none`, Druid t * For any SQL query API that fails, for example `POST /druid/v2/sql/...`, Druid sets the fields `errorClass` and `host` to null. Druid applies the transformation strategy to the `errorMessage` field. * For any JDBC related exceptions, Druid will turn all checked exceptions into `QueryInterruptedException` otherwise druid will attempt to keep the exception as the same type. For example if the original exception isn't owned by Druid it will become `QueryInterruptedException`. Druid applies the transformation strategy to the `errorMessage` field. -###### No error response transform strategy +##### No error response transform strategy In this mode, Druid leaves error responses from underlying services unchanged and returns the unchanged errors to the API client. This is the default Druid error response mode. To explicitly enable this strategy, set `druid.server.http.errorResponseTransform.strategy` to `none`. -###### Allowed regular expression error response transform strategy +##### Allowed regular expression error response transform strategy In this mode, Druid validates the error responses from underlying services against a list of regular expressions. Only error messages that match a configured regular expression are returned. To enable this strategy, set `druid.server.http.errorResponseTransform.strategy` to `allowedRegex`. @@ -774,7 +772,7 @@ This config is used to find the [Coordinator](../design/coordinator.md) using Cu You can configure how to announce and unannounce Znodes in ZooKeeper (using Curator). For normal operations you do not need to override any of these configs. -##### Batch data segment announcer +#### Batch data segment announcer In current Druid, multiple data segments may be announced under the same Znode. @@ -2037,7 +2035,7 @@ A simple in-memory LRU cache. Local cache resides in JVM heap memory, so if you |Property|Description|Default| |--------|-----------|-------| |`druid.cache.sizeInBytes`|Maximum cache size in bytes. Zero disables caching.|0| -|`druid.cache.initialSize`|Initial size of the hashtable backing the cache.|500000| +|`druid.cache.initialSize`|Initial size of the hash table backing the cache.|500000| |`druid.cache.logEvictionCount`|If non-zero, log cache eviction every `logEvictionCount` items.|0| #### Caffeine cache diff --git a/docs/development/extensions-core/azure.md b/docs/development/extensions-core/azure.md index 21e24153a47..d6310e32cf9 100644 --- a/docs/development/extensions-core/azure.md +++ b/docs/development/extensions-core/azure.md @@ -22,25 +22,75 @@ title: "Microsoft Azure" ~ under the License. --> +## Azure extension + +This extension allows you to do the following: + +* [Ingest data](#ingest-data-from-azure) from objects stored in Azure Blob Storage. +* [Write segments](#store-segments-in-azure) to Azure Blob Storage for deep storage. +* [Persist task logs](#persist-task-logs-in-azure) to Azure Blob Storage for long-term storage. + +:::info To use this Apache Druid extension, [include](../../configuration/extensions.md#loading-extensions) `druid-azure-extensions` in the extensions load list. -## Deep Storage +::: -[Microsoft Azure Storage](http://azure.microsoft.com/en-us/services/storage/) is another option for deep storage. This requires some additional Druid configuration. +### Ingest data from Azure -|Property|Description|Possible Values|Default| -|--------|---------------|-----------|-------| -|`druid.storage.type`|azure||Must be set.| -|`druid.azure.account`||Azure Storage account name.|Must be set.| -|`druid.azure.key`||Azure Storage account key.|Optional. Set one of key, sharedAccessStorageToken or useAzureCredentialsChain.| -|`druid.azure.sharedAccessStorageToken`||Azure Shared Storage access token|Optional. Set one of key, sharedAccessStorageToken or useAzureCredentialsChain..| -|`druid.azure.useAzureCredentialsChain`|Use [DefaultAzureCredential](https://learn.microsoft.com/en-us/java/api/overview/azure/identity-readme?view=azure-java-stable) for authentication|Optional. Set one of key, sharedAccessStorageToken or useAzureCredentialsChain.|False| -|`druid.azure.managedIdentityClientId`|If you want to use managed identity authentication in the `DefaultAzureCredential`, `useAzureCredentialsChain` must be true.||Optional.| -|`druid.azure.container`||Azure Storage container name.|Must be set.| -|`druid.azure.prefix`|A prefix string that will be prepended to the blob names for the segments published to Azure deep storage| |""| -|`druid.azure.protocol`|the protocol to use|http or https|https| -|`druid.azure.maxTries`|Number of tries before canceling an Azure operation.| |3| -|`druid.azure.maxListingLength`|maximum number of input files matching a given prefix to retrieve at a time| |1024| -|`druid.azure.storageAccountEndpointSuffix`| The endpoint suffix to use. Use this config instead of `druid.azure.endpointSuffix`. Override the default value to connect to [Azure Government](https://learn.microsoft.com/en-us/azure/azure-government/documentation-government-get-started-connect-to-storage#getting-started-with-storage-api). This config supports storage accounts enabled for [AzureDNSZone](https://learn.microsoft.com/en-us/azure/dns/dns-getstarted-portal). Note: do not include the storage account name prefix in this config value. | Examples: `ABCD1234.blob.storage.azure.net`, `blob.core.usgovcloudapi.net`| `blob.core.windows.net`| -See [Azure Services](http://azure.microsoft.com/en-us/pricing/free-trial/) for more information. +Ingest data using either [MSQ](../../multi-stage-query/index.md) or a native batch [parallel task](../../ingestion/native-batch.md) with an [Azure input source](../../ingestion/input-sources.md#azure-input-source) (`azureStorage`) to read objects directly from Azure Blob Storage. + +### Store segments in Azure + +:::info + +To use Azure for deep storage, set `druid.storage.type=azure`. + +::: + +#### Configure location + +Configure where to store segments using the following properties: + +| Property | Description | Default | +|---|---|---| +| `druid.azure.account` | The Azure Storage account name. | Must be set. | +| `druid.azure.container` | The Azure Storage container name. | Must be set. | +| `druid.azure.prefix` | A prefix string that will be prepended to the blob names for the segments published. | "" | +| `druid.azure.maxTries` | Number of tries before canceling an Azure operation. | 3 | +| `druid.azure.protocol` | The protocol to use to connect to the Azure Storage account. Either `http` or `https`. | `https` | +| `druid.azure.storageAccountEndpointSuffix` | The Storage account endpoint to use. Override the default value to connect to [Azure Government](https://learn.microsoft.com/en-us/azure/azure-government/documentation-government-get-started-connect-to-storage#getting-started-with-storage-api) or storage accounts with [Azure DNS zone endpoints](https://learn.microsoft.com/en-us/azure/storage/common/storage-account-overview#azure-dns-zone-endpoints-preview).

Do _not_ include the storage account name prefix in this config value.

Examples: `ABCD1234.blob.storage.azure.net`, `blob.core.usgovcloudapi.net`. | `blob.core.windows.net` | + +#### Configure authentication + +Authenticate access to Azure Blob Storage using one of the following methods: + +* [SAS token](https://learn.microsoft.com/en-us/azure/storage/common/storage-sas-overview) +* [Shared Key](https://learn.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key) +* Default Azure credentials chain ([`DefaultAzureCredential`](https://learn.microsoft.com/en-us/java/api/overview/azure/identity-readme#defaultazurecredential)). + +Configure authentication using the following properties: + +| Property | Description | Default | +|---|---|---| +| `druid.azure.sharedAccessStorageToken` | The SAS (Shared Storage Access) token. | | +| `druid.azure.key` | The Shared Key. | | +| `druid.azure.useAzureCredentialsChain` | If `true`, use `DefaultAzureCredential` for authentication. | `false` | +| `druid.azure.managedIdentityClientId` | To use managed identity authentication in the `DefaultAzureCredential`, set `useAzureCredentialsChain` to `true` and provide the client ID here. | | + +### Persist task logs in Azure + +:::info + +To persist task logs in Azure Blob Storage, set `druid.indexer.logs.type=azure`. + +::: + +Druid stores task logs using the storage account and authentication method configured for storing segments. Use the following configuration to set up where to store the task logs: + +| Property | Description | Default | +|---|---|---| +| `druid.indexer.logs.container` | The Azure Blob Store container to write logs to. | Must be set. | +| `druid.indexer.logs.prefix` | The path to prepend to logs. | Must be set. | + +For general options regarding task retention, see [Log retention policy](../../configuration/index.md#log-retention-policy). diff --git a/docs/development/extensions-core/hdfs.md b/docs/development/extensions-core/hdfs.md index 32ef6133a9d..b1d2d0ceaab 100644 --- a/docs/development/extensions-core/hdfs.md +++ b/docs/development/extensions-core/hdfs.md @@ -22,7 +22,6 @@ title: "HDFS" ~ under the License. --> - To use this Apache Druid extension, [include](../../configuration/extensions.md#loading-extensions) `druid-hdfs-storage` in the extensions load list and run druid processes with `GOOGLE_APPLICATION_CREDENTIALS=/path/to/service_account_keyfile` in the environment. ## Deep Storage @@ -44,11 +43,11 @@ If you want to eagerly authenticate against a secured hadoop/hdfs cluster you mu ### Configuration for Cloud Storage -You can also use the AWS S3 or the Google Cloud Storage as the deep storage via HDFS. +You can also use the Amazon S3 or the Google Cloud Storage as the deep storage via HDFS. -#### Configuration for AWS S3 +#### Configuration for Amazon S3 -To use the AWS S3 as the deep storage, you need to configure `druid.storage.storageDirectory` properly. +To use the Amazon S3 as the deep storage, you need to configure `druid.storage.storageDirectory` properly. |Property|Possible Values|Description|Default| |--------|---------------|-----------|-------| diff --git a/docs/development/extensions-core/s3.md b/docs/development/extensions-core/s3.md index 20bd1682f24..ab8745f6e3b 100644 --- a/docs/development/extensions-core/s3.md +++ b/docs/development/extensions-core/s3.md @@ -25,6 +25,7 @@ title: "S3-compatible" ## S3 extension This extension allows you to do 2 things: + * [Ingest data](#reading-data-from-s3) from files stored in S3. * Write segments to [deep storage](#deep-storage) in S3. @@ -41,7 +42,7 @@ To read objects from S3, you must supply [connection information](#configuration ### Deep Storage -S3-compatible deep storage means either AWS S3 or a compatible service like Google Storage which exposes the same API as S3. +S3-compatible deep storage means either Amazon S3 or a compatible service like Google Storage which exposes the same API as S3. S3 deep storage needs to be explicitly enabled by setting `druid.storage.type=s3`. **Only after setting the storage type to S3 will any of the settings below take effect.** @@ -97,19 +98,19 @@ Note that this setting only affects Druid's behavior. Changing S3 to use Object If you're using ACLs, Druid needs the following permissions: -- `s3:GetObject` -- `s3:PutObject` -- `s3:DeleteObject` -- `s3:GetBucketAcl` -- `s3:PutObjectAcl` +* `s3:GetObject` +* `s3:PutObject` +* `s3:DeleteObject` +* `s3:GetBucketAcl` +* `s3:PutObjectAcl` #### Object Ownership permissions If you're using Object Ownership, Druid needs the following permissions: -- `s3:GetObject` -- `s3:PutObject` -- `s3:DeleteObject` +* `s3:GetObject` +* `s3:PutObject` +* `s3:DeleteObject` ### AWS region @@ -117,8 +118,8 @@ The AWS SDK requires that a target region be specified. You can set these by us For example, to set the region to 'us-east-1' through system properties: -- Add `-Daws.region=us-east-1` to the `jvm.config` file for all Druid services. -- Add `-Daws.region=us-east-1` to `druid.indexer.runner.javaOpts` in [Middle Manager configuration](../../configuration/index.md#middlemanager-configuration) so that the property will be passed to Peon (worker) processes. +* Add `-Daws.region=us-east-1` to the `jvm.config` file for all Druid services. +* Add `-Daws.region=us-east-1` to `druid.indexer.runner.javaOpts` in [Middle Manager configuration](../../configuration/index.md#middlemanager-configuration) so that the property will be passed to Peon (worker) processes. ### Connecting to S3 configuration @@ -146,6 +147,6 @@ For example, to set the region to 'us-east-1' through system properties: You can enable [server-side encryption](https://docs.aws.amazon.com/AmazonS3/latest/dev/serv-side-encryption) by setting `druid.storage.sse.type` to a supported type of server-side encryption. The current supported types are: -- s3: [Server-side encryption with S3-managed encryption keys](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption) -- kms: [Server-side encryption with AWS KMS–Managed Keys](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption) -- custom: [Server-side encryption with Customer-Provided Encryption Keys](https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys) +* s3: [Server-side encryption with S3-managed encryption keys](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption) +* kms: [Server-side encryption with AWS KMS–Managed Keys](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption) +* custom: [Server-side encryption with Customer-Provided Encryption Keys](https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys) diff --git a/docs/ingestion/hadoop.md b/docs/ingestion/hadoop.md index c34fdb92117..96373e27517 100644 --- a/docs/ingestion/hadoop.md +++ b/docs/ingestion/hadoop.md @@ -148,7 +148,7 @@ For example, using the static input paths: "paths" : "hdfs://path/to/data/is/here/data.gz,hdfs://path/to/data/is/here/moredata.gz,hdfs://path/to/data/is/here/evenmoredata.gz" ``` -You can also read from cloud storage such as AWS S3 or Google Cloud Storage. +You can also read from cloud storage such as Amazon S3 or Google Cloud Storage. To do so, you need to install the necessary library under Druid's classpath in _all MiddleManager or Indexer processes_. For S3, you can run the below command to install the [Hadoop AWS module](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/). @@ -336,7 +336,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |logParseExceptions|Boolean|If true, log an error message when a parsing exception occurs, containing information about the row where the error occurred.|no(default = false)| |maxParseExceptions|Integer|The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overrides `ignoreInvalidRows` if `maxParseExceptions` is defined.|no(default = unlimited)| |useYarnRMJobStatusFallback|Boolean|If the Hadoop jobs created by the indexing task are unable to retrieve their completion status from the JobHistory server, and this parameter is true, the indexing task will try to fetch the application status from `http:///ws/v1/cluster/apps/`, where `` is the value of `yarn.resourcemanager.webapp.address` in your Hadoop configuration. This flag is intended as a fallback for cases where an indexing task's jobs succeed, but the JobHistory server is unavailable, causing the indexing task to fail because it cannot determine the job statuses.|no (default = true)| -|awaitSegmentAvailabilityTimeoutMillis|Long|Milliseconds to wait for the newly indexed segments to become available for query after ingestion completes. If `<= 0`, no wait will occur. If `> 0`, the task will wait for the Coordinator to indicate that the new segments are available for querying. If the timeout expires, the task will exit as successful, but the segments were not confirmed to have become available for query.|no (default = 0)| +|awaitSegmentAvailabilityTimeoutMillis|Long|Milliseconds to wait for the newly indexed segments to become available for query after ingestion completes. If `<= 0`, no wait will occur. If `> 0`, the task will wait for the Coordinator to indicate that the new segments are available for querying. If the timeout expires, the task will exit as successful, but the segments were not confirmed to have become available for query.|no (default = 0)| ### `jobProperties` diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md index f89693740c8..fb8e1f98c91 100644 --- a/docs/ingestion/input-sources.md +++ b/docs/ingestion/input-sources.md @@ -30,12 +30,15 @@ For general information on native batch indexing and parallel task indexing, see ## S3 input source :::info - You need to include the [`druid-s3-extensions`](../development/extensions-core/s3.md) as an extension to use the S3 input source. + +You need to include the [`druid-s3-extensions`](../development/extensions-core/s3.md) as an extension to use the S3 input source. + ::: The S3 input source reads objects directly from S3. You can specify either: -- a list of S3 URI strings -- a list of S3 location prefixes that attempts to list the contents and ingest + +* a list of S3 URI strings +* a list of S3 location prefixes that attempts to list the contents and ingest all objects contained within the locations. The S3 input source is splittable. Therefore, you can use it with the [Parallel task](./native-batch.md). Each worker task of `index_parallel` reads one or multiple objects. @@ -76,7 +79,6 @@ Sample specs: ... ``` - ```json ... "ioConfig": { @@ -210,13 +212,17 @@ Properties Object: |assumeRoleExternalId|A unique identifier that might be required when you assume a role in another account [see](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html)|None|no| :::info - **Note:** If `accessKeyId` and `secretAccessKey` are not given, the default [S3 credentials provider chain](../development/extensions-core/s3.md#s3-authentication-methods) is used. + +If `accessKeyId` and `secretAccessKey` are not given, the default [S3 credentials provider chain](../development/extensions-core/s3.md#s3-authentication-methods) is used. + ::: ## Google Cloud Storage input source :::info - You need to include the [`druid-google-extensions`](../development/extensions-core/google.md) as an extension to use the Google Cloud Storage input source. + +You need to include the [`druid-google-extensions`](../development/extensions-core/google.md) as an extension to use the Google Cloud Storage input source. + ::: The Google Cloud Storage input source is to support reading objects directly @@ -261,7 +267,6 @@ Sample specs: ... ``` - ```json ... "ioConfig": { @@ -300,16 +305,18 @@ Google Cloud Storage object: |path|The path where data is located.|None|yes| |systemFields|JSON array of system fields to return as part of input rows. Possible values: `__file_uri` (Google Cloud Storage URI starting with `gs://`), `__file_bucket` (GCS bucket), and `__file_path` (GCS key).|None|no| -## Azure input source +## Azure input source :::info - You need to include the [`druid-azure-extensions`](../development/extensions-core/azure.md) as an extension to use the Azure input source. + +You need to include the [`druid-azure-extensions`](../development/extensions-core/azure.md) as an extension to use the Azure input source. + ::: The Azure input source (that uses the type `azureStorage`) reads objects directly from Azure Blob store or Azure Data Lake sources. You can specify objects as a list of file URI strings or prefixes. You can split the Azure input source for use with [Parallel task](./native-batch.md) indexing and each worker task reads one chunk of the split data. -The `azureStorage` input source is a new schema for Azure input sources that allows you to specify which storage account files should be ingested from. We recommend that you update any specs that use the old `azure` schema to use the new `azureStorage` schema. The new schema provides more functionality than the older `azure` schema. +The `azureStorage` input source is a new schema for Azure input sources that allows you to specify which storage account files should be ingested from. We recommend that you update any specs that use the old `azure` schema to use the new `azureStorage` schema. The new schema provides more functionality than the older `azure` schema. Sample specs: @@ -347,7 +354,6 @@ Sample specs: ... ``` - ```json ... "ioConfig": { @@ -379,7 +385,7 @@ Sample specs: |objects|JSON array of Azure objects to ingest.|None|One of the following must be set:`uris`, `prefixes`, or `objects`.| |objectGlob|A glob for the object part of the Azure URI. In the URI `azureStorage://foo/bar/file.json`, the glob is applied to `bar/file.json`.

The glob must match the entire object part, not just the filename. For example, the glob `*.json` does not match `azureStorage://foo/bar/file.json` because the object part is `bar/file.json`, and the`*` does not match the slash. To match all objects ending in `.json`, use `**.json` instead.

For more information, refer to the documentation for [`FileSystem#getPathMatcher`](https://docs.oracle.com/javase/8/docs/api/java/nio/file/FileSystem.html#getPathMatcher-java.lang.String-).|None|no| |systemFields|JSON array of system fields to return as part of input rows. Possible values: `__file_uri` (Azure blob URI starting with `azureStorage://`), `__file_bucket` (Azure bucket), and `__file_path` (Azure object path).|None|no| -|properties|Properties object for overriding the default Azure configuration. See below for more information.|None|No (defaults will be used if not given) +|properties|Properties object for overriding the default Azure configuration. See below for more information.|None|No (defaults will be used if not given)| Note that the Azure input source skips all empty objects only when `prefixes` is specified. @@ -390,14 +396,12 @@ The `objects` property can one of the following: |bucket|Name of the Azure Blob Storage or Azure Data Lake storage account|None|yes| |path|The container and path where data is located.|None|yes| - The `properties` property can be one of the following: -- `sharedAccessStorageToken` -- `key` -- `appRegistrationClientId`, `appRegistrationClientSecret`, and `tenantId` -- empty - +* `sharedAccessStorageToken` +* `key` +* `appRegistrationClientId`, `appRegistrationClientSecret`, and `tenantId` +* empty |Property|Description|Default|Required| |--------|-----------|-------|---------| @@ -407,8 +411,7 @@ The `properties` property can be one of the following: |appRegistrationClientSecret|The client secret of the Azure App registration to authenticate as|None|Yes if `appRegistrationClientId` is provided| |tenantId|The tenant ID of the Azure App registration to authenticate as|None|Yes if `appRegistrationClientId` is provided| - -#### `azure` input source +### Legacy `azure` input source The Azure input source that uses the type `azure` is an older version of the Azure input type and is not recommended. It doesn't support specifying which storage account to ingest from. We recommend using the [`azureStorage` input source schema](#azure-input-source) instead since it provides more functionality. @@ -448,7 +451,6 @@ Sample specs: ... ``` - ```json ... "ioConfig": { @@ -487,11 +489,12 @@ The `objects` property is: |bucket|Name of the Azure Blob Storage or Azure Data Lake container|None|yes| |path|The path where data is located.|None|yes| - ## HDFS input source :::info - You need to include the [`druid-hdfs-storage`](../development/extensions-core/hdfs.md) as an extension to use the HDFS input source. + +You need to include the [`druid-hdfs-storage`](../development/extensions-core/hdfs.md) as an extension to use the HDFS input source. + ::: The HDFS input source is to support reading files directly @@ -580,10 +583,12 @@ in `druid.ingestion.hdfs.allowedProtocols`. See [HDFS input source security conf The HTTP input source is to support reading files directly from remote sites via HTTP. -:::info - **Security notes:** Ingestion tasks run under the operating system account that runs the Druid processes, for example the Indexer, Middle Manager, and Peon. This means any user who can submit an ingestion task can specify an input source referring to any location that the Druid process can access. For example, using `http` input source, users may have access to internal network servers. +:::info Security notes + +Ingestion tasks run under the operating system account that runs the Druid processes, for example the Indexer, Middle Manager, and Peon. This means any user who can submit an ingestion task can specify an input source referring to any location that the Druid process can access. For example, using `http` input source, users may have access to internal network servers. + +The `http` input source is not limited to the HTTP or HTTPS protocols. It uses the Java URI class that supports HTTP, HTTPS, FTP, file, and jar protocols by default. - The `http` input source is not limited to the HTTP or HTTPS protocols. It uses the Java URI class that supports HTTP, HTTPS, FTP, file, and jar protocols by default. ::: For more information about security best practices, see [Security overview](../operations/security-overview.md#best-practices). @@ -725,7 +730,7 @@ Sample spec: |filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter) for more information. Files matching the filter criteria are considered for ingestion. Files not matching the filter criteria are ignored.|yes if `baseDir` is specified| |baseDir|Directory to search recursively for files to be ingested. Empty files under the `baseDir` will be skipped.|At least one of `baseDir` or `files` should be specified| |files|File paths to ingest. Some files can be ignored to avoid ingesting duplicate files if they are located under the specified `baseDir`. Empty files will be skipped.|At least one of `baseDir` or `files` should be specified| -|systemFields|JSON array of system fields to return as part of input rows. Possible values: `__file_uri` (File URI starting with `file:`) and `__file_path` (file path).|None|no| +|systemFields|JSON array of system fields to return as part of input rows. Possible values: `__file_uri` (File URI starting with `file:`) and `__file_path` (file path).|no| ## Druid input source @@ -744,9 +749,9 @@ no `inputFormat` field needs to be specified in the ingestion spec when using th The Druid input source can be used for a variety of purposes, including: -- Creating new datasources that are rolled-up copies of existing datasources. -- Changing the [partitioning or sorting](./partitioning.md) of a datasource to improve performance. -- Updating or removing rows using a [`transformSpec`](./ingestion-spec.md#transformspec). +* Creating new datasources that are rolled-up copies of existing datasources. +* Changing the [partitioning or sorting](./partitioning.md) of a datasource to improve performance. +* Updating or removing rows using a [`transformSpec`](./ingestion-spec.md#transformspec). When using the Druid input source, the timestamp column shows up as a numeric field named `__time` set to the number of milliseconds since the epoch (January 1, 1970 00:00:00 UTC). It is common to use this in the timestampSpec, if you @@ -813,16 +818,16 @@ rolled-up datasource `wikipedia_rollup` by grouping on hour, "countryName", and ``` :::info - Note: Older versions (0.19 and earlier) did not respect the timestampSpec when using the Druid input source. If you - have ingestion specs that rely on this and cannot rewrite them, set - [`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`](../configuration/index.md#indexer-general-configuration) - to `true` to enable a compatibility mode where the timestampSpec is ignored. + +Older versions (0.19 and earlier) did not respect the timestampSpec when using the Druid input source. If you have ingestion specs that rely on this and cannot rewrite them, set [`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`](../configuration/index.md#indexer-general-configuration) to `true` to enable a compatibility mode where the timestampSpec is ignored. + ::: The [secondary partitioning method](native-batch.md#partitionsspec) determines the requisite number of concurrent worker tasks that run in parallel to complete ingestion with the Combining input source. Set this value in `maxNumConcurrentSubTasks` in `tuningConfig` based on the secondary partitioning method: -- `range` or `single_dim` partitioning: greater than or equal to 1 -- `hashed` or `dynamic` partitioning: greater than or equal to 2 + +* `range` or `single_dim` partitioning: greater than or equal to 1 +* `hashed` or `dynamic` partitioning: greater than or equal to 2 For more information on the `maxNumConcurrentSubTasks` field, see [Implementation considerations](native-batch.md#implementation-considerations). @@ -866,7 +871,7 @@ The following is an example of an SQL input source spec: The spec above will read all events from two separate SQLs for the interval `2013-01-01/2013-01-02`. Each of the SQL queries will be run in its own sub-task and thus for the above example, there would be two sub-tasks. -**Recommended practices** +### Recommended practices Compared to the other native batch input sources, SQL input source behaves differently in terms of reading the input data. Therefore, consider the following points before using this input source in a production environment: @@ -878,7 +883,6 @@ Compared to the other native batch input sources, SQL input source behaves diffe * Similar to file-based input formats, any updates to existing data will replace the data in segments specific to the intervals specified in the `granularitySpec`. - ## Combining input source The Combining input source lets you read data from multiple input sources. @@ -928,7 +932,9 @@ The following is an example of a Combining input source spec: ## Iceberg input source :::info + To use the Iceberg input source, load the extension [`druid-iceberg-extensions`](../development/extensions-contrib/iceberg.md). + ::: You use the Iceberg input source to read data stored in the Iceberg table format. For a given table, the input source scans up to the latest Iceberg snapshot from the configured Hive catalog. Druid ingests the underlying live data files using the existing input source formats. @@ -1133,13 +1139,15 @@ This input source provides the following filters: `and`, `equals`, `interval`, a ## Delta Lake input source :::info + To use the Delta Lake input source, load the extension [`druid-deltalake-extensions`](../development/extensions-contrib/delta-lake.md). + ::: You can use the Delta input source to read data stored in a Delta Lake table. For a given table, the input source scans the latest snapshot from the configured table. Druid ingests the underlying delta files from the table. - | Property|Description|Required| +| Property|Description|Required| |---------|-----------|--------| | type|Set this value to `delta`.|yes| | tablePath|The location of the Delta table.|yes| @@ -1155,7 +1163,6 @@ on statistics collected when the non-partitioned table is created. In this scena data that doesn't match the filter. To guarantee that the Delta Kernel prunes out unnecessary column values, only use filters on partitioned columns. - `and` filter: | Property | Description | Required | @@ -1217,7 +1224,6 @@ filters on partitioned columns. | column | The table column to apply the filter on. | yes | | value | The value to use in the filter. | yes | - The following is a sample spec to read all records from the Delta table `/delta-table/foo`: ```json diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index fc234cce0a2..398fea9f69a 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -28,12 +28,14 @@ sidebar_label: JSON-based batch ::: Apache Druid supports the following types of JSON-based batch indexing tasks: + - Parallel task indexing (`index_parallel`) that can run multiple indexing tasks concurrently. Parallel task works well for production ingestion tasks. - Simple task indexing (`index`) that run a single indexing task at a time. Simple task indexing is suitable for development and test environments. This topic covers the configuration for `index_parallel` ingestion specs. For related information on batch indexing, see: + - [Batch ingestion method comparison table](./index.md#batch) for a comparison of batch ingestion methods. - [Tutorial: Loading a file](../tutorials/tutorial-batch.md) for a tutorial on JSON-based batch ingestion. - [Input sources](./input-sources.md) for possible input sources. @@ -97,7 +99,6 @@ By default, JSON-based batch ingestion replaces all data in the intervals in you You can also perform concurrent append and replace tasks. For more information, see [Concurrent append and replace](./concurrent-append-replace.md) - #### Fully replacing existing segments using tombstones :::info @@ -124,12 +125,12 @@ You want to re-ingest and overwrite with new data as follows: Unless you set `dropExisting` to true, the result after ingestion with overwrite using the same `MONTH` `segmentGranularity` would be: -* **January**: 1 record -* **February**: 10 records -* **March**: 9 records +- **January**: 1 record +- **February**: 10 records +- **March**: 9 records This may not be what it is expected since the new data has 0 records for January. Set `dropExisting` to true to replace the unneeded January segment with a tombstone. - + ## Parallel indexing example The following example illustrates the configuration for a parallel indexing task. @@ -214,6 +215,7 @@ The following example illustrates the configuration for a parallel indexing task } } ``` + ## Parallel indexing configuration @@ -305,7 +307,7 @@ The segments split hint spec is used only for [`DruidInputSource`](./input-sourc ### `partitionsSpec` -The primary partition for Druid is time. You can define a secondary partitioning method in the partitions spec. Use the `partitionsSpec` type that applies for your [rollup](rollup.md) method. +The primary partition for Druid is time. You can define a secondary partitioning method in the partitions spec. Use the `partitionsSpec` type that applies for your [rollup](rollup.md) method. For perfect rollup, you can use: @@ -366,7 +368,7 @@ In the `partial segment generation` phase, just like the Map phase in MapReduce, the Parallel task splits the input data based on the split hint spec and assigns each split to a worker task. Each worker task (type `partial_index_generate`) reads the assigned split, and partitions rows by the time chunk from `segmentGranularity` (primary partition key) in the `granularitySpec` and then by the hash value of `partitionDimensions` (secondary partition key) in the `partitionsSpec`. -The partitioned data is stored in local storage of +The partitioned data is stored in local storage of the [middleManager](../design/middlemanager.md) or the [indexer](../design/indexer.md). The `partial segment merge` phase is similar to the Reduce phase in MapReduce. @@ -709,12 +711,14 @@ The returned result contains the worker task spec, a current task status if exis "taskHistory": [] } ``` + `http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}/history` Returns the task attempt history of the worker task spec of the given id, or HTTP 404 Not Found error if the supervisor task is running in the sequential mode. ## Segment pushing modes + While ingesting data using the parallel task indexing, Druid creates segments from the input data and pushes them. For segment pushing, the parallel task index supports the following segment pushing modes based upon your type of [rollup](./rollup.md): @@ -743,10 +747,12 @@ This may help the higher priority tasks to finish earlier than lower priority ta by assigning more task slots to them. ## Splittable input sources + Use the `inputSource` object to define the location where your index can read data. Only the native parallel task and simple task support the input source. For details on available input sources see: -- [S3 input source](./input-sources.md#s3-input-source) (`s3`) reads data from AWS S3 storage. + +- [S3 input source](./input-sources.md#s3-input-source) (`s3`) reads data from Amazon S3 storage. - [Google Cloud Storage input source](./input-sources.md#google-cloud-storage-input-source) (`gs`) reads data from Google Cloud Storage. - [Azure input source](./input-sources.md#azure-input-source) (`azure`) reads data from Azure Blob Storage and Azure Data Lake. - [HDFS input source](./input-sources.md#hdfs-input-source) (`hdfs`) reads data from HDFS storage. diff --git a/website/.spelling b/website/.spelling index 6bda08d608d..31b151a921d 100644 --- a/website/.spelling +++ b/website/.spelling @@ -216,6 +216,7 @@ ROUTINE_TYPE Rackspace Redis S3 +SAS SDK SIGAR SPNEGO From 35709de54969cf9ba297884323e55273217981bf Mon Sep 17 00:00:00 2001 From: Adithya Chakilam <35785271+adithyachakilam@users.noreply.github.com> Date: Thu, 20 Jun 2024 12:23:59 -0500 Subject: [PATCH 04/72] CgroupCpuSetMonitor: Initialize the cgroup discoverer (#16621) --- .../apache/druid/java/util/metrics/CgroupCpuSetMonitor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitor.java index 0534e00259f..84de0fd216d 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitor.java @@ -24,6 +24,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; import org.apache.druid.java.util.metrics.cgroups.CpuSet; +import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer; import java.util.Map; @@ -41,7 +42,7 @@ public class CgroupCpuSetMonitor extends FeedDefiningMonitor public CgroupCpuSetMonitor(final Map dimensions, String feed) { - this(null, dimensions, feed); + this(new ProcSelfCgroupDiscoverer(), dimensions, feed); } public CgroupCpuSetMonitor(final Map dimensions) From cd438b1918c1bf925e76ba97b6ce076cde831df4 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Fri, 21 Jun 2024 11:36:47 +0530 Subject: [PATCH 05/72] Emit metrics for S3UploadThreadPool (#16616) * Emit metrics for S3UploadThreadPool * Address review comments * Revert unnecessary formatting change * Revert unnecessary formatting change in metrics.md file * Address review comments * Add metric for task duration * Minor fix in metrics.md * Add s3Key and uploadId in the log message * Address review comments * Create new instance of ServiceMetricEvent.Builder for thread safety * Address review comments * Address review comments --- docs/operations/metrics.md | 13 ++++ .../s3/output/RetryableS3OutputStream.java | 22 ++++-- .../storage/s3/output/S3UploadManager.java | 75 ++++++++++++++----- .../s3/S3StorageConnectorProviderTest.java | 4 +- .../output/RetryableS3OutputStreamTest.java | 4 +- .../s3/output/S3StorageConnectorTest.java | 4 +- .../s3/output/S3UploadManagerTest.java | 9 ++- .../java/util/metrics/StubServiceEmitter.java | 4 +- 8 files changed, 104 insertions(+), 31 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index bf241ac5708..1d37169684e 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -508,6 +508,19 @@ These metrics are only available if the `OshiSysMonitor` module is included. |`sys/tcpv4/out/rsts`|Total "out reset" packets sent to reset the connection||Generally 0| |`sys/tcpv4/retrans/segs`|Total segments re-transmitted||Varies| + +## S3 multi-part upload + +These metrics are only available if the `druid-s3-extensions` module is included and if certain specific features are being used: MSQ export to S3, durable intermediate storage on S3. + +|Metric|Description|Dimensions|Normal value| +|------|-----------|----------|------------| +|`s3/upload/part/queueSize`|Number of items currently waiting in queue to be uploaded to S3. Each item in the queue corresponds to a single part in a multi-part upload.||Varies| +|`s3/upload/part/queuedTime`|Milliseconds spent by a single item (or part) in queue before it starts getting uploaded to S3.|`uploadId`, `partNumber`|Varies| +|`s3/upload/part/time`|Milliseconds taken to upload a single part of a multi-part upload to S3.|`uploadId`, `partNumber`|Varies| +|`s3/upload/total/time`|Milliseconds taken for uploading all parts of a multi-part upload to S3.|`uploadId`|Varies| +|`s3/upload/total/bytes`|Total bytes uploaded to S3 during a multi-part upload.|`uploadId`|Varies| + ## Cgroup These metrics are available on operating systems with the cgroup kernel feature. All the values are derived by reading from `/sys/fs/cgroup`. diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java index d0e5d0ee3ff..aa672444581 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; @@ -69,6 +70,11 @@ import java.util.concurrent.TimeUnit; */ public class RetryableS3OutputStream extends OutputStream { + // Metric related constants. + private static final String METRIC_PREFIX = "s3/upload/total/"; + private static final String METRIC_TOTAL_UPLOAD_TIME = METRIC_PREFIX + "time"; + private static final String METRIC_TOTAL_UPLOAD_BYTES = METRIC_PREFIX + "bytes"; + private static final Logger LOG = new Logger(RetryableS3OutputStream.class); private final S3OutputConfig config; @@ -208,14 +214,20 @@ public class RetryableS3OutputStream extends OutputStream org.apache.commons.io.FileUtils.forceDelete(chunkStorePath); LOG.info("Deleted chunkStorePath[%s]", chunkStorePath); - // This should be emitted as a metric - long totalChunkSize = (currentChunk.id - 1) * chunkSize + currentChunk.length(); + final long totalBytesUploaded = (currentChunk.id - 1) * chunkSize + currentChunk.length(); + final long totalUploadTimeMillis = pushStopwatch.elapsed(TimeUnit.MILLISECONDS); LOG.info( - "Pushed total [%d] parts containing [%d] bytes in [%d]ms.", + "Pushed total [%d] parts containing [%d] bytes in [%d]ms for s3Key[%s], uploadId[%s].", futures.size(), - totalChunkSize, - pushStopwatch.elapsed(TimeUnit.MILLISECONDS) + totalBytesUploaded, + totalUploadTimeMillis, + s3Key, + uploadId ); + + final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder().setDimension("uploadId", uploadId); + uploadManager.emitMetric(builder.setMetric(METRIC_TOTAL_UPLOAD_TIME, totalUploadTimeMillis)); + uploadManager.emitMetric(builder.setMetric(METRIC_TOTAL_UPLOAD_BYTES, totalBytesUploaded)); }); try (Closer ignored = closer) { diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java index 9caa2bcb2e3..cc9ce4bf15a 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java @@ -25,10 +25,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.inject.Inject; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; import org.apache.druid.utils.RuntimeInfo; @@ -36,6 +39,7 @@ import org.apache.druid.utils.RuntimeInfo; import java.io.File; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; /** * This class manages uploading files to S3 in chunks, while ensuring that the @@ -44,18 +48,34 @@ import java.util.concurrent.Future; @ManageLifecycle public class S3UploadManager { + // Metric related constants. + private static final String METRIC_PREFIX = "s3/upload/part/"; + private static final String METRIC_PART_QUEUED_TIME = METRIC_PREFIX + "queuedTime"; + private static final String METRIC_QUEUE_SIZE = METRIC_PREFIX + "queueSize"; + private static final String METRIC_PART_UPLOAD_TIME = METRIC_PREFIX + "time"; + private final ExecutorService uploadExecutor; + private final ServiceEmitter emitter; private static final Logger log = new Logger(S3UploadManager.class); + // For metrics regarding uploadExecutor. + private final AtomicInteger executorQueueSize = new AtomicInteger(0); + @Inject - public S3UploadManager(S3OutputConfig s3OutputConfig, S3ExportConfig s3ExportConfig, RuntimeInfo runtimeInfo) + public S3UploadManager( + S3OutputConfig s3OutputConfig, + S3ExportConfig s3ExportConfig, + RuntimeInfo runtimeInfo, + ServiceEmitter emitter + ) { int poolSize = Math.max(4, runtimeInfo.getAvailableProcessors()); int maxNumChunksOnDisk = computeMaxNumChunksOnDisk(s3OutputConfig, s3ExportConfig); this.uploadExecutor = createExecutorService(poolSize, maxNumChunksOnDisk); log.info("Initialized executor service for S3 multipart upload with pool size [%d] and work queue capacity [%d]", poolSize, maxNumChunksOnDisk); + this.emitter = emitter; } /** @@ -87,25 +107,36 @@ public class S3UploadManager S3OutputConfig config ) { - return uploadExecutor.submit(() -> RetryUtils.retry( - () -> { - log.debug("Uploading chunk[%d] for uploadId[%s].", chunkNumber, uploadId); - UploadPartResult uploadPartResult = uploadPartIfPossible( - s3Client, - uploadId, - config.getBucket(), - key, - chunkNumber, - chunkFile - ); - if (!chunkFile.delete()) { - log.warn("Failed to delete chunk [%s]", chunkFile.getAbsolutePath()); - } - return uploadPartResult; - }, - S3Utils.S3RETRY, - config.getMaxRetry() - )); + final Stopwatch stopwatch = Stopwatch.createStarted(); + executorQueueSize.incrementAndGet(); + return uploadExecutor.submit(() -> { + final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); + emitMetric(metricBuilder.setMetric(METRIC_QUEUE_SIZE, executorQueueSize.decrementAndGet())); + metricBuilder.setDimension("uploadId", uploadId).setDimension("partNumber", chunkNumber); + emitMetric(metricBuilder.setMetric(METRIC_PART_QUEUED_TIME, stopwatch.millisElapsed())); + stopwatch.restart(); + + return RetryUtils.retry( + () -> { + log.debug("Uploading chunk[%d] for uploadId[%s].", chunkNumber, uploadId); + UploadPartResult uploadPartResult = uploadPartIfPossible( + s3Client, + uploadId, + config.getBucket(), + key, + chunkNumber, + chunkFile + ); + if (!chunkFile.delete()) { + log.warn("Failed to delete chunk [%s]", chunkFile.getAbsolutePath()); + } + emitMetric(metricBuilder.setMetric(METRIC_PART_UPLOAD_TIME, stopwatch.millisElapsed())); + return uploadPartResult; + }, + S3Utils.S3RETRY, + config.getMaxRetry() + ); + }); } @VisibleForTesting @@ -149,4 +180,8 @@ public class S3UploadManager uploadExecutor.shutdown(); } + protected void emitMetric(ServiceMetricEvent.Builder builder) + { + emitter.emit(builder); + } } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java index 676352daf4f..a880d6f2efa 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java @@ -32,6 +32,7 @@ import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.StartupInjectorBuilder; import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.DruidProcessingConfigTest; import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.StorageConnectorModule; @@ -158,7 +159,8 @@ public class S3StorageConnectorProviderTest new S3UploadManager( new S3OutputConfig("bucket", "prefix", EasyMock.mock(File.class), new HumanReadableBytes("5MiB"), 1), new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, null), - new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0)) + new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0), + new StubServiceEmitter()) ) ); diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java index 8e7a81eb48d..8d15624c0d0 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java @@ -33,6 +33,7 @@ import com.amazonaws.services.s3.model.UploadPartResult; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.DruidProcessingConfigTest; import org.apache.druid.storage.s3.NoopServerSideEncryption; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; @@ -105,7 +106,8 @@ public class RetryableS3OutputStreamTest s3UploadManager = new S3UploadManager( new S3OutputConfig("bucket", "prefix", EasyMock.mock(File.class), new HumanReadableBytes("5MiB"), 1), new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, null), - new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0)); + new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0), + new StubServiceEmitter()); } @Test diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java index 67dcb3b6db6..68eaca1c42a 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java @@ -32,6 +32,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.DruidProcessingConfigTest; import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.s3.NoopServerSideEncryption; @@ -90,7 +91,8 @@ public class S3StorageConnectorTest storageConnector = new S3StorageConnector(s3OutputConfig, service, new S3UploadManager( s3OutputConfig, new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, null), - new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0))); + new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0), + new StubServiceEmitter())); } catch (IOException e) { throw new RuntimeException(e); diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java index b79c392844d..75305a3c95a 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java @@ -22,6 +22,7 @@ package org.apache.druid.storage.s3.output; import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.DruidProcessingConfigTest; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; import org.apache.druid.utils.RuntimeInfo; @@ -43,14 +44,16 @@ public class S3UploadManagerTest private S3UploadManager s3UploadManager; private S3OutputConfig s3OutputConfig; private S3ExportConfig s3ExportConfig; + private StubServiceEmitter serviceEmitter; @Before public void setUp() { s3OutputConfig = new S3OutputConfig("bucket", "prefix", EasyMock.mock(File.class), new HumanReadableBytes("100MiB"), 1); s3ExportConfig = new S3ExportConfig("tempDir", new HumanReadableBytes("200MiB"), 1, null); + serviceEmitter = new StubServiceEmitter(); final RuntimeInfo runtimeInfo = new DruidProcessingConfigTest.MockRuntimeInfo(8, 0, 0); - s3UploadManager = new S3UploadManager(s3OutputConfig, s3ExportConfig, runtimeInfo); + s3UploadManager = new S3UploadManager(s3OutputConfig, s3ExportConfig, runtimeInfo, serviceEmitter); } @Test @@ -75,6 +78,10 @@ public class S3UploadManagerTest UploadPartResult futureResult = result.get(); Assert.assertEquals(chunkId, futureResult.getPartNumber()); Assert.assertEquals("etag", futureResult.getETag()); + + serviceEmitter.verifyEmitted("s3/upload/part/queuedTime", 1); + serviceEmitter.verifyEmitted("s3/upload/part/queueSize", 1); + serviceEmitter.verifyEmitted("s3/upload/part/time", 1); } @Test diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java index 2ddba7c6cd8..e4a8b9403dd 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java @@ -26,9 +26,9 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * Test implementation of {@link ServiceEmitter} that collects emitted metrics @@ -38,7 +38,7 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie { private final List events = new ArrayList<>(); private final List alertEvents = new ArrayList<>(); - private final Map> metricEvents = new HashMap<>(); + private final ConcurrentHashMap> metricEvents = new ConcurrentHashMap<>(); public StubServiceEmitter() { From 4e0ea7823b128cc214df8dc774d370b306b91461 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Fri, 21 Jun 2024 06:01:59 -0700 Subject: [PATCH 06/72] Update docs for K8s TaskRunner Dynamic Config (#16600) * Update docs for K8s TaskRunner Dynamic Config * touchups * code review * npe * oopsies --- .../extensions-contrib/k8s-jobs.md | 413 ++++++++++++++++-- .../execution/PodTemplateSelectStrategy.java | 6 +- ...electorBasedPodTemplateSelectStrategy.java | 23 +- .../taskadapter/PodTemplateTaskAdapter.java | 5 - ...KubernetesTaskRunnerDynamicConfigTest.java | 3 +- ...torBasedPodTemplateSelectStrategyTest.java | 20 +- .../PodTemplateTaskAdapterTest.java | 2 +- 7 files changed, 386 insertions(+), 86 deletions(-) diff --git a/docs/development/extensions-contrib/k8s-jobs.md b/docs/development/extensions-contrib/k8s-jobs.md index 1d75e2b7efe..082b8735349 100644 --- a/docs/development/extensions-contrib/k8s-jobs.md +++ b/docs/development/extensions-contrib/k8s-jobs.md @@ -40,7 +40,280 @@ The extension uses `druid.indexer.runner.capacity` to limit the number of k8s jo Other configurations required are: `druid.indexer.runner.type: k8s` and `druid.indexer.task.encapsulatedTask: true` -## Pod Adapters +### Dynamic config + +Druid operators can dynamically tune certain features within this extension. You don't need to restart the Overlord +service for these changes to take effect. + +Druid can dynamically tune [pod template selection](#pod-template-selection), which allows you to configure the pod +template based on the task to be run. To enable dynamic pod template selection, first configure the +[custom template pod adapter](#custom-template-pod-adapter). + +Use the following APIs to view and update the dynamic configuration for the Kubernetes task runner. + +To use these APIs, ensure you have read and write permissions for the CONFIG resource type with the resource name +"CONFIG". For more information on permissions, see +[User authentication and authorization](../../operations/security-user-auth.md#config). + +#### Get dynamic configuration + +Retrieves the current dynamic execution config for the Kubernetes task runner. +Returns a JSON object with the dynamic configuration properties. + +##### URL + +`GET` `/druid/indexer/v1/k8s/taskRunner/executionConfig` + +##### Responses + + + + + + +*Successfully retrieved dynamic configuration* + + + + +--- + +##### Sample request + + + + + +```shell +curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/k8s/taskRunner/executionConfig" +``` + + + + +```HTTP +GET /druid/indexer/v1/k8s/taskRunner/executionConfig HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +##### Sample response + +
+View the response + +```json +{ + "type": "default", + "podTemplateSelectStrategy": + { + "type": "selectorBased", + "selectors": [ + { + "selectionKey": "podSpec1", + "context.tags": { + "userProvidedTag": ["tag1", "tag2"] + }, + "dataSource": ["wikipedia"] + }, + { + "selectionKey": "podSpec2", + "type": ["index_kafka"] + } + ] + } +} +``` +
+ +#### Update dynamic configuration + +Updates the dynamic configuration for the Kubernetes Task Runner + +##### URL + +`POST` `/druid/indexer/v1/k8s/taskRunner/executionConfig` + +##### Header parameters + +The endpoint supports the following optional header parameters to populate the `author` and `comment` fields in the configuration history. + +* `X-Druid-Author` + * Type: String + * Author of the configuration change. +* `X-Druid-Comment` + * Type: String + * Description for the update. + +##### Responses + + + + + + +*Successfully updated dynamic configuration* + + + + +--- + +##### Sample request + + + + + + +```shell +curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/k8s/taskRunner/executionConfig" \ +--header 'Content-Type: application/json' \ +--data '{ + "type": "default", + "podTemplateSelectStrategy": + { + "type": "selectorBased", + "selectors": [ + { + "selectionKey": "podSpec1", + "context.tags": + { + "userProvidedTag": ["tag1", "tag2"] + }, + "dataSource": ["wikipedia"] + }, + { + "selectionKey": "podSpec2", + "type": ["index_kafka"] + } + ] + } +}' +``` + + + + + +```HTTP +POST /druid/indexer/v1/k8s/taskRunner/executionConfig HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +Content-Type: application/json + +{ + "type": "default", + "podTemplateSelectStrategy": + { + "type": "selectorBased", + "selectors": [ + { + "selectionKey": "podSpec1", + "context.tags": + { + "userProvidedTag": ["tag1", "tag2"] + }, + "dataSource": ["wikipedia"] + }, + { + "selectionKey": "podSpec2", + "type": ["index_kafka"] + } + ] + } +} +``` + + + + +##### Sample response + +A successful request returns an HTTP `200 OK` message code and an empty response body. + +#### Get dynamic configuration history + +Retrieves the history of changes to Kubernetes task runner's dynamic execution config over an interval of time. Returns +an empty array if there are no history records available. + +##### URL + +`GET` `/druid/indexer/v1/k8s/taskRunner/executionConfig/history` + +##### Query parameters + +The endpoint supports the following optional query parameters to filter results. + +* `interval` + * Type: String + * Limit the results to the specified time interval in ISO 8601 format delimited with `/`. For example, `2023-07-13/2023-07-19`. The default interval is one week. You can change this period by setting `druid.audit.manager.auditHistoryMillis` in the `runtime.properties` file for the Coordinator. + +* `count` + * Type: Integer + * Limit the number of results to the last `n` entries. + +##### Responses + + + + + + +*Successfully retrieved dynamic configuration* + + + + +--- + +##### Sample request + + + + + + +```shell +curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/k8s/taskRunner/executionConfig/history" +``` + + + + + +```HTTP +GET /druid/indexer/v1/k8s/taskRunner/executionConfig/history HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +##### Sample response + +
+View the response + +```json +[ + { + "key": "k8s.taskrunner.config", + "type": "k8s.taskrunner.config", + "auditInfo": { + "author": "", + "comment": "", + "ip": "127.0.0.1" + }, + "payload": "{\"type\": \"default\",\"podTemplateSelectStrategy\":{\"type\": \"taskType\"}", + "auditTime": "2024-06-13T20:59:51.622Z" + } +] +``` +
+ +## Pod adapters The logic defining how the pod template is built for your Kubernetes Job depends on which pod adapter you have specified. ### Overlord Single Container Pod Adapter/Overlord Multi Container Pod Adapter @@ -65,7 +338,7 @@ and in your sidecar specs: That will not work, because we cannot decipher what your command is, the extension needs to know it explicitly. **Even for sidecars like Istio which are dynamically created by the service mesh, this needs to happen.* -Instead do the following: +Instead, do the following: You can keep your Dockerfile the same but you must have a sidecar spec like so: ``` container: name: foo @@ -90,13 +363,10 @@ The custom template pod adapter allows you to specify a pod template file per ta The base pod template must be specified as the runtime property `druid.indexer.runner.k8s.podTemplate.base: /path/to/basePodSpec.yaml` -Task specific pod templates can be specified as the runtime property `druid.indexer.runner.k8s.podTemplate.{taskType}: /path/to/taskSpecificPodSpec.yaml` where {taskType} is the name of the task type i.e `index_parallel`. +
+Example Pod Template that uses the regular druid docker image -If you are trying to use the default image's environment variable parsing feature to set runtime properties, you need to add a extra escape underscore when specifying pod templates. -e.g. set the environment variable `druid_indexer_runner_k8s_podTemplate_index__parallel` when setting `druid.indxer.runner.k8s.podTemplate.index_parallel` - -The following is an example Pod Template that uses the regular druid docker image. -``` +```yaml apiVersion: "v1" kind: "PodTemplate" template: @@ -164,6 +434,7 @@ template: - emptyDir: {} name: deepstorage-volume ``` +
The below runtime properties need to be passed to the Job's peon process. @@ -177,6 +448,10 @@ druid.indexer.task.encapsulatedTask=true ``` Any runtime property or JVM config used by the peon process can also be passed. E.G. below is a example of a ConfigMap that can be used to generate the `nodetype-config-volume` mount in the above template. + +
+Example ConfigMap + ``` kind: ConfigMap metadata: @@ -217,59 +492,112 @@ data: druid.peon.mode=remote druid.indexer.task.encapsulatedTask=true ``` -#### Dynamic Pod Template Selection Config -The Dynamic Pod Template Selection feature enhances the K8s extension by enabling more flexible and dynamic selection of pod templates based on task properties. This process is governed by the `PodTemplateSelectStrategy`. Below are the two strategies implemented: +
-|Property|Description|Default| -|--------|-----------|-------| -|`TaskTypePodTemplateSelectStrategy`| This strategy selects pod templates based on task type for execution purposes, implementing the behavior that maps templates to specific task types. | true | -|`SelectorBasedPodTemplateSelectStrategy`| This strategy evaluates a series of selectors, known as `selectors`, which are aligned with potential task properties. | false | +#### Pod template selection + +The pod template adapter can select which pod template should be used for a task using the [task runner execution config](#dynamic-config) -`SelectorBasedPodTemplateSelectStrategy`, the strategy implementing this new feature, is based on conditional `selectors` that match against top-level keys from the task payload. Currently, it supports matching based on task context tags, task type, and dataSource. These selectors are ordered in the dynamic configuration, with the first selector given the highest priority during the evaluation process. This means that the selection process uses these ordered conditions to determine a task’s Pod template. The first matching condition immediately determines the Pod template, thereby prioritizing certain configurations over others. If no selector matches, it will fall back to an optional `defaultKey` if configured; if there is still no match, it will use the `base` template. +##### Select based on task type -Example Configuration: +The `TaskTypePodTemplateSelectStrategy` strategy selects pod templates based on task type for execution purposes, +implementing the behavior that maps templates to specific task types. This is the default pod template selection +strategy. To explicitly select this strategy, set the `podTemplateSelectStrategy` in the dynamic execution config to -We define two template keys in the configuration—`low-throughput` and `medium-throughput`—each associated with specific task conditions and arranged in a priority order. - -- Low Throughput Template: This is the first template evaluated and has the highest priority. Tasks that have a context tag `billingCategory=streaming_ingestion` and a datasource of `wikipedia` will be classified under the `low-throughput` template. This classification directs such tasks to utilize a predefined pod template optimized for low throughput requirements. - -- Medium Throughput Template: If a task does not meet the low-throughput criteria, the system will then evaluate it against the next selector in order. In this example, if the task type is index_kafka, it will fall into the `medium-throughput` template. +```json +{ "type": "default" } ``` + +Task specific pod templates can be specified as the runtime property +`druid.indexer.runner.k8s.podTemplate.{taskType}: /path/to/taskSpecificPodSpec.yaml` where {taskType} is the name of the +task type. For example, `index_parallel`. + +If you are trying to use the default image's environment variable parsing feature to set runtime properties, you need to add a extra escape underscore when specifying pod templates. +For example, set the environment variable `druid_indexer_runner_k8s_podTemplate_index__kafka` when you set the runtime property `druid.indexer.runner.k8s.podTemplate.index_kafka` + + +The following example shows a configuration for task-based pod template selection: + +```properties +druid.indexer.runner.k8s.podTemplate.base=/path/to/basePodSpec.yaml +druid.indexer.runner.k8s.podTemplate.index_kafka=/path/to/kafkaPodSpec.yaml +``` + +##### Select based on one or more conditions + +The `SelectorBasedPodTemplateSelectStrategy` strategy evaluates a series of criteria within `selectors` to determine +which pod template to use to run the task. Pod templates are configured in the runtime properties like +`druid.indexer.runner.k8s.podTemplate.=...`. + +```json +{ + "type": "selectorBased", + "selectors": [ + { + "selectionKey": "podSpec1", + "context.tags": + { + "userProvidedTag": ["tag1", "tag2"] + }, + "dataSource": ["wikipedia"] + }, + { + "selectionKey": "podSpec2", + "type": ["index_kafka"] + } + ] +} +``` + +Selectors are processed in order. Druid selects the template based on the first matching selector. If a task does not +match any selector in the list, it will use the `base` pod template. + +For a task to match a selector, all the conditions within the selector must match. A selector can match on +- `type`: Type of the task +- `dataSource`: Destination datasource of the task. +- `context.tags`: Tags passed in the task's context. + +##### Example + +Set the following runtime properties to define the pod specs that can be used by Druid. + +```properties +druid.indexer.runner.k8s.podTemplate.base=/path/to/basePodSpec.yaml +druid.indexer.runner.k8s.podTemplate.podSpec1=/path/to/podSpecWithHighMemRequests.yaml +druid.indexer.runner.k8s.podTemplate.podSpec2=/path/to/podSpecWithLowCpuRequests.yaml +``` + +Set the dynamic execution config to define the pod template selection strategy. + +```json { "type": "default", - "podTemplateSelectStrategy": - { + "podTemplateSelectStrategy": { "type": "selectorBased", "selectors": [ { - "selectionKey": "low-throughput", - "context.tags": - { - "billingCategory": ["streaming_ingestion"] - }, + "selectionKey": "podSpec1", + "context.tags": { "userProvidedTag": ["tag1", "tag2"] }, "dataSource": ["wikipedia"] }, { - "selectionKey": "medium-throughput", + "selectionKey": "podSpec2", "type": ["index_kafka"] } - ], - "defaultKey"" "base" + ] } } ``` -Task specific pod templates can be specified as the runtime property `druid.indexer.runner.k8s.podTemplate.{template}: /path/to/taskSpecificPodSpec.yaml` where {template} is the matched `selectionKey` of the `podTemplateSelectStrategy` i.e low-throughput. -Similar to Overlord dynamic configuration, the following API endpoints are defined to retrieve and manage dynamic configurations of Pod Template Selection config: +Druid selects the pod templates as follows: +1. Use `podSpecWithHighMemRequests.yaml` when both of the following conditions are met: + 1. The task context contains a tag with the key `userProvidedTag` that has the value `tag1` or `tag2`. + 2. The task targets the `wikipedia` datasource. +2. Use `podSpecWithLowCpuRequests.yaml` when the task type is `index_kafka`. +3. Use the `basePodSpec.yaml` for all other tasks. -- Get dynamic configuration: -`POST` `/druid/indexer/v1/k8s/taskRunner/executionConfig` - -- Update dynamic configuration: -`GET` `/druid/indexer/v1/k8s/taskRunner/executionConfig` - -- Get dynamic configuration history: -`GET` `/druid/indexer/v1/k8s/taskRunner/executionConfig/history` +In this example, if there is an `index_kafka` task for the `wikipedia` datasource with the tag `userProvidedTag: tag1`, +Druid selects the pod template `podSpecWithHighMemRequests.yaml`. ### Properties |Property| Possible Values | Description |Default|required| @@ -302,7 +630,8 @@ Similar to Overlord dynamic configuration, the following API endpoints are defin - All Druid Pods belonging to one Druid cluster must be inside the same Kubernetes namespace. - You must have a role binding for the overlord's service account that provides the needed permissions for interacting with Kubernetes. An example spec could be: -``` + +```yaml kind: Role apiVersion: rbac.authorization.k8s.io/v1 metadata: diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/PodTemplateSelectStrategy.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/PodTemplateSelectStrategy.java index 1b8d57419d2..ae7869707fc 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/PodTemplateSelectStrategy.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/PodTemplateSelectStrategy.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.fabric8.kubernetes.api.model.PodTemplate; import org.apache.druid.indexing.common.task.Task; +import javax.validation.constraints.NotNull; import java.util.Map; /** @@ -41,8 +42,7 @@ public interface PodTemplateSelectStrategy * allows for customized resource allocation and management tailored to the task's specific requirements. * * @param task The task for which the Pod template is determined. - * @return The selected Pod template. If no matching template is found, - * the method falls back to a base template. + * @return The pod template that should be used to run the task. */ - PodTemplate getPodTemplateForTask(Task task, Map templates); + @NotNull PodTemplate getPodTemplateForTask(Task task, Map templates); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/SelectorBasedPodTemplateSelectStrategy.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/SelectorBasedPodTemplateSelectStrategy.java index 938ed04e6a6..4c2d01b5218 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/SelectorBasedPodTemplateSelectStrategy.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/SelectorBasedPodTemplateSelectStrategy.java @@ -25,7 +25,6 @@ import com.google.common.base.Preconditions; import io.fabric8.kubernetes.api.model.PodTemplate; import org.apache.druid.indexing.common.task.Task; -import javax.annotation.Nullable; import java.util.List; import java.util.Map; import java.util.Objects; @@ -36,19 +35,15 @@ import java.util.Objects; */ public class SelectorBasedPodTemplateSelectStrategy implements PodTemplateSelectStrategy { - @Nullable - private String defaultKey; - private List selectors; + private final List selectors; @JsonCreator public SelectorBasedPodTemplateSelectStrategy( - @JsonProperty("selectors") List selectors, - @JsonProperty("defaultKey") @Nullable String defaultKey + @JsonProperty("selectors") List selectors ) { Preconditions.checkNotNull(selectors, "selectors"); this.selectors = selectors; - this.defaultKey = defaultKey; } /** @@ -64,7 +59,7 @@ public class SelectorBasedPodTemplateSelectStrategy implements PodTemplateSelect .filter(selector -> selector.evaluate(task)) .findFirst() .map(Selector::getSelectionKey) - .orElse(defaultKey); + .orElse("base"); return templates.getOrDefault(templateKey, templates.get("base")); } @@ -75,13 +70,6 @@ public class SelectorBasedPodTemplateSelectStrategy implements PodTemplateSelect return selectors; } - @Nullable - @JsonProperty - public String getDefaultKey() - { - return defaultKey; - } - @Override public boolean equals(Object o) { @@ -92,13 +80,13 @@ public class SelectorBasedPodTemplateSelectStrategy implements PodTemplateSelect return false; } SelectorBasedPodTemplateSelectStrategy that = (SelectorBasedPodTemplateSelectStrategy) o; - return Objects.equals(defaultKey, that.defaultKey) && Objects.equals(selectors, that.selectors); + return Objects.equals(selectors, that.selectors); } @Override public int hashCode() { - return Objects.hash(defaultKey, selectors); + return Objects.hash(selectors); } @Override @@ -106,7 +94,6 @@ public class SelectorBasedPodTemplateSelectStrategy implements PodTemplateSelect { return "SelectorBasedPodTemplateSelectStrategy{" + "selectors=" + selectors + - ", defaultKey=" + defaultKey + '}'; } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java index 8e3788e31e1..19cc788b3ee 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java @@ -39,7 +39,6 @@ import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig; @@ -141,10 +140,6 @@ public class PodTemplateTaskAdapter implements TaskAdapter PodTemplate podTemplate = podTemplateSelectStrategy.getPodTemplateForTask(task, templates); - if (podTemplate == null) { - throw new ISE("Pod template spec not found for task type [%s]", task.getType()); - } - return new JobBuilder() .withNewMetadata() .withName(new K8sTaskId(task).getK8sJobName()) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java index 6236794d366..77a819dde9c 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java @@ -64,8 +64,7 @@ public class KubernetesTaskRunnerDynamicConfigTest + " \"selectionKey\": \"medium-throughput\",\n" + " \"type\": [\"index_kafka\"]\n" + " }\n" - + " ],\n" - + " \"defaultKey\": \"base\"\n" + + " ]\n" + " }\n" + "}"; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/SelectorBasedPodTemplateSelectStrategyTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/SelectorBasedPodTemplateSelectStrategyTest.java index 9aa1376a515..a82bb076855 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/SelectorBasedPodTemplateSelectStrategyTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/SelectorBasedPodTemplateSelectStrategyTest.java @@ -88,14 +88,14 @@ public class SelectorBasedPodTemplateSelectStrategyTest @Test(expected = NullPointerException.class) public void shouldThrowNullPointerExceptionWhenSelectorsAreNull() { - new SelectorBasedPodTemplateSelectStrategy(null, null); + new SelectorBasedPodTemplateSelectStrategy(null); } @Test public void testGetPodTemplate_ForTask_emptySelectorsFallbackToBaseTemplate() { List emptySelectors = Collections.emptyList(); - SelectorBasedPodTemplateSelectStrategy strategy = new SelectorBasedPodTemplateSelectStrategy(emptySelectors, null); + SelectorBasedPodTemplateSelectStrategy strategy = new SelectorBasedPodTemplateSelectStrategy(emptySelectors); Task task = NoopTask.create(); Assert.assertEquals("base", strategy.getPodTemplateForTask(task, templates).getMetadata().getName()); } @@ -105,21 +105,11 @@ public class SelectorBasedPodTemplateSelectStrategyTest { Selector noMatchSelector = new MockSelector(false, "mock"); List selectors = Collections.singletonList(noMatchSelector); - SelectorBasedPodTemplateSelectStrategy strategy = new SelectorBasedPodTemplateSelectStrategy(selectors, null); + SelectorBasedPodTemplateSelectStrategy strategy = new SelectorBasedPodTemplateSelectStrategy(selectors); Task task = NoopTask.create(); Assert.assertEquals("base", strategy.getPodTemplateForTask(task, templates).getMetadata().getName()); } - @Test - public void testGetPodTemplate_ForTask_noMatchSelectorsFallbackToDefaultKeyTemplate() - { - Selector noMatchSelector = new MockSelector(false, "mock"); - List selectors = Collections.singletonList(noMatchSelector); - SelectorBasedPodTemplateSelectStrategy strategy = new SelectorBasedPodTemplateSelectStrategy(selectors, "match"); - Task task = NoopTask.create(); - Assert.assertEquals("match", strategy.getPodTemplateForTask(task, templates).getMetadata().getName()); - } - @Test public void testGetPodTemplate_ForTask_withMatchSelectors() { @@ -132,7 +122,7 @@ public class SelectorBasedPodTemplateSelectStrategyTest noMatchSelector, matchSelector ); - SelectorBasedPodTemplateSelectStrategy strategy = new SelectorBasedPodTemplateSelectStrategy(selectors, null); + SelectorBasedPodTemplateSelectStrategy strategy = new SelectorBasedPodTemplateSelectStrategy(selectors); Task task = NoopTask.create(); Assert.assertEquals("match", strategy.getPodTemplateForTask(task, templates).getMetadata().getName()); } @@ -152,7 +142,7 @@ public class SelectorBasedPodTemplateSelectStrategyTest ); SelectorBasedPodTemplateSelectStrategy strategy = new SelectorBasedPodTemplateSelectStrategy( - Collections.singletonList(selector), "default"); + Collections.singletonList(selector)); SelectorBasedPodTemplateSelectStrategy strategy2 = objectMapper.readValue( objectMapper.writeValueAsBytes(strategy), diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java index 36fc77631a1..4aad419007e 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java @@ -620,7 +620,7 @@ public class PodTemplateTaskAdapterTest dynamicConfigRef = () -> new DefaultKubernetesTaskRunnerDynamicConfig(new SelectorBasedPodTemplateSelectStrategy( Collections.singletonList( new Selector("lowThrougput", null, null, Sets.newSet(dataSource) - )), null)); + )))); PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( taskRunnerConfig, From 4eced9b3c99ea818ecef15631662de6f274e70cc Mon Sep 17 00:00:00 2001 From: Rishabh Singh <6513075+findingrish@users.noreply.github.com> Date: Sat, 22 Jun 2024 04:10:12 +0530 Subject: [PATCH 07/72] Fix CentralizedDatasourceSchema group IT failure (#16636) * Fix build * Update datasource name in ITSystemTableBatchIndexTaskTest --- .../druid/tests/indexer/ITSystemTableBatchIndexTaskTest.java | 2 +- .../resources/indexer/sys_segment_batch_index_queries.json | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITSystemTableBatchIndexTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITSystemTableBatchIndexTaskTest.java index f15081509a0..f98626ad9b5 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITSystemTableBatchIndexTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITSystemTableBatchIndexTaskTest.java @@ -37,7 +37,7 @@ public class ITSystemTableBatchIndexTaskTest extends AbstractITBatchIndexTest private static final Logger LOG = new Logger(ITSystemTableBatchIndexTaskTest.class); private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json"; private static final String SYSTEM_QUERIES_RESOURCE = "/indexer/sys_segment_batch_index_queries.json"; - private static final String INDEX_DATASOURCE = "wikipedia_index_test"; + private static final String INDEX_DATASOURCE = "sys_segment_wikipedia_index_test"; @Test public void testIndexData() throws Exception diff --git a/integration-tests/src/test/resources/indexer/sys_segment_batch_index_queries.json b/integration-tests/src/test/resources/indexer/sys_segment_batch_index_queries.json index 931ad895778..b0746e0cab4 100644 --- a/integration-tests/src/test/resources/indexer/sys_segment_batch_index_queries.json +++ b/integration-tests/src/test/resources/indexer/sys_segment_batch_index_queries.json @@ -1,7 +1,7 @@ [ { "query": { - "query": "SELECT count(*) FROM sys.segments WHERE datasource LIKE 'wikipedia_index_test%'" + "query": "SELECT count(*) FROM sys.segments WHERE datasource LIKE 'sys_segment_wikipedia_index_test%'" }, "expectedResults": [ { @@ -24,7 +24,7 @@ }, { "query": { - "query": "SELECT status AS status FROM sys.tasks WHERE datasource LIKE 'wikipedia_index_test%' GROUP BY 1" + "query": "SELECT status AS status FROM sys.tasks WHERE datasource LIKE 'sys_segment_wikipedia_index_test%' GROUP BY 1" }, "expectedResults": [ { From 51c73b5a4e3e5743986cf3f24401e8cd14b8169a Mon Sep 17 00:00:00 2001 From: Vadim Ogievetsky Date: Fri, 21 Jun 2024 18:33:15 -0700 Subject: [PATCH 08/72] Web console: show formatted JSON value (#16632) * show formatted json value * update snapshot * window functions * count star can also have a window * better edit query context --- licenses.yaml | 2 +- web-console/lib/keywords.js | 3 + web-console/package-lock.json | 14 +-- web-console/package.json | 2 +- .../record-table-pane/record-table-pane.tsx | 4 +- .../edit-context-dialog.spec.tsx.snap | 102 ++++++++++++++++-- .../edit-context-dialog.scss | 21 +--- .../edit-context-dialog.spec.tsx | 6 +- .../edit-context-dialog.tsx | 102 ++++++++---------- .../show-value-dialog.spec.tsx.snap | 90 +++++++++------- .../show-value-dialog/show-value-dialog.scss | 23 ++-- .../show-value-dialog/show-value-dialog.tsx | 62 +++++++++-- .../string-menu-items/string-menu-items.tsx | 2 +- .../result-table-pane/result-table-pane.tsx | 4 +- .../workbench-view/run-panel/run-panel.tsx | 2 +- 15 files changed, 282 insertions(+), 157 deletions(-) diff --git a/licenses.yaml b/licenses.yaml index 3c1f7b0b2d4..700dc891553 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -5094,7 +5094,7 @@ license_category: binary module: web-console license_name: Apache License version 2.0 copyright: Imply Data -version: 0.22.15 +version: 0.22.20 --- diff --git a/web-console/lib/keywords.js b/web-console/lib/keywords.js index bf7b9a03910..dbf10e12bac 100644 --- a/web-console/lib/keywords.js +++ b/web-console/lib/keywords.js @@ -47,6 +47,7 @@ exports.SQL_KEYWORDS = [ 'FULL', 'CROSS', 'USING', + 'NATURAL', 'FETCH', 'FIRST', 'NEXT', @@ -67,6 +68,8 @@ exports.SQL_KEYWORDS = [ 'RANGE', 'PRECEDING', 'FOLLOWING', + 'CURRENT', + 'UNBOUNDED', 'EXTEND', 'PIVOT', 'UNPIVOT', diff --git a/web-console/package-lock.json b/web-console/package-lock.json index 26999644f1e..2945955d9d6 100644 --- a/web-console/package-lock.json +++ b/web-console/package-lock.json @@ -15,7 +15,7 @@ "@blueprintjs/icons": "^4.16.0", "@blueprintjs/popover2": "^1.14.9", "@blueprintjs/select": "^4.9.24", - "@druid-toolkit/query": "^0.22.15", + "@druid-toolkit/query": "^0.22.20", "@druid-toolkit/visuals-core": "^0.3.3", "@druid-toolkit/visuals-react": "^0.3.3", "ace-builds": "~1.4.14", @@ -1005,9 +1005,9 @@ } }, "node_modules/@druid-toolkit/query": { - "version": "0.22.15", - "resolved": "https://registry.npmjs.org/@druid-toolkit/query/-/query-0.22.15.tgz", - "integrity": "sha512-LyQVIVkVNhduscf2wnBO/oGBvj353tS5ElIws20xQzApvEIwNNxmlkA+8npqwy77BkJj3nRQvlenbSEDHQdqow==", + "version": "0.22.20", + "resolved": "https://registry.npmjs.org/@druid-toolkit/query/-/query-0.22.20.tgz", + "integrity": "sha512-GmmSd27y7zLVTjgTBQy+XoGeSSGhSDNmwyiwWtSua7I5LX8XqHV7Chi8HIH25YQoVgTK1pLK4RS8eRXxthRAzg==", "dependencies": { "tslib": "^2.5.2" } @@ -19147,9 +19147,9 @@ "dev": true }, "@druid-toolkit/query": { - "version": "0.22.15", - "resolved": "https://registry.npmjs.org/@druid-toolkit/query/-/query-0.22.15.tgz", - "integrity": "sha512-LyQVIVkVNhduscf2wnBO/oGBvj353tS5ElIws20xQzApvEIwNNxmlkA+8npqwy77BkJj3nRQvlenbSEDHQdqow==", + "version": "0.22.20", + "resolved": "https://registry.npmjs.org/@druid-toolkit/query/-/query-0.22.20.tgz", + "integrity": "sha512-GmmSd27y7zLVTjgTBQy+XoGeSSGhSDNmwyiwWtSua7I5LX8XqHV7Chi8HIH25YQoVgTK1pLK4RS8eRXxthRAzg==", "requires": { "tslib": "^2.5.2" } diff --git a/web-console/package.json b/web-console/package.json index 44e6986c2bd..6e38ba24684 100644 --- a/web-console/package.json +++ b/web-console/package.json @@ -69,7 +69,7 @@ "@blueprintjs/icons": "^4.16.0", "@blueprintjs/popover2": "^1.14.9", "@blueprintjs/select": "^4.9.24", - "@druid-toolkit/query": "^0.22.15", + "@druid-toolkit/query": "^0.22.20", "@druid-toolkit/visuals-core": "^0.3.3", "@druid-toolkit/visuals-react": "^0.3.3", "ace-builds": "~1.4.14", diff --git a/web-console/src/components/record-table-pane/record-table-pane.tsx b/web-console/src/components/record-table-pane/record-table-pane.tsx index 29433b25ddc..6007559fb0d 100644 --- a/web-console/src/components/record-table-pane/record-table-pane.tsx +++ b/web-console/src/components/record-table-pane/record-table-pane.tsx @@ -174,7 +174,9 @@ export const RecordTablePane = React.memo(function RecordTablePane(props: Record })} /> )} - {showValue && setShowValue(undefined)} str={showValue} />} + {showValue && ( + setShowValue(undefined)} str={showValue} size="large" /> + )} ); }); diff --git a/web-console/src/dialogs/edit-context-dialog/__snapshots__/edit-context-dialog.spec.tsx.snap b/web-console/src/dialogs/edit-context-dialog/__snapshots__/edit-context-dialog.spec.tsx.snap index b9e2d072ea9..7a2ed054f3a 100644 --- a/web-console/src/dialogs/edit-context-dialog/__snapshots__/edit-context-dialog.spec.tsx.snap +++ b/web-console/src/dialogs/edit-context-dialog/__snapshots__/edit-context-dialog.spec.tsx.snap @@ -57,18 +57,102 @@ exports[`EditContextDialog matches snapshot 1`] = ` - diff --git a/web-console/src/dialogs/show-value-dialog/show-value-dialog.scss b/web-console/src/dialogs/show-value-dialog/show-value-dialog.scss index a1da01e5931..f561f1bf99f 100644 --- a/web-console/src/dialogs/show-value-dialog/show-value-dialog.scss +++ b/web-console/src/dialogs/show-value-dialog/show-value-dialog.scss @@ -19,10 +19,6 @@ @import '../../variables'; .show-value-dialog { - &.#{$bp-ns}-dialog { - padding-bottom: 10px; - } - &.normal.#{$bp-ns}-dialog { height: 600px; } @@ -32,12 +28,21 @@ height: 90vh; } - .#{$bp-ns}-input { - margin: 10px; - flex: 1; + .#{$bp-ns}-dialog-body { + display: flex; + flex-direction: column; + + .ace-editor { + flex: 1; + } + + .#{$bp-ns}-input { + flex: 1; + resize: none; + } } - .#{$bp-ns}-dialog-footer-actions { - padding-right: 10px; + .#{$bp-ns}-dialog-footer { + margin-top: 0; } } diff --git a/web-console/src/dialogs/show-value-dialog/show-value-dialog.tsx b/web-console/src/dialogs/show-value-dialog/show-value-dialog.tsx index 8e1b0290865..4369a43bb76 100644 --- a/web-console/src/dialogs/show-value-dialog/show-value-dialog.tsx +++ b/web-console/src/dialogs/show-value-dialog/show-value-dialog.tsx @@ -16,11 +16,21 @@ * limitations under the License. */ -import { Button, Classes, Dialog, Intent, TextArea } from '@blueprintjs/core'; +import { + Button, + ButtonGroup, + Classes, + Dialog, + FormGroup, + Intent, + TextArea, +} from '@blueprintjs/core'; import { IconNames } from '@blueprintjs/icons'; import classNames from 'classnames'; import copy from 'copy-to-clipboard'; -import React from 'react'; +import * as JSONBig from 'json-bigint-native'; +import React, { useMemo, useState } from 'react'; +import AceEditor from 'react-ace'; import { AppToaster } from '../../singletons'; @@ -35,6 +45,15 @@ export interface ShowValueDialogProps { export const ShowValueDialog = React.memo(function ShowValueDialog(props: ShowValueDialogProps) { const { title, onClose, str, size } = props; + const [tab, setTab] = useState<'formatted' | 'raw'>('formatted'); + + const parsed = useMemo(() => { + try { + return JSONBig.parse(str); + } catch {} + }, [str]); + + const hasParsed = typeof parsed !== 'undefined'; function handleCopy() { copy(str, { format: 'text/plain' }); @@ -51,10 +70,41 @@ export const ShowValueDialog = React.memo(function ShowValueDialog(props: ShowVa onClose={onClose} title={title || 'Full value'} > -