NIFI-10429: Added the ability to Replay latest provenance event for a given Processor.

This closes #6359

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2022-08-31 12:05:45 -04:00 committed by exceptionfactory
parent 96c5bc80c5
commit a0c705715b
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
31 changed files with 901 additions and 48 deletions

View File

@ -26,6 +26,7 @@ import org.apache.nifi.provenance.search.SearchableField;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import static java.util.Collections.EMPTY_SET;
@ -102,6 +103,11 @@ public class NoOpProvenanceRepository implements ProvenanceRepository {
return null;
}
@Override
public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String componentId) throws IOException {
return Optional.empty();
}
@Override
public QuerySubmission retrieveQuerySubmission(String queryIdentifier, NiFiUser niFiUser) {
return null;

View File

@ -304,6 +304,7 @@ NOTE: For Processors, Ports, Remote Process Groups, Connections and Labels, it i
- *Run Once*: This option allows the user to run a selected Processor exactly once. If the Processor is prevented from executing (e.g., there are no incoming FlowFiles or the outgoing connection has back pressure applied) the Processor won't get triggered. *Execution* settings apply (i.e., *Primary Node* and *All Nodes* settings will result in running the Processor only once on the Primary Node or one time on each of the nodes, respectively). Works only with *Timer driven* and *CRON driven* scheduling strategies.
- *Enable* or *Disable*: This option allows the user to enable or disable a Processor; the option will be either Enable or Disable, depending on the current state of the Processor.
- *View data provenance*: This option displays the NiFi Data Provenance table, with information about data provenance events for the FlowFiles routed through that Processor (see <<data_provenance>>).
- *Replay last event*: This option will replay the last Provenance event, effectively requeuing the last FlowFile that was processed by the Processor (see <<replay_flowfile>>).
- *View status history*: This option opens a graphical representation of the Processor's statistical information over time.
- *View usage*: This option takes the user to the Processor's usage documentation.
- *View connections->Upstream*: This option allows the user to see and "jump to" upstream connections that are coming into the Processor. This is particularly useful when processors connect into and out of other Process Groups.
@ -2834,14 +2835,22 @@ a result of the processing event, the user may select the checkbox next to "Only
image:event-attributes.png["Event Attributes", width=700]
[[replay_flowfile]]
=== Replaying a FlowFile
A DFM may need to inspect a FlowFile's content at some point in the dataflow to ensure that it is being processed as expected. And if it
is not being processed properly, the DFM may need to make adjustments to the dataflow and replay the FlowFile again. The Content tab of the View Details dialog window is where the DFM can do these things. The Content tab shows information about the FlowFile's content, such as its location in the Content Repository
is not being processed properly, the DFM may need to make adjustments to the dataflow and replay the FlowFile again.
This can be achieved from the Content tab of the View Details dialog window. The Content tab shows information
about the FlowFile's content, such as its location in the Content Repository
and its size. In addition, it is here that the user may click the "Download" button to download a copy of the FlowFile's content as it existed
at this point in the flow. The user may also click the "Submit" button to replay the FlowFile at this point in the flow. Upon clicking "Submit",
the FlowFile is sent to the connection feeding the component that produced this processing event.
When a user is developing a dataflow, it can be very beneficial to have easy access to replaying a FlowFile, as well. For example, a user may configure
a Processor, run a FlowFile through it, and find that the configuration needs to be modified. The user can then update the configuration, and run the
same FlowFile through again to verify the results. In order to ease this process, the user can right-click on a Processor and choose the "Replay last event"
item. From here, the user can choose to either replay the last event from just the Primary Node or from all nodes.
image:event-content.png["Event Content", width=700]
=== Viewing FlowFile Lineage

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.provenance;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.events.EventReporter;
@ -27,6 +26,7 @@ import org.apache.nifi.provenance.search.SearchableField;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
public interface ProvenanceRepository extends ProvenanceEventRepository {
@ -94,6 +94,14 @@ public interface ProvenanceRepository extends ProvenanceEventRepository {
*/
QuerySubmission submitQuery(Query query, NiFiUser user);
/**
* Retrieves the most recent Provenance Event that is cached for the given component that is also accessible by the given user
* @param componentId the ID of the component
* @return an Optional containing the event, or an empty optional if no events are available or none of the available events are accessible by the given user
* @throws IOException if unable to read from the repository
*/
Optional<ProvenanceEventRecord> getLatestCachedEvent(String componentId) throws IOException;
/**
* @param queryIdentifier of the query
* @param user The user who is retrieving the query.

View File

@ -16,13 +16,6 @@
*/
package org.apache.nifi.provenance;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.events.EventReporter;
@ -31,6 +24,15 @@ import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QuerySubmission;
import org.apache.nifi.provenance.search.SearchableField;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
public class MockProvenanceRepository implements ProvenanceRepository {
private final List<ProvenanceEventRecord> records = new ArrayList<>();
@ -90,6 +92,11 @@ public class MockProvenanceRepository implements ProvenanceRepository {
throw new UnsupportedOperationException("MockProvenanceRepository does not support querying");
}
@Override
public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String componentId) throws IOException {
return Optional.empty();
}
@Override
public QuerySubmission retrieveQuerySubmission(String queryIdentifier, NiFiUser user) {
throw new UnsupportedOperationException("MockProvenanceRepository does not support querying");

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.entity;
import io.swagger.annotations.ApiModelProperty;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name = "nodeReplayLastEventSnapshot")
public class NodeReplayLastEventSnapshotDTO {
private String nodeId;
private String address;
private Integer apiPort;
private ReplayLastEventSnapshotDTO snapshot;
@ApiModelProperty("The unique ID that identifies the node")
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
@ApiModelProperty("The API address of the node")
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
@ApiModelProperty("The API port used to communicate with the node")
public Integer getApiPort() {
return apiPort;
}
public void setApiPort(Integer apiPort) {
this.apiPort = apiPort;
}
@ApiModelProperty("The snapshot from the node")
public ReplayLastEventSnapshotDTO getSnapshot() {
return snapshot;
}
public void setSnapshot(final ReplayLastEventSnapshotDTO snapshot) {
this.snapshot = snapshot;
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.entity;
import io.swagger.annotations.ApiModelProperty;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name = "replayLastEventRequestEntity")
public class ReplayLastEventRequestEntity extends Entity {
private String componentId;
private String nodes;
@ApiModelProperty("The UUID of the component whose last event should be replayed.")
public String getComponentId() {
return componentId;
}
public void setComponentId(final String componentId) {
this.componentId = componentId;
}
@ApiModelProperty(
value = "Which nodes are to replay their last provenance event.",
allowableValues = "ALL, PRIMARY"
)
public String getNodes() {
return nodes;
}
public void setNodes(String nodes) {
this.nodes = nodes;
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.entity;
import io.swagger.annotations.ApiModelProperty;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.List;
@XmlRootElement(name = "replayLastEventResponseEntity")
public class ReplayLastEventResponseEntity extends Entity {
private String componentId;
private String nodes;
private ReplayLastEventSnapshotDTO aggregateSnapshot;
private List<NodeReplayLastEventSnapshotDTO> nodeSnapshots;
@ApiModelProperty("The UUID of the component whose last event should be replayed.")
public String getComponentId() {
return componentId;
}
public void setComponentId(final String componentId) {
this.componentId = componentId;
}
@ApiModelProperty(
value = "Which nodes were requested to replay their last provenance event.",
allowableValues = "ALL, PRIMARY"
)
public String getNodes() {
return nodes;
}
public void setNodes(String nodes) {
this.nodes = nodes;
}
@ApiModelProperty("The aggregate result of all nodes' responses")
public ReplayLastEventSnapshotDTO getAggregateSnapshot() {
return aggregateSnapshot;
}
public void setAggregateSnapshot(final ReplayLastEventSnapshotDTO aggregateSnapshot) {
this.aggregateSnapshot = aggregateSnapshot;
}
@ApiModelProperty("The node-wise results")
public List<NodeReplayLastEventSnapshotDTO> getNodeSnapshots() {
return nodeSnapshots;
}
public void setNodeSnapshots(final List<NodeReplayLastEventSnapshotDTO> nodeSnapshots) {
this.nodeSnapshots = nodeSnapshots;
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.entity;
import io.swagger.annotations.ApiModelProperty;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.Collection;
@XmlRootElement(name = "replayLastEventSnapshot")
public class ReplayLastEventSnapshotDTO {
private Collection<Long> eventsReplayed;
private String failureExplanation;
private Boolean eventAvailable;
@ApiModelProperty("The IDs of the events that were successfully replayed")
public Collection<Long> getEventsReplayed() {
return eventsReplayed;
}
public void setEventsReplayed(final Collection<Long> eventsReplayed) {
this.eventsReplayed = eventsReplayed;
}
@ApiModelProperty("If unable to replay an event, specifies why the event could not be replayed")
public String getFailureExplanation() {
return failureExplanation;
}
public void setFailureExplanation(final String failureExplanation) {
this.failureExplanation = failureExplanation;
}
@ApiModelProperty("Whether or not an event was available. This may not be populated if there was a failure.")
public Boolean getEventAvailable() {
return eventAvailable;
}
public void setEventAvailable(final Boolean eventAvailable) {
this.eventAvailable = eventAvailable;
}
}

View File

@ -65,6 +65,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceQueryEndpoi
import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupStatusEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupsEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ReplayLastEventEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskTypesEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger;
@ -173,6 +174,7 @@ public class StandardHttpResponseMapper implements HttpResponseMapper {
endpointMergers.add(new ParameterContextUpdateEndpointMerger());
endpointMergers.add(new VerifyConfigEndpointMerger());
endpointMergers.add(new RuntimeManifestEndpointMerger());
endpointMergers.add(new ReplayLastEventEndpointMerger());
}
@Override

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.coordination.http.endpoints;
import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.web.api.entity.NodeReplayLastEventSnapshotDTO;
import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity;
import org.apache.nifi.web.api.entity.ReplayLastEventSnapshotDTO;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class ReplayLastEventEndpointMerger extends AbstractSingleEntityEndpoint<ReplayLastEventResponseEntity> implements EndpointResponseMerger {
public static final String REPLAY_URI = "/nifi-api/provenance-events/latest/replays";
@Override
public boolean canHandle(final URI uri, final String method) {
return "POST".equals(method) && REPLAY_URI.equals(uri.getPath());
}
@Override
protected Class<ReplayLastEventResponseEntity> getEntityClass() {
return ReplayLastEventResponseEntity.class;
}
@Override
protected void mergeResponses(final ReplayLastEventResponseEntity clientEntity, final Map<NodeIdentifier, ReplayLastEventResponseEntity> entityMap, final Set<NodeResponse> successfulResponses,
final Set<NodeResponse> problematicResponses) {
// Move all aggregate snapshots into the node snapshots.
final Set<Long> replayedEventIds = new HashSet<>();
final Set<String> failureExplanations = new HashSet<>();
boolean eventAvailable = false;
for (final Map.Entry<NodeIdentifier, ReplayLastEventResponseEntity> entry : entityMap.entrySet()) {
final NodeIdentifier nodeId = entry.getKey();
final ReplayLastEventResponseEntity nodeEntity = entry.getValue();
final ReplayLastEventSnapshotDTO nodeSnapshot = nodeEntity.getAggregateSnapshot();
final NodeReplayLastEventSnapshotDTO nodeResponseDto = new NodeReplayLastEventSnapshotDTO();
nodeResponseDto.setAddress(nodeId.getApiAddress());
nodeResponseDto.setApiPort(nodeId.getApiPort());
nodeResponseDto.setNodeId(nodeId.getId());
nodeResponseDto.setSnapshot(nodeSnapshot);
if (clientEntity.getNodeSnapshots() == null) {
clientEntity.setNodeSnapshots(new ArrayList<>());
}
clientEntity.getNodeSnapshots().add(nodeResponseDto);
final Collection<Long> eventsReplayed = nodeSnapshot.getEventsReplayed();
if (eventsReplayed != null) {
replayedEventIds.addAll(eventsReplayed);
}
final String failureExplanation = nodeSnapshot.getFailureExplanation();
if (failureExplanation != null) {
failureExplanations.add(nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " - " + failureExplanation);
}
eventAvailable = eventAvailable || nodeSnapshot.getEventAvailable() == Boolean.TRUE;
}
// Update the aggregate snapshot
clientEntity.getAggregateSnapshot().setEventsReplayed(replayedEventIds);
clientEntity.getAggregateSnapshot().setEventAvailable(eventAvailable);
if (failureExplanations.isEmpty()) {
return;
}
if (failureExplanations.size() == 1) {
clientEntity.getAggregateSnapshot().setFailureExplanation("One node failed to replay the latest event: " + failureExplanations.iterator().next());
} else {
clientEntity.getAggregateSnapshot().setFailureExplanation(failureExplanations.size() + " nodes failed to replay the latest events. See logs for more details.");
}
}
}

View File

@ -2781,16 +2781,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
return "Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue";
}
final Set<Connection> connections = flowManager.findAllConnections();
FlowFileQueue queue = null;
for (final Connection connection : connections) {
if (event.getSourceQueueIdentifier().equals(connection.getIdentifier())) {
queue = connection.getFlowFileQueue();
break;
}
}
if (queue == null) {
final Connection connection = flowManager.getConnection(event.getSourceQueueIdentifier());
if (connection == null) {
return "Cannot replay data from Provenance Event because the Source FlowFile Queue with ID " + event.getSourceQueueIdentifier() + " no longer exists";
}
@ -2818,10 +2810,28 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
// Make sure event has the Content Claim info
final Long contentSize = event.getPreviousFileSize();
final String contentClaimId = event.getPreviousContentClaimIdentifier();
final String contentClaimSection = event.getPreviousContentClaimSection();
final String contentClaimContainer = event.getPreviousContentClaimContainer();
boolean usePrevious = true;
Long contentSize = event.getPreviousFileSize();
String contentClaimId = event.getPreviousContentClaimIdentifier();
String contentClaimSection = event.getPreviousContentClaimSection();
String contentClaimContainer = event.getPreviousContentClaimContainer();
Long contentClaimOffset = event.getPreviousContentClaimOffset();
final int previousClaimNulls = countNulls(contentSize, contentClaimId, contentClaimSection, contentClaimContainer);
if (previousClaimNulls == 4) {
contentClaimId = event.getContentClaimIdentifier();
contentClaimSection = event.getContentClaimSection();
contentClaimContainer = event.getContentClaimContainer();
final int currentClaimNullCounts = countNulls(contentClaimId, contentClaimSection, contentClaimContainer);
// If the current claim is also all null, we will stick with using the previous. Otherwise, we'll denote that we're using the current claim
usePrevious = currentClaimNullCounts == 3;
if (!usePrevious) {
contentSize = event.getFileSize();
contentClaimOffset = event.getContentClaimOffset();
}
}
// All content fields must be null or no content fields can be null.
final int nullCount = countNulls(contentSize, contentClaimId, contentClaimSection, contentClaimContainer);
@ -2834,16 +2844,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
throw new IllegalArgumentException("Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue");
}
final Set<Connection> connections = flowManager.findAllConnections();
FlowFileQueue queue = null;
for (final Connection connection : connections) {
if (event.getSourceQueueIdentifier().equals(connection.getIdentifier())) {
queue = connection.getFlowFileQueue();
break;
}
}
if (queue == null) {
final Connection connection = flowManager.getConnection(event.getSourceQueueIdentifier());
if (connection == null) {
throw new IllegalStateException("Cannot replay data from Provenance Event because the Source FlowFile Queue with ID " + event.getSourceQueueIdentifier() + " no longer exists");
}
@ -2859,18 +2861,17 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
// being written to by the Content Repository. This is important only because we are creating a FlowFile with this Resource
// Claim. If, for instance, we are simply creating the claim to request its content, as in #getContentAvailability, etc.
// then this is not necessary.
ResourceClaim resourceClaim = resourceClaimManager.getResourceClaim(event.getPreviousContentClaimContainer(),
event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier());
ResourceClaim resourceClaim = resourceClaimManager.getResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId);
if (resourceClaim == null) {
resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false, false);
resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer,
contentClaimSection, contentClaimId, false, false);
}
// Increment Claimant Count, since we will now be referencing the Content Claim
resourceClaimManager.incrementClaimantCount(resourceClaim);
final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset();
final long claimOffset = contentClaimOffset == null ? 0L : contentClaimOffset;
contentClaim = new StandardContentClaim(resourceClaim, claimOffset);
contentClaim.setLength(event.getPreviousFileSize() == null ? -1L : event.getPreviousFileSize());
contentClaim.setLength(contentSize == null ? -1L : contentSize);
if (!contentRepository.isAccessible(contentClaim)) {
resourceClaimManager.decrementClaimantCount(resourceClaim);
@ -2893,7 +2894,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
// FlowFileRecord's contentClaimOffset to 0.
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
// Copy relevant info from source FlowFile
.addAttributes(event.getPreviousAttributes())
.addAttributes(usePrevious ? event.getPreviousAttributes() : event.getAttributes())
.contentClaim(contentClaim)
.contentClaimOffset(0L) // use 0 because we used the content claim offset in the Content Claim itself
.entryDate(System.currentTimeMillis())
@ -2923,10 +2924,12 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
.setLineageStartDate(event.getLineageStartDate())
.setComponentType(event.getComponentType())
.setComponentId(event.getComponentId())
.setSourceQueueIdentifier(event.getSourceQueueIdentifier())
.build();
provenanceRepository.registerEvent(replayEvent);
// Update the FlowFile Repository to indicate that we have added the FlowFile to the flow
final FlowFileQueue queue = connection.getFlowFileQueue();
final StandardRepositoryRecord record = new StandardRepositoryRecord(queue);
record.setWorking(flowFileRecord, false);
record.setDestination(queue);

View File

@ -224,6 +224,13 @@ public interface NiFiServiceFacade {
*/
ProvenanceEventDTO submitReplay(Long eventId);
/**
* Submits a replay request for the latest event generated by the component with the given ID
* @param componentId the ID the component
* @return the event, or <code>null</code> if no event was available
*/
ProvenanceEventDTO submitReplayLastEvent(String componentId);
/**
* Gets the content for the specified claim.
*

View File

@ -3336,6 +3336,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return controllerFacade.submitReplay(eventId);
}
@Override
public ProvenanceEventDTO submitReplayLastEvent(final String componentId) {
return controllerFacade.submitReplayLastEvent(componentId);
}
// -----------------------------------------
// Read Operations
// -----------------------------------------

View File

@ -916,6 +916,15 @@ public abstract class ApplicationResource {
throw new NoClusterCoordinatorException();
}
protected Optional<NodeIdentifier> getPrimaryNodeId() {
final ClusterCoordinator coordinator = getClusterCoordinator();
if (coordinator == null) {
throw new NoClusterCoordinatorException();
}
return Optional.ofNullable(coordinator.getPrimaryNode());
}
protected ReplicationTarget getReplicationTarget() {
return clusterCoordinator.isActiveClusterCoordinator() ? ReplicationTarget.CLUSTER_NODES : ReplicationTarget.CLUSTER_COORDINATOR;
}

View File

@ -22,6 +22,11 @@ import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
@ -31,8 +36,13 @@ import org.apache.nifi.web.DownloadableContent;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
import org.apache.nifi.web.api.entity.ReplayLastEventRequestEntity;
import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity;
import org.apache.nifi.web.api.entity.ReplayLastEventSnapshotDTO;
import org.apache.nifi.web.api.entity.SubmitReplayRequestEntity;
import org.apache.nifi.web.api.request.LongParameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
@ -52,7 +62,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Collections;
/**
* RESTful endpoint for querying data provenance.
@ -63,8 +73,10 @@ import java.net.URI;
description = "Endpoint for accessing data flow provenance."
)
public class ProvenanceEventResource extends ApplicationResource {
private static final Logger logger = LoggerFactory.getLogger(ProvenanceEventResource.class);
private NiFiServiceFacade serviceFacade;
private Authorizer authorizer;
/**
* Gets the content for the input of the specified event.
@ -310,6 +322,105 @@ public class ProvenanceEventResource extends ApplicationResource {
return generateOkResponse(entity).build();
}
/**
* Triggers the latest Provenance Event for the specified component to be replayed.
*
* @param httpServletRequest request
* @param requestEntity The replay request
* @return A replayLastEventResponseEntity
*/
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("latest/replays")
@ApiOperation(
value = "Replays content from a provenance event",
response = ReplayLastEventResponseEntity.class,
authorizations = {
@Authorization(value = "Read Component Provenance Data - /provenance-data/{component-type}/{uuid}"),
@Authorization(value = "Read Component Data - /data/{component-type}/{uuid}"),
@Authorization(value = "Write Component Data - /data/{component-type}/{uuid}")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response submitReplayLatestEvent(
@Context final HttpServletRequest httpServletRequest,
@ApiParam(
value = "The replay request.",
required = true
) final ReplayLastEventRequestEntity requestEntity) {
// ensure the event id is specified
if (requestEntity == null || requestEntity.getComponentId() == null) {
throw new IllegalArgumentException("The id of the component must be specified.");
}
final String requestedNodes = requestEntity.getNodes();
if (requestedNodes == null) {
throw new IllegalArgumentException("The nodes must be specified.");
}
if (!"ALL".equalsIgnoreCase(requestedNodes) && !"PRIMARY".equalsIgnoreCase(requestedNodes)) {
throw new IllegalArgumentException("The nodes must be either ALL or PRIMARY");
}
// replicate if cluster manager
if (isReplicateRequest()) {
// Replicate to either Primary Node or all nodes
if (requestedNodes.equalsIgnoreCase("PRIMARY")) {
final NodeIdentifier primaryNodeId = getPrimaryNodeId().orElseThrow(() -> new IllegalStateException("There is currently no Primary Node elected"));
return replicate(HttpMethod.POST, requestEntity, primaryNodeId.getId());
} else {
return replicate(HttpMethod.POST, requestEntity);
}
}
return withWriteLock(
serviceFacade,
requestEntity,
lookup -> {
final Authorizable provenance = lookup.getProvenance();
provenance.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
},
() -> {}, // No verification step necessary - this can be done any time
entity -> {
final ReplayLastEventSnapshotDTO aggregateSnapshot = new ReplayLastEventSnapshotDTO();
// Submit provenance query
try {
final ProvenanceEventDTO provenanceEventDto = serviceFacade.submitReplayLastEvent(entity.getComponentId());
if (provenanceEventDto == null) {
aggregateSnapshot.setEventAvailable(false);
} else {
aggregateSnapshot.setEventAvailable(true);
aggregateSnapshot.setEventsReplayed(Collections.singleton(provenanceEventDto.getEventId()));
}
} catch (final AccessDeniedException ade) {
logger.error("Failed to replay latest Provenance Event", ade);
aggregateSnapshot.setFailureExplanation("Access Denied");
} catch (final Exception e) {
logger.error("Failed to replay latest Provenance Event", e);
aggregateSnapshot.setFailureExplanation(e.getMessage());
}
final ReplayLastEventResponseEntity responseEntity = new ReplayLastEventResponseEntity();
responseEntity.setComponentId(entity.getComponentId());
responseEntity.setNodes(entity.getNodes());
responseEntity.setAggregateSnapshot(aggregateSnapshot);
return generateOkResponse(responseEntity).build();
});
}
/**
* Creates a new replay request for the content associated with the specified provenance event id.
*
@ -393,4 +504,8 @@ public class ProvenanceEventResource extends ApplicationResource {
this.serviceFacade = serviceFacade;
}
public void setAuthorizer(Authorizer authorizer) {
this.authorizer = authorizer;
}
}

View File

@ -132,6 +132,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TimeZone;
@ -1359,6 +1360,39 @@ public class ControllerFacade implements Authorizable {
}
}
/**
* Submits for replay the latest provenance event that is cached for the component with the given ID
* @param componentId the ID of the component
* @return the ProvenanceEventDTO representing the event that was replayed, or <code>null</code> if the no event was available
* @throws AccessDeniedException if an event is available but the current user is not permitted to replay the event
*/
public ProvenanceEventDTO submitReplayLastEvent(String componentId) {
try {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
if (user == null) {
throw new WebApplicationException(new Throwable("Unable to access details for current user."));
}
// lookup the original event
final Optional<ProvenanceEventRecord> optionalEvent = flowController.getProvenanceRepository().getLatestCachedEvent(componentId);
if (!optionalEvent.isPresent()) {
return null;
}
// Authorize the replay
final ProvenanceEventRecord event = optionalEvent.get();
authorizeReplay(event);
// Replay the FlowFile
flowController.replayFlowFile(event, user);
// convert the event record
return createProvenanceEventDto(event, false);
} catch (final IOException ioe) {
throw new NiFiCoreException("An error occurred while getting the specified event.", ioe);
}
}
/**
* Authorizes access to replay a specified provenance event. Whether to check read data permission can be specified. The context this
* method is invoked may have already verified these permissions. Using a flag here as it forces the caller to acknowledge this fact

View File

@ -560,6 +560,7 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="flowController" ref="flowController" />
<property name="authorizer" ref="authorizer" />
</bean>
<bean id="countersResource" class="org.apache.nifi.web.api.CountersResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>

View File

@ -676,6 +676,96 @@
}
},
replayLastProvenanceEvent: function(selection, nodes) {
if (selection.size() === 1) {
var selectionData = selection.datum();
// submit replay event
var entity = {
'componentId': selectionData.id,
'nodes': nodes
};
$.ajax({
type: 'POST',
url: config.urls.api + '/provenance-events/latest/replays',
data: JSON.stringify(entity),
dataType: 'json',
contentType: 'application/json'
}).fail(function (xhr, status, error) {
nfDialog.showOkDialog({
headerText: 'Failed to Replay Event',
dialogContent: nfCommon.escapeHtml(xhr.responseText)
});
}).done(function (response) {
if (nfCommon.isDefinedAndNotNull(response.aggregateSnapshot.failureExplanation)) {
nfDialog.showOkDialog({
headerText: 'Replay Event Failure',
dialogContent: response.aggregateSnapshot.failureExplanation
});
} else if (response.aggregateSnapshot.eventAvailable !== true) {
nfDialog.showOkDialog({
headerText: 'No Event Available',
dialogContent: 'There was no recent event available to be replayed.'
});
} else if (nfCommon.isDefinedAndNotNull(response.nodeSnapshots)) {
var replayedCount = 0;
var unavailableCount = 0;
for (var i = 0; i < response.nodeSnapshots.length; i++) {
var nodeResponse = response.nodeSnapshots[i];
if (nodeResponse.snapshot.eventAvailable) {
replayedCount++;
} else {
unavailableCount++;
}
}
var messageText;
if (unavailableCount === 0) {
messageText = 'All nodes successfully replayed the latest event.';
} else {
messageText = replayedCount + ' nodes successfully replayed the latest event but ' + unavailableCount + ' had no recent event avaialble to be replayed.';
}
nfDialog.showOkDialog({
headerText: 'Events Replayed',
dialogContent: messageText
});
} else {
nfDialog.showOkDialog({
headerText: 'Event Replayed',
dialogContent: 'Successfully replayed the latest event.'
});
}
var componentConnections = nfConnection.getComponentConnections(selectionData.id);
for (var i=0; i < componentConnections.length; i++) {
var connection = componentConnections[i];
nfConnection.reload(connection.id);
}
});
}
},
/**
* Submits a request to replay the last provenance event on all nodes in the cluster.
*
* @argument {selection} selection The selection
*/
replayLastAllNodes: function (selection) {
this.replayLastProvenanceEvent(selection, 'ALL');
},
/**
* Submits a request to replay the last provenance event on the primary node in the cluster.
*
* @argument {selection} selection The selection
*/
replayLastPrimaryNode: function (selection) {
this.replayLastProvenanceEvent(selection, 'PRIMARY');
},
/**
* Starts the components in the specified selection.
*

View File

@ -637,6 +637,20 @@
&& !nfCanvasUtils.isRemoteProcessGroup(selection) && nfCommon.canAccessProvenance();
};
/**
* Determines whether the current selection should provide ability to replay latest provenance event.
*
* @param {selection} selection
*/
var canReplayProvenance = function (selection) {
// ensure the correct number of components are selected
if (selection.size() !== 1) {
return false;
}
return nfCanvasUtils.isProcessor(selection) && nfCommon.canAccessProvenance();
}
/**
* Determines whether the current selection is a remote process group.
*
@ -831,6 +845,11 @@
{id: 'disable-all-controller-services-menu-item-noselection', condition: emptySelection, menuItem: {clazz: 'icon icon-enable-false', text: 'Disable all controller services', action: 'disableAllControllerServices'}},
{separator: true},
{id: 'data-provenance-menu-item', condition: canAccessProvenance, menuItem: {clazz: 'icon icon-provenance', imgStyle: 'context-menu-provenance', text: 'View data provenance', action: 'openProvenance'}},
{id: 'data-provenance-replay-last-menu-item', condition: canReplayProvenance, groupMenuItem: {clazz: 'fa fa-repeat', text: 'Replay last event'}, menuItems: [
{id: 'replay-last-all-nodes-menu-item', condition: canReplayProvenance, menuItem: {text: 'All nodes', action: 'replayLastAllNodes'}},
{id: 'replay-last-primary-node-menu-item', condition: canReplayProvenance, menuItem: {text: 'Primary node', action: 'replayLastPrimaryNode'}}
]},
{separator: true},
{id: 'show-stats-menu-item', condition: supportsStats, menuItem: {clazz: 'fa fa-area-chart', text: 'View status history', action: 'showStats'}},
{id: 'view-state-menu-item', condition: isStatefulProcessor, menuItem: {clazz: 'fa fa-tasks', text: 'View state', action: 'viewState'}},
{id: 'list-queue-menu-item', condition: canListQueue, menuItem: {clazz: 'fa fa-list', text: 'List queue', action: 'listQueue'}},

View File

@ -99,6 +99,7 @@ import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@ -2061,6 +2062,11 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
return result;
}
@Override
public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String componentId) throws IOException {
return Optional.empty();
}
/**
* This is for testing only and not actually used other than in debugging
*

View File

@ -57,6 +57,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -257,6 +258,11 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository {
return eventIndex.submitQuery(query, createEventAuthorizer(user), user == null ? null : user.getIdentity());
}
@Override
public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String componentId) throws IOException {
return eventIndex.getLatestCachedEvent(componentId);
}
@Override
public QuerySubmission retrieveQuerySubmission(final String queryIdentifier, final NiFiUser user) {
return eventIndex.retrieveQuerySubmission(queryIdentifier, user);

View File

@ -29,6 +29,7 @@ import org.apache.nifi.provenance.store.EventStore;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
/**
* An Event Index is responsible for indexing Provenance Events in such a way that the index can be quickly
@ -81,6 +82,15 @@ public interface EventIndex extends Closeable {
*/
QuerySubmission submitQuery(Query query, EventAuthorizer authorizer, String userId);
/**
* Retrieves the most recent Provenance Event that is cached for the given component that is also accessible by the given user
* @param componentId the ID of the component
*
* @return an Optional containing the event, or an empty optional if no events are available or none of the available events are accessible by the given user
* @throws IOException if unable to read from the repository
*/
Optional<ProvenanceEventRecord> getLatestCachedEvent(String componentId) throws IOException;
/**
* Asynchronously computes the lineage for the FlowFile that is identified by the Provenance Event with the given ID.
*

View File

@ -17,11 +17,6 @@
package org.apache.nifi.provenance.index.lucene;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.search.Query;
@ -29,6 +24,12 @@ import org.apache.nifi.provenance.search.SearchTerm;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.util.RingBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class LatestEventsPerProcessorQuery implements CachedQuery {
private static final String COMPONENT_ID_FIELD_NAME = SearchableFields.ComponentID.getSearchableFieldName();
private final ConcurrentMap<String, RingBuffer<Long>> latestRecords = new ConcurrentHashMap<>();
@ -40,6 +41,15 @@ public class LatestEventsPerProcessorQuery implements CachedQuery {
ringBuffer.add(storageSummary.getEventId());
}
public List<Long> getLatestEventIds(final String componentId) {
final RingBuffer<Long> ringBuffer = latestRecords.get(componentId);
if (ringBuffer == null) {
return Collections.emptyList();
}
return ringBuffer.asList();
}
@Override
public Optional<List<Long>> evaluate(final Query query) {
if (query.getMaxResults() > 1000) {

View File

@ -107,6 +107,7 @@ public class LuceneEventIndex implements EventIndex {
private final EventReporter eventReporter;
private final List<CachedQuery> cachedQueries = new ArrayList<>();
private LatestEventsPerProcessorQuery latestEventsPerProcessorQuery; // effectively final
private ScheduledExecutorService maintenanceExecutor; // effectively final
private ScheduledExecutorService cacheWarmerExecutor;
@ -163,7 +164,8 @@ public class LuceneEventIndex implements EventIndex {
maintenanceExecutor.scheduleWithFixedDelay(this::purgeObsoleteQueries, 30, 30, TimeUnit.SECONDS);
cachedQueries.add(new LatestEventsQuery());
cachedQueries.add(new LatestEventsPerProcessorQuery());
latestEventsPerProcessorQuery = new LatestEventsPerProcessorQuery();
cachedQueries.add(latestEventsPerProcessorQuery);
triggerReindexOfDefunctIndices();
triggerCacheWarming();
@ -667,6 +669,24 @@ public class LuceneEventIndex implements EventIndex {
return submission;
}
@Override
public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String componentId) throws IOException {
final List<Long> eventIds = latestEventsPerProcessorQuery.getLatestEventIds(componentId);
if (eventIds.isEmpty()) {
logger.info("There are no recent Provenance Events cached for Component with ID {}", componentId);
return Optional.empty();
}
final Long latestEventId = eventIds.get(eventIds.size() - 1);
final Optional<ProvenanceEventRecord> latestEvent = eventStore.getEvent(latestEventId);
if (latestEvent.isPresent()) {
logger.info("Returning {} as the most recent Provenance Events cached for Component with ID {}", latestEvent.get(), componentId);
} else {
logger.info("There are no recent Provenance Events cached for Component with ID {}", componentId);
}
return latestEvent;
}
@Override
public ComputeLineageSubmission submitLineageComputation(final String flowFileUuid, final NiFiUser user, final EventAuthorizer eventAuthorizer) {

View File

@ -50,6 +50,7 @@ import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -490,6 +491,17 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
return result;
}
@Override
public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String componentId) throws IOException {
final List<ProvenanceEventRecord> matches = ringBuffer.getSelectedElements(event -> componentId.equals(event.getComponentId()));
if (matches.isEmpty()) {
return Optional.empty();
}
return Optional.of(matches.get(matches.size() - 1));
}
@Override
public QuerySubmission retrieveQuerySubmission(final String queryIdentifier, final NiFiUser user) {
final QuerySubmission submission = querySubmissionMap.get(queryIdentifier);

View File

@ -38,6 +38,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@ -152,6 +153,11 @@ public class StatelessProvenanceRepository implements ProvenanceRepository {
throw new UnsupportedOperationException();
}
@Override
public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String componentId) throws IOException {
return Optional.empty();
}
@Override
public QuerySubmission retrieveQuerySubmission(final String queryIdentifier, final NiFiUser user) {
throw new UnsupportedOperationException();

View File

@ -1399,5 +1399,4 @@ public class NiFiClientUtil {
return nifiClient.getReportingTasksClient().updateReportingTask(entity);
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.provenance;
import org.apache.nifi.tests.system.NiFiInstanceFactory;
public class ClusteredReplayProvenanceIT extends ReplayProvenanceIT {
@Override
public NiFiInstanceFactory getInstanceFactory() {
return createTwoNodeInstanceFactory();
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.provenance;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient.ReplayEventNodes;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.io.IOException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ReplayProvenanceIT extends NiFiSystemIT {
@ParameterizedTest
@EnumSource(ReplayEventNodes.class)
public void testReplayLastEvent(final ReplayEventNodes nodes) throws NiFiClientException, IOException, InterruptedException {
ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
ConnectionEntity connection = getClientUtil().createConnection(generate, terminate, "success");
// Run Generate once
getClientUtil().startProcessor(generate);
waitForQueueCount(connection.getId(), getNumberOfNodes());
getClientUtil().stopProcessor(generate);
// Run terminate once
getClientUtil().startProcessor(terminate);
waitForQueueCount(connection.getId(), 0);
getClientUtil().stopProcessor(terminate);
// Replay last event for terminate and ensure that data is queued up.
final ReplayLastEventResponseEntity replayResponse = getNifiClient().getProvenanceClient().replayLastEvent(terminate.getId(), nodes);
assertNull(replayResponse.getAggregateSnapshot().getFailureExplanation());
assertEquals(Boolean.TRUE, replayResponse.getAggregateSnapshot().getEventAvailable());
final int expectedEventsPlayed = (nodes == ReplayEventNodes.PRIMARY) ? 1 : getNumberOfNodes();
assertEquals(expectedEventsPlayed, replayResponse.getAggregateSnapshot().getEventsReplayed().size());
waitForQueueCount(connection.getId(), expectedEventsPlayed);
// Attempt to replay event for generate - it should provide an error because this is a source processor whose event cannot be replayed
final ReplayLastEventResponseEntity generateReplayResponse = getNifiClient().getProvenanceClient().replayLastEvent(generate.getId(), nodes);
final String failureExplanation = generateReplayResponse.getAggregateSnapshot().getFailureExplanation();
assertNotNull(failureExplanation);
// The failure text is not provided if multiple nodes failed, as it can get too unwieldy to understand.
final String expectedFailureText = (nodes == ReplayEventNodes.ALL && getNumberOfNodes() > 1) ? "See logs for more details" : "Source FlowFile Queue";
assertTrue(failureExplanation.contains(expectedFailureText), failureExplanation);
}
}

View File

@ -18,6 +18,7 @@ package org.apache.nifi.toolkit.cli.impl.client.nifi;
import org.apache.nifi.web.api.entity.LineageEntity;
import org.apache.nifi.web.api.entity.ProvenanceEntity;
import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity;
import java.io.IOException;
@ -33,4 +34,11 @@ public interface ProvenanceClient {
LineageEntity getLineageRequest(String lineageRequestId) throws NiFiClientException, IOException;
LineageEntity deleteLineageRequest(String lineageRequestId) throws NiFiClientException, IOException;
ReplayLastEventResponseEntity replayLastEvent(String processorId, ReplayEventNodes replayEventNodes) throws NiFiClientException, IOException;
enum ReplayEventNodes {
PRIMARY,
ALL;
}
}

View File

@ -22,14 +22,18 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
import org.apache.nifi.web.api.entity.LineageEntity;
import org.apache.nifi.web.api.entity.ProvenanceEntity;
import org.apache.nifi.web.api.entity.ReplayLastEventRequestEntity;
import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.Objects;
public class JerseyProvenanceClient extends AbstractJerseyClient implements ProvenanceClient {
private final WebTarget provenanceTarget;
private final WebTarget provenanceEventsTarget;
public JerseyProvenanceClient(final WebTarget baseTarget) {
this(baseTarget, null);
@ -38,6 +42,7 @@ public class JerseyProvenanceClient extends AbstractJerseyClient implements Prov
public JerseyProvenanceClient(final WebTarget baseTarget, final RequestConfig requestConfig) {
super(requestConfig);
this.provenanceTarget = baseTarget.path("/provenance");
this.provenanceEventsTarget = baseTarget.path("/provenance-events");
}
@Override
@ -116,4 +121,21 @@ public class JerseyProvenanceClient extends AbstractJerseyClient implements Prov
});
}
@Override
public ReplayLastEventResponseEntity replayLastEvent(final String processorId, final ReplayEventNodes nodes) throws NiFiClientException, IOException {
Objects.requireNonNull(processorId, "Processor ID required");
Objects.requireNonNull(nodes, "Nodes must be specified");
final ReplayLastEventRequestEntity requestEntity = new ReplayLastEventRequestEntity();
requestEntity.setComponentId(processorId);
requestEntity.setNodes(nodes.name());
return executeAction("Error replaying last event for Processor " + processorId, () -> {
final WebTarget target = provenanceEventsTarget.path("/latest/replays");
return getRequestBuilder(target).post(
Entity.entity(requestEntity, MediaType.APPLICATION_JSON_TYPE),
ReplayLastEventResponseEntity.class);
});
}
}