NIFI-4656, NIFI-4680: This closes #2330. Fix error handling in consume/publish kafka processors. Address issue with HortonworksSchemaRegistry throwing RuntimeException when it should be IOException. Fixed bug in ConsumeerLease/ConsumKafkaRecord that caused it to report too many records received

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2017-12-08 11:20:32 -05:00 committed by joewitt
parent 1fc1d38fd8
commit c138987bb4
5 changed files with 162 additions and 93 deletions

View File

@ -519,6 +519,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
try {
reader = readerFactory.createRecordReader(attributes, in, logger);
} catch (final IOException e) {
yield();
rollback(topicPartition);
handleParseFailure(consumerRecord, session, e, "Failed to parse message from Kafka due to comms failure. Will roll back session and try again momentarily.");
closeWriter(writer);
return;
} catch (final Exception e) {
handleParseFailure(consumerRecord, session, e);
continue;
@ -543,13 +549,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
} catch (final Exception e) {
logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
try {
rollback(topicPartition);
} catch (final Exception rollbackException) {
logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
yield();
throw new ProcessException(e);
}
@ -572,13 +574,21 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
tracker.incrementRecordCount(1L);
session.adjustCounter("Records Received", records.size(), false);
session.adjustCounter("Records Received", 1L, false);
}
}
}
} catch (final Exception e) {
logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e);
closeWriter(writer);
rollback(topicPartition);
throw new ProcessException(e);
}
}
private void closeWriter(final RecordSetWriter writer) {
try {
if (writer != null) {
writer.close();
@ -586,26 +596,20 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
} catch (final Exception ioe) {
logger.warn("Failed to close Record Writer", ioe);
}
try {
rollback(topicPartition);
} catch (final Exception rollbackException) {
logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
throw new ProcessException(e);
}
}
private void rollback(final TopicPartition topicPartition) {
try {
OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
if (offsetAndMetadata == null) {
offsetAndMetadata = kafkaConsumer.committed(topicPartition);
}
final long offset = offsetAndMetadata.offset();
final long offset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset();
kafkaConsumer.seek(topicPartition, offset);
} catch (final Exception rollbackException) {
logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
}

View File

@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -378,7 +379,10 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor {
}
// Send each FlowFile to Kafka asynchronously.
for (final FlowFile flowFile : flowFiles) {
final Iterator<FlowFile> itr = flowFiles.iterator();
while (itr.hasNext()) {
final FlowFile flowFile = itr.next();
if (!isScheduled()) {
// If stopped, re-queue FlowFile instead of sending it
if (useTransactions) {
@ -388,6 +392,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor {
}
session.transfer(flowFile);
itr.remove();
continue;
}

View File

@ -519,6 +519,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
try {
reader = readerFactory.createRecordReader(attributes, in, logger);
} catch (final IOException e) {
yield();
rollback(topicPartition);
handleParseFailure(consumerRecord, session, e, "Failed to parse message from Kafka due to comms failure. Will roll back session and try again momentarily.");
closeWriter(writer);
return;
} catch (final Exception e) {
handleParseFailure(consumerRecord, session, e);
continue;
@ -543,13 +549,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
} catch (final Exception e) {
logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
try {
rollback(topicPartition);
} catch (final Exception rollbackException) {
logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
yield();
throw new ProcessException(e);
}
@ -572,13 +574,21 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
tracker.incrementRecordCount(1L);
session.adjustCounter("Records Received", records.size(), false);
session.adjustCounter("Records Received", 1L, false);
}
}
}
} catch (final Exception e) {
logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e);
closeWriter(writer);
rollback(topicPartition);
throw new ProcessException(e);
}
}
private void closeWriter(final RecordSetWriter writer) {
try {
if (writer != null) {
writer.close();
@ -586,26 +596,20 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
} catch (final Exception ioe) {
logger.warn("Failed to close Record Writer", ioe);
}
try {
rollback(topicPartition);
} catch (final Exception rollbackException) {
logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
throw new ProcessException(e);
}
}
private void rollback(final TopicPartition topicPartition) {
try {
OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
if (offsetAndMetadata == null) {
offsetAndMetadata = kafkaConsumer.committed(topicPartition);
}
final long offset = offsetAndMetadata.offset();
final long offset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset();
kafkaConsumer.seek(topicPartition, offset);
} catch (final Exception rollbackException) {
logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
}

View File

@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -378,7 +379,10 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
}
// Send each FlowFile to Kafka asynchronously.
for (final FlowFile flowFile : flowFiles) {
final Iterator<FlowFile> itr = flowFiles.iterator();
while (itr.hasNext()) {
final FlowFile flowFile = itr.next();
if (!isScheduled()) {
// If stopped, re-queue FlowFile instead of sending it
if (useTransactions) {
@ -388,6 +392,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
}
session.transfer(flowFile);
itr.remove();
continue;
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.schemaregistry.hortonworks;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
@ -224,23 +225,33 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
@Override
public RecordSchema retrieveSchema(final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException {
public RecordSchema retrieveSchema(final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException {
final SchemaRegistryClient client = getClient();
final SchemaVersionInfo versionInfo;
final Long schemaId;
final Integer version;
try {
final SchemaMetadataInfo metadataInfo = client.getSchemaMetadataInfo(schemaName);
if (metadataInfo == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
}
final Long schemaId = metadataInfo.getId();
schemaId = metadataInfo.getId();
if (schemaId == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
}
final SchemaVersionInfo versionInfo = getLatestSchemaVersionInfo(client, schemaName);
final Integer version = versionInfo.getVersion();
versionInfo = getLatestSchemaVersionInfo(client, schemaName);
version = versionInfo.getVersion();
if (version == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
}
} catch (final Exception e) {
handleException("Failed to retrieve schema with name '" + schemaName + "'", e);
return null;
}
final String schemaText = versionInfo.getSchemaText();
final SchemaIdentifier schemaIdentifier = (schemaId == null || version == null) ? SchemaIdentifier.ofName(schemaName) : SchemaIdentifier.of(schemaName, schemaId, version);
@ -254,8 +265,10 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
@Override
public String retrieveSchemaText(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException {
public String retrieveSchemaText(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException {
final SchemaRegistryClient client = getClient();
try {
final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId);
if (info == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
@ -271,24 +284,36 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
}
return versionInfo.getSchemaText();
} catch (final Exception e) {
handleException("Failed to retrieve schema with ID '" + schemaId + "' and version '" + version + "'", e);
return null;
}
}
@Override
public RecordSchema retrieveSchema(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException {
public RecordSchema retrieveSchema(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException {
final SchemaRegistryClient client = getClient();
final String schemaName;
final SchemaVersionInfo versionInfo;
try {
final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId);
if (info == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
}
final SchemaMetadata metadata = info.getSchemaMetadata();
final String schemaName = metadata.getName();
schemaName = metadata.getName();
final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version);
final SchemaVersionInfo versionInfo = getSchemaVersionInfo(client, schemaVersionKey);
versionInfo = getSchemaVersionInfo(client, schemaVersionKey);
if (versionInfo == null) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
}
} catch (final Exception e) {
handleException("Failed to retrieve schema with ID '" + schemaId + "' and version '" + version + "'", e);
return null;
}
final String schemaText = versionInfo.getSchemaText();
@ -300,6 +325,32 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
});
}
// The schema registry client wraps all IOExceptions in RuntimeException. So if an IOException occurs, we don't know
// that it was an IO problem. So we will look through the Exception's cause chain to see if there is an IOException present.
private void handleException(final String message, final Exception e) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException {
if (containsIOException(e)) {
throw new IOException(message, e);
}
throw new org.apache.nifi.schema.access.SchemaNotFoundException(message, e);
}
private boolean containsIOException(final Throwable t) {
if (t == null) {
return false;
}
if (t instanceof IOException) {
return true;
}
final Throwable cause = t.getCause();
if (cause == null) {
return false;
}
return containsIOException(cause);
}
@Override
public Set<SchemaField> getSuppliedSchemaFields() {