mirror of https://github.com/apache/nifi.git
NIFI-4456: Support multiple JSON objects in JSON record reader/writers
This closes #2640. Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
parent
4ed6631edc
commit
0289ca7114
|
@ -115,6 +115,10 @@
|
|||
<exclude>src/test/resources/json/bank-account-array-different-schemas.json</exclude>
|
||||
<exclude>src/test/resources/json/bank-account-array-optional-balance.json</exclude>
|
||||
<exclude>src/test/resources/json/bank-account-array.json</exclude>
|
||||
<exclude>src/test/resources/json/bank-account-mixed.json</exclude>
|
||||
<exclude>src/test/resources/json/bank-account-multiarray.json</exclude>
|
||||
<exclude>src/test/resources/json/bank-account-multiline.json</exclude>
|
||||
<exclude>src/test/resources/json/bank-account-oneline.json</exclude>
|
||||
<exclude>src/test/resources/json/json-with-unicode.json</exclude>
|
||||
<exclude>src/test/resources/json/primitive-type-array.json</exclude>
|
||||
<exclude>src/test/resources/json/single-bank-account.json</exclude>
|
||||
|
|
|
@ -84,10 +84,6 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
|||
|
||||
@Override
|
||||
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
|
||||
if (firstObjectConsumed && !array) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final JsonNode nextNode = getNextJsonNode();
|
||||
final RecordSchema schema = getSchema();
|
||||
try {
|
||||
|
@ -197,7 +193,8 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
|||
return jsonParser.readValueAsTree();
|
||||
case END_ARRAY:
|
||||
case START_ARRAY:
|
||||
return null;
|
||||
continue;
|
||||
|
||||
default:
|
||||
throw new MalformedRecordException("Expected to get a JSON Object but got a token of type " + token.name());
|
||||
}
|
||||
|
|
|
@ -48,15 +48,16 @@ import org.apache.nifi.serialization.record.RecordSchema;
|
|||
import com.jayway.jsonpath.JsonPath;
|
||||
|
||||
@Tags({"json", "jsonpath", "record", "reader", "parser"})
|
||||
@CapabilityDescription("Parses JSON records and evaluates user-defined JSON Path's against each JSON object. The root element may be either "
|
||||
+ "a single JSON object or a JSON array. If a JSON array is found, each JSON object within that array is treated as a separate record. "
|
||||
+ "User-defined properties define the fields that should be extracted from the JSON in order to form the fields of a Record. Any JSON field "
|
||||
+ "that is not extracted via a JSONPath will not be returned in the JSON Records.")
|
||||
@CapabilityDescription("Parses JSON records and evaluates user-defined JSON Path's against each JSON object. While the reader expects each record "
|
||||
+ "to be well-formed JSON, the content of a FlowFile may consist of many records, each as a well-formed JSON array or JSON object with "
|
||||
+ "optional whitespace between them, such as the common 'JSON-per-line' format. If an array is encountered, each element in that array will "
|
||||
+ "be treated as a separate record. User-defined properties define the fields that should be extracted from the JSON in order to form the "
|
||||
+ "fields of a Record. Any JSON field that is not extracted via a JSONPath will not be returned in the JSON Records.")
|
||||
@SeeAlso(JsonTreeReader.class)
|
||||
@DynamicProperty(name = "The field name for the record.",
|
||||
value="A JSONPath Expression that will be evaluated against each JSON record. The result of the JSONPath will be the value of the "
|
||||
+ "field whose name is the same as the property name.",
|
||||
description="User-defined properties identifiy how to extract specific fields from a JSON object in order to create a Record",
|
||||
description="User-defined properties identify how to extract specific fields from a JSON object in order to create a Record",
|
||||
expressionLanguageScope=ExpressionLanguageScope.NONE)
|
||||
public class JsonPathReader extends SchemaRegistryService implements RecordReaderFactory {
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.nifi.json;
|
|||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
|
@ -27,6 +28,8 @@ import org.apache.nifi.annotation.documentation.Tags;
|
|||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
|
@ -37,8 +40,8 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
|
|||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
@Tags({"json", "resultset", "writer", "serialize", "record", "recordset", "row"})
|
||||
@CapabilityDescription("Writes the results of a RecordSet as a JSON Array. Even if the RecordSet "
|
||||
+ "consists of a single row, it will be written as an array with a single element.")
|
||||
@CapabilityDescription("Writes the results of a RecordSet as either a JSON Array or one JSON object per line. If using Array output, then even if the RecordSet "
|
||||
+ "consists of a single row, it will be written as an array with a single element. If using One Line Per Object output, the JSON objects cannot be pretty-printed.")
|
||||
public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory {
|
||||
|
||||
static final AllowableValue ALWAYS_SUPPRESS = new AllowableValue("always-suppress", "Always Suppress",
|
||||
|
@ -48,6 +51,11 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
|
|||
static final AllowableValue SUPPRESS_MISSING = new AllowableValue("suppress-missing", "Suppress Missing Values",
|
||||
"When a field has a value of null, it will be written out. However, if a field is defined in the schema and not present in the record, the field will not be written out.");
|
||||
|
||||
static final AllowableValue OUTPUT_ARRAY = new AllowableValue("output-array", "Array",
|
||||
"Output records as a JSON array");
|
||||
static final AllowableValue OUTPUT_ONELINE = new AllowableValue("output-oneline", "One Line Per Object",
|
||||
"Output records with one JSON object per line, delimited by a newline character");
|
||||
|
||||
static final PropertyDescriptor SUPPRESS_NULLS = new PropertyDescriptor.Builder()
|
||||
.name("suppress-nulls")
|
||||
.displayName("Suppress Null Values")
|
||||
|
@ -64,18 +72,40 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
|
|||
.defaultValue("false")
|
||||
.required(true)
|
||||
.build();
|
||||
static final PropertyDescriptor OUTPUT_GROUPING = new PropertyDescriptor.Builder()
|
||||
.name("output-grouping")
|
||||
.displayName("Output Grouping")
|
||||
.description("Specifies how the writer should output the JSON records (as an array or one object per line, e.g.) Note that if 'One Line Per Object' is "
|
||||
+ "selected, then Pretty Print JSON must be false.")
|
||||
.allowableValues(OUTPUT_ARRAY, OUTPUT_ONELINE)
|
||||
.defaultValue(OUTPUT_ARRAY.getValue())
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
private volatile boolean prettyPrint;
|
||||
private volatile NullSuppression nullSuppression;
|
||||
private volatile OutputGrouping outputGrouping;
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
|
||||
properties.add(PRETTY_PRINT_JSON);
|
||||
properties.add(SUPPRESS_NULLS);
|
||||
properties.add(OUTPUT_GROUPING);
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext context) {
|
||||
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
|
||||
// Don't allow Pretty Print if One Line Per Object is selected
|
||||
if (context.getProperty(PRETTY_PRINT_JSON).asBoolean() && context.getProperty(OUTPUT_GROUPING).getValue().equals(OUTPUT_ONELINE.getValue())) {
|
||||
problems.add(new ValidationResult.Builder().input("Pretty Print").valid(false)
|
||||
.explanation("Pretty Print JSON must be false when 'Output Grouping' is set to 'One Line Per Object'").build());
|
||||
}
|
||||
return problems;
|
||||
}
|
||||
|
||||
@OnEnabled
|
||||
public void onEnabled(final ConfigurationContext context) {
|
||||
prettyPrint = context.getProperty(PRETTY_PRINT_JSON).asBoolean();
|
||||
|
@ -90,11 +120,20 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
|
|||
suppression = NullSuppression.NEVER_SUPPRESS;
|
||||
}
|
||||
this.nullSuppression = suppression;
|
||||
|
||||
String outputGroupingValue = context.getProperty(OUTPUT_GROUPING).getValue();
|
||||
final OutputGrouping grouping;
|
||||
if(OUTPUT_ONELINE.getValue().equals(outputGroupingValue)) {
|
||||
grouping = OutputGrouping.OUTPUT_ONELINE;
|
||||
} else {
|
||||
grouping = OutputGrouping.OUTPUT_ARRAY;
|
||||
}
|
||||
this.outputGrouping = grouping;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException {
|
||||
return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), out, prettyPrint, nullSuppression,
|
||||
return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), out, prettyPrint, nullSuppression, outputGrouping,
|
||||
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null));
|
||||
}
|
||||
|
||||
|
|
|
@ -38,12 +38,12 @@ import org.apache.nifi.serialization.RecordReaderFactory;
|
|||
import org.apache.nifi.serialization.SchemaRegistryService;
|
||||
|
||||
@Tags({"json", "tree", "record", "reader", "parser"})
|
||||
@CapabilityDescription("Parses JSON into individual Record objects. The Record that is produced will contain all top-level "
|
||||
+ "elements of the corresponding JSON Object. "
|
||||
+ "The root JSON element can be either a single element or an array of JSON elements, and each "
|
||||
+ "element in that array will be treated as a separate record. "
|
||||
+ "If the schema that is configured contains a field that is not present in the JSON, a null value will be used. If the JSON contains "
|
||||
+ "a field that is not present in the schema, that field will be skipped. "
|
||||
@CapabilityDescription("Parses JSON into individual Record objects. While the reader expects each record "
|
||||
+ "to be well-formed JSON, the content of a FlowFile may consist of many records, each as a well-formed "
|
||||
+ "JSON array or JSON object with optional whitespace between them, such as the common 'JSON-per-line' format. "
|
||||
+ "If an array is encountered, each element in that array will be treated as a separate record. "
|
||||
+ "If the schema that is configured contains a field that is not present in the JSON, a null value will be used. If the JSON contains "
|
||||
+ "a field that is not present in the schema, that field will be skipped. "
|
||||
+ "See the Usage of the Controller Service for more information and examples.")
|
||||
@SeeAlso(JsonPathReader.class)
|
||||
public class JsonTreeReader extends SchemaRegistryService implements RecordReaderFactory {
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.json;
|
||||
|
||||
public enum OutputGrouping {
|
||||
OUTPUT_ARRAY,
|
||||
OUTPUT_ONELINE
|
||||
}
|
|
@ -46,6 +46,7 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
|||
import org.codehaus.jackson.JsonFactory;
|
||||
import org.codehaus.jackson.JsonGenerationException;
|
||||
import org.codehaus.jackson.JsonGenerator;
|
||||
import org.codehaus.jackson.util.MinimalPrettyPrinter;
|
||||
|
||||
public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSetWriter, RawRecordWriter {
|
||||
private final ComponentLog logger;
|
||||
|
@ -53,19 +54,23 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
|
|||
private final RecordSchema recordSchema;
|
||||
private final JsonFactory factory = new JsonFactory();
|
||||
private final JsonGenerator generator;
|
||||
private final OutputStream out;
|
||||
private final NullSuppression nullSuppression;
|
||||
private final OutputGrouping outputGrouping;
|
||||
private final Supplier<DateFormat> LAZY_DATE_FORMAT;
|
||||
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
|
||||
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
|
||||
|
||||
public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
|
||||
final NullSuppression nullSuppression, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
|
||||
final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
|
||||
|
||||
super(out);
|
||||
this.logger = logger;
|
||||
this.recordSchema = recordSchema;
|
||||
this.schemaAccess = schemaAccess;
|
||||
this.out = out;
|
||||
this.nullSuppression = nullSuppression;
|
||||
this.outputGrouping = outputGrouping;
|
||||
|
||||
final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
|
||||
final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
|
||||
|
@ -78,6 +83,9 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
|
|||
this.generator = factory.createJsonGenerator(out);
|
||||
if (prettyPrint) {
|
||||
generator.useDefaultPrettyPrinter();
|
||||
} else if (OutputGrouping.OUTPUT_ONELINE.equals(outputGrouping)) {
|
||||
// Use a minimal pretty printer with a newline object separator, will output one JSON object per line
|
||||
generator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -87,12 +95,16 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
|
|||
final OutputStream out = getOutputStream();
|
||||
schemaAccess.writeHeader(recordSchema, out);
|
||||
|
||||
generator.writeStartArray();
|
||||
if (outputGrouping == OutputGrouping.OUTPUT_ARRAY) {
|
||||
generator.writeStartArray();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, String> onFinishRecordSet() throws IOException {
|
||||
generator.writeEndArray();
|
||||
if (outputGrouping == OutputGrouping.OUTPUT_ARRAY) {
|
||||
generator.writeEndArray();
|
||||
}
|
||||
return schemaAccess.getAttributes(recordSchema);
|
||||
}
|
||||
|
||||
|
@ -191,8 +203,6 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
endTask.apply(generator);
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to write {} with schema {} as a JSON Object due to {}", new Object[] {record, record.getSchema(), e.toString(), e});
|
||||
|
|
|
@ -118,6 +118,32 @@ public class TestJsonPathRowRecordReader {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadOneLine() throws IOException, MalformedRecordException {
|
||||
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-oneline.json"));
|
||||
final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final List<String> fieldNames = schema.getFieldNames();
|
||||
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
|
||||
assertEquals(expectedFieldNames, fieldNames);
|
||||
|
||||
final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
|
||||
final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
|
||||
RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
|
||||
assertEquals(expectedTypes, dataTypes);
|
||||
|
||||
final Object[] firstRecordValues = reader.nextRecord().getValues();
|
||||
Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
|
||||
|
||||
final Object[] secondRecordValues = reader.nextRecord().getValues();
|
||||
Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
|
||||
|
||||
assertNull(reader.nextRecord());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleJsonElement() throws IOException, MalformedRecordException {
|
||||
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
|
||||
|
|
|
@ -161,6 +161,123 @@ public class TestJsonTreeRowRecordReader {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadOneLinePerJSON() throws IOException, MalformedRecordException {
|
||||
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-oneline.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final List<String> fieldNames = schema.getFieldNames();
|
||||
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
|
||||
assertEquals(expectedFieldNames, fieldNames);
|
||||
|
||||
final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
|
||||
final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
|
||||
RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
|
||||
assertEquals(expectedTypes, dataTypes);
|
||||
|
||||
final Object[] firstRecordValues = reader.nextRecord().getValues();
|
||||
Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
|
||||
|
||||
final Object[] secondRecordValues = reader.nextRecord().getValues();
|
||||
Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
|
||||
|
||||
assertNull(reader.nextRecord());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadMultilineJSON() throws IOException, MalformedRecordException {
|
||||
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-multiline.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final List<String> fieldNames = schema.getFieldNames();
|
||||
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
|
||||
assertEquals(expectedFieldNames, fieldNames);
|
||||
|
||||
final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
|
||||
final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
|
||||
RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
|
||||
assertEquals(expectedTypes, dataTypes);
|
||||
|
||||
final Object[] firstRecordValues = reader.nextRecord().getValues();
|
||||
Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
|
||||
|
||||
final Object[] secondRecordValues = reader.nextRecord().getValues();
|
||||
Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
|
||||
|
||||
assertNull(reader.nextRecord());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadMultilineArrays() throws IOException, MalformedRecordException {
|
||||
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-multiarray.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final List<String> fieldNames = schema.getFieldNames();
|
||||
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
|
||||
assertEquals(expectedFieldNames, fieldNames);
|
||||
|
||||
final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
|
||||
final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
|
||||
RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
|
||||
assertEquals(expectedTypes, dataTypes);
|
||||
|
||||
final Object[] firstRecordValues = reader.nextRecord().getValues();
|
||||
Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
|
||||
|
||||
final Object[] secondRecordValues = reader.nextRecord().getValues();
|
||||
Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
|
||||
|
||||
final Object[] thirdRecordValues = reader.nextRecord().getValues();
|
||||
Assert.assertArrayEquals(new Object[] {3, "Maria Doe", 4750.89, "123 My Street", "My City", "ME", "11111", "USA"}, thirdRecordValues);
|
||||
|
||||
final Object[] fourthRecordValues = reader.nextRecord().getValues();
|
||||
Assert.assertArrayEquals(new Object[] {4, "Xi Doe", 4820.09, "321 Your Street", "Your City", "NV", "33333", "USA"}, fourthRecordValues);
|
||||
|
||||
assertNull(reader.nextRecord());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadMixedJSON() throws IOException, MalformedRecordException {
|
||||
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-mixed.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final List<String> fieldNames = schema.getFieldNames();
|
||||
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
|
||||
assertEquals(expectedFieldNames, fieldNames);
|
||||
|
||||
final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
|
||||
final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
|
||||
RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
|
||||
assertEquals(expectedTypes, dataTypes);
|
||||
|
||||
final Object[] firstRecordValues = reader.nextRecord().getValues();
|
||||
Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
|
||||
|
||||
final Object[] secondRecordValues = reader.nextRecord().getValues();
|
||||
Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
|
||||
|
||||
final Object[] thirdRecordValues = reader.nextRecord().getValues();
|
||||
Assert.assertArrayEquals(new Object[] {3, "Maria Doe", 4750.89, "123 My Street", "My City", "ME", "11111", "USA"}, thirdRecordValues);
|
||||
|
||||
final Object[] fourthRecordValues = reader.nextRecord().getValues();
|
||||
Assert.assertArrayEquals(new Object[] {4, "Xi Doe", 4820.09, "321 Your Street", "Your City", "NV", "33333", "USA"}, fourthRecordValues);
|
||||
|
||||
|
||||
assertNull(reader.nextRecord());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadRawRecordIncludesFieldsNotInSchema() throws IOException, MalformedRecordException {
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
|
|
|
@ -104,7 +104,8 @@ public class TestWriteJsonResult {
|
|||
final RecordSet rs = RecordSet.of(schema, record);
|
||||
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, true,
|
||||
NullSuppression.NEVER_SUPPRESS, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) {
|
||||
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, RecordFieldType.DATE.getDefaultFormat(),
|
||||
RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) {
|
||||
|
||||
writer.write(rs);
|
||||
}
|
||||
|
@ -141,7 +142,8 @@ public class TestWriteJsonResult {
|
|||
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, true,
|
||||
NullSuppression.NEVER_SUPPRESS, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) {
|
||||
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, RecordFieldType.DATE.getDefaultFormat(),
|
||||
RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) {
|
||||
|
||||
writer.write(rs);
|
||||
}
|
||||
|
@ -172,7 +174,8 @@ public class TestWriteJsonResult {
|
|||
final RecordSet rs = RecordSet.of(schema, record);
|
||||
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, null, null)) {
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
|
||||
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
|
||||
writer.write(rs);
|
||||
}
|
||||
|
||||
|
@ -196,7 +199,8 @@ public class TestWriteJsonResult {
|
|||
final Record record = new MapRecord(schema, values);
|
||||
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, null, null)) {
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
|
||||
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
|
||||
writer.beginRecordSet();
|
||||
writer.writeRecord(record);
|
||||
writer.finishRecordSet();
|
||||
|
@ -222,7 +226,8 @@ public class TestWriteJsonResult {
|
|||
final Record record = new MapRecord(schema, values);
|
||||
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, null, null)) {
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
|
||||
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
|
||||
writer.beginRecordSet();
|
||||
writer.writeRawRecord(record);
|
||||
writer.finishRecordSet();
|
||||
|
@ -248,7 +253,8 @@ public class TestWriteJsonResult {
|
|||
final Record record = new MapRecord(schema, values);
|
||||
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, null, null)) {
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
|
||||
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
|
||||
writer.beginRecordSet();
|
||||
writer.writeRecord(record);
|
||||
writer.finishRecordSet();
|
||||
|
@ -274,7 +280,8 @@ public class TestWriteJsonResult {
|
|||
final Record record = new MapRecord(schema, values);
|
||||
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, null, null)) {
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
|
||||
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
|
||||
writer.beginRecordSet();
|
||||
writer.writeRawRecord(record);
|
||||
writer.finishRecordSet();
|
||||
|
@ -301,7 +308,8 @@ public class TestWriteJsonResult {
|
|||
final Record record = new MapRecord(schema, values);
|
||||
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, null, null)) {
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
|
||||
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
|
||||
writer.beginRecordSet();
|
||||
writer.writeRecord(record);
|
||||
writer.finishRecordSet();
|
||||
|
@ -328,7 +336,8 @@ public class TestWriteJsonResult {
|
|||
final Record record = new MapRecord(schema, values);
|
||||
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, null, null)) {
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
|
||||
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
|
||||
writer.beginRecordSet();
|
||||
writer.writeRawRecord(record);
|
||||
writer.finishRecordSet();
|
||||
|
@ -354,7 +363,8 @@ public class TestWriteJsonResult {
|
|||
final Record recordWithMissingName = new MapRecord(schema, values);
|
||||
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, null, null)) {
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
|
||||
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
|
||||
writer.beginRecordSet();
|
||||
writer.write(recordWithMissingName);
|
||||
writer.finishRecordSet();
|
||||
|
@ -364,7 +374,8 @@ public class TestWriteJsonResult {
|
|||
|
||||
baos.reset();
|
||||
try (
|
||||
final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.ALWAYS_SUPPRESS, null, null, null)) {
|
||||
final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
|
||||
NullSuppression.ALWAYS_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
|
||||
writer.beginRecordSet();
|
||||
writer.write(recordWithMissingName);
|
||||
writer.finishRecordSet();
|
||||
|
@ -373,7 +384,8 @@ public class TestWriteJsonResult {
|
|||
assertEquals("[{\"id\":\"1\"}]", new String(baos.toByteArray(), StandardCharsets.UTF_8));
|
||||
|
||||
baos.reset();
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.SUPPRESS_MISSING, null, null,
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
|
||||
NullSuppression.SUPPRESS_MISSING, OutputGrouping.OUTPUT_ARRAY, null, null,
|
||||
null)) {
|
||||
writer.beginRecordSet();
|
||||
writer.write(recordWithMissingName);
|
||||
|
@ -387,7 +399,8 @@ public class TestWriteJsonResult {
|
|||
final Record recordWithNullValue = new MapRecord(schema, values);
|
||||
|
||||
baos.reset();
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, null, null)) {
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
|
||||
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
|
||||
writer.beginRecordSet();
|
||||
writer.write(recordWithNullValue);
|
||||
writer.finishRecordSet();
|
||||
|
@ -397,7 +410,8 @@ public class TestWriteJsonResult {
|
|||
|
||||
baos.reset();
|
||||
try (
|
||||
final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.ALWAYS_SUPPRESS, null, null, null)) {
|
||||
final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
|
||||
NullSuppression.ALWAYS_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
|
||||
writer.beginRecordSet();
|
||||
writer.write(recordWithNullValue);
|
||||
writer.finishRecordSet();
|
||||
|
@ -406,7 +420,8 @@ public class TestWriteJsonResult {
|
|||
assertEquals("[{\"id\":\"1\"}]", new String(baos.toByteArray(), StandardCharsets.UTF_8));
|
||||
|
||||
baos.reset();
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.SUPPRESS_MISSING, null, null,
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
|
||||
NullSuppression.SUPPRESS_MISSING, OutputGrouping.OUTPUT_ARRAY, null, null,
|
||||
null)) {
|
||||
writer.beginRecordSet();
|
||||
writer.write(recordWithNullValue);
|
||||
|
@ -416,4 +431,44 @@ public class TestWriteJsonResult {
|
|||
assertEquals("[{\"id\":\"1\",\"name\":null}]", new String(baos.toByteArray(), StandardCharsets.UTF_8));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnelineOutput() throws IOException {
|
||||
final Map<String, Object> values1 = new HashMap<>();
|
||||
values1.put("timestamp", new java.sql.Timestamp(37293723L));
|
||||
values1.put("time", new java.sql.Time(37293723L));
|
||||
values1.put("date", new java.sql.Date(37293723L));
|
||||
|
||||
final List<RecordField> fields1 = new ArrayList<>();
|
||||
fields1.add(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
|
||||
fields1.add(new RecordField("time", RecordFieldType.TIME.getDataType()));
|
||||
fields1.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
|
||||
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields1);
|
||||
|
||||
final Record record1 = new MapRecord(schema, values1);
|
||||
|
||||
final Map<String, Object> values2 = new HashMap<>();
|
||||
values2.put("timestamp", new java.sql.Timestamp(37293999L));
|
||||
values2.put("time", new java.sql.Time(37293999L));
|
||||
values2.put("date", new java.sql.Date(37293999L));
|
||||
|
||||
|
||||
final Record record2 = new MapRecord(schema, values2);
|
||||
|
||||
final RecordSet rs = RecordSet.of(schema, record1, record2);
|
||||
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
|
||||
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ONELINE, null, null, null)) {
|
||||
writer.write(rs);
|
||||
}
|
||||
|
||||
final byte[] data = baos.toByteArray();
|
||||
|
||||
final String expected = "{\"timestamp\":37293723,\"time\":37293723,\"date\":37293723}\n{\"timestamp\":37293999,\"time\":37293999,\"date\":37293999}";
|
||||
|
||||
final String output = new String(data, StandardCharsets.UTF_8);
|
||||
assertEquals(expected, output);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
[
|
||||
{
|
||||
"id": 1,
|
||||
"name": "John Doe",
|
||||
"balance": 4750.89,
|
||||
"address": "123 My Street",
|
||||
"city": "My City",
|
||||
"state": "MS",
|
||||
"zipCode": "11111",
|
||||
"country": "USA"
|
||||
}, {
|
||||
"id": 2,
|
||||
"name": "Jane Doe",
|
||||
"balance": 4820.09,
|
||||
"address": "321 Your Street",
|
||||
"city": "Your City",
|
||||
"state": "NY",
|
||||
"zipCode": "33333",
|
||||
"country": "USA"
|
||||
}
|
||||
]
|
||||
{ "id": 3, "name": "Maria Doe", "balance": 4750.89, "address": "123 My Street", "city": "My City", "state": "ME", "zipCode": "11111", "country": "USA" }
|
||||
{
|
||||
"id": 4,
|
||||
"name": "Xi Doe",
|
||||
"balance": 4820.09,
|
||||
"address": "321 Your Street",
|
||||
"city": "Your City",
|
||||
"state": "NV",
|
||||
"zipCode": "33333",
|
||||
"country": "USA"
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
[
|
||||
{
|
||||
"id": 1,
|
||||
"name": "John Doe",
|
||||
"balance": 4750.89,
|
||||
"address": "123 My Street",
|
||||
"city": "My City",
|
||||
"state": "MS",
|
||||
"zipCode": "11111",
|
||||
"country": "USA"
|
||||
}, {
|
||||
"id": 2,
|
||||
"name": "Jane Doe",
|
||||
"balance": 4820.09,
|
||||
"address": "321 Your Street",
|
||||
"city": "Your City",
|
||||
"state": "NY",
|
||||
"zipCode": "33333",
|
||||
"country": "USA"
|
||||
}
|
||||
]
|
||||
[
|
||||
{
|
||||
"id": 3,
|
||||
"name": "Maria Doe",
|
||||
"balance": 4750.89,
|
||||
"address": "123 My Street",
|
||||
"city": "My City",
|
||||
"state": "ME",
|
||||
"zipCode": "11111",
|
||||
"country": "USA"
|
||||
}, {
|
||||
"id": 4,
|
||||
"name": "Xi Doe",
|
||||
"balance": 4820.09,
|
||||
"address": "321 Your Street",
|
||||
"city": "Your City",
|
||||
"state": "NV",
|
||||
"zipCode": "33333",
|
||||
"country": "USA"
|
||||
}
|
||||
]
|
|
@ -0,0 +1,19 @@
|
|||
{
|
||||
"id": 1,
|
||||
"name": "John Doe",
|
||||
"balance": 4750.89,
|
||||
"address": "123 My Street",
|
||||
"city": "My City",
|
||||
"state": "MS",
|
||||
"zipCode": "11111",
|
||||
"country": "USA"
|
||||
} {
|
||||
"id": 2,
|
||||
"name": "Jane Doe",
|
||||
"balance": 4820.09,
|
||||
"address": "321 Your Street",
|
||||
"city": "Your City",
|
||||
"state": "NY",
|
||||
"zipCode": "33333",
|
||||
"country": "USA"
|
||||
}
|
|
@ -0,0 +1,2 @@
|
|||
{ "id": 1, "name": "John Doe", "balance": 4750.89, "address": "123 My Street", "city": "My City", "state": "MS", "zipCode": "11111", "country": "USA" }
|
||||
{ "id": 2, "name": "Jane Doe", "balance": 4820.09, "address": "321 Your Street", "city": "Your City", "state": "NY", "zipCode": "33333", "country": "USA"}
|
Loading…
Reference in New Issue