NIFI-3905: This closes #1805. When a Provenance Query is submitted to WriteAheadProvenanceRepository, purge any obsolete queries from the internal state before rejecting the query due to 'too many outstanding queries'

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2017-05-16 10:06:51 -04:00 committed by joewitt
parent 58cf15a912
commit 4fdea680ec
1 changed files with 28 additions and 28 deletions

View File

@ -154,7 +154,7 @@ public class LuceneEventIndex implements EventIndex {
maintenanceExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Provenance Repository Maintenance")); maintenanceExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Provenance Repository Maintenance"));
maintenanceExecutor.scheduleWithFixedDelay(() -> performMaintenance(), 1, 1, TimeUnit.MINUTES); maintenanceExecutor.scheduleWithFixedDelay(() -> performMaintenance(), 1, 1, TimeUnit.MINUTES);
maintenanceExecutor.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30, 30, TimeUnit.SECONDS); maintenanceExecutor.scheduleWithFixedDelay(this::purgeObsoleteQueries, 30, 30, TimeUnit.SECONDS);
cachedQueries.add(new LatestEventsQuery()); cachedQueries.add(new LatestEventsQuery());
cachedQueries.add(new LatestEventsPerProcessorQuery()); cachedQueries.add(new LatestEventsPerProcessorQuery());
@ -633,8 +633,11 @@ public class LuceneEventIndex implements EventIndex {
private void validate(final Query query) { private void validate(final Query query) {
final int numQueries = querySubmissionMap.size(); final int numQueries = querySubmissionMap.size();
if (numQueries > MAX_UNDELETED_QUERY_RESULTS) { if (numQueries > MAX_UNDELETED_QUERY_RESULTS) {
throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not " purgeObsoleteQueries();
+ "been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later."); if (querySubmissionMap.size() > MAX_UNDELETED_QUERY_RESULTS) {
throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not "
+ "been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later.");
}
} }
if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) { if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) {
@ -702,35 +705,32 @@ public class LuceneEventIndex implements EventIndex {
return removed; return removed;
} }
private class RemoveExpiredQueryResults implements Runnable { private void purgeObsoleteQueries() {
@Override try {
public void run() { final Date now = new Date();
try {
final Date now = new Date();
final Iterator<Map.Entry<String, AsyncQuerySubmission>> queryIterator = querySubmissionMap.entrySet().iterator(); final Iterator<Map.Entry<String, AsyncQuerySubmission>> queryIterator = querySubmissionMap.entrySet().iterator();
while (queryIterator.hasNext()) { while (queryIterator.hasNext()) {
final Map.Entry<String, AsyncQuerySubmission> entry = queryIterator.next(); final Map.Entry<String, AsyncQuerySubmission> entry = queryIterator.next();
final StandardQueryResult result = entry.getValue().getResult(); final StandardQueryResult result = entry.getValue().getResult();
if (entry.getValue().isCanceled() || result.isFinished() && result.getExpiration().before(now)) { if (entry.getValue().isCanceled() || result.isFinished() && result.getExpiration().before(now)) {
queryIterator.remove(); queryIterator.remove();
}
} }
final Iterator<Map.Entry<String, AsyncLineageSubmission>> lineageIterator = lineageSubmissionMap.entrySet().iterator();
while (lineageIterator.hasNext()) {
final Map.Entry<String, AsyncLineageSubmission> entry = lineageIterator.next();
final StandardLineageResult result = entry.getValue().getResult();
if (entry.getValue().isCanceled() || result.isFinished() && result.getExpiration().before(now)) {
lineageIterator.remove();
}
}
} catch (final Exception e) {
logger.error("Failed to expire Provenance Query Results due to {}", e.toString());
logger.error("", e);
} }
final Iterator<Map.Entry<String, AsyncLineageSubmission>> lineageIterator = lineageSubmissionMap.entrySet().iterator();
while (lineageIterator.hasNext()) {
final Map.Entry<String, AsyncLineageSubmission> entry = lineageIterator.next();
final StandardLineageResult result = entry.getValue().getResult();
if (entry.getValue().isCanceled() || result.isFinished() && result.getExpiration().before(now)) {
lineageIterator.remove();
}
}
} catch (final Exception e) {
logger.error("Failed to expire Provenance Query Results due to {}", e.toString());
logger.error("", e);
} }
} }
} }