diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java index f4b47d3dce..4a38071cb8 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java @@ -154,7 +154,7 @@ public class LuceneEventIndex implements EventIndex { maintenanceExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Provenance Repository Maintenance")); maintenanceExecutor.scheduleWithFixedDelay(() -> performMaintenance(), 1, 1, TimeUnit.MINUTES); - maintenanceExecutor.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30, 30, TimeUnit.SECONDS); + maintenanceExecutor.scheduleWithFixedDelay(this::purgeObsoleteQueries, 30, 30, TimeUnit.SECONDS); cachedQueries.add(new LatestEventsQuery()); cachedQueries.add(new LatestEventsPerProcessorQuery()); @@ -633,8 +633,11 @@ public class LuceneEventIndex implements EventIndex { private void validate(final Query query) { final int numQueries = querySubmissionMap.size(); if (numQueries > MAX_UNDELETED_QUERY_RESULTS) { - throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not " - + "been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later."); + purgeObsoleteQueries(); + if (querySubmissionMap.size() > MAX_UNDELETED_QUERY_RESULTS) { + throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not " + + "been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later."); + } } if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) { @@ -702,35 +705,32 @@ public class LuceneEventIndex implements EventIndex { return removed; } - private class RemoveExpiredQueryResults implements Runnable { - @Override - public void run() { - try { - final Date now = new Date(); + private void purgeObsoleteQueries() { + try { + final Date now = new Date(); - final Iterator> queryIterator = querySubmissionMap.entrySet().iterator(); - while (queryIterator.hasNext()) { - final Map.Entry entry = queryIterator.next(); + final Iterator> queryIterator = querySubmissionMap.entrySet().iterator(); + while (queryIterator.hasNext()) { + final Map.Entry entry = queryIterator.next(); - final StandardQueryResult result = entry.getValue().getResult(); - if (entry.getValue().isCanceled() || result.isFinished() && result.getExpiration().before(now)) { - queryIterator.remove(); - } + final StandardQueryResult result = entry.getValue().getResult(); + if (entry.getValue().isCanceled() || result.isFinished() && result.getExpiration().before(now)) { + queryIterator.remove(); } - - final Iterator> lineageIterator = lineageSubmissionMap.entrySet().iterator(); - while (lineageIterator.hasNext()) { - final Map.Entry entry = lineageIterator.next(); - - final StandardLineageResult result = entry.getValue().getResult(); - if (entry.getValue().isCanceled() || result.isFinished() && result.getExpiration().before(now)) { - lineageIterator.remove(); - } - } - } catch (final Exception e) { - logger.error("Failed to expire Provenance Query Results due to {}", e.toString()); - logger.error("", e); } + + final Iterator> lineageIterator = lineageSubmissionMap.entrySet().iterator(); + while (lineageIterator.hasNext()) { + final Map.Entry entry = lineageIterator.next(); + + final StandardLineageResult result = entry.getValue().getResult(); + if (entry.getValue().isCanceled() || result.isFinished() && result.getExpiration().before(now)) { + lineageIterator.remove(); + } + } + } catch (final Exception e) { + logger.error("Failed to expire Provenance Query Results due to {}", e.toString()); + logger.error("", e); } } }