Kinesis docs and logs improvements (#12886)

Going ahead with the merge. CI is failing because of a code coverage change in the log line.
This commit is contained in:
AmatyaAvadhanula 2022-08-22 14:49:42 +05:30 committed by GitHub
parent a879d91a20
commit 379df5f103
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 5 additions and 4 deletions

View File

@ -649,7 +649,5 @@ Before you deploy the Kinesis extension to production, consider the following kn
- Avoid implementing more than one Kinesis supervisor that read from the same Kinesis stream for ingestion. Kinesis has a per-shard read throughput limit and having multiple supervisors on the same stream can reduce available read throughput for an individual Supervisor's tasks. Additionally, multiple Supervisors ingesting to the same Druid Datasource can cause increased contention for locks on the Datasource.
- The only way to change the stream reset policy is to submit a new ingestion spec and set up a new supervisor.
- Timeouts for retrieving earliest sequence number will cause a reset of the supervisor. The job will resume own its own eventually, but it can trigger alerts.
- The Kinesis supervisor will not make progress if you have empty shards. Make sure you have at least 1 record in the shard.
- If ingestion tasks get stuck, the supervisor does not automatically recover. You should monitor ingestion tasks and investigate if your ingestion falls behind.
- A Kinesis supervisor can sometimes compare the checkpoint offset to retention window of the stream to see if it has fallen behind. These checks fetch the earliest sequence number for Kinesis which can result in `IteratorAgeMilliseconds` becoming very high in AWS CloudWatch.

View File

@ -228,7 +228,9 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
recordsResult = null;
if (!records.offer(currRecord, recordBufferOfferTimeout, TimeUnit.MILLISECONDS)) {
log.warn("OrderedPartitionableRecord buffer full, retrying in [%,dms]", recordBufferFullWait);
log.warn("Kinesis records are being processed slower than they are fetched. "
+ "OrderedPartitionableRecord buffer full, retrying in [%,dms].",
recordBufferFullWait);
scheduleBackgroundFetch(recordBufferFullWait);
}
@ -293,7 +295,8 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
// from this message and back off for a bit to let the buffer drain before retrying.
if (!records.offer(currRecord, recordBufferOfferTimeout, TimeUnit.MILLISECONDS)) {
log.warn(
"OrderedPartitionableRecord buffer full, storing iterator and retrying in [%,dms]",
"Kinesis records are being processed slower than they are fetched. "
+ "OrderedPartitionableRecord buffer full, storing iterator and retrying in [%,dms].",
recordBufferFullWait
);