mirror of https://github.com/apache/nifi.git
NIFI-2787 truncate flowfile attributes that get indexed to fit within Lucene limits
Signed-off-by: Joe Skora <jskora@apache.org> This closes #1043
This commit is contained in:
parent
25150d4016
commit
85a1f753a7
|
@ -74,7 +74,7 @@ public class IndexingAction {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final SearchableField searchableField : attributeSearchableFields) {
|
for (final SearchableField searchableField : attributeSearchableFields) {
|
||||||
addField(doc, searchableField, attributes.get(searchableField.getSearchableFieldName()), Store.NO);
|
addField(doc, searchableField, LuceneUtil.truncateIndexField(attributes.get(searchableField.getSearchableFieldName())), Store.NO);
|
||||||
}
|
}
|
||||||
|
|
||||||
final String storageFilename = LuceneUtil.substringBefore(record.getStorageFilename(), ".");
|
final String storageFilename = LuceneUtil.substringBefore(record.getStorageFilename(), ".");
|
||||||
|
|
|
@ -17,6 +17,12 @@
|
||||||
package org.apache.nifi.provenance.lucene;
|
package org.apache.nifi.provenance.lucene;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.CharBuffer;
|
||||||
|
import java.nio.charset.CharacterCodingException;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.nio.charset.CharsetDecoder;
|
||||||
|
import java.nio.charset.CodingErrorAction;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -30,6 +36,7 @@ import org.apache.nifi.processor.DataUnit;
|
||||||
import org.apache.nifi.provenance.SearchableFields;
|
import org.apache.nifi.provenance.SearchableFields;
|
||||||
import org.apache.nifi.provenance.search.SearchTerm;
|
import org.apache.nifi.provenance.search.SearchTerm;
|
||||||
import org.apache.lucene.document.Document;
|
import org.apache.lucene.document.Document;
|
||||||
|
import org.apache.lucene.index.IndexWriter;
|
||||||
import org.apache.lucene.index.IndexableField;
|
import org.apache.lucene.index.IndexableField;
|
||||||
import org.apache.lucene.index.Term;
|
import org.apache.lucene.index.Term;
|
||||||
import org.apache.lucene.search.BooleanClause;
|
import org.apache.lucene.search.BooleanClause;
|
||||||
|
@ -194,4 +201,37 @@ public class LuceneUtil {
|
||||||
}
|
}
|
||||||
return documentGroups;
|
return documentGroups;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Truncate a single field so that it does not exceed Lucene's byte size limit on indexed terms.
|
||||||
|
*
|
||||||
|
* @param field the string to be indexed
|
||||||
|
* @return a string that can be indexed which is within Lucene's byte size limit, or null if anything goes wrong
|
||||||
|
*/
|
||||||
|
public static String truncateIndexField(String field) {
|
||||||
|
if (field == null) {
|
||||||
|
return field;
|
||||||
|
}
|
||||||
|
|
||||||
|
Charset charset = Charset.defaultCharset();
|
||||||
|
byte[] bytes = field.getBytes(charset);
|
||||||
|
if (bytes.length <= IndexWriter.MAX_TERM_LENGTH) {
|
||||||
|
return field;
|
||||||
|
}
|
||||||
|
|
||||||
|
// chop the field to maximum allowed byte length
|
||||||
|
ByteBuffer bbuf = ByteBuffer.wrap(bytes, 0, IndexWriter.MAX_TERM_LENGTH);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// decode the chopped byte buffer back into original charset
|
||||||
|
CharsetDecoder decoder = charset.newDecoder();
|
||||||
|
decoder.onMalformedInput(CodingErrorAction.IGNORE);
|
||||||
|
decoder.reset();
|
||||||
|
CharBuffer cbuf = decoder.decode(bbuf);
|
||||||
|
return cbuf.toString();
|
||||||
|
} catch (CharacterCodingException shouldNotHappen) {}
|
||||||
|
|
||||||
|
// if we get here, something bad has happened
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -349,6 +349,51 @@ public class TestPersistentProvenanceRepository {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIndexOnRolloverWithImmenseAttribute() throws IOException {
|
||||||
|
final RepositoryConfiguration config = createConfiguration();
|
||||||
|
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
|
||||||
|
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
|
||||||
|
config.setSearchableAttributes(SearchableFieldParser.extractSearchableFields("immense", false));
|
||||||
|
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
|
||||||
|
repo.initialize(getEventReporter(), null, null);
|
||||||
|
|
||||||
|
int immenseAttrSize = 33000; // must be greater than 32766 for a meaningful test
|
||||||
|
StringBuilder immenseBldr = new StringBuilder(immenseAttrSize);
|
||||||
|
for (int i=0; i < immenseAttrSize; i++) {
|
||||||
|
immenseBldr.append('0');
|
||||||
|
}
|
||||||
|
final String uuid = "00000000-0000-0000-0000-000000000000";
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("abc", "xyz");
|
||||||
|
attributes.put("xyz", "abc");
|
||||||
|
attributes.put("filename", "file-" + uuid);
|
||||||
|
attributes.put("immense", immenseBldr.toString());
|
||||||
|
|
||||||
|
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
|
||||||
|
builder.setEventTime(System.currentTimeMillis());
|
||||||
|
builder.setEventType(ProvenanceEventType.RECEIVE);
|
||||||
|
builder.setTransitUri("nifi://unit-test");
|
||||||
|
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
|
||||||
|
builder.setComponentId("1234");
|
||||||
|
builder.setComponentType("dummy processor");
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
|
||||||
|
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
|
||||||
|
repo.registerEvent(builder.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
repo.waitForRollover();
|
||||||
|
|
||||||
|
final Query query = new Query(UUID.randomUUID().toString());
|
||||||
|
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.newSearchableAttribute("immense"), "000*"));
|
||||||
|
query.setMaxResults(100);
|
||||||
|
|
||||||
|
final QueryResult result = repo.queryEvents(query, createUser());
|
||||||
|
assertEquals(10, result.getMatchingEvents().size());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIndexOnRolloverAndSubsequentSearch() throws IOException, InterruptedException, ParseException {
|
public void testIndexOnRolloverAndSubsequentSearch() throws IOException, InterruptedException, ParseException {
|
||||||
final RepositoryConfiguration config = createConfiguration();
|
final RepositoryConfiguration config = createConfiguration();
|
||||||
|
|
Loading…
Reference in New Issue