NIFI-7234 Standardized on Avro 1.11.0

NIFI-7234 Replaced Jackson 1.X references with Jackson 2.X references in various classes.
NIFI-7234 Added jackson-annotations to nifi-hdfs-processors.
NIFI-7234 Various updates to bring our test cases into better alignment with the Avro specification as of 1.11.
Fixed a checkstyle issue.

NIFI-7234 Made changes requested in a review.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5900
This commit is contained in:
Mike Thomsen 2022-03-24 20:13:08 -04:00 committed by Matthew Burgess
parent fc2f539c7d
commit 7271e8cea7
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
27 changed files with 289 additions and 260 deletions

View File

@ -794,7 +794,6 @@ limitations under the License.
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>

View File

@ -38,7 +38,6 @@ language governing permissions and limitations under the License. -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>

View File

@ -58,7 +58,6 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -28,13 +28,13 @@ import com.datastax.driver.core.SniEndPoint;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.ReadTimeoutException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

View File

@ -57,7 +57,6 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
<!-- Other modules using nifi-database-utils are expected to have these APIs available, typically through a NAR dependency -->
<dependency>

View File

@ -32,7 +32,6 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>

View File

@ -326,7 +326,7 @@ public class AvroTypeUtil {
return Schema.createUnion(unionTypes);
}
return Schema.createUnion(Schema.create(Type.NULL), schema);
return Schema.createUnion(schema, Schema.create(Type.NULL));
}
/**

View File

@ -17,12 +17,12 @@
package org.apache.nifi.schema.access;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.codehaus.jackson.map.ObjectMapper;
import org.apache.commons.io.IOUtils;
import java.io.IOException;

View File

@ -143,7 +143,7 @@ public class TestAvroTypeUtil {
assertEquals("hello", avroSchema.getField("string").defaultVal());
assertEquals(17, avroSchema.getField("int").defaultVal());
assertEquals(42L, avroSchema.getField("long").defaultVal());
assertEquals(2.4D, (double) avroSchema.getField("float").defaultVal(), 0.002D); // Even though we provide a Float, avro converts it into a Double value.
assertEquals(2.4D, (float) avroSchema.getField("float").defaultVal(), 0.002D); // Even though we provide a Float, avro converts it into a Double value.
assertEquals(28.1D, (double) avroSchema.getField("double").defaultVal(), 0.002D);
assertEquals(new ArrayList<String>(), avroSchema.getField("stringArray").defaultVal());
assertEquals(new ArrayList<Integer>(), avroSchema.getField("intArray").defaultVal());
@ -265,12 +265,12 @@ public class TestAvroTypeUtil {
Record r = builder.build();
@SuppressWarnings("unchecked")
GenericData.Array<Integer> values = (GenericData.Array<Integer>) r.get("listOfInt");
assertEquals(values.size(), 0);
assertEquals(values.size(), 1);
RecordSchema record = AvroTypeUtil.createSchema(avroSchema);
RecordField field = record.getField("listOfInt").get();
assertEquals(RecordFieldType.ARRAY, field.getDataType().getFieldType());
assertTrue(field.getDefaultValue() instanceof Object[]);
assertEquals(0, ((Object[]) field.getDefaultValue()).length);
assertEquals(1, ((Object[]) field.getDefaultValue()).length);
}
/**
@ -313,7 +313,7 @@ public class TestAvroTypeUtil {
@SuppressWarnings("unchecked")
GenericData.Array<Integer> values = (GenericData.Array<Integer>) ((GenericRecord) r.get("field1"))
.get("listOfInt");
assertArrayEquals(new Object[] {}, values.toArray());
assertArrayEquals(new Object[] { 0 }, values.toArray());
RecordSchema record = AvroTypeUtil.createSchema(avroSchema);
RecordField field = record.getField("field1").get();
assertEquals(RecordFieldType.RECORD, field.getDataType().getFieldType());
@ -322,7 +322,7 @@ public class TestAvroTypeUtil {
RecordField childField = childSchema.getField("listOfInt").get();
assertEquals(RecordFieldType.ARRAY, childField.getDataType().getFieldType());
assertTrue(childField.getDefaultValue() instanceof Object[]);
assertArrayEquals(new Object[] {}, ((Object[]) childField.getDefaultValue()));
assertArrayEquals(new Object[] { 0 }, ((Object[]) childField.getDefaultValue()));
}
/**
@ -464,7 +464,7 @@ public class TestAvroTypeUtil {
@Test
public void testToDecimalConversion() {
final LogicalTypes.Decimal decimalType = LogicalTypes.decimal(18, 8);
final LogicalTypes.Decimal decimalType = LogicalTypes.decimal(26, 8);
final Schema fieldSchema = Schema.create(Type.BYTES);
decimalType.addToSchema(fieldSchema);
@ -866,47 +866,46 @@ public class TestAvroTypeUtil {
new RecordField("childRecordField2", RecordFieldType.STRING.getDataType())
);
String expected = "{" +
"\"type\":\"record\"," +
"\"name\":\"nifiRecord\"," +
"\"namespace\":\"org.apache.nifi\"," +
"\"fields\":[{" +
"\"name\":\"record1\"," +
"\"type\":[\"null\",{" +
"\"type\":\"record\"," +
"\"name\":\"record1Type\"," +
"\"fields\":[{" +
"\"name\":\"reoccurringFieldNameWithDifferentChildSchema\"," +
"\"type\":[\"null\",{" +
"\"type\":\"record\"," +
"\"name\":\"record1_reoccurringFieldNameWithDifferentChildSchemaType\"," +
"\"fields\":[{" +
"\"name\":\"childRecordField1\"," +
"\"type\":[\"null\",\"string\"]" +
"}]" +
"}]" +
"}]" +
"}]" +
"}," +
"{" +
"\"name\":\"record2\"," +
"\"type\":[\"null\",{" +
"\"type\":\"record\"," +
"\"name\":\"record2Type\"," +
"\"fields\":[{" +
"\"name\":\"reoccurringFieldNameWithDifferentChildSchema\"," +
"\"type\":[\"null\",{" +
"\"type\":\"record\"," +
"\"name\":\"record2_reoccurringFieldNameWithDifferentChildSchemaType\"," +
"\"fields\":[{" +
"\"name\":\"childRecordField2\"," +
"\"type\":[\"null\",\"string\"]" +
"}]" +
"}]" +
"}]" +
"}]" +
"}]" +
"}";
String expected = "{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"nifiRecord\",\n" +
" \"namespace\": \"org.apache.nifi\",\n" +
" \"fields\": [{\n" +
" \"name\": \"record1\",\n" +
" \"type\": [{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"record1Type\",\n" +
" \"fields\": [{\n" +
" \"name\": \"reoccurringFieldNameWithDifferentChildSchema\",\n" +
" \"type\": [{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"record1_reoccurringFieldNameWithDifferentChildSchemaType\",\n" +
" \"fields\": [{\n" +
" \"name\": \"childRecordField1\",\n" +
" \"type\": [\"string\", \"null\"]\n" +
" }]\n" +
" }, \"null\"]\n" +
" }]\n" +
" }, \"null\"]\n" +
" }, {\n" +
" \"name\": \"record2\",\n" +
" \"type\": [{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"record2Type\",\n" +
" \"fields\": [{\n" +
" \"name\": \"reoccurringFieldNameWithDifferentChildSchema\",\n" +
" \"type\": [{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"record2_reoccurringFieldNameWithDifferentChildSchemaType\",\n" +
" \"fields\": [{\n" +
" \"name\": \"childRecordField2\",\n" +
" \"type\": [\"string\", \"null\"]\n" +
" }]\n" +
" }, \"null\"]\n" +
" }]\n" +
" }, \"null\"]\n" +
" }]\n" +
"}";
// WHEN
// THEN
@ -926,47 +925,46 @@ public class TestAvroTypeUtil {
new RecordField(reoccurringFieldNameInChildSchema, RecordFieldType.BOOLEAN.getDataType())
);
String expected = "{" +
"\"type\":\"record\"," +
"\"name\":\"nifiRecord\"," +
"\"namespace\":\"org.apache.nifi\"," +
"\"fields\":[{" +
"\"name\":\"record1\"," +
"\"type\":[\"null\",{" +
"\"type\":\"record\"," +
"\"name\":\"record1Type\"," +
"\"fields\":[{" +
"\"name\":\"reoccurringFieldNameWithDifferentChildSchema\"," +
"\"type\":[\"null\",{" +
"\"type\":\"record\"," +
"\"name\":\"record1_reoccurringFieldNameWithDifferentChildSchemaType\"," +
"\"fields\":[{" +
"\"name\":\"childRecordField\"," +
"\"type\":[\"null\",\"string\"]" +
"}]" +
"}]" +
"}]" +
"}]" +
"}," +
"{" +
"\"name\":\"record2\"," +
"\"type\":[\"null\",{" +
"\"type\":\"record\"," +
"\"name\":\"record2Type\"," +
"\"fields\":[{" +
"\"name\":\"reoccurringFieldNameWithDifferentChildSchema\"," +
"\"type\":[\"null\",{" +
"\"type\":\"record\"," +
"\"name\":\"record2_reoccurringFieldNameWithDifferentChildSchemaType\"," +
"\"fields\":[{" +
"\"name\":\"childRecordField\"," +
"\"type\":[\"null\",\"boolean\"]" +
"}]" +
"}]" +
"}]" +
"}]" +
"}]" +
"}";
String expected = "{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"nifiRecord\",\n" +
" \"namespace\": \"org.apache.nifi\",\n" +
" \"fields\": [{\n" +
" \"name\": \"record1\",\n" +
" \"type\": [{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"record1Type\",\n" +
" \"fields\": [{\n" +
" \"name\": \"reoccurringFieldNameWithDifferentChildSchema\",\n" +
" \"type\": [{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"record1_reoccurringFieldNameWithDifferentChildSchemaType\",\n" +
" \"fields\": [{\n" +
" \"name\": \"childRecordField\",\n" +
" \"type\": [\"string\", \"null\"]\n" +
" }]\n" +
" }, \"null\"]\n" +
" }]\n" +
" }, \"null\"]\n" +
" }, {\n" +
" \"name\": \"record2\",\n" +
" \"type\": [{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"record2Type\",\n" +
" \"fields\": [{\n" +
" \"name\": \"reoccurringFieldNameWithDifferentChildSchema\",\n" +
" \"type\": [{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"record2_reoccurringFieldNameWithDifferentChildSchemaType\",\n" +
" \"fields\": [{\n" +
" \"name\": \"childRecordField\",\n" +
" \"type\": [\"boolean\",\"null\"]\n" +
" }]\n" +
" }, \"null\"]\n" +
" }]\n" +
" }, \"null\"]\n" +
" }]\n" +
"}";
// WHEN
// THEN
@ -999,73 +997,72 @@ public class TestAvroTypeUtil {
)));
String expected = "{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"nifiRecord\",\n" +
" \"namespace\": \"org.apache.nifi\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"dataCollection\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" {\n" +
" \"type\": \"array\",\n" +
" \"items\": {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"dataCollectionType\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"record\",\n" +
" \"type\": [\n" +
" {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"dataCollection_recordType\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"integer\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" \"int\"\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"boolean\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" \"boolean\"\n" +
" ]\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"dataCollection_record2Type\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"integer\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" \"int\"\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"string\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" \"string\"\n" +
" ]\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"null\"\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
"}";
" \"type\": \"record\",\n" +
" \"name\": \"nifiRecord\",\n" +
" \"namespace\": \"org.apache.nifi\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"dataCollection\",\n" +
" \"type\": [\n" +
" {\n" +
" \"type\": \"array\",\n" +
" \"items\": {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"dataCollectionType\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"record\",\n" +
" \"type\": [\n" +
" {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"dataCollection_recordType\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"integer\",\n" +
" \"type\": [\n" +
" \"int\",\n" +
" \"null\"\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"boolean\",\n" +
" \"type\": [\n" +
" \"boolean\",\n" +
" \"null\"\n" +
" ]\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"dataCollection_record2Type\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"integer\",\n" +
" \"type\": [\n" +
" \"int\",\n" +
" \"null\"\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"string\",\n" +
" \"type\": [\n" +
" \"string\",\n" +
" \"null\"\n" +
" ]\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"null\"\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }\n" +
" }, \"null\"\n" +
" ]\n" +
" }\n" +
" ]\n" +
"}";
// WHEN
Schema actual = AvroTypeUtil.extractAvroSchema(schema);
@ -1098,7 +1095,7 @@ public class TestAvroTypeUtil {
Schema actual = AvroTypeUtil.extractAvroSchema(recordSchema);
// THEN
assertEquals(expected, actual.toString());
assertEquals(new Schema.Parser().parse(expected), actual);
}
@Test
@ -1121,42 +1118,42 @@ public class TestAvroTypeUtil {
RecordSchema recordSchema = new SimpleRecordSchema(fields);
String expected = "{" +
"\"type\":\"record\"," +
"\"name\":\"nifiRecord\"," +
"\"namespace\":\"org.apache.nifi\"," +
"\"fields\":[{" +
"\"name\":\"multiLevelRecord\"," +
"\"type\":[\"null\",{" +
"\"type\":\"record\"," +
"\"name\":\"multiLevelRecordType\"," +
"\"fields\":[{" +
"\"name\":\"level2Record\"," +
"\"type\":[\"null\",{" +
"\"type\":\"record\"," +
"\"name\":\"multiLevelRecord_level2RecordType\"," +
"\"fields\":[{" +
"\"name\":\"level3Record\"," +
"\"type\":[\"null\",{" +
"\"type\":\"record\"," +
"\"name\":\"multiLevelRecord_level2Record_level3RecordType\"," +
"\"fields\":[{" +
"\"name\":\"stringField\"," +
"\"type\":[\"null\",\"string\"]" +
"}]" +
"}]" +
"}]" +
"}]" +
"}]" +
"}]" +
"}]" +
"}";
String expected = "{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"nifiRecord\",\n" +
" \"namespace\": \"org.apache.nifi\",\n" +
" \"fields\": [{\n" +
" \"name\": \"multiLevelRecord\",\n" +
" \"type\": [{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"multiLevelRecordType\",\n" +
" \"fields\": [{\n" +
" \"name\": \"level2Record\",\n" +
" \"type\": [{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"multiLevelRecord_level2RecordType\",\n" +
" \"fields\": [{\n" +
" \"name\": \"level3Record\",\n" +
" \"type\": [{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"multiLevelRecord_level2Record_level3RecordType\",\n" +
" \"fields\": [{\n" +
" \"name\": \"stringField\",\n" +
" \"type\": [ \"string\", \"null\" ]\n" +
" }]\n" +
" }, \"null\"]\n" +
" }]\n" +
" }, \"null\"]\n" +
" }]\n" +
" }, \"null\"]\n" +
" }]\n" +
"}";
// WHEN
Schema actual = AvroTypeUtil.extractAvroSchema(recordSchema);
// THEN
assertEquals(expected, actual.toString());
assertEquals(new Schema.Parser().parse(expected), actual);
}
@Test
@ -1197,9 +1194,10 @@ public class TestAvroTypeUtil {
}
private Schema givenAvroSchemaContainingNumericMap() {
Map<String, Long> defaultLongMap = new HashMap<>();
final List<Field> avroFields = Arrays.asList(
new Field("id", Schema.create(Type.INT), "", ""),
new Field("numbers", Schema.createMap(Schema.create(Type.LONG)), "", "")
new Field("id", Schema.create(Type.INT), "", 0),
new Field("numbers", Schema.createMap(Schema.create(Type.LONG)), "", defaultLongMap)
);
return Schema.createRecord(avroFields);

View File

@ -16,7 +16,7 @@
"items":"int"
},
"doc":"array of ints",
"default": 0
"default": [ 0 ]
}
],
"logicalType":"BigDecimal"

View File

@ -10,7 +10,7 @@
"items": "int"
},
"doc": "array of ints",
"default": 0
"default": [ 0 ]
}
]
}

View File

@ -1,20 +1,20 @@
[ {
"namespace" : "net.a",
"type" : "record",
"name" : "O",
"fields" : [ {
"name" : "hash",
"type" : [ "null", {
"type" : "map",
"values" : "string"
} ]
} ]
}, {
"namespace" : "net.a",
"type" : "record",
"name" : "A",
"fields" : [ {
"name" : "o",
"type" : [ "null", "O" ]
"type" : [ {
"namespace" : "net.a",
"type" : "record",
"name" : "O",
"fields" : [ {
"name" : "hash",
"type" : [ {
"type" : "map",
"values" : "string"
}, "null" ]
} ]
}, "null" ]
}]
} ]

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.hadoop.inotify;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
@ -50,7 +51,6 @@ import org.apache.nifi.processors.hadoop.FetchHDFS;
import org.apache.nifi.processors.hadoop.GetHDFS;
import org.apache.nifi.processors.hadoop.ListHDFS;
import org.apache.nifi.processors.hadoop.PutHDFS;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.io.OutputStream;

View File

@ -20,7 +20,6 @@
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
<avro.version>1.8.1</avro.version>
</properties>
<dependencies>
<dependency>
@ -59,12 +58,10 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>com.github.stephenc.findbugs</groupId>

View File

@ -68,6 +68,10 @@
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
@ -214,5 +218,9 @@
<version>1.17.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -328,8 +328,12 @@ public class NiFiOrcUtils {
String type = schema.getProp("logicalType");
switch (type){
case "decimal":
int precision = schema.getJsonProp("precision").asInt(10);
int scale = schema.getJsonProp("scale").asInt(2);
int precision = schema.getObjectProp("precision") != null
? Integer.valueOf(schema.getObjectProp("precision").toString())
: 10;
int scale = schema.getObjectProp("scale") != null
? Integer.valueOf(schema.getObjectProp("scale").toString())
: 2;
return new DecimalTypeInfo(precision, scale);
}
throw new IllegalArgumentException("Logical type " + type + " is not supported!");

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.hive;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileStream;
@ -776,6 +777,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
throw s;
} catch (IllegalArgumentException
| AvroRuntimeException
| HiveWriter.WriteFailure
| SerializationError inputError) {

View File

@ -377,10 +377,11 @@ public class TestNiFiOrcUtils {
public static GenericData.Record buildComplexAvroRecord(Integer i, Map<String, Double> m, String e, Object unionVal, List<Integer> intArray, ByteBuffer decimal) {
Schema schema = buildComplexAvroSchema();
Schema enumSchema = schema.getField("myEnum").schema();
GenericData.Record row = new GenericData.Record(schema);
row.put("myInt", i);
row.put("myMap", m);
row.put("myEnum", e);
row.put("myEnum", new GenericData.EnumSymbol(enumSchema, e));
row.put("myLongOrFloat", unionVal);
row.put("myIntList", intArray);
row.put("myDecimal", decimal);

View File

@ -103,7 +103,7 @@
<properties>
<hive11.version>1.1.1</hive11.version>
<hive11.hadoop.version>2.6.2</hive11.hadoop.version>
<hive12.version>1.2.1</hive12.version>
<hive12.version>1.2.2</hive12.version>
<hive12.hadoop.version>2.6.2</hive12.hadoop.version>
<hive3.version>3.1.2</hive3.version>
<hive.version>${hive3.version}</hive.version>

View File

@ -16,6 +16,13 @@
*/
package org.apache.nifi.reporting;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
@ -47,13 +54,6 @@ import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonToken;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
import javax.json.JsonArray;
import javax.json.JsonObjectBuilder;
@ -224,7 +224,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
public JsonRecordReader(final InputStream in, RecordSchema recordSchema) throws IOException, MalformedRecordException {
this.recordSchema = recordSchema;
try {
jsonParser = new JsonFactory().createJsonParser(in);
jsonParser = new JsonFactory().createParser(in);
jsonParser.setCodec(new ObjectMapper());
JsonToken token = jsonParser.nextToken();
if (token == JsonToken.START_ARRAY) {
@ -325,7 +325,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
values.put(fieldName, value);
}
} else {
final Iterator<String> fieldNames = jsonNode.getFieldNames();
final Iterator<String> fieldNames = jsonNode.fieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
final JsonNode childNode = jsonNode.get(fieldName);
@ -388,7 +388,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
final DataType valueType = ((MapDataType) desiredType).getValueType();
final Map<String, Object> map = new HashMap<>();
final Iterator<String> fieldNameItr = fieldNode.getFieldNames();
final Iterator<String> fieldNameItr = fieldNode.fieldNames();
while (fieldNameItr.hasNext()) {
final String childName = fieldNameItr.next();
final JsonNode childNode = fieldNode.get(childName);
@ -422,7 +422,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
if (childSchema == null) {
final List<RecordField> fields = new ArrayList<>();
final Iterator<String> fieldNameItr = fieldNode.getFieldNames();
final Iterator<String> fieldNameItr = fieldNode.fieldNames();
while (fieldNameItr.hasNext()) {
fields.add(new RecordField(fieldNameItr.next(), RecordFieldType.STRING.getDataType()));
}
@ -449,19 +449,19 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
}
if (fieldNode.isNumber()) {
return fieldNode.getNumberValue();
return fieldNode.numberValue();
}
if (fieldNode.isBinary()) {
return fieldNode.getBinaryValue();
return fieldNode.binaryValue();
}
if (fieldNode.isBoolean()) {
return fieldNode.getBooleanValue();
return fieldNode.booleanValue();
}
if (fieldNode.isTextual()) {
return fieldNode.getTextValue();
return fieldNode.textValue();
}
if (fieldNode.isArray()) {
@ -499,7 +499,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
childSchema = new SimpleRecordSchema(Collections.emptyList());
}
final Iterator<String> fieldNames = fieldNode.getFieldNames();
final Iterator<String> fieldNames = fieldNode.fieldNames();
final Map<String, Object> childValues = new HashMap<>();
while (fieldNames.hasNext()) {
final String childFieldName = fieldNames.next();

View File

@ -41,6 +41,7 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Set;
public class StandardContentViewerController extends HttpServlet {
@ -107,19 +108,23 @@ public class StandardContentViewerController extends HttpServlet {
// Use Avro conversions to display logical type values in human readable way.
final GenericData genericData = new GenericData(){
@Override
protected void toString(Object datum, StringBuilder buffer) {
protected void toString(Object datum, StringBuilder buffer, IdentityHashMap<Object, Object> seenObjects) {
// Since these types are not quoted and produce a malformed JSON string, quote it here.
if (datum instanceof LocalDate || datum instanceof LocalTime || datum instanceof DateTime) {
buffer.append("\"").append(datum).append("\"");
return;
}
super.toString(datum, buffer);
super.toString(datum, buffer, seenObjects);
}
};
genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimeConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
final DatumReader<GenericData.Record> datumReader = new GenericDatumReader<>(null, null, genericData);
try (final DataFileStream<GenericData.Record> dataFileReader = new DataFileStream<>(content.getContentStream(), datumReader)) {
while (dataFileReader.hasNext()) {

View File

@ -37,6 +37,10 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -63,10 +67,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.JsonNodeFactory;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
@ -435,7 +435,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
private Set<String> getNormalizedColumnNames(final JsonNode node, final boolean translateFieldNames) {
final Set<String> normalizedFieldNames = new HashSet<>();
final Iterator<String> fieldNameItr = node.getFieldNames();
final Iterator<String> fieldNameItr = node.fieldNames();
while (fieldNameItr.hasNext()) {
normalizedFieldNames.add(normalizeColumnName(fieldNameItr.next(), translateFieldNames));
}
@ -476,7 +476,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
// iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
// adding the column value to a "<sql>.args.N.value" attribute and the type of a "<sql>.args.N.type" attribute add the
// columns that we are inserting into
final Iterator<String> fieldNames = rootNode.getFieldNames();
final Iterator<String> fieldNames = rootNode.fieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
@ -639,7 +639,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
// iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
// adding the column value to a "<sql>.args.N.value" attribute and the type of a "<sql>.args.N.type" attribute add the
// columns that we are inserting into
Iterator<String> fieldNames = rootNode.getFieldNames();
Iterator<String> fieldNames = rootNode.fieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
@ -687,7 +687,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
// Set the WHERE clause based on the Update Key values
sqlBuilder.append(" WHERE ");
fieldNames = rootNode.getFieldNames();
fieldNames = rootNode.fieldNames();
int whereFieldCount = 0;
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
@ -764,7 +764,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
// iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
// adding the column value to a "<sql>.args.N.value" attribute and the type of a "<sql>.args.N.type" attribute add the
// columns that we are inserting into
final Iterator<String> fieldNames = rootNode.getFieldNames();
final Iterator<String> fieldNames = rootNode.fieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();

View File

@ -32,6 +32,10 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
@ -39,10 +43,6 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

View File

@ -310,11 +310,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>

View File

@ -66,7 +66,6 @@ limitations under the License.
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
<!-- Schema Registry Client-->

View File

@ -87,7 +87,6 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>net.java.dev.stax-utils</groupId>
@ -137,11 +136,26 @@
<artifactId>bval-jsr</artifactId>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>
<version>1.9</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>
</dependencies>
<build>
<plugins>

13
pom.xml
View File

@ -112,7 +112,8 @@
<org.slf4j.version>1.7.36</org.slf4j.version>
<ranger.version>2.1.0</ranger.version>
<jetty.version>9.4.46.v20220331</jetty.version>
<jackson.bom.version>2.13.2.20220328</jackson.bom.version>
<jackson.bom.version>2.13.2</jackson.bom.version>
<avro.version>1.11.0</avro.version>
<jaxb.runtime.version>2.3.5</jaxb.runtime.version>
<jakarta.xml.bind-api.version>2.3.3</jakarta.xml.bind-api.version>
<json.smart.version>2.4.8</json.smart.version>
@ -274,6 +275,16 @@
<artifactId>bcpg-jdk15on</artifactId>
<version>${org.bouncycastle.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>