diff --git a/nifi-commons/nifi-schema-utils/pom.xml b/nifi-commons/nifi-schema-utils/pom.xml
index 11c62aa8e5..a1fecb3741 100644
--- a/nifi-commons/nifi-schema-utils/pom.xml
+++ b/nifi-commons/nifi-schema-utils/pom.xml
@@ -21,6 +21,11 @@
nifi-schema-utils
+
+ org.apache.commons
+ commons-lang3
+ test
+
diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
index 469388962f..81043bc283 100644
--- a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
+++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
@@ -17,9 +17,13 @@
package org.apache.nifi.repository.schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.io.UTFDataFormatException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
@@ -27,6 +31,10 @@ import java.util.Map;
public class SchemaRecordWriter {
+ public static final int MAX_ALLOWED_UTF_LENGTH = 65_535;
+
+ private static final Logger logger = LoggerFactory.getLogger(SchemaRecordWriter.class);
+
public void writeRecord(final Record record, final OutputStream out) throws IOException {
// write sentinel value to indicate that there is a record. This allows the reader to then read one
// byte and check if -1. If so, the reader knows there are no more records. If not, then the reader
@@ -105,7 +113,7 @@ public class SchemaRecordWriter {
out.writeLong((Long) value);
break;
case STRING:
- out.writeUTF((String) value);
+ writeUTFLimited(out, (String) value);
break;
case LONG_STRING:
final byte[] charArray = ((String) value).getBytes(StandardCharsets.UTF_8);
@@ -126,7 +134,7 @@ public class SchemaRecordWriter {
break;
case UNION:
final NamedValue namedValue = (NamedValue) value;
- out.writeUTF(namedValue.getName());
+ writeUTFLimited(out, namedValue.getName());
final Record childRecord = (Record) namedValue.getValue();
writeRecordFields(childRecord, out);
break;
@@ -136,4 +144,44 @@ public class SchemaRecordWriter {
break;
}
}
+
+ private void writeUTFLimited(final DataOutputStream out, final String utfString) throws IOException {
+ try {
+ out.writeUTF(utfString);
+ } catch (UTFDataFormatException e) {
+ final String truncated = utfString.substring(0, getCharsInUTFLength(utfString, MAX_ALLOWED_UTF_LENGTH));
+ logger.warn("Truncating repository record value! Attempted to write {} chars that encode to a UTF byte length greater than "
+ + "supported maximum ({}), truncating to {} chars.",
+ utfString.length(), MAX_ALLOWED_UTF_LENGTH, truncated.length());
+ if (logger.isDebugEnabled()) {
+ logger.warn("String value was:\n{}", truncated);
+ }
+ out.writeUTF(truncated);
+ }
+ }
+
+
+ static int getCharsInUTFLength(final String str, final int utfLimit) {
+ // see java.io.DataOutputStream.writeUTF()
+ int strlen = str.length();
+ int utflen = 0;
+ int c;
+
+ /* use charAt instead of copying String to Char array */
+ for (int i = 0; i < strlen; i++) {
+ c = str.charAt(i);
+ if ((c >= 0x0001) & (c <= 0x007F)) {
+ utflen++;
+ } else if (c > 0x07FF) {
+ utflen += 3;
+ } else {
+ utflen += 2;
+ }
+ if (utflen > utfLimit) {
+ return i;
+ }
+ }
+ return strlen;
+ }
+
}
diff --git a/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java b/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java
index 18548fb2be..5eb815aa16 100644
--- a/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java
+++ b/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java
@@ -17,6 +17,7 @@
package org.apache.nifi.repository.schema;
+import static org.apache.nifi.repository.schema.SchemaRecordWriter.MAX_ALLOWED_UTF_LENGTH;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -26,6 +27,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -33,10 +35,18 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
public class TestSchemaRecordReaderWriter {
+ private static Character utfCharOneByte = '$';
+ private static Character utfCharTwoByte = '¢';
+ private static Character utfCharThreeByte = '€';
+ private static String utfStringOneByte = utfCharOneByte.toString();
+ private static String utfStringTwoByte = utfCharTwoByte.toString();
+ private static String utfStringThreeByte = utfCharThreeByte.toString();
+
@Test
@SuppressWarnings("unchecked")
public void testRoundTrip() throws IOException {
@@ -172,6 +182,126 @@ public class TestSchemaRecordReaderWriter {
}
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testUTFLargerThan64k() throws IOException {
+ // Create a Record Schema
+ final List fields = new ArrayList<>();
+ fields.add(new SimpleRecordField("int present", FieldType.INT, Repetition.ZERO_OR_ONE));
+ fields.add(new SimpleRecordField("string present", FieldType.STRING, Repetition.ZERO_OR_ONE));
+
+ final RecordSchema schema = new RecordSchema(fields);
+
+ // Create a Map of record fields to values, so that we can create a Record to write out
+ final Map values = new LinkedHashMap<>();
+ values.put(createField("int present", FieldType.INT), 42);
+ final String utfString = utfStringOneByte + utfStringTwoByte + utfStringThreeByte; // 3 chars and 6 utf8 bytes
+ final String seventyK = StringUtils.repeat(utfString, 21845); // 65,535 chars and 131070 utf8 bytes
+ assertTrue(seventyK.length() == 65535);
+ assertTrue(seventyK.getBytes("UTF-8").length == 131070);
+ values.put(createField("string present", FieldType.STRING), seventyK);
+
+ final FieldMapRecord originalRecord = new FieldMapRecord(values, schema);
+
+ // Write out a record and read it back in.
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ // Write the schema to the stream
+ schema.writeTo(baos);
+
+ // Write the record twice, to make sure that we're able to read/write multiple sequential records
+ final SchemaRecordWriter writer = new SchemaRecordWriter();
+ writer.writeRecord(originalRecord, baos);
+ writer.writeRecord(originalRecord, baos);
+
+ try (final InputStream in = new ByteArrayInputStream(baos.toByteArray())) {
+ // Read the Schema from the stream and create a Record Reader for reading records, based on this schema
+ final RecordSchema readSchema = RecordSchema.readFrom(in);
+ final SchemaRecordReader reader = SchemaRecordReader.fromSchema(readSchema);
+
+ // Read the records and verify the values.
+ for (int i=0; i < 2; i++) {
+ final Record record = reader.readRecord(in);
+
+ assertNotNull(record);
+ assertEquals(42, record.getFieldValue("int present"));
+ assertTrue(MAX_ALLOWED_UTF_LENGTH - ((String)record.getFieldValue("string present")).getBytes("utf-8").length <= 3);
+ assertEquals(32768, ((String)record.getFieldValue("string present")).length());
+ }
+
+ // Ensure that there is no more data.
+ assertNull(reader.readRecord(in));
+ }
+ }
+ }
+
+ @Test
+ public void testSingleCharUTFLengths() {
+ // verify handling of single characters mapping to 1, 2, and 3 utf byte strings
+ assertEquals("test 1 char string truncated to 0 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringOneByte, 0));
+ assertEquals("test 2 char string truncated to 0 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringTwoByte, 0));
+ assertEquals("test 3 char string truncated to 0 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringThreeByte, 0));
+ assertEquals("test 1 char string truncated to 1 utf bytes should be 1", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringOneByte, 1));
+ assertEquals("test 2 char string truncated to 1 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringTwoByte, 1));
+ assertEquals("test 3 char string truncated to 1 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringThreeByte, 1));
+ assertEquals("test 1 char string truncated to 2 utf bytes should be 1", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringOneByte, 2));
+ assertEquals("test 2 char string truncated to 2 utf bytes should be 2", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringTwoByte, 2));
+ assertEquals("test 3 char string truncated to 2 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringThreeByte, 2));
+ assertEquals("test 1 char string truncated to 3 utf bytes should be 1", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringOneByte, 3));
+ assertEquals("test 2 char string truncated to 3 utf bytes should be 2", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringTwoByte, 3));
+ assertEquals("test 3 char string truncated to 3 utf bytes should be 3", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringThreeByte, 3));
+ }
+
+ @Test
+ public void testMultiCharUTFLengths() {
+ // test boundary conditions as 1, 2, and 3 UTF byte chars are included into utf limit positions used by strings
+ final String testString1 = utfStringOneByte + utfStringTwoByte + utfStringThreeByte; // char 'abc' utf 'abbccc'
+ assertEquals("test 6 char string truncated to 0 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(testString1, 0)); // utf ''
+ assertEquals("test 6 char string truncated to 1 utf bytes should be 1", 1, SchemaRecordWriter.getCharsInUTFLength(testString1, 1)); // utf 'a'
+ assertEquals("test 6 char string truncated to 2 utf bytes should be 1", 1, SchemaRecordWriter.getCharsInUTFLength(testString1, 2)); // utf 'a'
+ assertEquals("test 6 char string truncated to 3 utf bytes should be 2", 2, SchemaRecordWriter.getCharsInUTFLength(testString1, 3)); // utf 'abb'
+ assertEquals("test 6 char string truncated to 4 utf bytes should be 2", 2, SchemaRecordWriter.getCharsInUTFLength(testString1, 4)); // utf 'abb'
+ assertEquals("test 6 char string truncated to 5 utf bytes should be 2", 2, SchemaRecordWriter.getCharsInUTFLength(testString1, 5)); // utf 'abb'
+ assertEquals("test 6 char string truncated to 6 utf bytes should be 3", 3, SchemaRecordWriter.getCharsInUTFLength(testString1, 6)); // utf 'abbccc'
+ }
+
+ @Test
+ public void testSmallCharUTFLengths() throws UnsupportedEncodingException {
+ final String string12b = StringUtils.repeat(utfStringOneByte + utfStringTwoByte + utfStringThreeByte, 2);
+
+ assertEquals("test multi-char string truncated to 0 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(string12b, 0));
+ assertEquals("test multi-char string truncated to 1 utf bytes should be 0", 1, SchemaRecordWriter.getCharsInUTFLength(string12b, 1));
+ assertEquals("test multi-char string truncated to 2 utf bytes should be 0", 1, SchemaRecordWriter.getCharsInUTFLength(string12b, 2));
+ assertEquals("test multi-char string truncated to 3 utf bytes should be 0", 2, SchemaRecordWriter.getCharsInUTFLength(string12b, 3));
+ assertEquals("test multi-char string truncated to 4 utf bytes should be 0", 2, SchemaRecordWriter.getCharsInUTFLength(string12b, 4));
+ assertEquals("test multi-char string truncated to 5 utf bytes should be 0", 2, SchemaRecordWriter.getCharsInUTFLength(string12b, 5));
+ assertEquals("test multi-char string truncated to 6 utf bytes should be 0", 3, SchemaRecordWriter.getCharsInUTFLength(string12b, 6));
+ assertEquals("test multi-char string truncated to 7 utf bytes should be 0", 4, SchemaRecordWriter.getCharsInUTFLength(string12b, 7));
+ assertEquals("test multi-char string truncated to 8 utf bytes should be 0", 4, SchemaRecordWriter.getCharsInUTFLength(string12b, 8));
+ assertEquals("test multi-char string truncated to 9 utf bytes should be 0", 5, SchemaRecordWriter.getCharsInUTFLength(string12b, 9));
+ assertEquals("test multi-char string truncated to 10 utf bytes should be 0", 5, SchemaRecordWriter.getCharsInUTFLength(string12b, 10));
+ assertEquals("test multi-char string truncated to 11 utf bytes should be 0", 5, SchemaRecordWriter.getCharsInUTFLength(string12b, 11));
+ assertEquals("test multi-char string truncated to 12 utf bytes should be 0", 6, SchemaRecordWriter.getCharsInUTFLength(string12b, 12));
+ }
+
+ @Test
+ public void testLargeCharUTFLengths() {
+ final String string64k = StringUtils.repeat(utfStringOneByte + utfStringTwoByte + utfStringThreeByte, 21845);
+
+ assertEquals("test 64k char string should be 64k chars long", 65535, string64k.length());
+
+ // drop half the chars going to utf of 64k bytes -- (1+1+1) * 21845 = 65535 chars which converts to (1+2+3) * 21845 = 131070 utf bytes so 1/2 is truncated
+ assertEquals("test 64k char string truncated to 65,535 utf bytes should be 32768", 32768, SchemaRecordWriter.getCharsInUTFLength(string64k, 65535));
+
+ // dropping bytes off the end of utf length
+ assertEquals("test 64k char string truncated to 65,534 utf bytes should be 32767", 32767, SchemaRecordWriter.getCharsInUTFLength(string64k, 65534)); // lost 2 byte char
+ assertEquals("test 64k char string truncated to 65,533 utf bytes should be 32767", 32767, SchemaRecordWriter.getCharsInUTFLength(string64k, 65533));
+ assertEquals("test 64k char string truncated to 65,532 utf bytes should be 32766", 32766, SchemaRecordWriter.getCharsInUTFLength(string64k, 65532)); // lost 1 byte char
+ assertEquals("test 64k char string truncated to 65,531 utf bytes should be 32765", 32765, SchemaRecordWriter.getCharsInUTFLength(string64k, 65531)); // lost 3 byte char
+ assertEquals("test 64k char string truncated to 65,530 utf bytes should be 32765", 32765, SchemaRecordWriter.getCharsInUTFLength(string64k, 65530));
+ assertEquals("test 64k char string truncated to 65,529 utf bytes should be 32765", 32765, SchemaRecordWriter.getCharsInUTFLength(string64k, 65529));
+ assertEquals("test 64k char string truncated to 65,528 utf bytes should be 32764", 32764, SchemaRecordWriter.getCharsInUTFLength(string64k, 65528)); // lost 2 byte char (again)
+ }
+
private SimpleRecordField createField(final String fieldName, final FieldType type) {
return new SimpleRecordField(fieldName, type, Repetition.ZERO_OR_ONE);
}
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
index a56296f40d..4db4169ff0 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
@@ -58,5 +58,10 @@
org.apache.lucene
lucene-queryparser
+
+ org.apache.commons
+ commons-lang3
+ test
+
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
index a95bd4f960..076e507e52 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
@@ -19,6 +19,7 @@ package org.apache.nifi.provenance;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
+import java.io.UTFDataFormatException;
import java.util.Collection;
import java.util.Map;
@@ -34,6 +35,9 @@ import org.slf4j.LoggerFactory;
*/
@Deprecated
public class StandardRecordWriter extends CompressableRecordWriter implements RecordWriter {
+
+ public static final int MAX_ALLOWED_UTF_LENGTH = 65_535;
+
private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class);
public static final int SERIALIZATION_VERISON = 9;
public static final String SERIALIZATION_NAME = "org.apache.nifi.provenance.PersistentProvenanceRepository";
@@ -72,7 +76,7 @@ public class StandardRecordWriter extends CompressableRecordWriter implements Re
final ProvenanceEventType recordType = record.getEventType();
out.writeLong(recordIdentifier);
- out.writeUTF(record.getEventType().name());
+ writeUTFLimited(out, record.getEventType().name());
out.writeLong(record.getEventTime());
out.writeLong(record.getFlowFileEntryDate());
out.writeLong(record.getEventDuration());
@@ -101,9 +105,9 @@ public class StandardRecordWriter extends CompressableRecordWriter implements Re
// If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
if (record.getContentClaimSection() != null && record.getContentClaimContainer() != null && record.getContentClaimIdentifier() != null) {
out.writeBoolean(true);
- out.writeUTF(record.getContentClaimContainer());
- out.writeUTF(record.getContentClaimSection());
- out.writeUTF(record.getContentClaimIdentifier());
+ writeUTFLimited(out, record.getContentClaimContainer());
+ writeUTFLimited(out, record.getContentClaimSection());
+ writeUTFLimited(out, record.getContentClaimIdentifier());
if (record.getContentClaimOffset() == null) {
out.writeLong(0L);
} else {
@@ -117,9 +121,9 @@ public class StandardRecordWriter extends CompressableRecordWriter implements Re
// If Previous Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
if (record.getPreviousContentClaimSection() != null && record.getPreviousContentClaimContainer() != null && record.getPreviousContentClaimIdentifier() != null) {
out.writeBoolean(true);
- out.writeUTF(record.getPreviousContentClaimContainer());
- out.writeUTF(record.getPreviousContentClaimSection());
- out.writeUTF(record.getPreviousContentClaimIdentifier());
+ writeUTFLimited(out, record.getPreviousContentClaimContainer());
+ writeUTFLimited(out, record.getPreviousContentClaimSection());
+ writeUTFLimited(out, record.getPreviousContentClaimIdentifier());
if (record.getPreviousContentClaimOffset() == null) {
out.writeLong(0L);
} else {
@@ -157,7 +161,7 @@ public class StandardRecordWriter extends CompressableRecordWriter implements Re
}
protected void writeUUID(final DataOutputStream out, final String uuid) throws IOException {
- out.writeUTF(uuid);
+ writeUTFLimited(out, uuid);
}
protected void writeUUIDs(final DataOutputStream out, final Collection list) throws IOException {
@@ -176,7 +180,7 @@ public class StandardRecordWriter extends CompressableRecordWriter implements Re
out.writeBoolean(false);
} else {
out.writeBoolean(true);
- out.writeUTF(toWrite);
+ writeUTFLimited(out, toWrite);
}
}
@@ -195,6 +199,45 @@ public class StandardRecordWriter extends CompressableRecordWriter implements Re
out.write(bytes);
}
+ private void writeUTFLimited(final java.io.DataOutputStream out, final String utfString) throws IOException {
+ try {
+ out.writeUTF(utfString);
+ } catch (UTFDataFormatException e) {
+ final String truncated = utfString.substring(0, getCharsInUTFLength(utfString, MAX_ALLOWED_UTF_LENGTH));
+ logger.warn("Truncating repository record value! Attempted to write {} chars that encode to a UTF byte length greater than "
+ + "supported maximum ({}), truncating to {} chars.",
+ utfString.length(), MAX_ALLOWED_UTF_LENGTH, truncated.length());
+ if (logger.isDebugEnabled()) {
+ logger.warn("String value was:\n{}", truncated);
+ }
+ out.writeUTF(truncated);
+ }
+ }
+
+ static int getCharsInUTFLength(final String str, final int utfLimit) {
+ // see java.io.DataOutputStream.writeUTF()
+ int strlen = str.length();
+ int utflen = 0;
+ int c;
+
+ /* use charAt instead of copying String to Char array */
+ for (int i = 0; i < strlen; i++) {
+ c = str.charAt(i);
+ if ((c >= 0x0001) & (c <= 0x007F)) {
+ utflen++;
+ } else if (c > 0x07FF) {
+ utflen += 3;
+ } else {
+ utflen += 2;
+ }
+ if (utflen > utfLimit) {
+ return i;
+ }
+ }
+ return strlen;
+ }
+
+
@Override
public String toString() {
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
index cc69b186fa..dfa37e4798 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
@@ -21,8 +21,12 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordWriter;
import org.apache.nifi.provenance.toc.NopTocWriter;
@@ -33,6 +37,9 @@ import org.apache.nifi.stream.io.NullOutputStream;
import org.junit.Ignore;
import org.junit.Test;
+import static org.apache.nifi.provenance.TestUtil.createFlowFile;
+import static org.junit.Assert.assertTrue;
+
public class TestStandardRecordReaderWriter extends AbstractTestRecordReaderWriter {
@@ -108,6 +115,42 @@ public class TestStandardRecordReaderWriter extends AbstractTestRecordReaderWrit
System.out.println("Took " + millis + " millis to read " + numEvents + " events");
}
+ @Test
+ public void testWriteUtfLargerThan64k() throws IOException, InterruptedException {
+
+ final Map attributes = new HashMap<>();
+ attributes.put("filename", "1.txt");
+ attributes.put("uuid", UUID.randomUUID().toString());
+
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ builder.setEventTime(System.currentTimeMillis());
+ builder.setEventType(ProvenanceEventType.RECEIVE);
+ builder.setTransitUri("nifi://unit-test");
+ builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+ builder.setComponentId("1234");
+ builder.setComponentType("dummy processor");
+ final String seventyK = StringUtils.repeat("X", 70000);
+ assertTrue(seventyK.length() > 65535);
+ assertTrue(seventyK.getBytes("UTF-8").length > 65535);
+ builder.setDetails(seventyK);
+ final ProvenanceEventRecord record = builder.build();
+
+ try (final ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
+ final DataOutputStream out = new DataOutputStream(headerOut)) {
+ out.writeUTF(PersistentProvenanceRepository.class.getName());
+ out.writeInt(9);
+ }
+
+ try (final ByteArrayOutputStream recordOut = new ByteArrayOutputStream();
+ final StandardRecordWriter writer = new StandardRecordWriter(recordOut, null, false, 0)) {
+
+ writer.writeHeader(1L);
+ recordOut.reset();
+
+ writer.writeRecord(record, 1L);
+ }
+ }
+
@Override
protected RecordWriter createWriter(File file, TocWriter tocWriter, boolean compressed, int uncompressedBlockSize) throws IOException {
return new StandardRecordWriter(file, tocWriter, compressed, uncompressedBlockSize);