diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 0d7886db26..bdeb2187bf 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -45,6 +45,7 @@ import org.apache.nifi.provenance.lucene.IndexSearch; import org.apache.nifi.provenance.lucene.IndexingAction; import org.apache.nifi.provenance.lucene.LineageQuery; import org.apache.nifi.provenance.lucene.LuceneUtil; +import org.apache.nifi.provenance.lucene.UpdateMinimumEventId; import org.apache.nifi.provenance.search.Query; import org.apache.nifi.provenance.search.QueryResult; import org.apache.nifi.provenance.search.QuerySubmission; @@ -262,7 +263,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS); - expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager)); + expirationActions.add(new UpdateMinimumEventId(indexConfig)); expirationActions.add(new FileRemovalAction()); scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS); @@ -1041,13 +1042,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository try (final RecordReader reader = RecordReaders.newRecordReader(firstLogFile, null, Integer.MAX_VALUE)) { final StandardProvenanceEventRecord event = reader.nextRecord(); earliestEventTime = event.getEventTime(); - - try { - maxEventId = reader.getMaxEventId(); - } catch (final IOException ioe) { - logger.warn("Unable to determine the maximum ID for Provenance Event Log File {}; values reported for the number of " - + "events in the Provenance Repository may be inaccurate.", firstLogFile); - } + maxEventId = reader.getMaxEventId(); + } catch (final IOException ioe) { + logger.warn("Unable to determine the maximum ID for Provenance Event Log File {}; values reported for the number of " + + "events in the Provenance Repository may be inaccurate.", firstLogFile); } // check if we can delete the index safely. diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/UpdateMinimumEventId.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/UpdateMinimumEventId.java new file mode 100644 index 0000000000..042a1fef40 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/UpdateMinimumEventId.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.lucene; + +import java.io.File; +import java.io.IOException; + +import org.apache.nifi.provenance.IndexConfiguration; +import org.apache.nifi.provenance.expiration.ExpirationAction; +import org.apache.nifi.provenance.serialization.RecordReader; +import org.apache.nifi.provenance.serialization.RecordReaders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UpdateMinimumEventId implements ExpirationAction { + private static final Logger logger = LoggerFactory.getLogger(UpdateMinimumEventId.class); + + private final IndexConfiguration indexConfig; + + public UpdateMinimumEventId(final IndexConfiguration indexConfig) { + this.indexConfig = indexConfig; + } + + @Override + public File execute(final File expiredFile) throws IOException { + try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, null, Integer.MAX_VALUE)) { + final long maxEventId = reader.getMaxEventId(); + indexConfig.setMinIdIndexed(maxEventId); + + logger.info("Updated Minimum Event ID for Provenance Event Repository - Minimum Event ID now {}", maxEventId); + } catch (final IOException ioe) { + logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath()); + } + + return expiredFile; + } + + @Override + public boolean hasBeenPerformed(final File expiredFile) throws IOException { + return !expiredFile.exists(); + } +}