diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java index ba99058fcf..f47466134e 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java @@ -74,7 +74,7 @@ public class IndexingAction { } for (final SearchableField searchableField : attributeSearchableFields) { - addField(doc, searchableField, attributes.get(searchableField.getSearchableFieldName()), Store.NO); + addField(doc, searchableField, LuceneUtil.truncateIndexField(attributes.get(searchableField.getSearchableFieldName())), Store.NO); } final String storageFilename = LuceneUtil.substringBefore(record.getStorageFilename(), "."); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java index 08a99d61ea..56e871fc11 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java @@ -17,6 +17,12 @@ package org.apache.nifi.provenance.lucene; import java.io.File; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CodingErrorAction; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; @@ -30,6 +36,7 @@ import org.apache.nifi.processor.DataUnit; import org.apache.nifi.provenance.SearchableFields; import org.apache.nifi.provenance.search.SearchTerm; import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; import org.apache.lucene.search.BooleanClause; @@ -194,4 +201,37 @@ public class LuceneUtil { } return documentGroups; } + + /** + * Truncate a single field so that it does not exceed Lucene's byte size limit on indexed terms. + * + * @param field the string to be indexed + * @return a string that can be indexed which is within Lucene's byte size limit, or null if anything goes wrong + */ + public static String truncateIndexField(String field) { + if (field == null) { + return field; + } + + Charset charset = Charset.defaultCharset(); + byte[] bytes = field.getBytes(charset); + if (bytes.length <= IndexWriter.MAX_TERM_LENGTH) { + return field; + } + + // chop the field to maximum allowed byte length + ByteBuffer bbuf = ByteBuffer.wrap(bytes, 0, IndexWriter.MAX_TERM_LENGTH); + + try { + // decode the chopped byte buffer back into original charset + CharsetDecoder decoder = charset.newDecoder(); + decoder.onMalformedInput(CodingErrorAction.IGNORE); + decoder.reset(); + CharBuffer cbuf = decoder.decode(bbuf); + return cbuf.toString(); + } catch (CharacterCodingException shouldNotHappen) {} + + // if we get here, something bad has happened + return null; + } } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index a210aa95c3..9dc340ada1 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -349,6 +349,51 @@ public class TestPersistentProvenanceRepository { } } + @Test + public void testIndexOnRolloverWithImmenseAttribute() throws IOException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); + config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); + config.setSearchableAttributes(SearchableFieldParser.extractSearchableFields("immense", false)); + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); + repo.initialize(getEventReporter(), null, null); + + int immenseAttrSize = 33000; // must be greater than 32766 for a meaningful test + StringBuilder immenseBldr = new StringBuilder(immenseAttrSize); + for (int i=0; i < immenseAttrSize; i++) { + immenseBldr.append('0'); + } + final String uuid = "00000000-0000-0000-0000-000000000000"; + final Map attributes = new HashMap<>(); + attributes.put("abc", "xyz"); + attributes.put("xyz", "abc"); + attributes.put("filename", "file-" + uuid); + attributes.put("immense", immenseBldr.toString()); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + for (int i = 0; i < 10; i++) { + attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i); + builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); + repo.registerEvent(builder.build()); + } + + repo.waitForRollover(); + + final Query query = new Query(UUID.randomUUID().toString()); + query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.newSearchableAttribute("immense"), "000*")); + query.setMaxResults(100); + + final QueryResult result = repo.queryEvents(query, createUser()); + assertEquals(10, result.getMatchingEvents().size()); + } + @Test public void testIndexOnRolloverAndSubsequentSearch() throws IOException, InterruptedException, ParseException { final RepositoryConfiguration config = createConfiguration();