The Netty pipeline set up by the client can deliver multiple exceptions,
and can deliver chunks even after delivering exceptions. This makes it
difficult to implement HttpResponseHandlers. Looking at existing handler
implementations, I do not see attempts to handle this case, so it's also
a potential source of bugs.
This patch updates the client to track whether an exception was
encountered, and if so, to not call any additional methods on the handler
after exceptionCaught. It also harmonizes exception handling between
exceptionCaught and channelDisconnected.
Refactors the DruidSchema and DruidTable abstractions to prepare for the Druid Catalog.
As we add the catalog, we’ll want to combine physical segment metadata information with “hints” provided by the catalog. This is best done if we tidy up the existing code to more clearly separate responsibilities.
This PR is purely a refactoring move: no functionality changed. There is no difference to user functionality or external APIs. Functionality changes will come later as we add the catalog itself.
DruidSchema
In the present code, DruidSchema does three tasks:
Holds the segment metadata cache
Interfaces with an external schema manager
Acts as a schema to Calcite
This PR splits those responsibilities.
DruidSchema holds the Calcite schema for the druid namespace, combining information fro the segment metadata cache, from the external schema manager and (later) from the catalog.
SegmentMetadataCache holds the segment metadata cache formerly in DruidSchema.
DruidTable
The present DruidTable class is a bit of a kitchen sink: it holds all the various kinds of tables which Druid supports, and uses if-statements to handle behavior that differs between types. Yet, any given DruidTable will handle only one such table type. To more clearly model the actual table types, we split DruidTable into several classes:
DruidTable becomes an abstract base class to hold Druid-specific methods.
DatasourceTable represents a datasource.
ExternalTable represents an external table, such as from EXTERN or (later) from the catalog.
InlineTable represents the internal case in which we attach data directly to a table.
LookupTable represents Druid’s lookup table mechanism.
The new subclasses are more focused: they can be selective about the data they hold and the various predicates since they represent just one table type. This will be important as the catalog information will differ depending on table type and the new structure makes adding that logic cleaner.
DatasourceMetadata
Previously, the DruidSchema segment cache would work with DruidTable objects. With the catalog, we need a layer between the segment metadata and the table as presented to Calcite. To fix this, the new SegmentMetadataCache class uses a new DatasourceMetadata class as its cache entry to hold only the “physical” segment metadata information: it is up to the DruidTable to combine this with the catalog information in a later PR.
More Efficient Table Resolution
Calcite provides a convenient base class for schema objects: AbstractSchema. However, this class is a bit too convenient: all we have to do is provide a map of tables and Calcite does the rest. This means that, to resolve any single datasource, say, foo, we need to cache segment metadata, external schema information, and catalog information for all tables. Just so Calcite can do a map lookup.
There is nothing special about AbstractSchema. We can handle table lookups ourselves. The new AbstractTableSchema does this. In fact, all the rest of Calcite wants is to resolve individual tables by name, and, for commands we don’t use, to provide a list of table names.
DruidSchema now extends AbstractTableSchema. SegmentMetadataCache resolves individual tables (and provides table names.)
DruidSchemaManager
DruidSchemaManager provides a way to specify table schemas externally. In this sense, it is similar to the catalog, but only for datasources. It originally followed the AbstractSchema pattern: it implements provide a map of tables. This PR provides new optional methods for the table lookup and table names operations. The default implementations work the same way that AbstractSchema works: we get the entire map and pick out the information we need. Extensions that use this API should be revised to support the individual operations instead. Druid code no longer calls the original getTables() method.
The PR has one breaking change: since the DruidSchemaManager map is read-only to the rest of Druid, we should return a Map, not a ConcurrentMap.
* change kafka lookups module to not commit offsets
The current behaviour of the Kafka lookup extractor is to not commit
offsets by assigning a unique ID to the consumer group and setting
auto.offset.reset to earliest. This does the job but also pollutes the
Kafka broker with a bunch of "ghost" consumer groups that will never again be
used.
To fix this, we now set enable.auto.commit to false, which prevents the
ghost consumer groups being created in the first place.
* update docs to include new enable.auto.commit setting behaviour
* update kafka-lookup-extractor documentation
Provide some additional detail on functionality and configuration.
Hopefully this will make it clearer how the extractor works for
developers who aren't so familiar with Kafka.
* add comments better explaining the logic of the code
* add spelling exceptions for kafka lookup docs
* remove kafka lookup records from factory when record tombstoned
* update kafka lookup docs to include tombstone behaviour
* change test wait time down to 10ms
Co-authored-by: David Palmer <david.palmer@adscale.co.nz>
* Adjust "in" filter null behavior to match "selector".
Now, both of them match numeric nulls if constructed with a "null" value.
This is consistent as far as native execution goes, but doesn't match
the behavior of SQL = and IN. So, to address that, this patch also
updates the docs to clarify that the native filters do match nulls.
This patch also updates the SQL docs to describe how Boolean logic is
handled in addition to how NULL values are handled.
Fixes#12856.
* Fix test.
Kinesis ingestion requires all shards to have at least 1 record at the required position in druid.
Even if this is satisified initially, resharding the stream can lead to empty intermediate shards. A significant delay in writing to newly created shards was also problematic.
Kinesis shard sequence numbers are big integers. Introduce two more custom sequence tokens UNREAD_TRIM_HORIZON and UNREAD_LATEST to indicate that a shard has not been read from and that it needs to be read from the start or the end respectively.
These values can be used to avoid the need to read at least one record to obtain a sequence number for ingesting a newly discovered shard.
If a record cannot be obtained immediately, use a marker to obtain the relevant shardIterator and use this shardIterator to obtain a valid sequence number. As long as a valid sequence number is not obtained, continue storing the token as the offset.
These tokens (UNREAD_TRIM_HORIZON and UNREAD_LATEST) are logically ordered to be earlier than any valid sequence number.
However, the ordering requires a few subtle changes to the existing mechanism for record sequence validation:
The sequence availability check ensures that the current offset is before the earliest available sequence in the shard. However, current token being an UNREAD token indicates that any sequence number in the shard is valid (despite the ordering)
Kinesis sequence numbers are inclusive i.e if current sequence == end sequence, there are more records left to read.
However, the equality check is exclusive when dealing with UNREAD tokens.
* Add check for eternity time segment to SqlSegmentsMetadataQuery
* Add check for half eternities
* Add multiple segments test
* Add failing test to document known issue
* Frame processing and channels.
Follow-up to #12745. This patch adds three new concepts:
1) Frame channels are interfaces for doing nonblocking reads and writes
of frames.
2) Frame processors are interfaces for doing nonblocking processing of
frames received from input channels and sent to output channels.
3) Cluster-by keys, which can be used for sorting or partitioning.
The patch also adds SuperSorter, a user of these concepts, both to
illustrate how they are used, and also because it is going to be useful
in future work.
Central classes:
- ReadableFrameChannel. Implementations include
BlockingQueueFrameChannel (in-memory channel that implements both interfaces),
ReadableFileFrameChannel (file-based channel),
ReadableByteChunksFrameChannel (byte-stream-based channel), and others.
- WritableFrameChannel. Implementations include BlockingQueueFrameChannel
and WritableStreamFrameChannel (byte-stream-based channel).
- ClusterBy, a sorting or partitioning key.
- FrameProcessor, nonblocking processor of frames. Implementations include
FrameChannelBatcher, FrameChannelMerger, and FrameChannelMuxer.
- FrameProcessorExecutor, an executor service that runs FrameProcessors.
- SuperSorter, a class that uses frame channels and processors to
do parallel external merge sort of any amount of data (as long as there
is enough disk space).
* Additional tests, fixes.
* Changes from review.
* Better implementation for ReadableInputStreamFrameChannel.
* Rename getFrameFileReference -> newFrameFileReference.
* Add InterruptedException to runIncrementally; add more tests.
* Cancellation adjustments.
* Review adjustments.
* Refactor BlockingQueueFrameChannel, rename doneReading and doneWriting to close.
* Additional changes from review.
* Additional changes.
* Fix test.
* Adjustments.
* Adjustments.
* Refactor Guice initialization
Builders for various module collections
Revise the extensions loader
Injector builders for server startup
Move Hadoop init to indexer
Clean up server node role filtering
Calcite test injector builder
* Revisions from review comments
* Build fixes
* Revisions from review comments
* Improved Java 17 support and Java runtime docs.
1) Add a "Java runtime" doc page with information about supported
Java versions, garbage collection, and strong encapsulation..
2) Update asm and equalsverifier to versions that support Java 17.
3) Add additional "--add-opens" lines to surefire configuration, so
tests can pass successfully under Java 17.
4) Switch openjdk15 tests to openjdk17.
5) Update FrameFile to specifically mention Java runtime incompatibility
as the cause of not being able to use Memory.map.
6) Update SegmentLoadDropHandler to log an error for Errors too, not
just Exceptions. This is important because an IllegalAccessError is
encountered when the correct "--add-opens" line is not provided,
which would otherwise be silently ignored.
7) Update example configs to use druid.indexer.runner.javaOptsArray
instead of druid.indexer.runner.javaOpts. (The latter is deprecated.)
* Adjustments.
* Use run-java in more places.
* Add run-java.
* Update .gitignore.
* Exclude hadoop-client-api.
Brought in when building on Java 17.
* Swap one more usage of java.
* Fix the run-java script.
* Fix flag.
* Include link to Temurin.
* Spelling.
* Update examples/bin/run-java
Co-authored-by: Xavier Léauté <xl+github@xvrl.net>
Co-authored-by: Xavier Léauté <xl+github@xvrl.net>
* Use nonzero default value of maxQueuedBytes.
The purpose of this parameter is to prevent the Broker from running out
of memory. The prior default is unlimited; this patch changes it to a
relatively conservative 25MB.
This may be too low for larger clusters. The risk is that throughput
can decrease for queries with large resultsets or large amounts of intermediate
data. However, I think this is better than the risk of the prior default, which
is that these queries can cause the Broker to go OOM.
* Alter calculation.
* Python 3 support for post-index-task.
Useful when running on macOS or any other system that
doesn't have Python 2.
* Encode JSON returned by read_task_file.
* Adjust.
* Skip needless loads.
* Add a decode.
* Additional decodes needed.
Historicals and middle managers crash with an `UnknownHostException` on trying
to load `druid-parquet-extensions` with an ephemeral Hadoop cluster. This happens
because the `fs.defaultFS` URI value cannot be resolved at start up time as the
hadoop cluster may not exist at startup time.
This commit fixes the error by performing initialization of the filesystem in
`ParquetInputFormat.createReader()` whenever a new reader is requested.
add NumericRangeIndex interface and BoundFilter support
changes:
* NumericRangeIndex interface, like LexicographicalRangeIndex but for numbers
* BoundFilter now uses NumericRangeIndex if comparator is numeric and there is no extractionFn
* NestedFieldLiteralColumnIndexSupplier.java now supports supplying NumericRangeIndex for single typed numeric nested literal columns
* better faster stronger and (ever so slightly) more understandable
* more tests, fix bug
* fix style
* Druid planner now makes only one pass through Calcite planner
Resolves the issue that required two parse/plan cycles: one
for validate, another for plan. Creates a clone of the Calcite
planner and validator to resolve the conflict that prevented
the merger.
* Fixes for the Avatica JDBC driver
Correctly implement regular and prepared statements
Correctly implement result sets
Fix race condition with contexts
Clarify when parameters are used
Prepare for single-pass through the planner
* Addressed review comments
* Addressed review comment
Some queries like `REPLACE INTO ... SELECT TIME_PARSE("__time") AS __time FROM ...`
fail at the Calcite layer because any column with name `__time` is considered to be of
type `SqlTypeName.TIMESTAMP`.
Changes:
- Modify `RowSignatures.toRelDataType()` so that the type of `__time` column
is determined by the RowSignature's type.
Sysmonitor stats (mem, fs, disk, net, cpu, swap, sys, tcp) are reported by all Druid processes, including Peons that are ephemeral in nature. Since Peons always run on the same host as the MiddleManager that spawned them and is unlikely to change, the SyMonitor metrics emitted by Peon are merely duplicates. This is often not a problem except when machines are super-beefy. Imagine a 64-core machine and 32 workers running on this machine. now you will have each Peon reporting metrics for each core. that's an increase of (32 * 64)x in the number of metrics. This leads to a metric explosion.
This PR updates MetricsModule to check node role running while registering SysMonitor and not to load any existing SysMonitor$Stats.