NIFI-2778 added abilty to interrupt Lucene search

polishing

This closes #1138
This commit is contained in:
Oleg Zhurakousky 2016-10-14 10:50:29 -04:00 committed by Matt Burgess
parent 6aefc0b910
commit 5fd4a55791
3 changed files with 30 additions and 2 deletions

View File

@ -16,7 +16,10 @@
*/
package org.apache.nifi.provenance;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.provenance.search.Query;
@ -33,6 +36,8 @@ public class AsyncQuerySubmission implements QuerySubmission {
private final StandardQueryResult queryResult;
private final String submitterId;
private final List<Future<?>> queryExecutions = new ArrayList<>();
/**
* Constructs an AsyncQuerySubmission with the given query and the given
* number of steps, indicating how many results must be added to this
@ -65,6 +70,9 @@ public class AsyncQuerySubmission implements QuerySubmission {
@Override
public void cancel() {
this.canceled = true;
for (Future<?> queryExecution : this.queryExecutions) {
queryExecution.cancel(true);
}
queryResult.cancel();
}
@ -82,4 +90,8 @@ public class AsyncQuerySubmission implements QuerySubmission {
public StandardQueryResult getResult() {
return queryResult;
}
public void addQueryExecution(Future<?> execution) {
this.queryExecutions.add(execution);
}
}

View File

@ -2021,7 +2021,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
result.getResult().update(Collections.<ProvenanceEventRecord>emptyList(), 0L);
} else {
for (final File indexDir : indexDirectories) {
queryExecService.submit(new QueryRunnable(query, result, user, indexDir, retrievalCount));
result.addQueryExecution(queryExecService.submit(new QueryRunnable(query, result, user, indexDir, retrievalCount)));
}
}

View File

@ -26,6 +26,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
@ -44,8 +47,21 @@ public class SimpleIndexManager implements IndexManager {
private final ConcurrentMap<Object, List<Closeable>> closeables = new ConcurrentHashMap<>();
private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
private final ExecutorService searchExecutor = Executors.newCachedThreadPool();
@Override
public void close() throws IOException {
logger.debug("Shutting down SimpleIndexManager search executor");
this.searchExecutor.shutdown();
try {
if (!this.searchExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
this.searchExecutor.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
this.searchExecutor.shutdownNow();
}
}
@Override
@ -53,7 +69,7 @@ public class SimpleIndexManager implements IndexManager {
logger.debug("Creating index searcher for {}", indexDir);
final Directory directory = FSDirectory.open(indexDir);
final DirectoryReader directoryReader = DirectoryReader.open(directory);
final IndexSearcher searcher = new IndexSearcher(directoryReader);
final IndexSearcher searcher = new IndexSearcher(directoryReader, this.searchExecutor);
final List<Closeable> closeableList = new ArrayList<>(2);
closeableList.add(directoryReader);