NIFI-7339: Fixed bug that caused Write Ahead Provenance Repository not to rollover event files after specified time. Code cleanup. Updated some default properties.

This commit is contained in:
Mark Payne 2020-04-08 12:37:37 -04:00 committed by Bryan Bende
parent 1ec7e31f11
commit 728bdec0f4
4 changed files with 111 additions and 25 deletions

View File

@ -109,9 +109,9 @@
<nifi.provenance.repository.encryption.key.id />
<nifi.provenance.repository.encryption.key />
<nifi.provenance.repository.directory.default>./provenance_repository</nifi.provenance.repository.directory.default>
<nifi.provenance.repository.max.storage.time>24 hours</nifi.provenance.repository.max.storage.time>
<nifi.provenance.repository.max.storage.size>1 GB</nifi.provenance.repository.max.storage.size>
<nifi.provenance.repository.rollover.time>30 secs</nifi.provenance.repository.rollover.time>
<nifi.provenance.repository.max.storage.time>30 days</nifi.provenance.repository.max.storage.time>
<nifi.provenance.repository.max.storage.size>10 GB</nifi.provenance.repository.max.storage.size>
<nifi.provenance.repository.rollover.time>10 mins</nifi.provenance.repository.rollover.time>
<nifi.provenance.repository.rollover.size>100 MB</nifi.provenance.repository.rollover.size>
<nifi.provenance.repository.query.threads>2</nifi.provenance.repository.query.threads>
<nifi.provenance.repository.index.threads>2</nifi.provenance.repository.index.threads>

View File

