From c138987bb46b975379daa00e57c42f498b6ef207 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 8 Dec 2017 11:20:32 -0500 Subject: [PATCH] NIFI-4656, NIFI-4680: This closes #2330. Fix error handling in consume/publish kafka processors. Address issue with HortonworksSchemaRegistry throwing RuntimeException when it should be IOException. Fixed bug in ConsumeerLease/ConsumKafkaRecord that caused it to report too many records received Signed-off-by: joewitt --- .../kafka/pubsub/ConsumerLease.java | 56 ++++---- .../kafka/pubsub/PublishKafkaRecord_0_11.java | 7 +- .../kafka/pubsub/ConsumerLease.java | 56 ++++---- .../kafka/pubsub/PublishKafkaRecord_1_0.java | 7 +- .../HortonworksSchemaRegistry.java | 129 ++++++++++++------ 5 files changed, 162 insertions(+), 93 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index eed797e70a..4d9a5b6d53 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -519,6 +519,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe try { reader = readerFactory.createRecordReader(attributes, in, logger); + } catch (final IOException e) { + yield(); + rollback(topicPartition); + handleParseFailure(consumerRecord, session, e, "Failed to parse message from Kafka due to comms failure. Will roll back session and try again momentarily."); + closeWriter(writer); + return; } catch (final Exception e) { handleParseFailure(consumerRecord, session, e); continue; @@ -543,13 +549,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe } catch (final Exception e) { logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e); - try { - rollback(topicPartition); - } catch (final Exception rollbackException) { - logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); - } - + rollback(topicPartition); yield(); + throw new ProcessException(e); } @@ -572,40 +574,42 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe } tracker.incrementRecordCount(1L); - session.adjustCounter("Records Received", records.size(), false); + session.adjustCounter("Records Received", 1L, false); } } } } catch (final Exception e) { logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e); - try { - if (writer != null) { - writer.close(); - } - } catch (final Exception ioe) { - logger.warn("Failed to close Record Writer", ioe); - } - - try { - rollback(topicPartition); - } catch (final Exception rollbackException) { - logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); - } + closeWriter(writer); + rollback(topicPartition); throw new ProcessException(e); } } + private void closeWriter(final RecordSetWriter writer) { + try { + if (writer != null) { + writer.close(); + } + } catch (final Exception ioe) { + logger.warn("Failed to close Record Writer", ioe); + } + } private void rollback(final TopicPartition topicPartition) { - OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition); - if (offsetAndMetadata == null) { - offsetAndMetadata = kafkaConsumer.committed(topicPartition); - } + try { + OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition); + if (offsetAndMetadata == null) { + offsetAndMetadata = kafkaConsumer.committed(topicPartition); + } - final long offset = offsetAndMetadata.offset(); - kafkaConsumer.seek(topicPartition, offset); + final long offset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset(); + kafkaConsumer.seek(topicPartition, offset); + } catch (final Exception rollbackException) { + logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); + } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java index 093375bf46..d42df15dd4 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -378,7 +379,10 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor { } // Send each FlowFile to Kafka asynchronously. - for (final FlowFile flowFile : flowFiles) { + final Iterator itr = flowFiles.iterator(); + while (itr.hasNext()) { + final FlowFile flowFile = itr.next(); + if (!isScheduled()) { // If stopped, re-queue FlowFile instead of sending it if (useTransactions) { @@ -388,6 +392,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor { } session.transfer(flowFile); + itr.remove(); continue; } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index a2a449c09c..2e7e2d465e 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -519,6 +519,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe try { reader = readerFactory.createRecordReader(attributes, in, logger); + } catch (final IOException e) { + yield(); + rollback(topicPartition); + handleParseFailure(consumerRecord, session, e, "Failed to parse message from Kafka due to comms failure. Will roll back session and try again momentarily."); + closeWriter(writer); + return; } catch (final Exception e) { handleParseFailure(consumerRecord, session, e); continue; @@ -543,13 +549,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe } catch (final Exception e) { logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e); - try { - rollback(topicPartition); - } catch (final Exception rollbackException) { - logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); - } - + rollback(topicPartition); yield(); + throw new ProcessException(e); } @@ -572,40 +574,42 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe } tracker.incrementRecordCount(1L); - session.adjustCounter("Records Received", records.size(), false); + session.adjustCounter("Records Received", 1L, false); } } } } catch (final Exception e) { logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e); - try { - if (writer != null) { - writer.close(); - } - } catch (final Exception ioe) { - logger.warn("Failed to close Record Writer", ioe); - } - - try { - rollback(topicPartition); - } catch (final Exception rollbackException) { - logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); - } + closeWriter(writer); + rollback(topicPartition); throw new ProcessException(e); } } + private void closeWriter(final RecordSetWriter writer) { + try { + if (writer != null) { + writer.close(); + } + } catch (final Exception ioe) { + logger.warn("Failed to close Record Writer", ioe); + } + } private void rollback(final TopicPartition topicPartition) { - OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition); - if (offsetAndMetadata == null) { - offsetAndMetadata = kafkaConsumer.committed(topicPartition); - } + try { + OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition); + if (offsetAndMetadata == null) { + offsetAndMetadata = kafkaConsumer.committed(topicPartition); + } - final long offset = offsetAndMetadata.offset(); - kafkaConsumer.seek(topicPartition, offset); + final long offset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset(); + kafkaConsumer.seek(topicPartition, offset); + } catch (final Exception rollbackException) { + logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); + } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java index c125d622ba..517cb0ce20 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -378,7 +379,10 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor { } // Send each FlowFile to Kafka asynchronously. - for (final FlowFile flowFile : flowFiles) { + final Iterator itr = flowFiles.iterator(); + while (itr.hasNext()) { + final FlowFile flowFile = itr.next(); + if (!isScheduled()) { // If stopped, re-queue FlowFile instead of sending it if (useTransactions) { @@ -388,6 +392,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor { } session.transfer(flowFile); + itr.remove(); continue; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java index ccb54b03b9..f37c9278a1 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.schemaregistry.hortonworks; +import java.io.IOException; import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; @@ -224,22 +225,32 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme @Override - public RecordSchema retrieveSchema(final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException { + public RecordSchema retrieveSchema(final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException { final SchemaRegistryClient client = getClient(); - final SchemaMetadataInfo metadataInfo = client.getSchemaMetadataInfo(schemaName); - if (metadataInfo == null) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'"); - } - final Long schemaId = metadataInfo.getId(); - if (schemaId == null) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'"); - } + final SchemaVersionInfo versionInfo; + final Long schemaId; + final Integer version; - final SchemaVersionInfo versionInfo = getLatestSchemaVersionInfo(client, schemaName); - final Integer version = versionInfo.getVersion(); - if (version == null) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'"); + try { + final SchemaMetadataInfo metadataInfo = client.getSchemaMetadataInfo(schemaName); + if (metadataInfo == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'"); + } + + schemaId = metadataInfo.getId(); + if (schemaId == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'"); + } + + versionInfo = getLatestSchemaVersionInfo(client, schemaName); + version = versionInfo.getVersion(); + if (version == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'"); + } + } catch (final Exception e) { + handleException("Failed to retrieve schema with name '" + schemaName + "'", e); + return null; } final String schemaText = versionInfo.getSchemaText(); @@ -254,40 +265,54 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme @Override - public String retrieveSchemaText(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException { + public String retrieveSchemaText(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException { final SchemaRegistryClient client = getClient(); - final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId); - if (info == null) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); + + try { + final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId); + if (info == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); + } + + final SchemaMetadata metadata = info.getSchemaMetadata(); + final String schemaName = metadata.getName(); + + final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); + final SchemaVersionInfo versionInfo = getSchemaVersionInfo(client, schemaVersionKey); + if (versionInfo == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); + } + + return versionInfo.getSchemaText(); + } catch (final Exception e) { + handleException("Failed to retrieve schema with ID '" + schemaId + "' and version '" + version + "'", e); + return null; } - - final SchemaMetadata metadata = info.getSchemaMetadata(); - final String schemaName = metadata.getName(); - - final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); - final SchemaVersionInfo versionInfo = getSchemaVersionInfo(client, schemaVersionKey); - if (versionInfo == null) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); - } - - return versionInfo.getSchemaText(); } @Override - public RecordSchema retrieveSchema(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException { + public RecordSchema retrieveSchema(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException { final SchemaRegistryClient client = getClient(); - final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId); - if (info == null) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); - } - final SchemaMetadata metadata = info.getSchemaMetadata(); - final String schemaName = metadata.getName(); + final String schemaName; + final SchemaVersionInfo versionInfo; + try { + final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId); + if (info == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); + } - final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); - final SchemaVersionInfo versionInfo = getSchemaVersionInfo(client, schemaVersionKey); - if (versionInfo == null) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); + final SchemaMetadata metadata = info.getSchemaMetadata(); + schemaName = metadata.getName(); + + final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); + versionInfo = getSchemaVersionInfo(client, schemaVersionKey); + if (versionInfo == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); + } + } catch (final Exception e) { + handleException("Failed to retrieve schema with ID '" + schemaId + "' and version '" + version + "'", e); + return null; } final String schemaText = versionInfo.getSchemaText(); @@ -300,6 +325,32 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme }); } + // The schema registry client wraps all IOExceptions in RuntimeException. So if an IOException occurs, we don't know + // that it was an IO problem. So we will look through the Exception's cause chain to see if there is an IOException present. + private void handleException(final String message, final Exception e) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException { + if (containsIOException(e)) { + throw new IOException(message, e); + } + + throw new org.apache.nifi.schema.access.SchemaNotFoundException(message, e); + } + + private boolean containsIOException(final Throwable t) { + if (t == null) { + return false; + } + + if (t instanceof IOException) { + return true; + } + + final Throwable cause = t.getCause(); + if (cause == null) { + return false; + } + + return containsIOException(cause); + } @Override public Set getSuppliedSchemaFields() {