diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java index 68e6c98342..f66e9ed583 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java @@ -21,6 +21,7 @@ package org.apache.nifi.processors.kite; import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Scanner; @@ -43,6 +44,8 @@ public class AvroRecordConverter { private final Schema outputSchema; // Store this from output field to input field so we can look up by output. private final Map fieldMapping; + private final Locale locale; + private static final Locale DEFAULT_LOCALE = Locale.getDefault(); /** * @param inputSchema @@ -55,6 +58,22 @@ public class AvroRecordConverter { */ public AvroRecordConverter(Schema inputSchema, Schema outputSchema, Map fieldMapping) { + this(inputSchema, outputSchema, fieldMapping, DEFAULT_LOCALE); + } + + /** + * @param inputSchema + * Schema of input record objects + * @param outputSchema + * Schema of output record objects + * @param fieldMapping + * Map from field name in input record to field name in output + * record. + * @param locale + * Locale to use + */ + public AvroRecordConverter(Schema inputSchema, Schema outputSchema, + Map fieldMapping, Locale locale) { this.inputSchema = inputSchema; this.outputSchema = outputSchema; // Need to reverse this map. @@ -63,6 +82,7 @@ public class AvroRecordConverter { for (Map.Entry entry : fieldMapping.entrySet()) { this.fieldMapping.put(entry.getValue(), entry.getKey()); } + this.locale = locale; } /** @@ -224,6 +244,7 @@ public class AvroRecordConverter { // return questionable results when a String starts with a number // but then contains other content Scanner scanner = new Scanner(content.toString()); + scanner.useLocale(locale); switch (nonNillOutput.getType()) { case LONG: if (scanner.hasNextLong()) { diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java index d64f5df507..b0d3518249 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java @@ -24,6 +24,7 @@ import java.io.OutputStream; import java.util.Collection; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; @@ -34,6 +35,7 @@ import org.apache.avro.file.DataFileStream; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericDatumReader; +import org.apache.commons.lang.LocaleUtils; import org.apache.hadoop.conf.Configuration; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -128,6 +130,27 @@ public class ConvertAvroSchema extends AbstractKiteProcessor { } }; + public static final String DEFAULT_LOCALE_VALUE = "default"; + public static final Validator LOCALE_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + String reason = null; + if (value.equals(DEFAULT_LOCALE_VALUE) == false) { + try { + final Locale locale = LocaleUtils.toLocale(value); + if (locale == null) { + reason = "null locale returned"; + } else if (LocaleUtils.isAvailableLocale(locale) == false) { + reason = "locale not available"; + } + } catch (final IllegalArgumentException e) { + reason = "invalid format for locale"; + } + } + return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); + } + }; + @VisibleForTesting static final PropertyDescriptor INPUT_SCHEMA = new PropertyDescriptor.Builder() .name("Input Schema").description("Avro Schema of Input Flowfiles") @@ -141,10 +164,19 @@ public class ConvertAvroSchema extends AbstractKiteProcessor { .addValidator(MAPPED_SCHEMA_VALIDATOR).expressionLanguageSupported(true) .required(true).build(); + @VisibleForTesting + static final PropertyDescriptor LOCALE = new PropertyDescriptor.Builder() + .name("Locale") + .description("Locale to use for scanning data (see https://docs.oracle.com/javase/7/docs/api/java/util/Locale.html)" + + "or \" " + DEFAULT_LOCALE_VALUE + "\" for JVM default") + .addValidator(LOCALE_VALIDATOR) + .defaultValue(DEFAULT_LOCALE_VALUE).build(); + private static final List PROPERTIES = ImmutableList . builder() .add(INPUT_SCHEMA) - .add(OUTPUT_SCHEMA).build(); + .add(OUTPUT_SCHEMA) + .add(LOCALE).build(); private static final Set RELATIONSHIPS = ImmutableSet . builder().add(SUCCESS).add(FAILURE).build(); @@ -240,8 +272,11 @@ public class ConvertAvroSchema extends AbstractKiteProcessor { fieldMapping.put(entry.getKey().getName(), entry.getValue()); } } + // Set locale + final String localeProperty = context.getProperty(LOCALE).getValue(); + final Locale locale = (localeProperty == DEFAULT_LOCALE_VALUE)?Locale.getDefault():LocaleUtils.toLocale(localeProperty); final AvroRecordConverter converter = new AvroRecordConverter( - inputSchema, outputSchema, fieldMapping); + inputSchema, outputSchema, fieldMapping, locale); final DataFileWriter writer = new DataFileWriter<>( AvroUtil.newDatumWriter(outputSchema, Record.class)); diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java index 1a4748f1b6..11e86bfa76 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData.Record; +import org.apache.commons.lang.LocaleUtils; import org.codehaus.jackson.map.ObjectMapper; import org.junit.Test; @@ -87,7 +88,7 @@ public class TestAvroRecordConverter { .endRecord(); AvroRecordConverter converter = new AvroRecordConverter(input, output, - EMPTY_MAPPING); + EMPTY_MAPPING, LocaleUtils.toLocale("en_US")); Record inputRecord = new Record(input); inputRecord.put("s1", null); diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java index 33f3a821f9..2da0513964 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java @@ -22,13 +22,17 @@ import static org.apache.nifi.processors.kite.TestUtil.streamFor; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.text.NumberFormat; +import java.text.ParseException; import java.util.List; +import java.util.Locale; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericDatumReader; +import org.apache.commons.lang.LocaleUtils; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -60,12 +64,15 @@ public class TestConvertAvroSchema { INPUT_SCHEMA.toString()); runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA, OUTPUT_SCHEMA.toString()); + Locale locale = Locale.getDefault(); runner.setProperty("primaryColor", "color"); runner.assertValid(); + NumberFormat format = NumberFormat.getInstance(locale); + // Two valid rows, and one invalid because "free" is not a double. Record goodRecord1 = dataBasic("1", "blue", null, null); - Record goodRecord2 = dataBasic("2", "red", "yellow", "5.5"); + Record goodRecord2 = dataBasic("2", "red", "yellow", format.format(5.5)); Record badRecord = dataBasic("3", "red", "yellow", "free"); List input = Lists.newArrayList(goodRecord1, goodRecord2, badRecord); @@ -107,9 +114,83 @@ public class TestConvertAvroSchema { count = 0; for (Record r : successStream) { if (count == 0) { - Assert.assertEquals(convertBasic(goodRecord1), r); + Assert.assertEquals(convertBasic(goodRecord1, locale), r); } else { - Assert.assertEquals(convertBasic(goodRecord2), r); + Assert.assertEquals(convertBasic(goodRecord2, locale), r); + } + count++; + } + successStream.close(); + Assert.assertEquals(2, count); + } + + @Test + public void testBasicConversionWithLocales() throws IOException { + testBasicConversionWithLocale("en_US"); + testBasicConversionWithLocale("fr_FR"); + } + + public void testBasicConversionWithLocale(String localeString) throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class); + runner.assertNotValid(); + runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA, + INPUT_SCHEMA.toString()); + runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA, + OUTPUT_SCHEMA.toString()); + Locale locale = LocaleUtils.toLocale(localeString); + runner.setProperty(ConvertAvroSchema.LOCALE, localeString); + runner.setProperty("primaryColor", "color"); + runner.assertValid(); + + NumberFormat format = NumberFormat.getInstance(locale); + + // Two valid rows, and one invalid because "free" is not a double. + Record goodRecord1 = dataBasic("1", "blue", null, null); + Record goodRecord2 = dataBasic("2", "red", "yellow", format.format(5.5)); + Record badRecord = dataBasic("3", "red", "yellow", "free"); + List input = Lists.newArrayList(goodRecord1, goodRecord2, + badRecord); + + runner.enqueue(streamFor(input)); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 2 rows", 2, converted); + Assert.assertEquals("Should reject 1 rows", 1, errors); + + runner.assertTransferCount("success", 1); + runner.assertTransferCount("failure", 1); + + MockFlowFile incompatible = runner.getFlowFilesForRelationship( + "failure").get(0); + GenericDatumReader reader = new GenericDatumReader( + INPUT_SCHEMA); + DataFileStream stream = new DataFileStream( + new ByteArrayInputStream( + runner.getContentAsByteArray(incompatible)), reader); + int count = 0; + for (Record r : stream) { + Assert.assertEquals(badRecord, r); + count++; + } + stream.close(); + Assert.assertEquals(1, count); + Assert.assertEquals("Should accumulate error messages", + FAILURE_SUMMARY, incompatible.getAttribute("errors")); + + GenericDatumReader successReader = new GenericDatumReader( + OUTPUT_SCHEMA); + DataFileStream successStream = new DataFileStream( + new ByteArrayInputStream(runner.getContentAsByteArray(runner + .getFlowFilesForRelationship("success").get(0))), + successReader); + count = 0; + for (Record r : successStream) { + if (count == 0) { + Assert.assertEquals(convertBasic(goodRecord1, locale), r); + } else { + Assert.assertEquals(convertBasic(goodRecord2, locale), r); } count++; } @@ -163,15 +244,22 @@ public class TestConvertAvroSchema { Assert.assertEquals(2, count); } - private Record convertBasic(Record inputRecord) { + private Record convertBasic(Record inputRecord, Locale locale) { Record result = new Record(OUTPUT_SCHEMA); result.put("id", Long.parseLong(inputRecord.get("id").toString())); result.put("color", inputRecord.get("primaryColor").toString()); if (inputRecord.get("price") == null) { result.put("price", null); } else { - result.put("price", - Double.parseDouble(inputRecord.get("price").toString())); + final NumberFormat format = NumberFormat.getInstance(locale); + double price; + try { + price = format.parse(inputRecord.get("price").toString()).doubleValue(); + } catch (ParseException e) { + // Shouldn't happen + throw new RuntimeException(e); + } + result.put("price", price); } return result; }