From 5fd4a55791da27fdba577636ac985a294618328a Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Fri, 14 Oct 2016 10:50:29 -0400 Subject: [PATCH] NIFI-2778 added abilty to interrupt Lucene search polishing This closes #1138 --- .../nifi/provenance/AsyncQuerySubmission.java | 12 ++++++++++++ .../PersistentProvenanceRepository.java | 2 +- .../provenance/lucene/SimpleIndexManager.java | 18 +++++++++++++++++- 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java index cd2ab39ac8..66858b48a1 100644 --- a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java @@ -16,7 +16,10 @@ */ package org.apache.nifi.provenance; +import java.util.ArrayList; import java.util.Date; +import java.util.List; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.nifi.provenance.search.Query; @@ -33,6 +36,8 @@ public class AsyncQuerySubmission implements QuerySubmission { private final StandardQueryResult queryResult; private final String submitterId; + private final List> queryExecutions = new ArrayList<>(); + /** * Constructs an AsyncQuerySubmission with the given query and the given * number of steps, indicating how many results must be added to this @@ -65,6 +70,9 @@ public class AsyncQuerySubmission implements QuerySubmission { @Override public void cancel() { this.canceled = true; + for (Future queryExecution : this.queryExecutions) { + queryExecution.cancel(true); + } queryResult.cancel(); } @@ -82,4 +90,8 @@ public class AsyncQuerySubmission implements QuerySubmission { public StandardQueryResult getResult() { return queryResult; } + + public void addQueryExecution(Future execution) { + this.queryExecutions.add(execution); + } } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 07887160fc..f70bf7dfb4 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -2021,7 +2021,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { result.getResult().update(Collections.emptyList(), 0L); } else { for (final File indexDir : indexDirectories) { - queryExecService.submit(new QueryRunnable(query, result, user, indexDir, retrievalCount)); + result.addQueryExecution(queryExecService.submit(new QueryRunnable(query, result, user, indexDir, retrievalCount))); } } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java index daf6413f21..9e3bacd1ac 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java @@ -26,6 +26,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.standard.StandardAnalyzer; @@ -44,8 +47,21 @@ public class SimpleIndexManager implements IndexManager { private final ConcurrentMap> closeables = new ConcurrentHashMap<>(); private final Map writerCounts = new HashMap<>(); + private final ExecutorService searchExecutor = Executors.newCachedThreadPool(); + + @Override public void close() throws IOException { + logger.debug("Shutting down SimpleIndexManager search executor"); + this.searchExecutor.shutdown(); + try { + if (!this.searchExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + this.searchExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + this.searchExecutor.shutdownNow(); + } } @Override @@ -53,7 +69,7 @@ public class SimpleIndexManager implements IndexManager { logger.debug("Creating index searcher for {}", indexDir); final Directory directory = FSDirectory.open(indexDir); final DirectoryReader directoryReader = DirectoryReader.open(directory); - final IndexSearcher searcher = new IndexSearcher(directoryReader); + final IndexSearcher searcher = new IndexSearcher(directoryReader, this.searchExecutor); final List closeableList = new ArrayList<>(2); closeableList.add(directoryReader);