NIFI-3055 StandardRecordWriter Can Throw UTFDataFormatException

* Updated StandardRecordWriter, even though it is now deprecated to consider the encoding behavior of java.io.DataOutputStream.writeUTF() and truncate string values such that the UTF representation will not be longer than that DataOutputStream's 64K UTF format limit.
* Updated the new SchemaRecordWriter class to similarly truncate long Strings that will be written as UTF.
* Add tests to confirm handling of large UTF strings and various edge conditions of UTF string handling.

Signed-off-by: Mike Moser <mosermw@apache.org>

This closes #1469.
This commit is contained in:
Joe Skora 2017-02-02 18:24:56 +00:00 committed by Mike Moser
parent 2d6d7710c7
commit 376af83a3d
6 changed files with 285 additions and 11 deletions

View File

@ -21,6 +21,11 @@
</parent>
<artifactId>nifi-schema-utils</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -17,9 +17,13 @@
package org.apache.nifi.repository.schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UTFDataFormatException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
@ -27,6 +31,10 @@ import java.util.Map;
public class SchemaRecordWriter {
public static final int MAX_ALLOWED_UTF_LENGTH = 65_535;
private static final Logger logger = LoggerFactory.getLogger(SchemaRecordWriter.class);
public void writeRecord(final Record record, final OutputStream out) throws IOException {
// write sentinel value to indicate that there is a record. This allows the reader to then read one
// byte and check if -1. If so, the reader knows there are no more records. If not, then the reader
@ -105,7 +113,7 @@ public class SchemaRecordWriter {
out.writeLong((Long) value);
break;
case STRING:
out.writeUTF((String) value);
writeUTFLimited(out, (String) value);
break;
case LONG_STRING:
final byte[] charArray = ((String) value).getBytes(StandardCharsets.UTF_8);
@ -126,7 +134,7 @@ public class SchemaRecordWriter {
break;
case UNION:
final NamedValue namedValue = (NamedValue) value;
out.writeUTF(namedValue.getName());
writeUTFLimited(out, namedValue.getName());
final Record childRecord = (Record) namedValue.getValue();
writeRecordFields(childRecord, out);
break;
@ -136,4 +144,44 @@ public class SchemaRecordWriter {
break;
}
}
private void writeUTFLimited(final DataOutputStream out, final String utfString) throws IOException {
try {
out.writeUTF(utfString);
} catch (UTFDataFormatException e) {
final String truncated = utfString.substring(0, getCharsInUTFLength(utfString, MAX_ALLOWED_UTF_LENGTH));
logger.warn("Truncating repository record value! Attempted to write {} chars that encode to a UTF byte length greater than "
+ "supported maximum ({}), truncating to {} chars.",
utfString.length(), MAX_ALLOWED_UTF_LENGTH, truncated.length());
if (logger.isDebugEnabled()) {
logger.warn("String value was:\n{}", truncated);
}
out.writeUTF(truncated);
}
}
static int getCharsInUTFLength(final String str, final int utfLimit) {
// see java.io.DataOutputStream.writeUTF()
int strlen = str.length();
int utflen = 0;
int c;
/* use charAt instead of copying String to Char array */
for (int i = 0; i < strlen; i++) {
c = str.charAt(i);
if ((c >= 0x0001) & (c <= 0x007F)) {
utflen++;
} else if (c > 0x07FF) {
utflen += 3;
} else {
utflen += 2;
}
if (utflen > utfLimit) {
return i;
}
}
return strlen;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.repository.schema;
import static org.apache.nifi.repository.schema.SchemaRecordWriter.MAX_ALLOWED_UTF_LENGTH;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@ -26,6 +27,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -33,10 +35,18 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
public class TestSchemaRecordReaderWriter {
private static Character utfCharOneByte = '$';
private static Character utfCharTwoByte = '¢';
private static Character utfCharThreeByte = '€';
private static String utfStringOneByte = utfCharOneByte.toString();
private static String utfStringTwoByte = utfCharTwoByte.toString();
private static String utfStringThreeByte = utfCharThreeByte.toString();
@Test
@SuppressWarnings("unchecked")
public void testRoundTrip() throws IOException {
@ -172,6 +182,126 @@ public class TestSchemaRecordReaderWriter {
}
}
@Test
@SuppressWarnings("unchecked")
public void testUTFLargerThan64k() throws IOException {
// Create a Record Schema
final List<RecordField> fields = new ArrayList<>();
fields.add(new SimpleRecordField("int present", FieldType.INT, Repetition.ZERO_OR_ONE));
fields.add(new SimpleRecordField("string present", FieldType.STRING, Repetition.ZERO_OR_ONE));
final RecordSchema schema = new RecordSchema(fields);
// Create a Map of record fields to values, so that we can create a Record to write out
final Map<RecordField, Object> values = new LinkedHashMap<>();
values.put(createField("int present", FieldType.INT), 42);
final String utfString = utfStringOneByte + utfStringTwoByte + utfStringThreeByte; // 3 chars and 6 utf8 bytes
final String seventyK = StringUtils.repeat(utfString, 21845); // 65,535 chars and 131070 utf8 bytes
assertTrue(seventyK.length() == 65535);
assertTrue(seventyK.getBytes("UTF-8").length == 131070);
values.put(createField("string present", FieldType.STRING), seventyK);
final FieldMapRecord originalRecord = new FieldMapRecord(values, schema);
// Write out a record and read it back in.
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
// Write the schema to the stream
schema.writeTo(baos);
// Write the record twice, to make sure that we're able to read/write multiple sequential records
final SchemaRecordWriter writer = new SchemaRecordWriter();
writer.writeRecord(originalRecord, baos);
writer.writeRecord(originalRecord, baos);
try (final InputStream in = new ByteArrayInputStream(baos.toByteArray())) {
// Read the Schema from the stream and create a Record Reader for reading records, based on this schema
final RecordSchema readSchema = RecordSchema.readFrom(in);
final SchemaRecordReader reader = SchemaRecordReader.fromSchema(readSchema);
// Read the records and verify the values.
for (int i=0; i < 2; i++) {
final Record record = reader.readRecord(in);
assertNotNull(record);
assertEquals(42, record.getFieldValue("int present"));
assertTrue(MAX_ALLOWED_UTF_LENGTH - ((String)record.getFieldValue("string present")).getBytes("utf-8").length <= 3);
assertEquals(32768, ((String)record.getFieldValue("string present")).length());
}
// Ensure that there is no more data.
assertNull(reader.readRecord(in));
}
}
}
@Test
public void testSingleCharUTFLengths() {
// verify handling of single characters mapping to 1, 2, and 3 utf byte strings
assertEquals("test 1 char string truncated to 0 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringOneByte, 0));
assertEquals("test 2 char string truncated to 0 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringTwoByte, 0));
assertEquals("test 3 char string truncated to 0 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringThreeByte, 0));
assertEquals("test 1 char string truncated to 1 utf bytes should be 1", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringOneByte, 1));
assertEquals("test 2 char string truncated to 1 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringTwoByte, 1));
assertEquals("test 3 char string truncated to 1 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringThreeByte, 1));
assertEquals("test 1 char string truncated to 2 utf bytes should be 1", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringOneByte, 2));
assertEquals("test 2 char string truncated to 2 utf bytes should be 2", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringTwoByte, 2));
assertEquals("test 3 char string truncated to 2 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(utfStringThreeByte, 2));
assertEquals("test 1 char string truncated to 3 utf bytes should be 1", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringOneByte, 3));
assertEquals("test 2 char string truncated to 3 utf bytes should be 2", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringTwoByte, 3));
assertEquals("test 3 char string truncated to 3 utf bytes should be 3", 1, SchemaRecordWriter.getCharsInUTFLength(utfStringThreeByte, 3));
}
@Test
public void testMultiCharUTFLengths() {
// test boundary conditions as 1, 2, and 3 UTF byte chars are included into utf limit positions used by strings
final String testString1 = utfStringOneByte + utfStringTwoByte + utfStringThreeByte; // char 'abc' utf 'abbccc'
assertEquals("test 6 char string truncated to 0 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(testString1, 0)); // utf ''
assertEquals("test 6 char string truncated to 1 utf bytes should be 1", 1, SchemaRecordWriter.getCharsInUTFLength(testString1, 1)); // utf 'a'
assertEquals("test 6 char string truncated to 2 utf bytes should be 1", 1, SchemaRecordWriter.getCharsInUTFLength(testString1, 2)); // utf 'a'
assertEquals("test 6 char string truncated to 3 utf bytes should be 2", 2, SchemaRecordWriter.getCharsInUTFLength(testString1, 3)); // utf 'abb'
assertEquals("test 6 char string truncated to 4 utf bytes should be 2", 2, SchemaRecordWriter.getCharsInUTFLength(testString1, 4)); // utf 'abb'
assertEquals("test 6 char string truncated to 5 utf bytes should be 2", 2, SchemaRecordWriter.getCharsInUTFLength(testString1, 5)); // utf 'abb'
assertEquals("test 6 char string truncated to 6 utf bytes should be 3", 3, SchemaRecordWriter.getCharsInUTFLength(testString1, 6)); // utf 'abbccc'
}
@Test
public void testSmallCharUTFLengths() throws UnsupportedEncodingException {
final String string12b = StringUtils.repeat(utfStringOneByte + utfStringTwoByte + utfStringThreeByte, 2);
assertEquals("test multi-char string truncated to 0 utf bytes should be 0", 0, SchemaRecordWriter.getCharsInUTFLength(string12b, 0));
assertEquals("test multi-char string truncated to 1 utf bytes should be 0", 1, SchemaRecordWriter.getCharsInUTFLength(string12b, 1));
assertEquals("test multi-char string truncated to 2 utf bytes should be 0", 1, SchemaRecordWriter.getCharsInUTFLength(string12b, 2));
assertEquals("test multi-char string truncated to 3 utf bytes should be 0", 2, SchemaRecordWriter.getCharsInUTFLength(string12b, 3));
assertEquals("test multi-char string truncated to 4 utf bytes should be 0", 2, SchemaRecordWriter.getCharsInUTFLength(string12b, 4));
assertEquals("test multi-char string truncated to 5 utf bytes should be 0", 2, SchemaRecordWriter.getCharsInUTFLength(string12b, 5));
assertEquals("test multi-char string truncated to 6 utf bytes should be 0", 3, SchemaRecordWriter.getCharsInUTFLength(string12b, 6));
assertEquals("test multi-char string truncated to 7 utf bytes should be 0", 4, SchemaRecordWriter.getCharsInUTFLength(string12b, 7));
assertEquals("test multi-char string truncated to 8 utf bytes should be 0", 4, SchemaRecordWriter.getCharsInUTFLength(string12b, 8));
assertEquals("test multi-char string truncated to 9 utf bytes should be 0", 5, SchemaRecordWriter.getCharsInUTFLength(string12b, 9));
assertEquals("test multi-char string truncated to 10 utf bytes should be 0", 5, SchemaRecordWriter.getCharsInUTFLength(string12b, 10));
assertEquals("test multi-char string truncated to 11 utf bytes should be 0", 5, SchemaRecordWriter.getCharsInUTFLength(string12b, 11));
assertEquals("test multi-char string truncated to 12 utf bytes should be 0", 6, SchemaRecordWriter.getCharsInUTFLength(string12b, 12));
}
@Test
public void testLargeCharUTFLengths() {
final String string64k = StringUtils.repeat(utfStringOneByte + utfStringTwoByte + utfStringThreeByte, 21845);
assertEquals("test 64k char string should be 64k chars long", 65535, string64k.length());
// drop half the chars going to utf of 64k bytes -- (1+1+1) * 21845 = 65535 chars which converts to (1+2+3) * 21845 = 131070 utf bytes so 1/2 is truncated
assertEquals("test 64k char string truncated to 65,535 utf bytes should be 32768", 32768, SchemaRecordWriter.getCharsInUTFLength(string64k, 65535));
// dropping bytes off the end of utf length
assertEquals("test 64k char string truncated to 65,534 utf bytes should be 32767", 32767, SchemaRecordWriter.getCharsInUTFLength(string64k, 65534)); // lost 2 byte char
assertEquals("test 64k char string truncated to 65,533 utf bytes should be 32767", 32767, SchemaRecordWriter.getCharsInUTFLength(string64k, 65533));
assertEquals("test 64k char string truncated to 65,532 utf bytes should be 32766", 32766, SchemaRecordWriter.getCharsInUTFLength(string64k, 65532)); // lost 1 byte char
assertEquals("test 64k char string truncated to 65,531 utf bytes should be 32765", 32765, SchemaRecordWriter.getCharsInUTFLength(string64k, 65531)); // lost 3 byte char
assertEquals("test 64k char string truncated to 65,530 utf bytes should be 32765", 32765, SchemaRecordWriter.getCharsInUTFLength(string64k, 65530));
assertEquals("test 64k char string truncated to 65,529 utf bytes should be 32765", 32765, SchemaRecordWriter.getCharsInUTFLength(string64k, 65529));
assertEquals("test 64k char string truncated to 65,528 utf bytes should be 32764", 32764, SchemaRecordWriter.getCharsInUTFLength(string64k, 65528)); // lost 2 byte char (again)
}
private SimpleRecordField createField(final String fieldName, final FieldType type) {
return new SimpleRecordField(fieldName, type, Repetition.ZERO_OR_ONE);
}

View File

@ -58,5 +58,10 @@
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-queryparser</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -19,6 +19,7 @@ package org.apache.nifi.provenance;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UTFDataFormatException;
import java.util.Collection;
import java.util.Map;
@ -34,6 +35,9 @@ import org.slf4j.LoggerFactory;
*/
@Deprecated
public class StandardRecordWriter extends CompressableRecordWriter implements RecordWriter {
public static final int MAX_ALLOWED_UTF_LENGTH = 65_535;
private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class);
public static final int SERIALIZATION_VERISON = 9;
public static final String SERIALIZATION_NAME = "org.apache.nifi.provenance.PersistentProvenanceRepository";
@ -72,7 +76,7 @@ public class StandardRecordWriter extends CompressableRecordWriter implements Re
final ProvenanceEventType recordType = record.getEventType();
out.writeLong(recordIdentifier);
out.writeUTF(record.getEventType().name());
writeUTFLimited(out, record.getEventType().name());
out.writeLong(record.getEventTime());
out.writeLong(record.getFlowFileEntryDate());
out.writeLong(record.getEventDuration());
@ -101,9 +105,9 @@ public class StandardRecordWriter extends CompressableRecordWriter implements Re
// If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
if (record.getContentClaimSection() != null && record.getContentClaimContainer() != null && record.getContentClaimIdentifier() != null) {
out.writeBoolean(true);
out.writeUTF(record.getContentClaimContainer());
out.writeUTF(record.getContentClaimSection());
out.writeUTF(record.getContentClaimIdentifier());
writeUTFLimited(out, record.getContentClaimContainer());
writeUTFLimited(out, record.getContentClaimSection());
writeUTFLimited(out, record.getContentClaimIdentifier());
if (record.getContentClaimOffset() == null) {
out.writeLong(0L);
} else {
@ -117,9 +121,9 @@ public class StandardRecordWriter extends CompressableRecordWriter implements Re
// If Previous Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
if (record.getPreviousContentClaimSection() != null && record.getPreviousContentClaimContainer() != null && record.getPreviousContentClaimIdentifier() != null) {
out.writeBoolean(true);
out.writeUTF(record.getPreviousContentClaimContainer());
out.writeUTF(record.getPreviousContentClaimSection());
out.writeUTF(record.getPreviousContentClaimIdentifier());
writeUTFLimited(out, record.getPreviousContentClaimContainer());
writeUTFLimited(out, record.getPreviousContentClaimSection());
writeUTFLimited(out, record.getPreviousContentClaimIdentifier());
if (record.getPreviousContentClaimOffset() == null) {
out.writeLong(0L);
} else {
@ -157,7 +161,7 @@ public class StandardRecordWriter extends CompressableRecordWriter implements Re
}
protected void writeUUID(final DataOutputStream out, final String uuid) throws IOException {
out.writeUTF(uuid);
writeUTFLimited(out, uuid);
}
protected void writeUUIDs(final DataOutputStream out, final Collection<String> list) throws IOException {
@ -176,7 +180,7 @@ public class StandardRecordWriter extends CompressableRecordWriter implements Re
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(toWrite);
writeUTFLimited(out, toWrite);
}
}
@ -195,6 +199,45 @@ public class StandardRecordWriter extends CompressableRecordWriter implements Re
out.write(bytes);
}
private void writeUTFLimited(final java.io.DataOutputStream out, final String utfString) throws IOException {
try {
out.writeUTF(utfString);
} catch (UTFDataFormatException e) {
final String truncated = utfString.substring(0, getCharsInUTFLength(utfString, MAX_ALLOWED_UTF_LENGTH));
logger.warn("Truncating repository record value! Attempted to write {} chars that encode to a UTF byte length greater than "
+ "supported maximum ({}), truncating to {} chars.",
utfString.length(), MAX_ALLOWED_UTF_LENGTH, truncated.length());
if (logger.isDebugEnabled()) {
logger.warn("String value was:\n{}", truncated);
}
out.writeUTF(truncated);
}
}
static int getCharsInUTFLength(final String str, final int utfLimit) {
// see java.io.DataOutputStream.writeUTF()
int strlen = str.length();
int utflen = 0;
int c;
/* use charAt instead of copying String to Char array */
for (int i = 0; i < strlen; i++) {
c = str.charAt(i);
if ((c >= 0x0001) & (c <= 0x007F)) {
utflen++;
} else if (c > 0x07FF) {
utflen += 3;
} else {
utflen += 2;
}
if (utflen > utfLimit) {
return i;
}
}
return strlen;
}
@Override
public String toString() {

View File

@ -21,8 +21,12 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordWriter;
import org.apache.nifi.provenance.toc.NopTocWriter;
@ -33,6 +37,9 @@ import org.apache.nifi.stream.io.NullOutputStream;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.nifi.provenance.TestUtil.createFlowFile;
import static org.junit.Assert.assertTrue;
public class TestStandardRecordReaderWriter extends AbstractTestRecordReaderWriter {
@ -108,6 +115,42 @@ public class TestStandardRecordReaderWriter extends AbstractTestRecordReaderWrit
System.out.println("Took " + millis + " millis to read " + numEvents + " events");
}
@Test
public void testWriteUtfLargerThan64k() throws IOException, InterruptedException {
final Map<String, String> attributes = new HashMap<>();
attributes.put("filename", "1.txt");
attributes.put("uuid", UUID.randomUUID().toString());
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
builder.setComponentId("1234");
builder.setComponentType("dummy processor");
final String seventyK = StringUtils.repeat("X", 70000);
assertTrue(seventyK.length() > 65535);
assertTrue(seventyK.getBytes("UTF-8").length > 65535);
builder.setDetails(seventyK);
final ProvenanceEventRecord record = builder.build();
try (final ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(headerOut)) {
out.writeUTF(PersistentProvenanceRepository.class.getName());
out.writeInt(9);
}
try (final ByteArrayOutputStream recordOut = new ByteArrayOutputStream();
final StandardRecordWriter writer = new StandardRecordWriter(recordOut, null, false, 0)) {
writer.writeHeader(1L);
recordOut.reset();
writer.writeRecord(record, 1L);
}
}
@Override
protected RecordWriter createWriter(File file, TocWriter tocWriter, boolean compressed, int uncompressedBlockSize) throws IOException {
return new StandardRecordWriter(file, tocWriter, compressed, uncompressedBlockSize);