NIFI-1656 Added locale support to ConvertAvroSchema and fixed locale problems in unit tests

Reviewed by Joe Witt (joewitt@apache.org). This closes #292
This commit is contained in:
trkurc 2016-03-20 18:25:04 -04:00
parent 3921ab827b
commit 9cde92da16
4 changed files with 154 additions and 9 deletions

View File

@ -21,6 +21,7 @@ package org.apache.nifi.processors.kite;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Scanner;
@ -43,6 +44,8 @@ public class AvroRecordConverter {
private final Schema outputSchema;
// Store this from output field to input field so we can look up by output.
private final Map<String, String> fieldMapping;
private final Locale locale;
private static final Locale DEFAULT_LOCALE = Locale.getDefault();
/**
* @param inputSchema
@ -55,6 +58,22 @@ public class AvroRecordConverter {
*/
public AvroRecordConverter(Schema inputSchema, Schema outputSchema,
Map<String, String> fieldMapping) {
this(inputSchema, outputSchema, fieldMapping, DEFAULT_LOCALE);
}
/**
* @param inputSchema
* Schema of input record objects
* @param outputSchema
* Schema of output record objects
* @param fieldMapping
* Map from field name in input record to field name in output
* record.
* @param locale
* Locale to use
*/
public AvroRecordConverter(Schema inputSchema, Schema outputSchema,
Map<String, String> fieldMapping, Locale locale) {
this.inputSchema = inputSchema;
this.outputSchema = outputSchema;
// Need to reverse this map.
@ -63,6 +82,7 @@ public class AvroRecordConverter {
for (Map.Entry<String, String> entry : fieldMapping.entrySet()) {
this.fieldMapping.put(entry.getValue(), entry.getKey());
}
this.locale = locale;
}
/**
@ -224,6 +244,7 @@ public class AvroRecordConverter {
// return questionable results when a String starts with a number
// but then contains other content
Scanner scanner = new Scanner(content.toString());
scanner.useLocale(locale);
switch (nonNillOutput.getType()) {
case LONG:
if (scanner.hasNextLong()) {

View File

@ -24,6 +24,7 @@ import java.io.OutputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
@ -34,6 +35,7 @@ import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.commons.lang.LocaleUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -128,6 +130,27 @@ public class ConvertAvroSchema extends AbstractKiteProcessor {
}
};
public static final String DEFAULT_LOCALE_VALUE = "default";
public static final Validator LOCALE_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
String reason = null;
if (value.equals(DEFAULT_LOCALE_VALUE) == false) {
try {
final Locale locale = LocaleUtils.toLocale(value);
if (locale == null) {
reason = "null locale returned";
} else if (LocaleUtils.isAvailableLocale(locale) == false) {
reason = "locale not available";
}
} catch (final IllegalArgumentException e) {
reason = "invalid format for locale";
}
}
return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
}
};
@VisibleForTesting
static final PropertyDescriptor INPUT_SCHEMA = new PropertyDescriptor.Builder()
.name("Input Schema").description("Avro Schema of Input Flowfiles")
@ -141,10 +164,19 @@ public class ConvertAvroSchema extends AbstractKiteProcessor {
.addValidator(MAPPED_SCHEMA_VALIDATOR).expressionLanguageSupported(true)
.required(true).build();
@VisibleForTesting
static final PropertyDescriptor LOCALE = new PropertyDescriptor.Builder()
.name("Locale")
.description("Locale to use for scanning data (see https://docs.oracle.com/javase/7/docs/api/java/util/Locale.html)" +
"or \" " + DEFAULT_LOCALE_VALUE + "\" for JVM default")
.addValidator(LOCALE_VALIDATOR)
.defaultValue(DEFAULT_LOCALE_VALUE).build();
private static final List<PropertyDescriptor> PROPERTIES = ImmutableList
.<PropertyDescriptor> builder()
.add(INPUT_SCHEMA)
.add(OUTPUT_SCHEMA).build();
.add(OUTPUT_SCHEMA)
.add(LOCALE).build();
private static final Set<Relationship> RELATIONSHIPS = ImmutableSet
.<Relationship> builder().add(SUCCESS).add(FAILURE).build();
@ -240,8 +272,11 @@ public class ConvertAvroSchema extends AbstractKiteProcessor {
fieldMapping.put(entry.getKey().getName(), entry.getValue());
}
}
// Set locale
final String localeProperty = context.getProperty(LOCALE).getValue();
final Locale locale = (localeProperty == DEFAULT_LOCALE_VALUE)?Locale.getDefault():LocaleUtils.toLocale(localeProperty);
final AvroRecordConverter converter = new AvroRecordConverter(
inputSchema, outputSchema, fieldMapping);
inputSchema, outputSchema, fieldMapping, locale);
final DataFileWriter<Record> writer = new DataFileWriter<>(
AvroUtil.newDatumWriter(outputSchema, Record.class));

View File

@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData.Record;
import org.apache.commons.lang.LocaleUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.Test;
@ -87,7 +88,7 @@ public class TestAvroRecordConverter {
.endRecord();
AvroRecordConverter converter = new AvroRecordConverter(input, output,
EMPTY_MAPPING);
EMPTY_MAPPING, LocaleUtils.toLocale("en_US"));
Record inputRecord = new Record(input);
inputRecord.put("s1", null);

View File

@ -22,13 +22,17 @@ import static org.apache.nifi.processors.kite.TestUtil.streamFor;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.text.NumberFormat;
import java.text.ParseException;
import java.util.List;
import java.util.Locale;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.commons.lang.LocaleUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -60,12 +64,15 @@ public class TestConvertAvroSchema {
INPUT_SCHEMA.toString());
runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
OUTPUT_SCHEMA.toString());
Locale locale = Locale.getDefault();
runner.setProperty("primaryColor", "color");
runner.assertValid();
NumberFormat format = NumberFormat.getInstance(locale);
// Two valid rows, and one invalid because "free" is not a double.
Record goodRecord1 = dataBasic("1", "blue", null, null);
Record goodRecord2 = dataBasic("2", "red", "yellow", "5.5");
Record goodRecord2 = dataBasic("2", "red", "yellow", format.format(5.5));
Record badRecord = dataBasic("3", "red", "yellow", "free");
List<Record> input = Lists.newArrayList(goodRecord1, goodRecord2,
badRecord);
@ -107,9 +114,83 @@ public class TestConvertAvroSchema {
count = 0;
for (Record r : successStream) {
if (count == 0) {
Assert.assertEquals(convertBasic(goodRecord1), r);
Assert.assertEquals(convertBasic(goodRecord1, locale), r);
} else {
Assert.assertEquals(convertBasic(goodRecord2), r);
Assert.assertEquals(convertBasic(goodRecord2, locale), r);
}
count++;
}
successStream.close();
Assert.assertEquals(2, count);
}
@Test
public void testBasicConversionWithLocales() throws IOException {
testBasicConversionWithLocale("en_US");
testBasicConversionWithLocale("fr_FR");
}
public void testBasicConversionWithLocale(String localeString) throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
runner.assertNotValid();
runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
INPUT_SCHEMA.toString());
runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
OUTPUT_SCHEMA.toString());
Locale locale = LocaleUtils.toLocale(localeString);
runner.setProperty(ConvertAvroSchema.LOCALE, localeString);
runner.setProperty("primaryColor", "color");
runner.assertValid();
NumberFormat format = NumberFormat.getInstance(locale);
// Two valid rows, and one invalid because "free" is not a double.
Record goodRecord1 = dataBasic("1", "blue", null, null);
Record goodRecord2 = dataBasic("2", "red", "yellow", format.format(5.5));
Record badRecord = dataBasic("3", "red", "yellow", "free");
List<Record> input = Lists.newArrayList(goodRecord1, goodRecord2,
badRecord);
runner.enqueue(streamFor(input));
runner.run();
long converted = runner.getCounterValue("Converted records");
long errors = runner.getCounterValue("Conversion errors");
Assert.assertEquals("Should convert 2 rows", 2, converted);
Assert.assertEquals("Should reject 1 rows", 1, errors);
runner.assertTransferCount("success", 1);
runner.assertTransferCount("failure", 1);
MockFlowFile incompatible = runner.getFlowFilesForRelationship(
"failure").get(0);
GenericDatumReader<Record> reader = new GenericDatumReader<Record>(
INPUT_SCHEMA);
DataFileStream<Record> stream = new DataFileStream<Record>(
new ByteArrayInputStream(
runner.getContentAsByteArray(incompatible)), reader);
int count = 0;
for (Record r : stream) {
Assert.assertEquals(badRecord, r);
count++;
}
stream.close();
Assert.assertEquals(1, count);
Assert.assertEquals("Should accumulate error messages",
FAILURE_SUMMARY, incompatible.getAttribute("errors"));
GenericDatumReader<Record> successReader = new GenericDatumReader<Record>(
OUTPUT_SCHEMA);
DataFileStream<Record> successStream = new DataFileStream<Record>(
new ByteArrayInputStream(runner.getContentAsByteArray(runner
.getFlowFilesForRelationship("success").get(0))),
successReader);
count = 0;
for (Record r : successStream) {
if (count == 0) {
Assert.assertEquals(convertBasic(goodRecord1, locale), r);
} else {
Assert.assertEquals(convertBasic(goodRecord2, locale), r);
}
count++;
}
@ -163,15 +244,22 @@ public class TestConvertAvroSchema {
Assert.assertEquals(2, count);
}
private Record convertBasic(Record inputRecord) {
private Record convertBasic(Record inputRecord, Locale locale) {
Record result = new Record(OUTPUT_SCHEMA);
result.put("id", Long.parseLong(inputRecord.get("id").toString()));
result.put("color", inputRecord.get("primaryColor").toString());
if (inputRecord.get("price") == null) {
result.put("price", null);
} else {
result.put("price",
Double.parseDouble(inputRecord.get("price").toString()));
final NumberFormat format = NumberFormat.getInstance(locale);
double price;
try {
price = format.parse(inputRecord.get("price").toString()).doubleValue();
} catch (ParseException e) {
// Shouldn't happen
throw new RuntimeException(e);
}
result.put("price", price);
}
return result;
}