Problem
Currently, the delta input source only supports reading from the latest snapshot of the given Delta Lake table. This is a known documented limitation.
Description
Add support for reading Delta snapshot. By default, the Druid-Delta connector reads the latest snapshot of the Delta table in order to preserve compatibility. Users can specify a snapshotVersion to ingest change data events from Delta tables into Druid.
In the future, we can also add support for time-based snapshot reads. The Delta API to read time-based snapshots is not clear currently.
changes:
* Adds new `CompressedComplexColumn`, `CompressedComplexColumnSerializer`, `CompressedComplexColumnSupplier` based on `CompressedVariableSizedBlobColumn` used by JSON columns
* Adds `IndexSpec.complexMetricCompression` which can be used to specify compression for the generic compressed complex column. Defaults to uncompressed because compressed columns are not backwards compatible.
* Adds new definition of `ComplexMetricSerde.getSerializer` which accepts an `IndexSpec` argument when creating a serializer. The old signature has been marked `@Deprecated` and has a default implementation that returns `null`, but it will be used by the default implementation of the new version if it is implemented to return a non-null value. The default implementation of the new method will use a `CompressedComplexColumnSerializer` if `IndexSpec.complexMetricCompression` is not null/none/uncompressed, or will use `LargeColumnSupportedComplexColumnSerializer` otherwise.
* Removed all duplicate generic implementations of `ComplexMetricSerde.getSerializer` and `ComplexMetricSerde.deserializeColumn` into default implementations `ComplexMetricSerde` instead of being copied all over the place. The default implementation of `deserializeColumn` will check if the first byte indicates that the new compression was used, otherwise will use the `GenericIndexed` based supplier.
* Complex columns with custom serializers/deserializers are unaffected and may continue doing whatever it is they do, either with specialized compression or whatever else, this new stuff is just to provide generic implementations built around `ObjectStrategy`.
* add ObjectStrategy.readRetainsBufferReference so CompressedComplexColumn only copies on read if required
* add copyValueOnRead flag down to CompressedBlockReader to avoid buffer duplicate if the value needs copied anyway
* Linked back to query granularity docs
* Update ingestion-spec.md
clairfy about query granularities in the spec.
* Update docs/design/storage.md
Co-authored-by: Charles Smith <techdocsmith@gmail.com>
* Update docs/ingestion/ingestion-spec.md
Co-authored-by: Charles Smith <techdocsmith@gmail.com>
* Update docs/querying/granularities.md
Co-authored-by: Charles Smith <techdocsmith@gmail.com>
* Apply suggestions from code review
---------
Co-authored-by: Charles Smith <techdocsmith@gmail.com>
* Segments primarily sorted by non-time columns.
Currently, segments are always sorted by __time, followed by the sort
order provided by the user via dimensionsSpec or CLUSTERED BY. Sorting
by __time enables efficient execution of queries involving time-ordering
or granularity. Time-ordering is a simple matter of reading the rows in
stored order, and granular cursors can be generated in streaming fashion.
However, for various workloads, it's better for storage footprint and
query performance to sort by arbitrary orders that do not start with __time.
With this patch, users can sort segments by such orders.
For spec-based ingestion, users add "useExplicitSegmentSortOrder: true" to
dimensionsSpec. The "dimensions" list determines the sort order. To
define a sort order that includes "__time", users explicitly
include a dimension named "__time".
For SQL-based ingestion, users set the context parameter
"useExplicitSegmentSortOrder: true". The CLUSTERED BY clause is then
used as the explicit segment sort order.
In both cases, when the new "useExplicitSegmentSortOrder" parameter is
false (the default), __time is implicitly prepended to the sort order,
as it always was prior to this patch.
The new parameter is experimental for two main reasons. First, such
segments can cause errors when loaded by older servers, due to violating
their expectations that timestamps are always monotonically increasing.
Second, even on newer servers, not all queries can run on non-time-sorted
segments. Scan queries involving time-ordering and any query involving
granularity will not run. (To partially mitigate this, a currently-undocumented
SQL feature "sqlUseGranularity" is provided. When set to false the SQL planner
avoids using "granularity".)
Changes on the write path:
1) DimensionsSpec can now optionally contain a __time dimension, which
controls the placement of __time in the sort order. If not present,
__time is considered to be first in the sort order, as it has always
been.
2) IncrementalIndex and IndexMerger are updated to sort facts more
flexibly; not always by time first.
3) Metadata (stored in metadata.drd) gains a "sortOrder" field.
4) MSQ can generate range-based shard specs even when not all columns are
singly-valued strings. It merely stops accepting new clustering key
fields when it encounters the first one that isn't a singly-valued
string. This is useful because it enables range shard specs on
"someDim" to be created for clauses like "CLUSTERED BY someDim, __time".
Changes on the read path:
1) Add StorageAdapter#getSortOrder so query engines can tell how a
segment is sorted.
2) Update QueryableIndexStorageAdapter, IncrementalIndexStorageAdapter,
and VectorCursorGranularizer to throw errors when using granularities
on non-time-ordered segments.
3) Update ScanQueryEngine to throw an error when using the time-ordering
"order" parameter on non-time-ordered segments.
4) Update TimeBoundaryQueryRunnerFactory to perform a segment scan when
running on a non-time-ordered segment.
5) Add "sqlUseGranularity" context parameter that causes the SQL planner
to avoid using granularities other than ALL.
Other changes:
1) Rename DimensionsSpec "hasCustomDimensions" to "hasFixedDimensions"
and change the meaning subtly: it now returns true if the DimensionsSpec
represents an unchanging list of dimensions, or false if there is
some discovery happening. This is what call sites had expected anyway.
* Fixups from CI.
* Fixes.
* Fix missing arg.
* Additional changes.
* Fix logic.
* Fixes.
* Fix test.
* Adjust test.
* Remove throws.
* Fix styles.
* Fix javadocs.
* Cleanup.
* Smoother handling of null ordering.
* Fix tests.
* Missed a spot on the merge.
* Fixups.
* Avoid needless Filters.and.
* Add timeBoundaryInspector to test.
* Fix tests.
* Fix FrameStorageAdapterTest.
* Fix various tests.
* Use forceSegmentSortByTime instead of useExplicitSegmentSortOrder.
* Pom fix.
* Fix doc.
Previously, SeekableStreamIndexTaskRunner set ingestion state to
COMPLETED when it finished reading data from Kafka. This is incorrect.
After the changes in this patch, the transitions go:
1) The task stays in BUILD_SEGMENTS after it finishes reading from Kafka,
while it is building its final set of segments to publish.
2) The task transitions to SEGMENT_AVAILABILITY_WAIT after publishing,
while waiting for handoff.
3) The task transitions to COMPLETED immediately before exiting, when
truly done.
* SQL syntax error should target USER persona
* * revert change to queryHandler and related tests, based on review comments
* * add test
* Docs for Kinesis input format
* * remove reference to kafka
* * fix spellcheck error
* Apply suggestions from code review
Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>
---------
Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>
changes:
* removed `Firehose` and `FirehoseFactory` and remaining implementations which were mostly no longer used after #16602
* Moved `IngestSegmentFirehose` which was still used internally by Hadoop ingestion to `DatasourceRecordReader.SegmentReader`
* Rename `SQLFirehoseFactoryDatabaseConnector` to `SQLInputSourceDatabaseConnector` and similar renames for sub-classes
* Moved anything remaining in a 'firehose' package somewhere else
* Clean up docs on firehose stuff
index_realtime tasks were removed from the documentation in #13107. Even
at that time, they weren't really documented per se— just mentioned. They
existed solely to support Tranquility, which is an obsolete ingestion
method that predates migration of Druid to ASF and is no longer being
maintained. Tranquility docs were also de-linked from the sidebars and
the other doc pages in #11134. Only a stub remains, so people with
links to the page can see that it's no longer recommended.
index_realtime_appenderator tasks existed in the code base, but were
never documented, nor as far as I am aware were they used for any purpose.
This patch removes both task types completely, as well as removes all
supporting code that was otherwise unused. It also updates the stub
doc for Tranquility to be firmer that it is not compatible. (Previously,
the stub doc said it wasn't recommended, and pointed out that it is
built against an ancient 0.9.2 version of Druid.)
ITUnionQueryTest has been migrated to the new integration tests framework and updated to use Kafka ingestion.
Co-authored-by: Gian Merlino <gianmerlino@gmail.com>
* Delta Lake support for filters.
* Updates
* cleanup comments
* Docs
* Remmove Enclosed runner
* Rename
* Cleanup test
* Serde test for the Delta input source and fix jackson annotation.
* Updates and docs.
* Update error messages to be clearer
* Fixes
* Handle NumberFormatException to provide a nicer error message.
* Apply suggestions from code review
Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>
* Doc fixes based on feedback
* Yes -> yes in docs; reword slightly.
* Update docs/ingestion/input-sources.md
Co-authored-by: Laksh Singla <lakshsingla@gmail.com>
* Update docs/ingestion/input-sources.md
Co-authored-by: Laksh Singla <lakshsingla@gmail.com>
* Documentation, javadoc and more updates.
* Not with an or expression end-to-end test.
* Break up =, >, >=, <, <= into its own types instead of sub-classing.
---------
Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>
Co-authored-by: Laksh Singla <lakshsingla@gmail.com>
Changes:
- Add new config `lagAggregate` to `LagBasedAutoScalerConfig`
- Add field `aggregateForScaling` to `LagStats`
- Use the new field/config to determine which aggregate to use to compute lag
- Remove method `Supervisor.computeLagForAutoScaler()`
Changes:
- Add `TaskContextEnricher` interface to improve task management and monitoring
- Invoke `enrichContext` in `TaskQueue.add()` whenever a new task is submitted to the Overlord
- Add `TaskContextReport` to write out task context information in reports
Changes:
- Add visibility into number of segments read/published by each parallel compaction
- Add new fields `segmentsRead`, `segmentsPublished` to `IngestionStatsAndErrorsTaskReportData`
- Update `ParallelIndexSupervisorTask` to populate the new stats
Changes:
- Add visibility into number of records processed by each streaming task per partition
- Add field `recordsProcessed` to `IngestionStatsAndErrorsTaskReportData`
- Populate number of records processed per partition in `SeekableStreamIndexTaskRunner`
Merging the work so far. @ektravel , @vogievetsky if there are additional improvements, let's track them & make another pr.
* Refactor streaming ingestion docs
* Update property definition
* Update after review
* Update known issues
* Move kinesis and kafka topics to ingestion, add redirects
* Saving changes
* Saving
* Add input format text
* Update after review
* Minor text edit
* Update example syntax
* Revert back to colon
* Fix merge conflicts
* Fix broken links
* Fix spelling error
* something
* test commit
* compilation fix
* more compilation fixes (fixme placeholders)
* Comment out druid-kereberos build since it conflicts with newly added transitive deps from delta-lake
Will need to sort out the dependencies later.
* checkpoint
* remove snapshot schema since we can get schema from the row
* iterator bug fix
* json json json
* sampler flow
* empty impls for read(InputStats) and sample()
* conversion?
* conversion, without timestamp
* Web console changes to show Delta Lake
* Asset bug fix and tile load
* Add missing pieces to input source info, etc.
* fix stuff
* Use a different delta lake asset
* Delta lake extension dependencies
* Cleanup
* Add InputSource, module init and helper code to process delta files.
* Test init
* Checkpoint changes
* Test resources and updates
* some fixes
* move to the correct package
* More tests
* Test cleanup
* TODOs
* Test updates
* requirements and javadocs
* Adjust dependencies
* Update readme
* Bump up version
* fixup typo in deps
* forbidden api and checkstyle checks
* Trim down dependencies
* new lines
* Fixup Intellij inspections.
* Add equals() and hashCode()
* chain splits, intellij inspections
* review comments and todo placeholder
* fix up some docs
* null table path and test dependencies. Fixup broken link.
* run prettify
* Different test; fixes
* Upgrade pyspark and delta-spark to latest (3.5.0 and 3.0.0) and regenerate tests
* yank the old test resource.
* add a couple of sad path tests
* Updates to readme based on latest.
* Version support
* Extract Delta DateTime converstions to DeltaTimeUtils class and add test
* More comprehensive split tests.
* Some test renames.
* Cleanup and update instructions.
* add pruneSchema() optimization for table scans.
* Oops, missed the parquet files.
* Update default table and rename schema constants.
* Test setup and misc changes.
* Add class loader logic as the context class loader is unaware about extension classes
* change some table client creation logic.
* Add hadoop-aws, hadoop-common and related exclusions.
* Remove org.apache.hadoop:hadoop-common
* Apply suggestions from code review
Co-authored-by: Victoria Lim <vtlim@users.noreply.github.com>
* Add entry to .spelling to fix docs static check
---------
Co-authored-by: abhishekagarwal87 <1477457+abhishekagarwal87@users.noreply.github.com>
Co-authored-by: Laksh Singla <lakshsingla@gmail.com>
Co-authored-by: Victoria Lim <vtlim@users.noreply.github.com>
This patch introduces a param snapshotTime in the iceberg inputsource spec that allows the user to ingest data files associated with the most recent snapshot as of the given time. This helps the user ingest data based on older snapshots by specifying the associated snapshot time.
This patch also upgrades the iceberg core version to 1.4.1
* Add system fields to input sources.
Main changes:
1) The SystemField enum defines system fields "__file_uri", "__file_path",
and "__file_bucket". They are associated with each input entity.
2) The SystemFieldInputSource interface can be added to any InputSource
to make it system-field-capable. It sets up serialization of a list
of configured "systemFields" in the JSON form of the input source, and
provides a method getSystemFieldValue for computing the value of each
system field. Cloud object, HDFS, HTTP, and Local now have this.
* Fix various LocalInputSource calls.
* Fix style stuff.
* Fixups.
* Fix tests and coverage.