NIFI-13563: Updated Provenance Repository so that instead of returning the single latest event for a component, we return the events from the latest invocation / session. Added system tests to verify the behavior. Also, when replaying latest event, attempt all of those events until one succeeds or all fail

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #9095
This commit is contained in:
Mark Payne 2024-07-19 14:47:18 -04:00 committed by Matt Burgess
parent f50fce5344
commit 6dd83b7034
24 changed files with 509 additions and 83 deletions

View File

@ -26,7 +26,6 @@ import org.apache.nifi.provenance.search.SearchableField;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import static java.util.Collections.EMPTY_SET;
@ -104,8 +103,8 @@ public class NoOpProvenanceRepository implements ProvenanceRepository {
}
@Override
public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String componentId) throws IOException {
return Optional.empty();
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId) {
return List.of();
}
@Override

View File

@ -57,7 +57,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -258,8 +257,8 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository {
}
@Override
public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String componentId) throws IOException {
return eventIndex.getLatestCachedEvent(componentId);
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId) throws IOException {
return eventIndex.getLatestCachedEvents(componentId);
}
@Override

View File

@ -28,8 +28,8 @@ import org.apache.nifi.provenance.store.EventStore;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* An Event Index is responsible for indexing Provenance Events in such a way that the index can be quickly
@ -83,13 +83,13 @@ public interface EventIndex extends Closeable {
QuerySubmission submitQuery(Query query, EventAuthorizer authorizer, String userId);
/**
* Retrieves the most recent Provenance Event that is cached for the given component that is also accessible by the given user
* Retrieves the list of Provenance Events that are cached for the most recent invocation of the given component
* @param componentId the ID of the component
*
* @return an Optional containing the event, or an empty optional if no events are available or none of the available events are accessible by the given user
* @throws IOException if unable to read from the repository
*/
Optional<ProvenanceEventRecord> getLatestCachedEvent(String componentId) throws IOException;
List<ProvenanceEventRecord> getLatestCachedEvents(String componentId) throws IOException;
/**
* Asynchronously computes the lineage for the FlowFile that is identified by the Provenance Event with the given ID.

View File

@ -17,16 +17,17 @@
package org.apache.nifi.provenance.index.lucene;
import java.util.List;
import java.util.Optional;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.serialization.StorageSummary;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public interface CachedQuery {
void update(ProvenanceEventRecord event, StorageSummary storageSummary);
void update(Map<ProvenanceEventRecord, StorageSummary> events);
Optional<List<Long>> evaluate(Query query);

View File

@ -24,30 +24,46 @@ import org.apache.nifi.provenance.search.SearchTerm;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.util.RingBuffer;
import java.util.Collections;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class LatestEventsPerProcessorQuery implements CachedQuery {
private static final String COMPONENT_ID_FIELD_NAME = SearchableFields.ComponentID.getSearchableFieldName();
// Map of component ID to a RingBuffer holding up to the last 1000 events
private final ConcurrentMap<String, RingBuffer<Long>> latestRecords = new ConcurrentHashMap<>();
// Map of component ID to a List of the Event IDs for all events in the latest batch of events that have been indexed for the given component ID
private final ConcurrentMap<String, List<Long>> latestEventSet = new ConcurrentHashMap<>();
@Override
public void update(final ProvenanceEventRecord event, final StorageSummary storageSummary) {
public void update(final Map<ProvenanceEventRecord, StorageSummary> events) {
final Map<String, List<Long>> eventsByComponent = new HashMap<>();
for (final Map.Entry<ProvenanceEventRecord, StorageSummary> entry : events.entrySet()) {
update(entry.getKey(), entry.getValue());
final String componentId = entry.getKey().getComponentId();
final List<Long> eventSet = eventsByComponent.computeIfAbsent(componentId, id -> new ArrayList<>());
eventSet.add(entry.getValue().getEventId());
}
latestEventSet.putAll(eventsByComponent);
}
private void update(final ProvenanceEventRecord event, final StorageSummary storageSummary) {
final String componentId = event.getComponentId();
final RingBuffer<Long> ringBuffer = latestRecords.computeIfAbsent(componentId, id -> new RingBuffer<>(1000));
ringBuffer.add(storageSummary.getEventId());
}
public List<Long> getLatestEventIds(final String componentId) {
final RingBuffer<Long> ringBuffer = latestRecords.get(componentId);
if (ringBuffer == null) {
return Collections.emptyList();
}
return ringBuffer.asList();
final List<Long> eventIds = latestEventSet.get(componentId);
return eventIds == null ? List.of() : eventIds;
}
@Override

View File

@ -17,21 +17,24 @@
package org.apache.nifi.provenance.index.lucene;
import java.util.List;
import java.util.Optional;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.util.RingBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class LatestEventsQuery implements CachedQuery {
final RingBuffer<Long> latestRecords = new RingBuffer<>(1000);
@Override
public void update(final ProvenanceEventRecord event, final StorageSummary storageSummary) {
latestRecords.add(storageSummary.getEventId());
public void update(final Map<ProvenanceEventRecord, StorageSummary> events) {
for (final StorageSummary storageSummary : events.values()) {
latestRecords.add(storageSummary.getEventId());
}
}
@Override

View File

@ -368,15 +368,15 @@ public class LuceneEventIndex implements EventIndex {
File lastIndexDir = null;
long lastEventTime = -2L;
for (final CachedQuery cachedQuery : cachedQueries) {
cachedQuery.update(events);
}
final List<IndexableDocument> indexableDocs = new ArrayList<>(events.size());
for (final Map.Entry<ProvenanceEventRecord, StorageSummary> entry : events.entrySet()) {
final ProvenanceEventRecord event = entry.getKey();
final StorageSummary summary = entry.getValue();
for (final CachedQuery cachedQuery : cachedQueries) {
cachedQuery.update(event, summary);
}
final Document document = eventConverter.convert(event, summary);
if (document == null) {
logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event.getEventId());
@ -424,10 +424,6 @@ public class LuceneEventIndex implements EventIndex {
}
protected void addEvent(final ProvenanceEventRecord event, final StorageSummary location) {
for (final CachedQuery cachedQuery : cachedQueries) {
cachedQuery.update(event, location);
}
final Document document = eventConverter.convert(event, location);
if (document == null) {
logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event.getEventId());
@ -486,6 +482,10 @@ public class LuceneEventIndex implements EventIndex {
for (final Map.Entry<ProvenanceEventRecord, StorageSummary> entry : events.entrySet()) {
addEvent(entry.getKey(), entry.getValue());
}
for (final CachedQuery cachedQuery : cachedQueries) {
cachedQuery.update(events);
}
}
@ -643,22 +643,28 @@ public class LuceneEventIndex implements EventIndex {
}
@Override
public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String componentId) throws IOException {
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId) throws IOException {
final List<Long> eventIds = latestEventsPerProcessorQuery.getLatestEventIds(componentId);
if (eventIds.isEmpty()) {
logger.info("There are no recent Provenance Events cached for Component with ID {}", componentId);
return Optional.empty();
return List.of();
}
final Long latestEventId = eventIds.get(eventIds.size() - 1);
final Optional<ProvenanceEventRecord> latestEvent = eventStore.getEvent(latestEventId);
if (latestEvent.isPresent()) {
logger.info("Returning {} as the most recent Provenance Events cached for Component with ID {}", latestEvent.get(), componentId);
} else {
final List<ProvenanceEventRecord> latestEvents = new ArrayList<>(eventIds.size());
for (final Long eventId : eventIds) {
final Optional<ProvenanceEventRecord> latestEvent = eventStore.getEvent(eventId);
if (latestEvent.isPresent()) {
latestEvents.add(latestEvent.get());
}
}
if (latestEvents.isEmpty()) {
logger.info("There are no recent Provenance Events cached for Component with ID {}", componentId);
} else {
logger.info("Returning {} as the most recent Provenance Events cached for Component with ID {}", latestEvents, componentId);
}
return latestEvent;
return latestEvents;
}
@Override

View File

@ -50,7 +50,6 @@ import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -492,14 +491,15 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
}
@Override
public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String componentId) throws IOException {
final List<ProvenanceEventRecord> matches = ringBuffer.getSelectedElements(event -> componentId.equals(event.getComponentId()));
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId) {
final List<ProvenanceEventRecord> matches = ringBuffer.getSelectedElements(
event -> componentId.equals(event.getComponentId()), 1);
if (matches.isEmpty()) {
return Optional.empty();
return List.of();
}
return Optional.of(matches.get(matches.size() - 1));
return List.of(matches.getLast());
}
@Override

View File

@ -26,7 +26,6 @@ import org.apache.nifi.provenance.search.SearchableField;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
public interface ProvenanceRepository extends ProvenanceEventRepository {
@ -95,12 +94,12 @@ public interface ProvenanceRepository extends ProvenanceEventRepository {
QuerySubmission submitQuery(Query query, NiFiUser user);
/**
* Retrieves the most recent Provenance Event that is cached for the given component that is also accessible by the given user
* Retrieves the Provenance Events that are cached for the most recent invocation of the given component.
* @param componentId the ID of the component
* @return an Optional containing the event, or an empty optional if no events are available or none of the available events are accessible by the given user
* @return the list of events that are cached for the given component
* @throws IOException if unable to read from the repository
*/
Optional<ProvenanceEventRecord> getLatestCachedEvent(String componentId) throws IOException;
List<ProvenanceEventRecord> getLatestCachedEvents(String componentId) throws IOException;
/**
* @param queryIdentifier of the query

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.provenance;
import jakarta.xml.bind.annotation.XmlType;
import java.util.List;
@XmlType(name = "latestProvenanceEvents")
public class LatestProvenanceEventsDTO {
private String componentId;
private List<ProvenanceEventDTO> provenanceEvents;
/**
* @return the ID of the component whose latest events were fetched
*/
public String getComponentId() {
return componentId;
}
public void setComponentId(final String componentId) {
this.componentId = componentId;
}
/**
* @return the latest provenance events that were recorded for the associated component
*/
public List<ProvenanceEventDTO> getProvenanceEvents() {
return provenanceEvents;
}
public void setProvenanceEvents(final List<ProvenanceEventDTO> provenanceEvents) {
this.provenanceEvents = provenanceEvents;
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.entity;
import jakarta.xml.bind.annotation.XmlRootElement;
import org.apache.nifi.web.api.dto.provenance.LatestProvenanceEventsDTO;
@XmlRootElement(name = "latestProvenanceEventsEntity")
public class LatestProvenanceEventsEntity extends Entity {
private LatestProvenanceEventsDTO latestProvenanceEvents;
/**
* @return latest provenance events
*/
public LatestProvenanceEventsDTO getLatestProvenanceEvents() {
return latestProvenanceEvents;
}
public void setLatestProvenanceEvents(LatestProvenanceEventsDTO latestProvenanceEvents) {
this.latestProvenanceEvents = latestProvenanceEvents;
}
}

View File

@ -51,6 +51,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.GroupStatusEndpointMe
import org.apache.nifi.cluster.coordination.http.endpoints.InputPortsEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.LabelEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.LabelsEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.LatestProvenanceEventsMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ListFlowFilesEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.NarDetailsEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.NarSummariesEndpointMerger;
@ -111,7 +112,7 @@ import java.util.stream.Collectors;
public class StandardHttpResponseMapper implements HttpResponseMapper {
private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMapper.class);
private final Logger logger = LoggerFactory.getLogger(StandardHttpResponseMapper.class);
private final List<EndpointResponseMerger> endpointMergers = new ArrayList<>();
@ -145,6 +146,7 @@ public class StandardHttpResponseMapper implements HttpResponseMapper {
endpointMergers.add(new FlowSnippetEndpointMerger());
endpointMergers.add(new ProvenanceQueryEndpointMerger());
endpointMergers.add(new ProvenanceEventEndpointMerger());
endpointMergers.add(new LatestProvenanceEventsMerger());
endpointMergers.add(new ControllerServiceEndpointMerger());
endpointMergers.add(new ControllerServicesEndpointMerger());
endpointMergers.add(new ControllerServiceReferenceEndpointMerger());

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.coordination.http.endpoints;
import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.web.api.dto.provenance.LatestProvenanceEventsDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.entity.LatestProvenanceEventsEntity;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
public class LatestProvenanceEventsMerger implements EndpointResponseMerger {
public static final Pattern LATEST_EVENTS_URI = Pattern.compile("/nifi-api/provenance-events/latest/[a-f0-9\\-]{36}");
@Override
public boolean canHandle(final URI uri, final String method) {
if ("GET".equalsIgnoreCase(method) && LATEST_EVENTS_URI.matcher(uri.getPath()).matches()) {
return true;
}
return false;
}
@Override
public NodeResponse merge(final URI uri, final String method, final Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses, final NodeResponse clientResponse) {
if (!canHandle(uri, method)) {
throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method);
}
final LatestProvenanceEventsEntity responseEntity = clientResponse.getClientResponse().readEntity(LatestProvenanceEventsEntity.class);
final LatestProvenanceEventsDTO dto = responseEntity.getLatestProvenanceEvents();
final List<ProvenanceEventDTO> mergedEvents = new ArrayList<>();
for (final NodeResponse nodeResponse : successfulResponses) {
final NodeIdentifier nodeId = nodeResponse.getNodeId();
final LatestProvenanceEventsEntity nodeResponseEntity = nodeResponse.getClientResponse().readEntity(LatestProvenanceEventsEntity.class);
final List<ProvenanceEventDTO> nodeEvents = nodeResponseEntity.getLatestProvenanceEvents().getProvenanceEvents();
// if the cluster node id or node address is not set, then we need to populate them. If they
// are already set, we don't want to populate them because it will be the case that they were populated
// by the Cluster Coordinator when it federated the request, and we are now just receiving the response
// from the Cluster Coordinator.
for (final ProvenanceEventDTO eventDto : nodeEvents) {
if (eventDto.getClusterNodeId() == null || eventDto.getClusterNodeAddress() == null) {
eventDto.setClusterNodeId(nodeId.getId());
eventDto.setClusterNodeAddress(nodeId.getApiAddress() + ":" + nodeId.getApiPort());
}
}
mergedEvents.addAll(nodeEvents);
}
dto.setProvenanceEvents(mergedEvents);
return new NodeResponse(clientResponse, responseEntity);
}
}

View File

@ -29,7 +29,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@ -92,8 +91,8 @@ public class MockProvenanceRepository implements ProvenanceRepository {
}
@Override
public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String componentId) throws IOException {
return Optional.empty();
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId) {
return List.of();
}
@Override

View File

@ -114,6 +114,7 @@ import org.apache.nifi.web.api.entity.FlowRegistryBucketEntity;
import org.apache.nifi.web.api.entity.FlowRegistryClientEntity;
import org.apache.nifi.web.api.entity.FunnelEntity;
import org.apache.nifi.web.api.entity.LabelEntity;
import org.apache.nifi.web.api.entity.LatestProvenanceEventsEntity;
import org.apache.nifi.web.api.entity.NarDetailsEntity;
import org.apache.nifi.web.api.entity.NarSummaryEntity;
import org.apache.nifi.web.api.entity.ParameterContextEntity;
@ -322,6 +323,13 @@ public interface NiFiServiceFacade {
*/
ProvenanceEventDTO getProvenanceEvent(Long id);
/**
* Gets the latest provenance events for the specified component.
* @param componentId the ID of the components to retrieve the latest events for
* @return the latest provenance events
*/
LatestProvenanceEventsEntity getLatestProvenanceEvents(String componentId);
/**
* Gets the configuration for this controller.
*

View File

@ -280,6 +280,7 @@ import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsDTO;
import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.provenance.LatestProvenanceEventsDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO;
@ -325,6 +326,7 @@ import org.apache.nifi.web.api.entity.FlowRegistryBucketEntity;
import org.apache.nifi.web.api.entity.FlowRegistryClientEntity;
import org.apache.nifi.web.api.entity.FunnelEntity;
import org.apache.nifi.web.api.entity.LabelEntity;
import org.apache.nifi.web.api.entity.LatestProvenanceEventsEntity;
import org.apache.nifi.web.api.entity.NarDetailsEntity;
import org.apache.nifi.web.api.entity.NarSummaryEntity;
import org.apache.nifi.web.api.entity.ParameterContextEntity;
@ -3656,6 +3658,15 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return controllerFacade.getProvenanceEvent(id);
}
@Override
public LatestProvenanceEventsEntity getLatestProvenanceEvents(final String componentId) {
final LatestProvenanceEventsDTO dto = controllerFacade.getLatestProvenanceEvents(componentId);
final LatestProvenanceEventsEntity entity = new LatestProvenanceEventsEntity();
entity.setLatestProvenanceEvents(dto);
return entity;
}
@Override
public ProcessGroupStatusEntity getProcessGroupStatus(final String groupId, final boolean recursive) {
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);

View File

@ -16,12 +16,6 @@
*/
package org.apache.nifi.web.api;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Collections;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
@ -57,6 +51,7 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.web.DownloadableContent;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.entity.LatestProvenanceEventsEntity;
import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
import org.apache.nifi.web.api.entity.ReplayLastEventRequestEntity;
import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity;
@ -67,6 +62,12 @@ import org.apache.nifi.web.util.ResponseBuilderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Collections;
/**
* RESTful endpoint for querying data provenance.
*/
@ -496,6 +497,45 @@ public class ProvenanceEventResource extends ApplicationResource {
return generateCreatedResponse(uri, entity).build();
}
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("latest/{componentId}")
@Operation(
summary = "Retrieves the latest cached Provenance Events for the specified component",
responses = @ApiResponse(content = @Content(schema = @Schema(implementation = LatestProvenanceEventsEntity.class))),
security = {
@SecurityRequirement(name = "Read Component Provenance Data - /provenance-data/{component-type}/{uuid}"),
@SecurityRequirement(name = "Read Component Data - /data/{component-type}/{uuid}")
}
)
@ApiResponses(
value = {
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.")
}
)
public Response getLatestProvenanceEvents(
@Parameter(
description = "The ID of the component to retrieve the latest Provenance Events for.",
required = true
)
@PathParam("componentId") final String componentId
) {
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// get the latest provenance events
final LatestProvenanceEventsEntity entity = serviceFacade.getLatestProvenanceEvents(componentId);
// generate the response
return generateOkResponse(entity).build();
}
// setters
public void setServiceFacade(NiFiServiceFacade serviceFacade) {

View File

@ -16,10 +16,6 @@
*/
package org.apache.nifi.web.api;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
@ -55,6 +51,10 @@ import org.apache.nifi.web.api.entity.LineageEntity;
import org.apache.nifi.web.api.entity.ProvenanceEntity;
import org.apache.nifi.web.api.entity.ProvenanceOptionsEntity;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
/**
* RESTful endpoint for querying data provenance.

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.web.controller;
import jakarta.ws.rs.WebApplicationException;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
@ -66,8 +67,8 @@ import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.diagnostics.StorageUsage;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.flowanalysis.FlowAnalysisRule;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flowanalysis.FlowAnalysisRule;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
@ -107,6 +108,7 @@ import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO;
import org.apache.nifi.web.api.dto.provenance.AttributeDTO;
import org.apache.nifi.web.api.dto.provenance.LatestProvenanceEventsDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO;
@ -126,7 +128,6 @@ import org.apache.nifi.web.search.query.SearchQueryParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import jakarta.ws.rs.WebApplicationException;
import java.io.IOException;
import java.io.InputStream;
import java.text.Collator;
@ -137,10 +138,10 @@ import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TimeZone;
@ -1414,20 +1415,37 @@ public class ControllerFacade implements Authorizable {
}
// lookup the original event
final Optional<ProvenanceEventRecord> optionalEvent = flowController.getProvenanceRepository().getLatestCachedEvent(componentId);
if (!optionalEvent.isPresent()) {
final List<ProvenanceEventRecord> latestEvents = flowController.getProvenanceRepository().getLatestCachedEvents(componentId);
if (latestEvents.isEmpty()) {
return null;
}
// Authorize the replay
final ProvenanceEventRecord event = optionalEvent.get();
authorizeReplay(event);
final Iterator<ProvenanceEventRecord> itr = latestEvents.iterator();
while (itr.hasNext()) {
final ProvenanceEventRecord event = itr.next();
// Replay the FlowFile
flowController.replayFlowFile(event, user);
try {
// Authorize the replay
authorizeReplay(event);
// convert the event record
return createProvenanceEventDto(event, false);
// Replay the FlowFile
flowController.replayFlowFile(event, user);
// convert the event record
return createProvenanceEventDto(event, false);
} catch (final IOException e) {
throw e;
} catch (final Exception e) {
if (itr.hasNext()) {
logger.debug("Failed to replay Provenance Event {} but will continue to try remaining events", event, e);
} else {
throw e;
}
}
}
// Won't happen, because we will have either thrown an Exception or returned the result of createProvenanceEventDto, but necessary for compiler
return null;
} catch (final IOException ioe) {
throw new NiFiCoreException("An error occurred while getting the specified event.", ioe);
}
@ -1519,6 +1537,30 @@ public class ControllerFacade implements Authorizable {
}
}
public LatestProvenanceEventsDTO getLatestProvenanceEvents(final String componentId) {
final Authorizable authorizable = flowController.getProvenanceAuthorizableFactory().createProvenanceDataAuthorizable(componentId);
final Authorizer authorizer = flowController.getAuthorizer();
if (!authorizable.isAuthorized(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser())) {
throw new AccessDeniedException("User does not have permission to view the latest events for the specified component.");
}
try {
final List<ProvenanceEventRecord> events = flowController.getProvenanceRepository().getLatestCachedEvents(componentId);
final List<ProvenanceEventDTO> eventDtos = new ArrayList<>();
for (final ProvenanceEventRecord event : events) {
eventDtos.add(createProvenanceEventDto(event, false));
}
final LatestProvenanceEventsDTO dto = new LatestProvenanceEventsDTO();
dto.setComponentId(componentId);
dto.setProvenanceEvents(eventDtos);
return dto;
} catch (final IOException ioe) {
throw new NiFiCoreException("An error occurred while getting the latest events for the specified component.", ioe);
}
}
/**
* Creates a ProvenanceEventDTO for the specified ProvenanceEventRecord. This should only be invoked once the
* current user has been authorized for access to this provenance event.

View File

@ -38,7 +38,6 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@ -154,8 +153,8 @@ public class StatelessProvenanceRepository implements ProvenanceRepository {
}
@Override
public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String componentId) throws IOException {
return Optional.empty();
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId) {
return List.of();
}
@Override

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.provenance;
import org.apache.nifi.tests.system.NiFiInstanceFactory;
public class ClusteredGetLatestProvenanceEventsIT extends GetLatestProvenanceEventsIT {
@Override
public NiFiInstanceFactory getInstanceFactory() {
return createTwoNodeInstanceFactory();
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.provenance;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.entity.LatestProvenanceEventsEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class GetLatestProvenanceEventsIT extends NiFiSystemIT {
@Test
public void testSingleEvent() throws NiFiClientException, IOException, InterruptedException {
runTest(false);
}
@Test
public void testMultipleEvents() throws NiFiClientException, IOException, InterruptedException {
runTest(true);
}
private void runTest(final boolean autoTerminateReverse) throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
final ProcessorEntity reverse = getClientUtil().createProcessor("ReverseContents");
if (autoTerminateReverse) {
getClientUtil().setAutoTerminatedRelationships(reverse, Set.of("success", "failure"));
} else {
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
getClientUtil().createConnection(reverse, terminate, "success");
getClientUtil().setAutoTerminatedRelationships(reverse, "failure");
}
getClientUtil().createConnection(generate, reverse, "success");
getClientUtil().updateProcessorProperties(generate, Map.of("Text", "Hello, World!"));
getClientUtil().startProcessor(generate);
getClientUtil().startProcessor(reverse);
final int expectedEventCount = getNumberOfNodes() * (autoTerminateReverse ? 2 : 1);
waitFor(() -> {
final LatestProvenanceEventsEntity entity = getNifiClient().getProvenanceClient().getLatestEvents(reverse.getId());
final List<ProvenanceEventDTO> events = entity.getLatestProvenanceEvents().getProvenanceEvents();
return events.size() == expectedEventCount;
});
final LatestProvenanceEventsEntity entity = getNifiClient().getProvenanceClient().getLatestEvents(reverse.getId());
final List<ProvenanceEventDTO> events = entity.getLatestProvenanceEvents().getProvenanceEvents();
final Map<String, Integer> countsByEventType = new HashMap<>();
for (final ProvenanceEventDTO event : events) {
assertEquals(reverse.getId(), event.getComponentId());
final String eventType = event.getEventType();
countsByEventType.put(eventType, countsByEventType.getOrDefault(eventType, 0) + 1);
if (getNumberOfNodes() > 1) {
assertNotNull(event.getClusterNodeId());
}
}
if (autoTerminateReverse) {
assertEquals(getNumberOfNodes(), countsByEventType.get("DROP").intValue());
}
assertEquals(getNumberOfNodes(), countsByEventType.get("CONTENT_MODIFIED").intValue());
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.toolkit.cli.impl.client.nifi;
import org.apache.nifi.web.api.entity.LatestProvenanceEventsEntity;
import org.apache.nifi.web.api.entity.LineageEntity;
import org.apache.nifi.web.api.entity.ProvenanceEntity;
import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity;
@ -37,6 +38,8 @@ public interface ProvenanceClient {
ReplayLastEventResponseEntity replayLastEvent(String processorId, ReplayEventNodes replayEventNodes) throws NiFiClientException, IOException;
LatestProvenanceEventsEntity getLatestEvents(String processorId) throws NiFiClientException, IOException;
enum ReplayEventNodes {
PRIMARY,
ALL;

View File

@ -16,18 +16,19 @@
*/
package org.apache.nifi.toolkit.cli.impl.client.nifi.impl;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.core.MediaType;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
import org.apache.nifi.web.api.entity.LatestProvenanceEventsEntity;
import org.apache.nifi.web.api.entity.LineageEntity;
import org.apache.nifi.web.api.entity.ProvenanceEntity;
import org.apache.nifi.web.api.entity.ReplayLastEventRequestEntity;
import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.Objects;
@ -138,4 +139,16 @@ public class JerseyProvenanceClient extends AbstractJerseyClient implements Prov
ReplayLastEventResponseEntity.class);
});
}
@Override
public LatestProvenanceEventsEntity getLatestEvents(final String processorId) throws NiFiClientException, IOException {
if (StringUtils.isBlank(processorId)) {
throw new IllegalArgumentException("Processor ID must be specified");
}
return executeAction("Error getting latest events for Processor " + processorId, () -> {
final WebTarget target = provenanceEventsTarget.path("/latest/").path(processorId);
return getRequestBuilder(target).get(LatestProvenanceEventsEntity.class);
});
}
}