From 6f32e6e9776548b746f6388d25067cd17aa1898c Mon Sep 17 00:00:00 2001 From: ricky Date: Tue, 28 Apr 2015 15:58:09 -0400 Subject: [PATCH] NIFI-551 - ConvertJSONToAvro improve error message - Report failure counts as an log error message - Send record parsing errors to a separate flowfile which is transfered down the failure relationship Signed-off-by: joewitt --- .../processors/kite/ConvertJSONToAvro.java | 38 +++++++++++++------ .../kite/TestJSONToAvroProcessor.java | 23 +++++++++-- 2 files changed, 47 insertions(+), 14 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java index 78f80b97be..d4cc760354 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java @@ -38,6 +38,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.StreamCallback; import org.kitesdk.data.DatasetException; import org.kitesdk.data.DatasetIOException; @@ -97,22 +98,22 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor { } @Override - public void onTrigger(ProcessContext context, final ProcessSession session) + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { + FlowFile successfulRecords = session.get(); + if (successfulRecords == null) { return; } String schemaProperty = context.getProperty(SCHEMA) - .evaluateAttributeExpressions(flowFile) + .evaluateAttributeExpressions(successfulRecords) .getValue(); final Schema schema; try { schema = getSchema(schemaProperty, DefaultConfiguration.get()); } catch (SchemaNotFoundException e) { getLogger().error("Cannot find schema: " + schemaProperty); - session.transfer(flowFile, FAILURE); + session.transfer(successfulRecords, FAILURE); return; } @@ -121,21 +122,31 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor { writer.setCodec(CodecFactory.snappyCodec()); try { - flowFile = session.write(flowFile, new StreamCallback() { + successfulRecords = session.write(successfulRecords, new StreamCallback() { @Override public void process(InputStream in, OutputStream out) throws IOException { + FlowFile failedRecords = session.create(); long written = 0L; long errors = 0L; + long total = 0L; try (JSONFileReader reader = new JSONFileReader<>( in, schema, Record.class)) { reader.initialize(); try (DataFileWriter w = writer.create(schema, out)) { while (reader.hasNext()) { + total += 1; try { Record record = reader.next(); w.append(record); written += 1; - } catch (DatasetRecordException e) { + } catch (final DatasetRecordException e) { + failedRecords = session.append(failedRecords, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write((e.getMessage() + " [" + + e.getCause().getMessage() + "]\n").getBytes()); + } + }); errors += 1; } } @@ -143,21 +154,26 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor { session.adjustCounter("Converted records", written, false /* update only if file transfer is successful */); session.adjustCounter("Conversion errors", errors, - false /* update only if file transfer is successful */); + false /* update only if file transfer is successful */); + + if (errors > 0L) { + getLogger().warn("Failed to convert " + errors + '/' + total + " records from JSON to Avro"); + } } + session.transfer(failedRecords, FAILURE); } }); - session.transfer(flowFile, SUCCESS); + session.transfer(successfulRecords, SUCCESS); //session.getProvenanceReporter().send(flowFile, target.getUri().toString()); } catch (ProcessException | DatasetIOException e) { getLogger().error("Failed reading or writing", e); - session.transfer(flowFile, FAILURE); + session.transfer(successfulRecords, FAILURE); } catch (DatasetException e) { getLogger().error("Failed to read FlowFile", e); - session.transfer(flowFile, FAILURE); + session.transfer(successfulRecords, FAILURE); } } diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java index d50e7f9c0b..0b53bc75ef 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java @@ -18,9 +18,17 @@ */ package org.apache.nifi.processors.kite; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.List; + import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; @@ -38,9 +46,13 @@ public class TestJSONToAvroProcessor { public static final String JSON_CONTENT = "" + "{\"id\": 1,\"color\": \"green\"}" - + "{\"id\": \"120V\", \"color\": \"blue\"}\n" + // invalid, ID is a string + + "{\"id\": \"120V\", \"color\": \"blue\"}\n" // invalid, ID is a string + + "{\"id\": 10, \"color\": 15.23}\n" + // invalid, color as double "{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }"; + public static final String FAILURE_CONTENT = "Cannot convert field id [Cannot convert to long: \"120V\"]\n" + + "Cannot convert field color [Cannot convert to string: 15.23]\n"; + @Test public void testBasicConversion() throws IOException { TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class); @@ -54,8 +66,13 @@ public class TestJSONToAvroProcessor { long converted = runner.getCounterValue("Converted records"); long errors = runner.getCounterValue("Conversion errors"); Assert.assertEquals("Should convert 2 rows", 2, converted); - Assert.assertEquals("Should reject 1 row", 1, errors); + Assert.assertEquals("Should reject 2 rows", 2, errors); - runner.assertAllFlowFilesTransferred("success", 1); + runner.assertTransferCount("success", 1); + runner.assertTransferCount("failure", 1); + + String failureContent = Bytes.toString(runner.getContentAsByteArray( + runner.getFlowFilesForRelationship("failure").get(0))); + Assert.assertEquals("Should reject an invalid string and double", FAILURE_CONTENT, failureContent); } }