From b2a1f5217dd89e8133887d691e56483e9f0e4214 Mon Sep 17 00:00:00 2001 From: joewitt Date: Sat, 25 Apr 2015 09:15:36 -0400 Subject: [PATCH] NIFI-271 --- .../kite/AbstractKiteProcessor.java | 10 +- .../apache/nifi/processors/kite/AvroUtil.java | 18 +- .../processors/kite/ConvertCSVToAvro.java | 386 +++++++++--------- .../processors/kite/ConvertJSONToAvro.java | 2 +- .../processors/kite/StoreInKiteDataset.java | 186 ++++----- .../kite/TestCSVToAvroProcessor.java | 155 ++++--- .../kite/TestConfigurationProperty.java | 65 ++- .../nifi/processors/kite/TestGetSchema.java | 97 +++-- .../kite/TestJSONToAvroProcessor.java | 48 +-- .../kite/TestKiteProcessorsCluster.java | 123 +++--- .../kite/TestKiteStorageProcessor.java | 221 +++++----- .../apache/nifi/processors/kite/TestUtil.java | 110 ++--- 12 files changed, 707 insertions(+), 714 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java index 56418f4bae..fec823918e 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java @@ -81,7 +81,7 @@ abstract class AbstractKiteProcessor extends AbstractProcessor { public ValidationResult validate(String subject, String uri, ValidationContext context) { String message = "not set"; boolean isValid = true; - + if (uri.trim().isEmpty()) { isValid = false; } else { @@ -95,7 +95,7 @@ abstract class AbstractKiteProcessor extends AbstractProcessor { } } } - + return new ValidationResult.Builder() .subject(subject) .input(uri) @@ -163,14 +163,14 @@ abstract class AbstractKiteProcessor extends AbstractProcessor { public ValidationResult validate(String subject, String uri, ValidationContext context) { Configuration conf = getConfiguration(context.getProperty(CONF_XML_FILES).getValue()); String error = null; - + final boolean elPresent = context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(uri); if (!elPresent) { try { getSchema(uri, conf); - } catch (SchemaNotFoundException e) { + } catch (SchemaNotFoundException e) { error = e.getMessage(); - } + } } return new ValidationResult.Builder() .subject(subject) diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java index 9ff0f73f9f..53075c7ab0 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.nifi.processors.kite; import org.apache.avro.Schema; @@ -24,17 +23,16 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; - class AvroUtil { - @SuppressWarnings("unchecked") - public static DatumWriter newDatumWriter(Schema schema, Class dClass) { - return (DatumWriter) GenericData.get().createDatumWriter(schema); - } + @SuppressWarnings("unchecked") + public static DatumWriter newDatumWriter(Schema schema, Class dClass) { + return (DatumWriter) GenericData.get().createDatumWriter(schema); + } - @SuppressWarnings("unchecked") - public static DatumReader newDatumReader(Schema schema, Class dClass) { - return (DatumReader) GenericData.get().createDatumReader(schema); - } + @SuppressWarnings("unchecked") + public static DatumReader newDatumReader(Schema schema, Class dClass) { + return (DatumReader) GenericData.get().createDatumReader(schema); + } } diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java index c6f58c7bed..564a203a6f 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.nifi.processors.kite; import com.google.common.annotations.VisibleForTesting; @@ -57,204 +56,205 @@ import static org.apache.nifi.processor.util.StandardValidators.createLongValida @Tags({"kite", "csv", "avro"}) @CapabilityDescription( - "Converts CSV files to Avro according to an Avro Schema") + "Converts CSV files to Avro according to an Avro Schema") public class ConvertCSVToAvro extends AbstractKiteProcessor { - private static final CSVProperties DEFAULTS = new CSVProperties.Builder().build(); - private static final Validator CHAR_VALIDATOR = new Validator() { - @Override - public ValidationResult validate(String subject, String input, - ValidationContext context) { - return new ValidationResult.Builder() - .subject(subject) - .input(input) - .explanation("Only single characters are supported") - .valid(input.length() == 1) - .build(); - } - }; + private static final CSVProperties DEFAULTS = new CSVProperties.Builder().build(); - private static final Relationship SUCCESS = new Relationship.Builder() - .name("success") - .description("FlowFile content has been successfully saved") - .build(); - - private static final Relationship FAILURE = new Relationship.Builder() - .name("failure") - .description("FlowFile content could not be processed") - .build(); - - @VisibleForTesting - static final PropertyDescriptor SCHEMA = - new PropertyDescriptor.Builder() - .name("Record schema") - .description("Outgoing Avro schema for each record created from a CSV row") - .addValidator(SCHEMA_VALIDATOR) - .expressionLanguageSupported(true) - .required(true) - .build(); - - @VisibleForTesting - static final PropertyDescriptor CHARSET = - new PropertyDescriptor.Builder() - .name("CSV charset") - .description("Character set for CSV files") - .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) - .defaultValue(DEFAULTS.charset) - .build(); - - @VisibleForTesting - static final PropertyDescriptor DELIMITER = - new PropertyDescriptor.Builder() - .name("CSV delimiter") - .description("Delimiter character for CSV records") - .addValidator(CHAR_VALIDATOR) - .defaultValue(DEFAULTS.delimiter) - .build(); - - @VisibleForTesting - static final PropertyDescriptor QUOTE = - new PropertyDescriptor.Builder() - .name("CSV quote character") - .description("Quote character for CSV values") - .addValidator(CHAR_VALIDATOR) - .defaultValue(DEFAULTS.quote) - .build(); - - @VisibleForTesting - static final PropertyDescriptor ESCAPE = - new PropertyDescriptor.Builder() - .name("CSV escape character") - .description("Escape character for CSV values") - .addValidator(CHAR_VALIDATOR) - .defaultValue(DEFAULTS.escape) - .build(); - - @VisibleForTesting - static final PropertyDescriptor HAS_HEADER = - new PropertyDescriptor.Builder() - .name("Use CSV header line") - .description("Whether to use the first line as a header") - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .defaultValue(String.valueOf(DEFAULTS.useHeader)) - .build(); - - @VisibleForTesting - static final PropertyDescriptor LINES_TO_SKIP = - new PropertyDescriptor.Builder() - .name("Lines to skip") - .description("Number of lines to skip before reading header or data") - .addValidator(createLongValidator(0L, Integer.MAX_VALUE, true)) - .defaultValue(String.valueOf(DEFAULTS.linesToSkip)) - .build(); - - private static final List PROPERTIES = - ImmutableList.builder() - .addAll(AbstractKiteProcessor.getProperties()) - .add(SCHEMA) - .add(CHARSET) - .add(DELIMITER) - .add(QUOTE) - .add(ESCAPE) - .add(HAS_HEADER) - .add(LINES_TO_SKIP) - .build(); - - private static final Set RELATIONSHIPS = - ImmutableSet.builder() - .add(SUCCESS) - .add(FAILURE) - .build(); - - // Immutable configuration - @VisibleForTesting - volatile CSVProperties props; - - @Override - protected List getSupportedPropertyDescriptors() { - return PROPERTIES; - } - - @Override - public Set getRelationships() { - return RELATIONSHIPS; - } - - @OnScheduled - public void createCSVProperties(ProcessContext context) throws IOException { - super.setDefaultConfiguration(context); - - this.props = new CSVProperties.Builder() - .charset(context.getProperty(CHARSET).getValue()) - .delimiter(context.getProperty(DELIMITER).getValue()) - .quote(context.getProperty(QUOTE).getValue()) - .escape(context.getProperty(ESCAPE).getValue()) - .hasHeader(context.getProperty(HAS_HEADER).asBoolean()) - .linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger()) - .build(); - } - - @Override - public void onTrigger(ProcessContext context, final ProcessSession session) - throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - String schemaProperty = context.getProperty(SCHEMA) - .evaluateAttributeExpressions(flowFile) - .getValue(); - final Schema schema; - try { - schema = getSchema(schemaProperty, DefaultConfiguration.get()); - } catch (SchemaNotFoundException e) { - getLogger().error("Cannot find schema: " + schemaProperty); - session.transfer(flowFile, FAILURE); - return; - } - - final DataFileWriter writer = new DataFileWriter<>( - AvroUtil.newDatumWriter(schema, Record.class)); - writer.setCodec(CodecFactory.snappyCodec()); - - try { - flowFile = session.write(flowFile, new StreamCallback() { + private static final Validator CHAR_VALIDATOR = new Validator() { @Override - public void process(InputStream in, OutputStream out) throws IOException { - long written = 0L; - long errors = 0L; - try (CSVFileReader reader = new CSVFileReader<>( - in, props, schema, Record.class)) { - reader.initialize(); - try (DataFileWriter w = writer.create(schema, out)) { - while (reader.hasNext()) { - try { - Record record = reader.next(); - w.append(record); - written += 1; - } catch (DatasetRecordException e) { - errors += 1; - } - } - } - } - session.adjustCounter("Converted records", written, - false /* update only if file transfer is successful */); - session.adjustCounter("Conversion errors", errors, - false /* update only if file transfer is successful */); + public ValidationResult validate(String subject, String input, + ValidationContext context) { + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .explanation("Only single characters are supported") + .valid(input.length() == 1) + .build(); } - }); + }; - session.transfer(flowFile, SUCCESS); + private static final Relationship SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFile content has been successfully saved") + .build(); - //session.getProvenanceReporter().send(flowFile, target.getUri().toString()); - } catch (ProcessException | DatasetIOException e) { - getLogger().error("Failed reading or writing", e); - session.transfer(flowFile, FAILURE); - } catch (DatasetException e) { - getLogger().error("Failed to read FlowFile", e); - session.transfer(flowFile, FAILURE); + private static final Relationship FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFile content could not be processed") + .build(); + + @VisibleForTesting + static final PropertyDescriptor SCHEMA + = new PropertyDescriptor.Builder() + .name("Record schema") + .description("Outgoing Avro schema for each record created from a CSV row") + .addValidator(SCHEMA_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); + + @VisibleForTesting + static final PropertyDescriptor CHARSET + = new PropertyDescriptor.Builder() + .name("CSV charset") + .description("Character set for CSV files") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue(DEFAULTS.charset) + .build(); + + @VisibleForTesting + static final PropertyDescriptor DELIMITER + = new PropertyDescriptor.Builder() + .name("CSV delimiter") + .description("Delimiter character for CSV records") + .addValidator(CHAR_VALIDATOR) + .defaultValue(DEFAULTS.delimiter) + .build(); + + @VisibleForTesting + static final PropertyDescriptor QUOTE + = new PropertyDescriptor.Builder() + .name("CSV quote character") + .description("Quote character for CSV values") + .addValidator(CHAR_VALIDATOR) + .defaultValue(DEFAULTS.quote) + .build(); + + @VisibleForTesting + static final PropertyDescriptor ESCAPE + = new PropertyDescriptor.Builder() + .name("CSV escape character") + .description("Escape character for CSV values") + .addValidator(CHAR_VALIDATOR) + .defaultValue(DEFAULTS.escape) + .build(); + + @VisibleForTesting + static final PropertyDescriptor HAS_HEADER + = new PropertyDescriptor.Builder() + .name("Use CSV header line") + .description("Whether to use the first line as a header") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue(String.valueOf(DEFAULTS.useHeader)) + .build(); + + @VisibleForTesting + static final PropertyDescriptor LINES_TO_SKIP + = new PropertyDescriptor.Builder() + .name("Lines to skip") + .description("Number of lines to skip before reading header or data") + .addValidator(createLongValidator(0L, Integer.MAX_VALUE, true)) + .defaultValue(String.valueOf(DEFAULTS.linesToSkip)) + .build(); + + private static final List PROPERTIES + = ImmutableList.builder() + .addAll(AbstractKiteProcessor.getProperties()) + .add(SCHEMA) + .add(CHARSET) + .add(DELIMITER) + .add(QUOTE) + .add(ESCAPE) + .add(HAS_HEADER) + .add(LINES_TO_SKIP) + .build(); + + private static final Set RELATIONSHIPS + = ImmutableSet.builder() + .add(SUCCESS) + .add(FAILURE) + .build(); + + // Immutable configuration + @VisibleForTesting + volatile CSVProperties props; + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @OnScheduled + public void createCSVProperties(ProcessContext context) throws IOException { + super.setDefaultConfiguration(context); + + this.props = new CSVProperties.Builder() + .charset(context.getProperty(CHARSET).getValue()) + .delimiter(context.getProperty(DELIMITER).getValue()) + .quote(context.getProperty(QUOTE).getValue()) + .escape(context.getProperty(ESCAPE).getValue()) + .hasHeader(context.getProperty(HAS_HEADER).asBoolean()) + .linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger()) + .build(); + } + + @Override + public void onTrigger(ProcessContext context, final ProcessSession session) + throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + String schemaProperty = context.getProperty(SCHEMA) + .evaluateAttributeExpressions(flowFile) + .getValue(); + final Schema schema; + try { + schema = getSchema(schemaProperty, DefaultConfiguration.get()); + } catch (SchemaNotFoundException e) { + getLogger().error("Cannot find schema: " + schemaProperty); + session.transfer(flowFile, FAILURE); + return; + } + + final DataFileWriter writer = new DataFileWriter<>( + AvroUtil.newDatumWriter(schema, Record.class)); + writer.setCodec(CodecFactory.snappyCodec()); + + try { + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + long written = 0L; + long errors = 0L; + try (CSVFileReader reader = new CSVFileReader<>( + in, props, schema, Record.class)) { + reader.initialize(); + try (DataFileWriter w = writer.create(schema, out)) { + while (reader.hasNext()) { + try { + Record record = reader.next(); + w.append(record); + written += 1; + } catch (DatasetRecordException e) { + errors += 1; + } + } + } + } + session.adjustCounter("Converted records", written, + false /* update only if file transfer is successful */); + session.adjustCounter("Conversion errors", errors, + false /* update only if file transfer is successful */); + } + }); + + session.transfer(flowFile, SUCCESS); + + //session.getProvenanceReporter().send(flowFile, target.getUri().toString()); + } catch (ProcessException | DatasetIOException e) { + getLogger().error("Failed reading or writing", e); + session.transfer(flowFile, FAILURE); + } catch (DatasetException e) { + getLogger().error("Failed to read FlowFile", e); + session.transfer(flowFile, FAILURE); + } } - } } diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java index 7a35e31393..78f80b97be 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java @@ -105,7 +105,7 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor { } String schemaProperty = context.getProperty(SCHEMA) - .evaluateAttributeExpressions(flowFile) + .evaluateAttributeExpressions(flowFile) .getValue(); final Schema schema; try { diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java index 5586de1c5f..7a30db1374 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.nifi.processors.kite; import com.google.common.collect.ImmutableList; @@ -50,113 +49,114 @@ import org.kitesdk.data.spi.SchemaValidationUtil; @Tags({"kite", "avro", "parquet", "hadoop", "hive", "hdfs", "hbase"}) @CapabilityDescription("Stores Avro records in a Kite dataset") public class StoreInKiteDataset extends AbstractKiteProcessor { - private static final Relationship SUCCESS = new Relationship.Builder() - .name("success") - .description("FlowFile content has been successfully saved") - .build(); - private static final Relationship INCOMPATIBLE = new Relationship.Builder() - .name("incompatible") - .description("FlowFile content is not compatible with the target dataset") - .build(); + private static final Relationship SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFile content has been successfully saved") + .build(); - private static final Relationship FAILURE = new Relationship.Builder() - .name("failure") - .description("FlowFile content could not be processed") - .build(); + private static final Relationship INCOMPATIBLE = new Relationship.Builder() + .name("incompatible") + .description("FlowFile content is not compatible with the target dataset") + .build(); - public static final PropertyDescriptor KITE_DATASET_URI = - new PropertyDescriptor.Builder() - .name("Target dataset URI") - .description("URI that identifies a Kite dataset where data will be stored") - .addValidator(RECOGNIZED_URI) - .expressionLanguageSupported(true) - .required(true) - .build(); + private static final Relationship FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFile content could not be processed") + .build(); - private static final List PROPERTIES = - ImmutableList.builder() - .addAll(AbstractKiteProcessor.getProperties()) - .add(KITE_DATASET_URI) - .build(); + public static final PropertyDescriptor KITE_DATASET_URI + = new PropertyDescriptor.Builder() + .name("Target dataset URI") + .description("URI that identifies a Kite dataset where data will be stored") + .addValidator(RECOGNIZED_URI) + .expressionLanguageSupported(true) + .required(true) + .build(); - private static final Set RELATIONSHIPS = - ImmutableSet.builder() - .add(SUCCESS) - .add(INCOMPATIBLE) - .add(FAILURE) - .build(); + private static final List PROPERTIES + = ImmutableList.builder() + .addAll(AbstractKiteProcessor.getProperties()) + .add(KITE_DATASET_URI) + .build(); - @Override - protected List getSupportedPropertyDescriptors() { - return PROPERTIES; - } + private static final Set RELATIONSHIPS + = ImmutableSet.builder() + .add(SUCCESS) + .add(INCOMPATIBLE) + .add(FAILURE) + .build(); - @Override - public Set getRelationships() { - return RELATIONSHIPS; - } - - @Override - public void onTrigger(ProcessContext context, final ProcessSession session) - throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; } - final View target = load(context, flowFile); - final Schema schema = target.getDataset().getDescriptor().getSchema(); + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } - try { - StopWatch timer = new StopWatch(true); - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream in) throws IOException { - try (DataFileStream stream = new DataFileStream<>( - in, AvroUtil.newDatumReader(schema, Record.class))) { - IncompatibleSchemaException.check( - SchemaValidationUtil.canRead(stream.getSchema(), schema), - "Incompatible file schema %s, expected %s", - stream.getSchema(), schema); - - long written = 0L; - try (DatasetWriter writer = target.newWriter()) { - for (Record record : stream) { - writer.write(record); - written += 1; - } - } finally { - session.adjustCounter("Stored records", written, - true /* cannot roll back the write */); - } - } + @Override + public void onTrigger(ProcessContext context, final ProcessSession session) + throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; } - }); - timer.stop(); - session.getProvenanceReporter().send(flowFile, - target.getUri().toString(), - timer.getDuration(TimeUnit.MILLISECONDS), - true /* cannot roll back the write */ ); + final View target = load(context, flowFile); + final Schema schema = target.getDataset().getDescriptor().getSchema(); - session.transfer(flowFile, SUCCESS); + try { + StopWatch timer = new StopWatch(true); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + try (DataFileStream stream = new DataFileStream<>( + in, AvroUtil.newDatumReader(schema, Record.class))) { + IncompatibleSchemaException.check( + SchemaValidationUtil.canRead(stream.getSchema(), schema), + "Incompatible file schema %s, expected %s", + stream.getSchema(), schema); - } catch (ProcessException | DatasetIOException e) { - getLogger().error("Failed to read FlowFile", e); - session.transfer(flowFile, FAILURE); + long written = 0L; + try (DatasetWriter writer = target.newWriter()) { + for (Record record : stream) { + writer.write(record); + written += 1; + } + } finally { + session.adjustCounter("Stored records", written, + true /* cannot roll back the write */); + } + } + } + }); + timer.stop(); - } catch (ValidationException e) { - getLogger().error(e.getMessage()); - getLogger().debug("Incompatible schema error", e); - session.transfer(flowFile, INCOMPATIBLE); + session.getProvenanceReporter().send(flowFile, + target.getUri().toString(), + timer.getDuration(TimeUnit.MILLISECONDS), + true /* cannot roll back the write */); + + session.transfer(flowFile, SUCCESS); + + } catch (ProcessException | DatasetIOException e) { + getLogger().error("Failed to read FlowFile", e); + session.transfer(flowFile, FAILURE); + + } catch (ValidationException e) { + getLogger().error(e.getMessage()); + getLogger().debug("Incompatible schema error", e); + session.transfer(flowFile, INCOMPATIBLE); + } } - } - private View load(ProcessContext context, FlowFile file) { - String uri = context.getProperty(KITE_DATASET_URI) - .evaluateAttributeExpressions(file) - .getValue(); - return Datasets.load(uri, Record.class); - } + private View load(ProcessContext context, FlowFile file) { + String uri = context.getProperty(KITE_DATASET_URI) + .evaluateAttributeExpressions(file) + .getValue(); + return Datasets.load(uri, Record.class); + } } diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java index dbe3b81e7b..753b72bfae 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.nifi.processors.kite; import java.io.IOException; @@ -33,94 +32,94 @@ import static org.apache.nifi.processors.kite.TestUtil.streamFor; public class TestCSVToAvroProcessor { - public static final Schema SCHEMA = SchemaBuilder.record("Test").fields() - .requiredLong("id") - .requiredString("color") - .optionalDouble("price") - .endRecord(); + public static final Schema SCHEMA = SchemaBuilder.record("Test").fields() + .requiredLong("id") + .requiredString("color") + .optionalDouble("price") + .endRecord(); - public static final String CSV_CONTENT = "" + - "1,green\n" + - ",blue,\n" + // invalid, ID is missing - "2,grey,12.95"; + public static final String CSV_CONTENT = "" + + "1,green\n" + + ",blue,\n" + // invalid, ID is missing + "2,grey,12.95"; - @Test - public void testBasicConversion() throws IOException { - TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); - runner.assertNotValid(); - runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString()); - runner.assertValid(); + @Test + public void testBasicConversion() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); + runner.assertNotValid(); + runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString()); + runner.assertValid(); - runner.enqueue(streamFor(CSV_CONTENT)); - runner.run(); + runner.enqueue(streamFor(CSV_CONTENT)); + runner.run(); - long converted = runner.getCounterValue("Converted records"); - long errors = runner.getCounterValue("Conversion errors"); - Assert.assertEquals("Should convert 2 rows", 2, converted); - Assert.assertEquals("Should reject 1 row", 1, errors); + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 2 rows", 2, converted); + Assert.assertEquals("Should reject 1 row", 1, errors); - runner.assertAllFlowFilesTransferred("success", 1); - } + runner.assertAllFlowFilesTransferred("success", 1); + } - @Test - public void testAlternateCharset() throws IOException { - TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); - runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString()); - runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16"); - runner.assertValid(); + @Test + public void testAlternateCharset() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); + runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString()); + runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16"); + runner.assertValid(); - runner.enqueue(streamFor(CSV_CONTENT, Charset.forName("UTF-16"))); - runner.run(); + runner.enqueue(streamFor(CSV_CONTENT, Charset.forName("UTF-16"))); + runner.run(); - long converted = runner.getCounterValue("Converted records"); - long errors = runner.getCounterValue("Conversion errors"); - Assert.assertEquals("Should convert 2 rows", 2, converted); - Assert.assertEquals("Should reject 1 row", 1, errors); + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 2 rows", 2, converted); + Assert.assertEquals("Should reject 1 row", 1, errors); - runner.assertAllFlowFilesTransferred("success", 1); - } + runner.assertAllFlowFilesTransferred("success", 1); + } - @Test - public void testCSVProperties() throws IOException { - TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); - ConvertCSVToAvro processor = new ConvertCSVToAvro(); - ProcessContext context = runner.getProcessContext(); + @Test + public void testCSVProperties() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); + ConvertCSVToAvro processor = new ConvertCSVToAvro(); + ProcessContext context = runner.getProcessContext(); - // check defaults - processor.createCSVProperties(context); - Assert.assertEquals("Charset should match", - "utf8", processor.props.charset); - Assert.assertEquals("Delimiter should match", - ",", processor.props.delimiter); - Assert.assertEquals("Quote should match", - "\"", processor.props.quote); - Assert.assertEquals("Escape should match", - "\\", processor.props.escape); - Assert.assertEquals("Header flag should match", - false, processor.props.useHeader); - Assert.assertEquals("Lines to skip should match", - 0, processor.props.linesToSkip); + // check defaults + processor.createCSVProperties(context); + Assert.assertEquals("Charset should match", + "utf8", processor.props.charset); + Assert.assertEquals("Delimiter should match", + ",", processor.props.delimiter); + Assert.assertEquals("Quote should match", + "\"", processor.props.quote); + Assert.assertEquals("Escape should match", + "\\", processor.props.escape); + Assert.assertEquals("Header flag should match", + false, processor.props.useHeader); + Assert.assertEquals("Lines to skip should match", + 0, processor.props.linesToSkip); - runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16"); - runner.setProperty(ConvertCSVToAvro.DELIMITER, "|"); - runner.setProperty(ConvertCSVToAvro.QUOTE, "'"); - runner.setProperty(ConvertCSVToAvro.ESCAPE, "\u2603"); - runner.setProperty(ConvertCSVToAvro.HAS_HEADER, "true"); - runner.setProperty(ConvertCSVToAvro.LINES_TO_SKIP, "2"); + runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16"); + runner.setProperty(ConvertCSVToAvro.DELIMITER, "|"); + runner.setProperty(ConvertCSVToAvro.QUOTE, "'"); + runner.setProperty(ConvertCSVToAvro.ESCAPE, "\u2603"); + runner.setProperty(ConvertCSVToAvro.HAS_HEADER, "true"); + runner.setProperty(ConvertCSVToAvro.LINES_TO_SKIP, "2"); - // check updates - processor.createCSVProperties(context); - Assert.assertEquals("Charset should match", - "utf16", processor.props.charset); - Assert.assertEquals("Delimiter should match", - "|", processor.props.delimiter); - Assert.assertEquals("Quote should match", - "'", processor.props.quote); - Assert.assertEquals("Escape should match", - "\u2603", processor.props.escape); - Assert.assertEquals("Header flag should match", - true, processor.props.useHeader); - Assert.assertEquals("Lines to skip should match", - 2, processor.props.linesToSkip); - } + // check updates + processor.createCSVProperties(context); + Assert.assertEquals("Charset should match", + "utf16", processor.props.charset); + Assert.assertEquals("Delimiter should match", + "|", processor.props.delimiter); + Assert.assertEquals("Quote should match", + "'", processor.props.quote); + Assert.assertEquals("Escape should match", + "\u2603", processor.props.escape); + Assert.assertEquals("Header flag should match", + true, processor.props.useHeader); + Assert.assertEquals("Lines to skip should match", + 2, processor.props.linesToSkip); + } } diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java index 7b1019de6f..724a4c64da 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.nifi.processors.kite; import java.io.File; @@ -35,43 +34,43 @@ import org.kitesdk.data.spi.DefaultConfiguration; public class TestConfigurationProperty { - @Rule - public final TemporaryFolder temp = new TemporaryFolder(); - public File confLocation; + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + public File confLocation; - @Before - public void saveConfiguration() throws IOException { - Configuration conf = new Configuration(false); - conf.setBoolean("nifi.config.canary", true); + @Before + public void saveConfiguration() throws IOException { + Configuration conf = new Configuration(false); + conf.setBoolean("nifi.config.canary", true); - confLocation = temp.newFile("nifi-conf.xml"); - FileOutputStream out = new FileOutputStream(confLocation); - conf.writeXml(out); - out.close(); - } + confLocation = temp.newFile("nifi-conf.xml"); + FileOutputStream out = new FileOutputStream(confLocation); + conf.writeXml(out); + out.close(); + } - @Test - public void testConfigurationCanary() throws IOException { - TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); - runner.setProperty( - AbstractKiteProcessor.CONF_XML_FILES, confLocation.toString()); + @Test + public void testConfigurationCanary() throws IOException { + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.setProperty( + AbstractKiteProcessor.CONF_XML_FILES, confLocation.toString()); - Assert.assertFalse("Should not contain canary value", - DefaultConfiguration.get().getBoolean("nifi.config.canary", false)); + Assert.assertFalse("Should not contain canary value", + DefaultConfiguration.get().getBoolean("nifi.config.canary", false)); - AbstractKiteProcessor processor = new StoreInKiteDataset(); - ProcessContext context = runner.getProcessContext(); - processor.setDefaultConfiguration(context); + AbstractKiteProcessor processor = new StoreInKiteDataset(); + ProcessContext context = runner.getProcessContext(); + processor.setDefaultConfiguration(context); - Assert.assertTrue("Should contain canary value", - DefaultConfiguration.get().getBoolean("nifi.config.canary", false)); - } + Assert.assertTrue("Should contain canary value", + DefaultConfiguration.get().getBoolean("nifi.config.canary", false)); + } - @Test - public void testFilesMustExist() throws IOException { - TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); - runner.setProperty( - AbstractKiteProcessor.CONF_XML_FILES, temp.newFile().toString()); - runner.assertNotValid(); - } + @Test + public void testFilesMustExist() throws IOException { + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.setProperty( + AbstractKiteProcessor.CONF_XML_FILES, temp.newFile().toString()); + runner.assertNotValid(); + } } diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java index a3489ec1ab..9354e8fa9a 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.nifi.processors.kite; import java.io.File; @@ -39,63 +38,63 @@ import static org.apache.nifi.processors.kite.TestUtil.bytesFor; public class TestGetSchema { - public static final Schema SCHEMA = SchemaBuilder.record("Test").fields() - .requiredLong("id") - .requiredString("color") - .optionalDouble("price") - .endRecord(); + public static final Schema SCHEMA = SchemaBuilder.record("Test").fields() + .requiredLong("id") + .requiredString("color") + .optionalDouble("price") + .endRecord(); - @Rule - public TemporaryFolder temp = new TemporaryFolder(); + @Rule + public TemporaryFolder temp = new TemporaryFolder(); - @Test - @Ignore("Does not work on windows") - public void testSchemaFromFileSystem() throws IOException { - File schemaFile = temp.newFile("schema.avsc"); - FileOutputStream out = new FileOutputStream(schemaFile); - out.write(bytesFor(SCHEMA.toString(), Charset.forName("utf8"))); - out.close(); + @Test + @Ignore("Does not work on windows") + public void testSchemaFromFileSystem() throws IOException { + File schemaFile = temp.newFile("schema.avsc"); + FileOutputStream out = new FileOutputStream(schemaFile); + out.write(bytesFor(SCHEMA.toString(), Charset.forName("utf8"))); + out.close(); - Schema schema = AbstractKiteProcessor.getSchema( - schemaFile.toString(), DefaultConfiguration.get()); + Schema schema = AbstractKiteProcessor.getSchema( + schemaFile.toString(), DefaultConfiguration.get()); - Assert.assertEquals("Schema from file should match", SCHEMA, schema); - } - - @Test - @Ignore("Does not work on windows") - public void testSchemaFromKiteURIs() throws IOException { - String location = temp.newFolder("ns", "temp").toString(); - if (location.endsWith("/")) { - location = location.substring(0, location.length() - 1); + Assert.assertEquals("Schema from file should match", SCHEMA, schema); } - String datasetUri = "dataset:" + location; - DatasetDescriptor descriptor = new DatasetDescriptor.Builder() - .schema(SCHEMA) - .build(); - Datasets.create(datasetUri, descriptor); + @Test + @Ignore("Does not work on windows") + public void testSchemaFromKiteURIs() throws IOException { + String location = temp.newFolder("ns", "temp").toString(); + if (location.endsWith("/")) { + location = location.substring(0, location.length() - 1); + } + String datasetUri = "dataset:" + location; + DatasetDescriptor descriptor = new DatasetDescriptor.Builder() + .schema(SCHEMA) + .build(); - Schema schema = AbstractKiteProcessor.getSchema( - datasetUri, DefaultConfiguration.get()); - Assert.assertEquals("Schema from dataset URI should match", SCHEMA, schema); + Datasets.create(datasetUri, descriptor); - schema = AbstractKiteProcessor.getSchema( - "view:file:" + location + "?color=orange", DefaultConfiguration.get()); - Assert.assertEquals("Schema from view URI should match", SCHEMA, schema); - } + Schema schema = AbstractKiteProcessor.getSchema( + datasetUri, DefaultConfiguration.get()); + Assert.assertEquals("Schema from dataset URI should match", SCHEMA, schema); - @Test - public void testSchemaFromResourceURI() throws IOException { - DatasetDescriptor descriptor = new DatasetDescriptor.Builder() - .schemaUri("resource:schema/user.avsc") // in kite-data-core test-jar - .build(); - Schema expected = descriptor.getSchema(); + schema = AbstractKiteProcessor.getSchema( + "view:file:" + location + "?color=orange", DefaultConfiguration.get()); + Assert.assertEquals("Schema from view URI should match", SCHEMA, schema); + } - Schema schema = AbstractKiteProcessor.getSchema( - "resource:schema/user.avsc", DefaultConfiguration.get()); + @Test + public void testSchemaFromResourceURI() throws IOException { + DatasetDescriptor descriptor = new DatasetDescriptor.Builder() + .schemaUri("resource:schema/user.avsc") // in kite-data-core test-jar + .build(); + Schema expected = descriptor.getSchema(); - Assert.assertEquals("Schema from resource URI should match", - expected, schema); - } + Schema schema = AbstractKiteProcessor.getSchema( + "resource:schema/user.avsc", DefaultConfiguration.get()); + + Assert.assertEquals("Schema from resource URI should match", + expected, schema); + } } diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java index 434b969879..d50e7f9c0b 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.nifi.processors.kite; import java.io.IOException; @@ -30,32 +29,33 @@ import org.junit.Test; import static org.apache.nifi.processors.kite.TestUtil.streamFor; public class TestJSONToAvroProcessor { - public static final Schema SCHEMA = SchemaBuilder.record("Test").fields() - .requiredLong("id") - .requiredString("color") - .optionalDouble("price") - .endRecord(); - public static final String JSON_CONTENT = "" + - "{\"id\": 1,\"color\": \"green\"}" + - "{\"id\": \"120V\", \"color\": \"blue\"}\n" + // invalid, ID is a string - "{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }"; + public static final Schema SCHEMA = SchemaBuilder.record("Test").fields() + .requiredLong("id") + .requiredString("color") + .optionalDouble("price") + .endRecord(); - @Test - public void testBasicConversion() throws IOException { - TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class); - runner.assertNotValid(); - runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString()); - runner.assertValid(); + public static final String JSON_CONTENT = "" + + "{\"id\": 1,\"color\": \"green\"}" + + "{\"id\": \"120V\", \"color\": \"blue\"}\n" + // invalid, ID is a string + "{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }"; - runner.enqueue(streamFor(JSON_CONTENT)); - runner.run(); + @Test + public void testBasicConversion() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class); + runner.assertNotValid(); + runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString()); + runner.assertValid(); - long converted = runner.getCounterValue("Converted records"); - long errors = runner.getCounterValue("Conversion errors"); - Assert.assertEquals("Should convert 2 rows", 2, converted); - Assert.assertEquals("Should reject 1 row", 1, errors); + runner.enqueue(streamFor(JSON_CONTENT)); + runner.run(); - runner.assertAllFlowFilesTransferred("success", 1); - } + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 2 rows", 2, converted); + Assert.assertEquals("Should reject 1 row", 1, errors); + + runner.assertAllFlowFilesTransferred("success", 1); + } } diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java index 00a98dbb44..087e1cb620 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.nifi.processors.kite; import com.google.common.collect.Lists; @@ -54,79 +53,79 @@ import static org.apache.nifi.processors.kite.TestUtil.user; @Ignore("Does not work on windows") public class TestKiteProcessorsCluster { - public static MiniCluster cluster = null; - public static DatasetDescriptor descriptor = new DatasetDescriptor.Builder() - .schema(USER_SCHEMA) - .build(); + public static MiniCluster cluster = null; + public static DatasetDescriptor descriptor = new DatasetDescriptor.Builder() + .schema(USER_SCHEMA) + .build(); - @BeforeClass - public static void startCluster() throws IOException, InterruptedException { - long rand = Math.abs((long) (Math.random() * 1000000)); - cluster = new MiniCluster.Builder() - .workDir("/tmp/minicluster-" + rand) - .clean(true) - .addService(HdfsService.class) - .addService(HiveService.class) - .bindIP("127.0.0.1") - .hiveMetastorePort(9083) - .build(); - cluster.start(); - } - - @AfterClass - public static void stopCluster() throws IOException, InterruptedException { - if (cluster != null) { - cluster.stop(); - cluster = null; + @BeforeClass + public static void startCluster() throws IOException, InterruptedException { + long rand = Math.abs((long) (Math.random() * 1000000)); + cluster = new MiniCluster.Builder() + .workDir("/tmp/minicluster-" + rand) + .clean(true) + .addService(HdfsService.class) + .addService(HiveService.class) + .bindIP("127.0.0.1") + .hiveMetastorePort(9083) + .build(); + cluster.start(); } - } - @Test - public void testBasicStoreToHive() throws IOException { - String datasetUri = "dataset:hive:ns/test"; + @AfterClass + public static void stopCluster() throws IOException, InterruptedException { + if (cluster != null) { + cluster.stop(); + cluster = null; + } + } - Dataset dataset = Datasets.create(datasetUri, descriptor, Record.class); + @Test + public void testBasicStoreToHive() throws IOException { + String datasetUri = "dataset:hive:ns/test"; - TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); - runner.assertNotValid(); + Dataset dataset = Datasets.create(datasetUri, descriptor, Record.class); - runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri); - runner.assertValid(); + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.assertNotValid(); - List users = Lists.newArrayList( - user("a", "a@example.com"), - user("b", "b@example.com"), - user("c", "c@example.com") - ); + runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri); + runner.assertValid(); - runner.enqueue(streamFor(users)); - runner.run(); + List users = Lists.newArrayList( + user("a", "a@example.com"), + user("b", "b@example.com"), + user("c", "c@example.com") + ); - runner.assertAllFlowFilesTransferred("success", 1); - List stored = Lists.newArrayList( - (Iterable) dataset.newReader()); - Assert.assertEquals("Records should match", users, stored); + runner.enqueue(streamFor(users)); + runner.run(); - Datasets.delete(datasetUri); - } + runner.assertAllFlowFilesTransferred("success", 1); + List stored = Lists.newArrayList( + (Iterable) dataset.newReader()); + Assert.assertEquals("Records should match", users, stored); - @Test - public void testSchemaFromDistributedFileSystem() throws IOException { - Schema expected = SchemaBuilder.record("Test").fields() - .requiredLong("id") - .requiredString("color") - .optionalDouble("price") - .endRecord(); + Datasets.delete(datasetUri); + } - Path schemaPath = new Path("hdfs:/tmp/schema.avsc"); - FileSystem fs = schemaPath.getFileSystem(DefaultConfiguration.get()); - OutputStream out = fs.create(schemaPath); - out.write(bytesFor(expected.toString(), Charset.forName("utf8"))); - out.close(); + @Test + public void testSchemaFromDistributedFileSystem() throws IOException { + Schema expected = SchemaBuilder.record("Test").fields() + .requiredLong("id") + .requiredString("color") + .optionalDouble("price") + .endRecord(); - Schema schema = AbstractKiteProcessor.getSchema( - schemaPath.toString(), DefaultConfiguration.get()); + Path schemaPath = new Path("hdfs:/tmp/schema.avsc"); + FileSystem fs = schemaPath.getFileSystem(DefaultConfiguration.get()); + OutputStream out = fs.create(schemaPath); + out.write(bytesFor(expected.toString(), Charset.forName("utf8"))); + out.close(); - Assert.assertEquals("Schema from file should match", expected, schema); - } + Schema schema = AbstractKiteProcessor.getSchema( + schemaPath.toString(), DefaultConfiguration.get()); + + Assert.assertEquals("Schema from file should match", expected, schema); + } } diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java index 5063f5d81c..3fcae4f844 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.nifi.processors.kite; import com.google.common.collect.Lists; @@ -47,125 +46,125 @@ import static org.apache.nifi.processors.kite.TestUtil.user; @Ignore("Does not work on windows") public class TestKiteStorageProcessor { - @Rule - public TemporaryFolder temp = new TemporaryFolder(); + @Rule + public TemporaryFolder temp = new TemporaryFolder(); - private String datasetUri = null; - private Dataset dataset = null; + private String datasetUri = null; + private Dataset dataset = null; - @Before - public void createDataset() throws Exception { - DatasetDescriptor descriptor = new DatasetDescriptor.Builder() - .schema(TestUtil.USER_SCHEMA) - .build(); - this.datasetUri = "dataset:file:" + temp.newFolder("ns", "temp").toString(); - this.dataset = Datasets.create(datasetUri, descriptor, Record.class); - } - - @After - public void deleteDataset() throws Exception { - Datasets.delete(datasetUri); - } - - @Test - public void testBasicStore() throws IOException { - TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); - runner.assertNotValid(); - - runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri); - runner.assertValid(); - - List users = Lists.newArrayList( - user("a", "a@example.com"), - user("b", "b@example.com"), - user("c", "c@example.com") - ); - - runner.enqueue(streamFor(users)); - runner.run(); - - runner.assertAllFlowFilesTransferred("success", 1); - runner.assertQueueEmpty(); - Assert.assertEquals("Should store 3 values", - 3, (long) runner.getCounterValue("Stored records")); - - List stored = Lists.newArrayList( - (Iterable) dataset.newReader()); - Assert.assertEquals("Records should match", users, stored); - } - - @Test - public void testViewURI() { - TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); - runner.setProperty( - StoreInKiteDataset.KITE_DATASET_URI, "view:hive:ns/table?year=2015"); - runner.assertValid(); - } - - @Test - public void testInvalidURI() { - TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); - runner.setProperty( - StoreInKiteDataset.KITE_DATASET_URI, "dataset:unknown"); - runner.assertNotValid(); - } - - @Test - public void testUnreadableContent() throws IOException { - TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); - runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri); - runner.assertValid(); - - runner.enqueue(invalidStreamFor(user("a", "a@example.com"))); - runner.run(); - - runner.assertAllFlowFilesTransferred("failure", 1); - } - - @Test - public void testCorruptedBlocks() throws IOException { - TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); - runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri); - runner.assertValid(); - - List records = Lists.newArrayList(); - for (int i = 0; i < 10000; i += 1) { - String num = String.valueOf(i); - records.add(user(num, num + "@example.com")); + @Before + public void createDataset() throws Exception { + DatasetDescriptor descriptor = new DatasetDescriptor.Builder() + .schema(TestUtil.USER_SCHEMA) + .build(); + this.datasetUri = "dataset:file:" + temp.newFolder("ns", "temp").toString(); + this.dataset = Datasets.create(datasetUri, descriptor, Record.class); } - runner.enqueue(invalidStreamFor(records)); - runner.run(); + @After + public void deleteDataset() throws Exception { + Datasets.delete(datasetUri); + } - long stored = runner.getCounterValue("Stored records"); - Assert.assertTrue("Should store some readable values", - 0 < stored && stored < 10000); + @Test + public void testBasicStore() throws IOException { + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.assertNotValid(); - runner.assertAllFlowFilesTransferred("success", 1); - } + runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri); + runner.assertValid(); - @Test - public void testIncompatibleSchema() throws IOException { - Schema incompatible = SchemaBuilder.record("User").fields() - .requiredLong("id") - .requiredString("username") - .optionalString("email") // the dataset requires this field - .endRecord(); + List users = Lists.newArrayList( + user("a", "a@example.com"), + user("b", "b@example.com"), + user("c", "c@example.com") + ); - // this user has the email field and could be stored, but the schema is - // still incompatible so the entire stream is rejected - Record incompatibleUser = new Record(incompatible); - incompatibleUser.put("id", 1L); - incompatibleUser.put("username", "a"); - incompatibleUser.put("email", "a@example.com"); + runner.enqueue(streamFor(users)); + runner.run(); - TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); - runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri); - runner.assertValid(); + runner.assertAllFlowFilesTransferred("success", 1); + runner.assertQueueEmpty(); + Assert.assertEquals("Should store 3 values", + 3, (long) runner.getCounterValue("Stored records")); - runner.enqueue(streamFor(incompatibleUser)); - runner.run(); + List stored = Lists.newArrayList( + (Iterable) dataset.newReader()); + Assert.assertEquals("Records should match", users, stored); + } - runner.assertAllFlowFilesTransferred("incompatible", 1); - } + @Test + public void testViewURI() { + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.setProperty( + StoreInKiteDataset.KITE_DATASET_URI, "view:hive:ns/table?year=2015"); + runner.assertValid(); + } + + @Test + public void testInvalidURI() { + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.setProperty( + StoreInKiteDataset.KITE_DATASET_URI, "dataset:unknown"); + runner.assertNotValid(); + } + + @Test + public void testUnreadableContent() throws IOException { + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri); + runner.assertValid(); + + runner.enqueue(invalidStreamFor(user("a", "a@example.com"))); + runner.run(); + + runner.assertAllFlowFilesTransferred("failure", 1); + } + + @Test + public void testCorruptedBlocks() throws IOException { + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri); + runner.assertValid(); + + List records = Lists.newArrayList(); + for (int i = 0; i < 10000; i += 1) { + String num = String.valueOf(i); + records.add(user(num, num + "@example.com")); + } + + runner.enqueue(invalidStreamFor(records)); + runner.run(); + + long stored = runner.getCounterValue("Stored records"); + Assert.assertTrue("Should store some readable values", + 0 < stored && stored < 10000); + + runner.assertAllFlowFilesTransferred("success", 1); + } + + @Test + public void testIncompatibleSchema() throws IOException { + Schema incompatible = SchemaBuilder.record("User").fields() + .requiredLong("id") + .requiredString("username") + .optionalString("email") // the dataset requires this field + .endRecord(); + + // this user has the email field and could be stored, but the schema is + // still incompatible so the entire stream is rejected + Record incompatibleUser = new Record(incompatible); + incompatibleUser.put("id", 1L); + incompatibleUser.put("username", "a"); + incompatibleUser.put("email", "a@example.com"); + + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri); + runner.assertValid(); + + runner.enqueue(streamFor(incompatibleUser)); + runner.run(); + + runner.assertAllFlowFilesTransferred("incompatible", 1); + } } diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java index 2eb30af13e..37ddbec42f 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.nifi.processors.kite; import java.io.ByteArrayInputStream; @@ -37,67 +36,68 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData.Record; public class TestUtil { - public static final Schema USER_SCHEMA = SchemaBuilder.record("User").fields() - .requiredString("username") - .requiredString("email") - .endRecord(); - public static Record user(String username, String email) { - Record user = new Record(USER_SCHEMA); - user.put("username", username); - user.put("email", email); - return user; - } + public static final Schema USER_SCHEMA = SchemaBuilder.record("User").fields() + .requiredString("username") + .requiredString("email") + .endRecord(); - public static InputStream streamFor(Record... records) throws IOException { - return streamFor(Arrays.asList(records)); - } - - public static InputStream streamFor(List records) throws IOException { - return new ByteArrayInputStream(bytesFor(records)); - } - - public static InputStream invalidStreamFor(Record... records) throws IOException { - return invalidStreamFor(Arrays.asList(records)); - } - - public static InputStream invalidStreamFor(List records) throws IOException { - // purposely truncate the content - byte[] bytes = bytesFor(records); - return new ByteArrayInputStream(bytes, 0, bytes.length / 2); - } - - private static byte[] bytesFor(List records) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - DataFileWriter writer = new DataFileWriter<>( - AvroUtil.newDatumWriter(records.get(0).getSchema(), Record.class)); - writer.setCodec(CodecFactory.snappyCodec()); - writer = writer.create(records.get(0).getSchema(), out); - - for (Record record : records) { - writer.append(record); + public static Record user(String username, String email) { + Record user = new Record(USER_SCHEMA); + user.put("username", username); + user.put("email", email); + return user; } - writer.flush(); + public static InputStream streamFor(Record... records) throws IOException { + return streamFor(Arrays.asList(records)); + } - return out.toByteArray(); - } + public static InputStream streamFor(List records) throws IOException { + return new ByteArrayInputStream(bytesFor(records)); + } - public static InputStream streamFor(String content) throws CharacterCodingException { - return streamFor(content, Charset.forName("utf8")); - } + public static InputStream invalidStreamFor(Record... records) throws IOException { + return invalidStreamFor(Arrays.asList(records)); + } - public static InputStream streamFor(String content, Charset charset) throws CharacterCodingException { - return new ByteArrayInputStream(bytesFor(content, charset)); - } + public static InputStream invalidStreamFor(List records) throws IOException { + // purposely truncate the content + byte[] bytes = bytesFor(records); + return new ByteArrayInputStream(bytes, 0, bytes.length / 2); + } - public static byte[] bytesFor(String content, Charset charset) throws CharacterCodingException { - CharBuffer chars = CharBuffer.wrap(content); - CharsetEncoder encoder = charset.newEncoder(); - ByteBuffer buffer = encoder.encode(chars); - byte[] bytes = new byte[buffer.remaining()]; - buffer.get(bytes); - return bytes; - } + private static byte[] bytesFor(List records) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataFileWriter writer = new DataFileWriter<>( + AvroUtil.newDatumWriter(records.get(0).getSchema(), Record.class)); + writer.setCodec(CodecFactory.snappyCodec()); + writer = writer.create(records.get(0).getSchema(), out); + + for (Record record : records) { + writer.append(record); + } + + writer.flush(); + + return out.toByteArray(); + } + + public static InputStream streamFor(String content) throws CharacterCodingException { + return streamFor(content, Charset.forName("utf8")); + } + + public static InputStream streamFor(String content, Charset charset) throws CharacterCodingException { + return new ByteArrayInputStream(bytesFor(content, charset)); + } + + public static byte[] bytesFor(String content, Charset charset) throws CharacterCodingException { + CharBuffer chars = CharBuffer.wrap(content); + CharsetEncoder encoder = charset.newEncoder(); + ByteBuffer buffer = encoder.encode(chars); + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return bytes; + } }