mirror of https://github.com/apache/nifi.git
Merge branch 'develop' of https://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
This commit is contained in:
commit
1f139e2198
|
@ -261,8 +261,7 @@ language governing permissions and limitations under the License. -->
|
|||
<nifi.provenance.repository.rollover.size>100 MB</nifi.provenance.repository.rollover.size>
|
||||
<nifi.provenance.repository.query.threads>2</nifi.provenance.repository.query.threads>
|
||||
<nifi.provenance.repository.compress.on.rollover>true</nifi.provenance.repository.compress.on.rollover>
|
||||
<nifi.provenance.repository.indexed.fields>EventType, FlowFileUUID,
|
||||
Filename, ProcessorID</nifi.provenance.repository.indexed.fields>
|
||||
<nifi.provenance.repository.indexed.fields>EventType, FlowFileUUID, Filename, ProcessorID</nifi.provenance.repository.indexed.fields>
|
||||
<nifi.provenance.repository.indexed.attributes />
|
||||
<nifi.provenance.repository.index.shard.size>500 MB</nifi.provenance.repository.index.shard.size>
|
||||
<nifi.provenance.repository.always.sync>false</nifi.provenance.repository.always.sync>
|
||||
|
@ -302,8 +301,7 @@ language governing permissions and limitations under the License. -->
|
|||
<nifi.security.ocsp.responder.url />
|
||||
<nifi.security.ocsp.responder.certificate />
|
||||
|
||||
<!-- nifi.properties: cluster common properties (cluster manager and nodes
|
||||
must have same values) -->
|
||||
<!-- nifi.properties: cluster common properties (cluster manager and nodes must have same values) -->
|
||||
<nifi.cluster.protocol.heartbeat.interval>5 sec</nifi.cluster.protocol.heartbeat.interval>
|
||||
<nifi.cluster.protocol.is.secure>false</nifi.cluster.protocol.is.secure>
|
||||
<nifi.cluster.protocol.socket.timeout>30 sec</nifi.cluster.protocol.socket.timeout>
|
||||
|
@ -315,8 +313,7 @@ language governing permissions and limitations under the License. -->
|
|||
<nifi.cluster.protocol.multicast.service.locator.attempts>3</nifi.cluster.protocol.multicast.service.locator.attempts>
|
||||
<nifi.cluster.protocol.multicast.service.locator.attempts.delay>1 sec</nifi.cluster.protocol.multicast.service.locator.attempts.delay>
|
||||
|
||||
<!-- nifi.properties: cluster node properties (only configure for cluster
|
||||
nodes) -->
|
||||
<!-- nifi.properties: cluster node properties (only configure for cluster nodes) -->
|
||||
<nifi.cluster.is.node>false</nifi.cluster.is.node>
|
||||
<nifi.cluster.node.address />
|
||||
<nifi.cluster.node.protocol.port />
|
||||
|
@ -324,8 +321,7 @@ language governing permissions and limitations under the License. -->
|
|||
<nifi.cluster.node.unicast.manager.address />
|
||||
<nifi.cluster.node.unicast.manager.protocol.port />
|
||||
|
||||
<!-- nifi.properties: cluster manager properties (only configure for cluster
|
||||
manager) -->
|
||||
<!-- nifi.properties: cluster manager properties (only configure for cluster manager) -->
|
||||
<nifi.cluster.is.manager>false</nifi.cluster.is.manager>
|
||||
<nifi.cluster.manager.address />
|
||||
<nifi.cluster.manager.protocol.port />
|
||||
|
@ -382,10 +378,8 @@ language governing permissions and limitations under the License. -->
|
|||
<artifactId>rpm-maven-plugin</artifactId>
|
||||
<configuration>
|
||||
<summary>Apache NiFi (incubating)</summary>
|
||||
<description>Apache Nifi (incubating) is dataflow system based on
|
||||
the Flow-Based Programming concepts.</description>
|
||||
<license>Apache License, Version 2.0 and others (see included
|
||||
LICENSE file)</license>
|
||||
<description>Apache Nifi (incubating) is dataflow system based on the Flow-Based Programming concepts.</description>
|
||||
<license>Apache License, Version 2.0 and others (see included LICENSE file)</license>
|
||||
<url>http://nifi.incubator.apache.org</url>
|
||||
<group>Utilities</group>
|
||||
<prefix>/opt/nifi</prefix>
|
||||
|
|
|
@ -44,6 +44,8 @@ public class FileBasedClusterNodeFirewallTest {
|
|||
@Rule
|
||||
public final TemporaryFolder temp = new TemporaryFolder();
|
||||
|
||||
private static final String NONEXISTENT_HOSTNAME = "abc";
|
||||
|
||||
private static boolean badHostsDoNotResolve = false;
|
||||
|
||||
/**
|
||||
|
@ -55,7 +57,7 @@ public class FileBasedClusterNodeFirewallTest {
|
|||
public static void ensureBadHostsDoNotWork() {
|
||||
final InetAddress ip;
|
||||
try {
|
||||
ip = InetAddress.getByName("I typed a search term and my browser expected a host.");
|
||||
ip = InetAddress.getByName(NONEXISTENT_HOSTNAME);
|
||||
} catch (final UnknownHostException uhe) {
|
||||
badHostsDoNotResolve = true;
|
||||
}
|
||||
|
@ -80,13 +82,13 @@ public class FileBasedClusterNodeFirewallTest {
|
|||
public void ensureBadDataWasIgnored() {
|
||||
assumeTrue(badHostsDoNotResolve);
|
||||
assertFalse("firewall treated our malformed data as a host. If " +
|
||||
"`host \"bad data should be skipped\"` works locally, this test should have been " +
|
||||
"skipped.",
|
||||
ipsFirewall.isPermissible("bad data should be skipped"));
|
||||
"`host \"bad data should be skipped\"` works locally, this test should have been " +
|
||||
"skipped.",
|
||||
ipsFirewall.isPermissible("bad data should be skipped"));
|
||||
assertFalse("firewall treated our malformed data as a host. If " +
|
||||
"`host \"more bad data\"` works locally, this test should have been " +
|
||||
"skipped.",
|
||||
ipsFirewall.isPermissible("more bad data"));
|
||||
"`host \"more bad data\"` works locally, this test should have been " +
|
||||
"skipped.",
|
||||
ipsFirewall.isPermissible("more bad data"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -112,9 +114,9 @@ public class FileBasedClusterNodeFirewallTest {
|
|||
@Test
|
||||
public void testIsPermissibleWithMalformedData() {
|
||||
assumeTrue(badHostsDoNotResolve);
|
||||
assertFalse("firewall allowed host 'abc' rather than rejecting as malformed. If `host abc` "
|
||||
+ "works locally, this test should have been skipped.",
|
||||
ipsFirewall.isPermissible("abc"));
|
||||
assertFalse("firewall allowed host '" + NONEXISTENT_HOSTNAME + "' rather than rejecting as malformed. If `host " + NONEXISTENT_HOSTNAME + "` "
|
||||
+ "works locally, this test should have been skipped.",
|
||||
ipsFirewall.isPermissible(NONEXISTENT_HOSTNAME));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -125,9 +127,9 @@ public class FileBasedClusterNodeFirewallTest {
|
|||
@Test
|
||||
public void testIsPermissibleWithEmptyConfigWithMalformedData() {
|
||||
assumeTrue(badHostsDoNotResolve);
|
||||
assertTrue("firewall did not allow malformed host 'abc' under permissive configs. If " +
|
||||
"`host abc` works locally, this test should have been skipped.",
|
||||
acceptAllFirewall.isPermissible("abc"));
|
||||
assertTrue("firewall did not allow malformed host '" + NONEXISTENT_HOSTNAME + "' under permissive configs. If " +
|
||||
"`host " + NONEXISTENT_HOSTNAME + "` works locally, this test should have been skipped.",
|
||||
acceptAllFirewall.isPermissible(NONEXISTENT_HOSTNAME));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.apache.nifi.provenance.lucene.DeleteIndexAction;
|
||||
|
||||
import org.apache.nifi.provenance.toc.TocUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -30,16 +30,32 @@ public class FileRemovalAction implements ExpirationAction {
|
|||
|
||||
@Override
|
||||
public File execute(final File expiredFile) throws IOException {
|
||||
final boolean removed = remove(expiredFile);
|
||||
if (removed) {
|
||||
logger.info("Removed expired Provenance Event file {}", expiredFile);
|
||||
} else {
|
||||
logger.warn("Failed to remove old Provenance Event file {}; this file should be cleaned up manually", expiredFile);
|
||||
}
|
||||
|
||||
final File tocFile = TocUtil.getTocFile(expiredFile);
|
||||
if (remove(tocFile)) {
|
||||
logger.info("Removed expired Provenance Table-of-Contents file {}", tocFile);
|
||||
} else {
|
||||
logger.warn("Failed to remove old Provenance Table-of-Contents file {}; this file should be cleaned up manually", expiredFile);
|
||||
}
|
||||
|
||||
return removed ? null : expiredFile;
|
||||
}
|
||||
|
||||
private boolean remove(final File file) {
|
||||
boolean removed = false;
|
||||
for (int i = 0; i < 10 && !removed; i++) {
|
||||
if ((removed = expiredFile.delete())) {
|
||||
logger.info("Removed expired Provenance Event file {}", expiredFile);
|
||||
return null;
|
||||
if (removed = file.delete()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
logger.warn("Failed to remove old Provenance Event file {}", expiredFile);
|
||||
return expiredFile;
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -64,14 +64,11 @@ public class DocsReader {
|
|||
final int docId = scoreDoc.doc;
|
||||
final Document d = indexReader.document(docId);
|
||||
docs.add(d);
|
||||
if ( retrievalCount.incrementAndGet() >= maxResults ) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
final long readDocuments = System.nanoTime() - start;
|
||||
logger.debug("Reading {} Lucene Documents took {} millis", docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments));
|
||||
return read(docs, allProvenanceLogFiles);
|
||||
return read(docs, allProvenanceLogFiles, retrievalCount, maxResults);
|
||||
}
|
||||
|
||||
|
||||
|
@ -88,7 +85,7 @@ public class DocsReader {
|
|||
|
||||
|
||||
private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException {
|
||||
IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
|
||||
final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
|
||||
if ( blockField == null ) {
|
||||
reader.skipTo(getByteOffset(d, reader));
|
||||
} else {
|
||||
|
@ -97,7 +94,7 @@ public class DocsReader {
|
|||
|
||||
StandardProvenanceEventRecord record;
|
||||
while ( (record = reader.nextRecord()) != null) {
|
||||
IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
|
||||
final IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
|
||||
if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) {
|
||||
break;
|
||||
}
|
||||
|
@ -111,7 +108,11 @@ public class DocsReader {
|
|||
}
|
||||
|
||||
|
||||
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles) throws IOException {
|
||||
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException {
|
||||
if (retrievalCount.get() >= maxResults) {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
LuceneUtil.sortDocsForRetrieval(docs);
|
||||
|
||||
RecordReader reader = null;
|
||||
|
@ -133,6 +134,10 @@ public class DocsReader {
|
|||
try {
|
||||
if (reader != null && storageFilename.equals(lastStorageFilename)) {
|
||||
matchingRecords.add(getRecord(d, reader));
|
||||
|
||||
if ( retrievalCount.incrementAndGet() >= maxResults ) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
logger.debug("Opening log file {}", storageFilename);
|
||||
|
||||
|
@ -141,7 +146,7 @@ public class DocsReader {
|
|||
reader.close();
|
||||
}
|
||||
|
||||
List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
|
||||
final List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
|
||||
if (potentialFiles.isEmpty()) {
|
||||
logger.warn("Could not find Provenance Log File with basename {} in the "
|
||||
+ "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
|
||||
|
@ -158,6 +163,10 @@ public class DocsReader {
|
|||
try {
|
||||
reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles);
|
||||
matchingRecords.add(getRecord(d, reader));
|
||||
|
||||
if ( retrievalCount.incrementAndGet() >= maxResults ) {
|
||||
break;
|
||||
}
|
||||
} catch (final IOException e) {
|
||||
throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e);
|
||||
}
|
||||
|
|
|
@ -119,6 +119,14 @@ public class IndexManager implements Closeable {
|
|||
}
|
||||
|
||||
writerCounts.put(absoluteFile, writerCount);
|
||||
|
||||
// Mark any active searchers as poisoned because we are updating the index
|
||||
final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile);
|
||||
if ( searchers != null ) {
|
||||
for (final ActiveIndexSearcher activeSearcher : searchers) {
|
||||
activeSearcher.poison();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
|
||||
writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
|
||||
|
@ -137,7 +145,7 @@ public class IndexManager implements Closeable {
|
|||
|
||||
lock.lock();
|
||||
try {
|
||||
IndexWriterCount count = writerCounts.remove(absoluteFile);
|
||||
final IndexWriterCount count = writerCounts.remove(absoluteFile);
|
||||
|
||||
try {
|
||||
if ( count == null ) {
|
||||
|
@ -184,6 +192,15 @@ public class IndexManager implements Closeable {
|
|||
try {
|
||||
for ( final ActiveIndexSearcher searcher : currentlyCached ) {
|
||||
if ( searcher.isCache() ) {
|
||||
// if the searcher is poisoned, we want to close and expire it.
|
||||
if ( searcher.isPoisoned() ) {
|
||||
logger.debug("Index Searcher for {} is poisoned; removing cached searcher", absoluteFile);
|
||||
expired.add(searcher);
|
||||
continue;
|
||||
}
|
||||
|
||||
// if there are no references to the reader, it will have been closed. Since there is no
|
||||
// isClosed() method, this is how we determine whether it's been closed or not.
|
||||
final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
|
||||
if ( refCount <= 0 ) {
|
||||
// if refCount == 0, then the reader has been closed, so we need to discard the searcher
|
||||
|
@ -212,7 +229,7 @@ public class IndexManager implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
|
||||
final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
|
||||
if ( writerCount == null ) {
|
||||
final Directory directory = FSDirectory.open(absoluteFile);
|
||||
logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
|
||||
|
@ -270,21 +287,40 @@ public class IndexManager implements Closeable {
|
|||
lock.lock();
|
||||
try {
|
||||
// check if we already have a reader cached.
|
||||
List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
|
||||
final List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
|
||||
if ( currentlyCached == null ) {
|
||||
logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
|
||||
+ "result in a resource leak", indexDirectory);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if the given searcher is in our list. We use an Iterator to do this so that if we
|
||||
// find it we can call remove() on the iterator if need be.
|
||||
final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator();
|
||||
while (itr.hasNext()) {
|
||||
final ActiveIndexSearcher activeSearcher = itr.next();
|
||||
if ( activeSearcher.getSearcher().equals(searcher) ) {
|
||||
if ( activeSearcher.isCache() ) {
|
||||
// the searcher is cached. Just leave it open.
|
||||
logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
|
||||
return;
|
||||
// if the searcher is poisoned, close it and remove from "pool".
|
||||
if ( activeSearcher.isPoisoned() ) {
|
||||
itr.remove();
|
||||
|
||||
try {
|
||||
logger.debug("Closing Index Searcher for {} because it is poisoned", indexDirectory);
|
||||
activeSearcher.close();
|
||||
} catch (final IOException ioe) {
|
||||
logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
|
||||
if ( logger.isDebugEnabled() ) {
|
||||
logger.warn("", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
} else {
|
||||
// the searcher is cached. Just leave it open.
|
||||
logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// searcher is not cached. It was created from a writer, and we want
|
||||
// the newest updates the next time that we get a searcher, so we will
|
||||
|
@ -405,9 +441,10 @@ public class IndexManager implements Closeable {
|
|||
private final DirectoryReader directoryReader;
|
||||
private final Directory directory;
|
||||
private final boolean cache;
|
||||
private boolean poisoned = false;
|
||||
|
||||
public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader directoryReader,
|
||||
Directory directory, final boolean cache) {
|
||||
public ActiveIndexSearcher(final IndexSearcher searcher, final DirectoryReader directoryReader,
|
||||
final Directory directory, final boolean cache) {
|
||||
this.searcher = searcher;
|
||||
this.directoryReader = directoryReader;
|
||||
this.directory = directory;
|
||||
|
@ -422,6 +459,14 @@ public class IndexManager implements Closeable {
|
|||
return searcher;
|
||||
}
|
||||
|
||||
public boolean isPoisoned() {
|
||||
return poisoned;
|
||||
}
|
||||
|
||||
public void poison() {
|
||||
this.poisoned = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
IndexManager.close(directoryReader, directory);
|
||||
|
|
Loading…
Reference in New Issue