@ -21,20 +21,30 @@ import org.apache.nifi.provenance.serialization.RecordWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class RecordWriterLease {
private final Logger logger = LoggerFactory.getLogger(RecordWriterLease.class);
private final RecordWriter writer;
private final long maxBytes;
private final int maxEvents;
private final long maxSystemTime;
private long usageCounter;
private boolean markedRollable = false;
private RolloverState rolloverState = RolloverState.SHOULD_NOT_ROLLOVER;
private boolean closed = false;
public RecordWriterLease(final RecordWriter writer, final long maxBytes, final int maxEvents) {
public RecordWriterLease(final RecordWriter writer, final long maxBytes, final int maxEvents, final long maxMillis) {
this.writer = writer;
this.maxBytes = maxBytes;
this.maxEvents = maxEvents;
// The max timestamp that we want to write to this lease is X number of milliseconds into the future.
// We don't want X to be more than the given max millis. However, we also don't want to allow it to get too large. If it
// becomes >= Integer.MAX_VALUE, we could have some timestamp offsets that rollover into the negative range.
// To avoid that, we could use a value that is no more than Integer.MAX_VALUE. But since the event may be persisted
// a bit after the lease has been obtained, we subtract 1 hour from that time to give ourselves a little buffer room.
this.maxSystemTime = System.currentTimeMillis() + Math.min(maxMillis, Integer.MAX_VALUE - TimeUnit.HOURS.toMillis(1));
}
public RecordWriter getWriter() {
@ -42,7 +52,14 @@ public class RecordWriterLease {
}
public synchronized boolean tryClaim() {
if (markedRollable || writer.isClosed() || writer.isDirty() || writer.getBytesWritten() >= maxBytes || writer.getRecordsWritten() >= maxEvents) {
if (rolloverState.isRollover()) {
return false;
}
// The previous state did not indicate that we should rollover. We need to check the current state also.
// It is important that we do not update the rolloverState here because we can do that only if the usageCounter indicates
// that the writer is no longer in use. This is handled in the getRolloverState() method.
if (determineRolloverReason().isRollover()) {
return false;
}
@ -62,17 +79,37 @@ public class RecordWriterLease {
}
}
public synchronized boolean shouldRoll() {
if (markedRollable) {
return true;
private synchronized RolloverState determineRolloverReason() {
if (writer.isClosed()) {
return RolloverState.WRITER_ALREADY_CLOSED;
}
if (writer.isDirty()) {
return RolloverState.WRITER_IS_DIRTY;
}
if (writer.getBytesWritten() >= maxBytes) {
return RolloverState.MAX_BYTES_REACHED;
}
if (writer.getRecordsWritten() >= maxEvents) {
return RolloverState.MAX_EVENTS_REACHED;
}
if (System.currentTimeMillis() >= maxSystemTime) {
return RolloverState.MAX_TIME_REACHED;
}
if (usageCounter < 1 && (writer.isClosed() || writer.isDirty() || writer.getBytesWritten() >= maxBytes || writer.getRecordsWritten() >= maxEvents)) {
markedRollable = true;
return true;
return RolloverState.SHOULD_NOT_ROLLOVER;
}
public synchronized RolloverState getRolloverState() {
if (rolloverState.isRollover()) {
return rolloverState;
}
return false;
if (usageCounter < 1) {
rolloverState = determineRolloverReason();
return rolloverState;
}
return RolloverState.SHOULD_NOT_ROLLOVER;
}
public synchronized void close() {

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.store;
public enum RolloverState {
WRITER_ALREADY_CLOSED,
WRITER_IS_DIRTY,
MAX_BYTES_REACHED,
MAX_EVENTS_REACHED,
MAX_TIME_REACHED,
SHOULD_NOT_ROLLOVER(false);
private final boolean rollover;
public boolean isRollover() {
return rollover;
}
RolloverState() {
this(true);
}
RolloverState(final boolean rollover) {
this.rollover = rollover;
}
}

View File

@ -175,18 +175,20 @@ public class WriteAheadStorePartition implements EventStorePartition {
}
// Claim a Record Writer Lease so that we have a writer to persist the events to
boolean claimed = false;
RecordWriterLease lease = null;
while (!claimed) {
while (true) {
lease = getLease();
claimed = lease.tryClaim();
if (claimed) {
if (lease.tryClaim()) {
break;
}
if (lease.shouldRoll()) {
tryRollover(lease);
final RolloverState rolloverState = lease.getRolloverState();
if (rolloverState.isRollover()) {
final boolean success = tryRollover(lease);
if (success) {
logger.info("Successfully rolled over Event Writer for {} due to {}", this, rolloverState);
}
}
}
@ -202,10 +204,11 @@ public class WriteAheadStorePartition implements EventStorePartition {
// Roll over the writer if necessary
Integer eventsRolledOver = null;
final boolean shouldRoll = lease.shouldRoll();
final RolloverState rolloverState = lease.getRolloverState();
try {
if (shouldRoll && tryRollover(lease)) {
if (rolloverState.isRollover() && tryRollover(lease)) {
eventsRolledOver = writer.getRecordsWritten();
logger.info("Successfully rolled over Event Writer for {} after writing {} events due to {}", this, eventsRolledOver, rolloverState);
}
} catch (final IOException ioe) {
logger.error("Updated {} but failed to rollover to a new Event File", this, ioe);
@ -258,7 +261,7 @@ public class WriteAheadStorePartition implements EventStorePartition {
final RecordWriter updatedWriter = recordWriterFactory.createWriter(updatedEventFile, idGenerator, false, true);
updatedWriter.writeHeader(nextEventId);
final RecordWriterLease updatedLease = new RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(), config.getMaxEventFileCount());
final RecordWriterLease updatedLease = new RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(), config.getMaxEventFileCount(), config.getMaxEventFileLife(TimeUnit.MILLISECONDS));
final boolean updated = eventWriterLeaseRef.compareAndSet(lease, updatedLease);
if (!updated) {
@ -319,7 +322,7 @@ public class WriteAheadStorePartition implements EventStorePartition {
// Update max event id to be equal to be the greater of the current value or the
// max value just written.
final long maxIdWritten = maxId;
this.maxEventId.getAndUpdate(cur -> maxIdWritten > cur ? maxIdWritten : cur);
this.maxEventId.getAndUpdate(cur -> Math.max(maxIdWritten, cur));
if (config.isAlwaysSync()) {
writer.sync();
@ -542,7 +545,7 @@ public class WriteAheadStorePartition implements EventStorePartition {
final long eventsToReindex = maxEventId - minEventIdToReindex;
logger.info("The last Provenance Event indexed for partition {} is {}, but the last event written to partition has ID {}. "
+ "Re-indexing up to the last {} events to ensure that the Event Index is accurate and up-to-date",
+ "Re-indexing up to the last {} events for {} to ensure that the Event Index is accurate and up-to-date",
partitionName, minEventIdToReindex, maxEventId, eventsToReindex, partitionDirectory);
// Find the first event file that we care about.