diff --git a/minifi/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repositories/src/main/java/org/apache/nifi/provenance/NoOpProvenanceRepository.java b/minifi/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repositories/src/main/java/org/apache/nifi/provenance/NoOpProvenanceRepository.java index 8aed44745c..6444c42fbc 100644 --- a/minifi/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repositories/src/main/java/org/apache/nifi/provenance/NoOpProvenanceRepository.java +++ b/minifi/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repositories/src/main/java/org/apache/nifi/provenance/NoOpProvenanceRepository.java @@ -26,6 +26,7 @@ import org.apache.nifi.provenance.search.SearchableField; import java.io.IOException; import java.util.List; +import java.util.Optional; import java.util.Set; import static java.util.Collections.EMPTY_SET; @@ -102,6 +103,11 @@ public class NoOpProvenanceRepository implements ProvenanceRepository { return null; } + @Override + public Optional getLatestCachedEvent(final String componentId) throws IOException { + return Optional.empty(); + } + @Override public QuerySubmission retrieveQuerySubmission(String queryIdentifier, NiFiUser niFiUser) { return null; diff --git a/nifi-docs/src/main/asciidoc/user-guide.adoc b/nifi-docs/src/main/asciidoc/user-guide.adoc index 8dceaa60d0..55b50caa2f 100644 --- a/nifi-docs/src/main/asciidoc/user-guide.adoc +++ b/nifi-docs/src/main/asciidoc/user-guide.adoc @@ -304,6 +304,7 @@ NOTE: For Processors, Ports, Remote Process Groups, Connections and Labels, it i - *Run Once*: This option allows the user to run a selected Processor exactly once. If the Processor is prevented from executing (e.g., there are no incoming FlowFiles or the outgoing connection has back pressure applied) the Processor won't get triggered. *Execution* settings apply (i.e., *Primary Node* and *All Nodes* settings will result in running the Processor only once on the Primary Node or one time on each of the nodes, respectively). Works only with *Timer driven* and *CRON driven* scheduling strategies. - *Enable* or *Disable*: This option allows the user to enable or disable a Processor; the option will be either Enable or Disable, depending on the current state of the Processor. - *View data provenance*: This option displays the NiFi Data Provenance table, with information about data provenance events for the FlowFiles routed through that Processor (see <>). +- *Replay last event*: This option will replay the last Provenance event, effectively requeuing the last FlowFile that was processed by the Processor (see <>). - *View status history*: This option opens a graphical representation of the Processor's statistical information over time. - *View usage*: This option takes the user to the Processor's usage documentation. - *View connections->Upstream*: This option allows the user to see and "jump to" upstream connections that are coming into the Processor. This is particularly useful when processors connect into and out of other Process Groups. @@ -2834,14 +2835,22 @@ a result of the processing event, the user may select the checkbox next to "Only image:event-attributes.png["Event Attributes", width=700] +[[replay_flowfile]] === Replaying a FlowFile A DFM may need to inspect a FlowFile's content at some point in the dataflow to ensure that it is being processed as expected. And if it -is not being processed properly, the DFM may need to make adjustments to the dataflow and replay the FlowFile again. The Content tab of the View Details dialog window is where the DFM can do these things. The Content tab shows information about the FlowFile's content, such as its location in the Content Repository +is not being processed properly, the DFM may need to make adjustments to the dataflow and replay the FlowFile again. +This can be achieved from the Content tab of the View Details dialog window. The Content tab shows information +about the FlowFile's content, such as its location in the Content Repository and its size. In addition, it is here that the user may click the "Download" button to download a copy of the FlowFile's content as it existed at this point in the flow. The user may also click the "Submit" button to replay the FlowFile at this point in the flow. Upon clicking "Submit", the FlowFile is sent to the connection feeding the component that produced this processing event. +When a user is developing a dataflow, it can be very beneficial to have easy access to replaying a FlowFile, as well. For example, a user may configure +a Processor, run a FlowFile through it, and find that the configuration needs to be modified. The user can then update the configuration, and run the +same FlowFile through again to verify the results. In order to ease this process, the user can right-click on a Processor and choose the "Replay last event" +item. From here, the user can choose to either replay the last event from just the Primary Node or from all nodes. + image:event-content.png["Event Content", width=700] === Viewing FlowFile Lineage diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java index bbde0c70c3..8e159d864b 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.provenance; - import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.events.EventReporter; @@ -27,6 +26,7 @@ import org.apache.nifi.provenance.search.SearchableField; import java.io.IOException; import java.util.List; +import java.util.Optional; import java.util.Set; public interface ProvenanceRepository extends ProvenanceEventRepository { @@ -94,6 +94,14 @@ public interface ProvenanceRepository extends ProvenanceEventRepository { */ QuerySubmission submitQuery(Query query, NiFiUser user); + /** + * Retrieves the most recent Provenance Event that is cached for the given component that is also accessible by the given user + * @param componentId the ID of the component + * @return an Optional containing the event, or an empty optional if no events are available or none of the available events are accessible by the given user + * @throws IOException if unable to read from the repository + */ + Optional getLatestCachedEvent(String componentId) throws IOException; + /** * @param queryIdentifier of the query * @param user The user who is retrieving the query. diff --git a/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java b/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java index e1d8321f6b..c694d64972 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java +++ b/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java @@ -16,13 +16,6 @@ */ package org.apache.nifi.provenance; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.events.EventReporter; @@ -31,6 +24,15 @@ import org.apache.nifi.provenance.search.Query; import org.apache.nifi.provenance.search.QuerySubmission; import org.apache.nifi.provenance.search.SearchableField; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + public class MockProvenanceRepository implements ProvenanceRepository { private final List records = new ArrayList<>(); @@ -90,6 +92,11 @@ public class MockProvenanceRepository implements ProvenanceRepository { throw new UnsupportedOperationException("MockProvenanceRepository does not support querying"); } + @Override + public Optional getLatestCachedEvent(final String componentId) throws IOException { + return Optional.empty(); + } + @Override public QuerySubmission retrieveQuerySubmission(String queryIdentifier, NiFiUser user) { throw new UnsupportedOperationException("MockProvenanceRepository does not support querying"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/NodeReplayLastEventSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/NodeReplayLastEventSnapshotDTO.java new file mode 100644 index 0000000000..0d36d6e1d0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/NodeReplayLastEventSnapshotDTO.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "nodeReplayLastEventSnapshot") +public class NodeReplayLastEventSnapshotDTO { + private String nodeId; + private String address; + private Integer apiPort; + private ReplayLastEventSnapshotDTO snapshot; + + @ApiModelProperty("The unique ID that identifies the node") + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + @ApiModelProperty("The API address of the node") + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + @ApiModelProperty("The API port used to communicate with the node") + public Integer getApiPort() { + return apiPort; + } + + public void setApiPort(Integer apiPort) { + this.apiPort = apiPort; + } + + @ApiModelProperty("The snapshot from the node") + public ReplayLastEventSnapshotDTO getSnapshot() { + return snapshot; + } + + public void setSnapshot(final ReplayLastEventSnapshotDTO snapshot) { + this.snapshot = snapshot; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReplayLastEventRequestEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReplayLastEventRequestEntity.java new file mode 100644 index 0000000000..82effe0065 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReplayLastEventRequestEntity.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "replayLastEventRequestEntity") +public class ReplayLastEventRequestEntity extends Entity { + + private String componentId; + private String nodes; + + @ApiModelProperty("The UUID of the component whose last event should be replayed.") + public String getComponentId() { + return componentId; + } + + public void setComponentId(final String componentId) { + this.componentId = componentId; + } + + @ApiModelProperty( + value = "Which nodes are to replay their last provenance event.", + allowableValues = "ALL, PRIMARY" + ) + public String getNodes() { + return nodes; + } + + public void setNodes(String nodes) { + this.nodes = nodes; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReplayLastEventResponseEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReplayLastEventResponseEntity.java new file mode 100644 index 0000000000..e8f3ab39be --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReplayLastEventResponseEntity.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlRootElement; +import java.util.List; + +@XmlRootElement(name = "replayLastEventResponseEntity") +public class ReplayLastEventResponseEntity extends Entity { + + private String componentId; + private String nodes; + private ReplayLastEventSnapshotDTO aggregateSnapshot; + private List nodeSnapshots; + + @ApiModelProperty("The UUID of the component whose last event should be replayed.") + public String getComponentId() { + return componentId; + } + + public void setComponentId(final String componentId) { + this.componentId = componentId; + } + + @ApiModelProperty( + value = "Which nodes were requested to replay their last provenance event.", + allowableValues = "ALL, PRIMARY" + ) + public String getNodes() { + return nodes; + } + + public void setNodes(String nodes) { + this.nodes = nodes; + } + + @ApiModelProperty("The aggregate result of all nodes' responses") + public ReplayLastEventSnapshotDTO getAggregateSnapshot() { + return aggregateSnapshot; + } + + public void setAggregateSnapshot(final ReplayLastEventSnapshotDTO aggregateSnapshot) { + this.aggregateSnapshot = aggregateSnapshot; + } + + @ApiModelProperty("The node-wise results") + public List getNodeSnapshots() { + return nodeSnapshots; + } + + public void setNodeSnapshots(final List nodeSnapshots) { + this.nodeSnapshots = nodeSnapshots; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReplayLastEventSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReplayLastEventSnapshotDTO.java new file mode 100644 index 0000000000..47185593ea --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReplayLastEventSnapshotDTO.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlRootElement; +import java.util.Collection; + +@XmlRootElement(name = "replayLastEventSnapshot") +public class ReplayLastEventSnapshotDTO { + private Collection eventsReplayed; + private String failureExplanation; + private Boolean eventAvailable; + + @ApiModelProperty("The IDs of the events that were successfully replayed") + public Collection getEventsReplayed() { + return eventsReplayed; + } + + public void setEventsReplayed(final Collection eventsReplayed) { + this.eventsReplayed = eventsReplayed; + } + + @ApiModelProperty("If unable to replay an event, specifies why the event could not be replayed") + public String getFailureExplanation() { + return failureExplanation; + } + + public void setFailureExplanation(final String failureExplanation) { + this.failureExplanation = failureExplanation; + } + + @ApiModelProperty("Whether or not an event was available. This may not be populated if there was a failure.") + public Boolean getEventAvailable() { + return eventAvailable; + } + + public void setEventAvailable(final Boolean eventAvailable) { + this.eventAvailable = eventAvailable; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java index 61e5cab30b..440dc60d5f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java @@ -65,6 +65,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceQueryEndpoi import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupStatusEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupsEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ReplayLastEventEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskTypesEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger; @@ -173,6 +174,7 @@ public class StandardHttpResponseMapper implements HttpResponseMapper { endpointMergers.add(new ParameterContextUpdateEndpointMerger()); endpointMergers.add(new VerifyConfigEndpointMerger()); endpointMergers.add(new RuntimeManifestEndpointMerger()); + endpointMergers.add(new ReplayLastEventEndpointMerger()); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReplayLastEventEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReplayLastEventEndpointMerger.java new file mode 100644 index 0000000000..7911713e2b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReplayLastEventEndpointMerger.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.NodeReplayLastEventSnapshotDTO; +import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity; +import org.apache.nifi.web.api.entity.ReplayLastEventSnapshotDTO; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class ReplayLastEventEndpointMerger extends AbstractSingleEntityEndpoint implements EndpointResponseMerger { + public static final String REPLAY_URI = "/nifi-api/provenance-events/latest/replays"; + + @Override + public boolean canHandle(final URI uri, final String method) { + return "POST".equals(method) && REPLAY_URI.equals(uri.getPath()); + } + + @Override + protected Class getEntityClass() { + return ReplayLastEventResponseEntity.class; + } + + @Override + protected void mergeResponses(final ReplayLastEventResponseEntity clientEntity, final Map entityMap, final Set successfulResponses, + final Set problematicResponses) { + + // Move all aggregate snapshots into the node snapshots. + final Set replayedEventIds = new HashSet<>(); + final Set failureExplanations = new HashSet<>(); + boolean eventAvailable = false; + for (final Map.Entry entry : entityMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ReplayLastEventResponseEntity nodeEntity = entry.getValue(); + + final ReplayLastEventSnapshotDTO nodeSnapshot = nodeEntity.getAggregateSnapshot(); + final NodeReplayLastEventSnapshotDTO nodeResponseDto = new NodeReplayLastEventSnapshotDTO(); + nodeResponseDto.setAddress(nodeId.getApiAddress()); + nodeResponseDto.setApiPort(nodeId.getApiPort()); + nodeResponseDto.setNodeId(nodeId.getId()); + nodeResponseDto.setSnapshot(nodeSnapshot); + + if (clientEntity.getNodeSnapshots() == null) { + clientEntity.setNodeSnapshots(new ArrayList<>()); + } + clientEntity.getNodeSnapshots().add(nodeResponseDto); + + final Collection eventsReplayed = nodeSnapshot.getEventsReplayed(); + if (eventsReplayed != null) { + replayedEventIds.addAll(eventsReplayed); + } + + final String failureExplanation = nodeSnapshot.getFailureExplanation(); + if (failureExplanation != null) { + failureExplanations.add(nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " - " + failureExplanation); + } + + eventAvailable = eventAvailable || nodeSnapshot.getEventAvailable() == Boolean.TRUE; + } + + // Update the aggregate snapshot + clientEntity.getAggregateSnapshot().setEventsReplayed(replayedEventIds); + clientEntity.getAggregateSnapshot().setEventAvailable(eventAvailable); + + if (failureExplanations.isEmpty()) { + return; + } + if (failureExplanations.size() == 1) { + clientEntity.getAggregateSnapshot().setFailureExplanation("One node failed to replay the latest event: " + failureExplanations.iterator().next()); + } else { + clientEntity.getAggregateSnapshot().setFailureExplanation(failureExplanations.size() + " nodes failed to replay the latest events. See logs for more details."); + } + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index ce5fc64f83..6ac54d3c3a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -2781,16 +2781,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node return "Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue"; } - final Set connections = flowManager.findAllConnections(); - FlowFileQueue queue = null; - for (final Connection connection : connections) { - if (event.getSourceQueueIdentifier().equals(connection.getIdentifier())) { - queue = connection.getFlowFileQueue(); - break; - } - } - - if (queue == null) { + final Connection connection = flowManager.getConnection(event.getSourceQueueIdentifier()); + if (connection == null) { return "Cannot replay data from Provenance Event because the Source FlowFile Queue with ID " + event.getSourceQueueIdentifier() + " no longer exists"; } @@ -2818,10 +2810,28 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node } // Make sure event has the Content Claim info - final Long contentSize = event.getPreviousFileSize(); - final String contentClaimId = event.getPreviousContentClaimIdentifier(); - final String contentClaimSection = event.getPreviousContentClaimSection(); - final String contentClaimContainer = event.getPreviousContentClaimContainer(); + boolean usePrevious = true; + Long contentSize = event.getPreviousFileSize(); + String contentClaimId = event.getPreviousContentClaimIdentifier(); + String contentClaimSection = event.getPreviousContentClaimSection(); + String contentClaimContainer = event.getPreviousContentClaimContainer(); + Long contentClaimOffset = event.getPreviousContentClaimOffset(); + + final int previousClaimNulls = countNulls(contentSize, contentClaimId, contentClaimSection, contentClaimContainer); + if (previousClaimNulls == 4) { + contentClaimId = event.getContentClaimIdentifier(); + contentClaimSection = event.getContentClaimSection(); + contentClaimContainer = event.getContentClaimContainer(); + + final int currentClaimNullCounts = countNulls(contentClaimId, contentClaimSection, contentClaimContainer); + + // If the current claim is also all null, we will stick with using the previous. Otherwise, we'll denote that we're using the current claim + usePrevious = currentClaimNullCounts == 3; + if (!usePrevious) { + contentSize = event.getFileSize(); + contentClaimOffset = event.getContentClaimOffset(); + } + } // All content fields must be null or no content fields can be null. final int nullCount = countNulls(contentSize, contentClaimId, contentClaimSection, contentClaimContainer); @@ -2834,16 +2844,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node throw new IllegalArgumentException("Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue"); } - final Set connections = flowManager.findAllConnections(); - FlowFileQueue queue = null; - for (final Connection connection : connections) { - if (event.getSourceQueueIdentifier().equals(connection.getIdentifier())) { - queue = connection.getFlowFileQueue(); - break; - } - } - - if (queue == null) { + final Connection connection = flowManager.getConnection(event.getSourceQueueIdentifier()); + if (connection == null) { throw new IllegalStateException("Cannot replay data from Provenance Event because the Source FlowFile Queue with ID " + event.getSourceQueueIdentifier() + " no longer exists"); } @@ -2859,18 +2861,17 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node // being written to by the Content Repository. This is important only because we are creating a FlowFile with this Resource // Claim. If, for instance, we are simply creating the claim to request its content, as in #getContentAvailability, etc. // then this is not necessary. - ResourceClaim resourceClaim = resourceClaimManager.getResourceClaim(event.getPreviousContentClaimContainer(), - event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier()); + ResourceClaim resourceClaim = resourceClaimManager.getResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId); if (resourceClaim == null) { - resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(), - event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false, false); + resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer, + contentClaimSection, contentClaimId, false, false); } // Increment Claimant Count, since we will now be referencing the Content Claim resourceClaimManager.incrementClaimantCount(resourceClaim); - final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset(); + final long claimOffset = contentClaimOffset == null ? 0L : contentClaimOffset; contentClaim = new StandardContentClaim(resourceClaim, claimOffset); - contentClaim.setLength(event.getPreviousFileSize() == null ? -1L : event.getPreviousFileSize()); + contentClaim.setLength(contentSize == null ? -1L : contentSize); if (!contentRepository.isAccessible(contentClaim)) { resourceClaimManager.decrementClaimantCount(resourceClaim); @@ -2893,7 +2894,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node // FlowFileRecord's contentClaimOffset to 0. final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() // Copy relevant info from source FlowFile - .addAttributes(event.getPreviousAttributes()) + .addAttributes(usePrevious ? event.getPreviousAttributes() : event.getAttributes()) .contentClaim(contentClaim) .contentClaimOffset(0L) // use 0 because we used the content claim offset in the Content Claim itself .entryDate(System.currentTimeMillis()) @@ -2923,10 +2924,12 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node .setLineageStartDate(event.getLineageStartDate()) .setComponentType(event.getComponentType()) .setComponentId(event.getComponentId()) + .setSourceQueueIdentifier(event.getSourceQueueIdentifier()) .build(); provenanceRepository.registerEvent(replayEvent); // Update the FlowFile Repository to indicate that we have added the FlowFile to the flow + final FlowFileQueue queue = connection.getFlowFileQueue(); final StandardRepositoryRecord record = new StandardRepositoryRecord(queue); record.setWorking(flowFileRecord, false); record.setDestination(queue); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index aa75f20234..7cf07768c8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -224,6 +224,13 @@ public interface NiFiServiceFacade { */ ProvenanceEventDTO submitReplay(Long eventId); + /** + * Submits a replay request for the latest event generated by the component with the given ID + * @param componentId the ID the component + * @return the event, or null if no event was available + */ + ProvenanceEventDTO submitReplayLastEvent(String componentId); + /** * Gets the content for the specified claim. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index aa39cb6d7a..b8135eee9b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -3336,6 +3336,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return controllerFacade.submitReplay(eventId); } + @Override + public ProvenanceEventDTO submitReplayLastEvent(final String componentId) { + return controllerFacade.submitReplayLastEvent(componentId); + } + // ----------------------------------------- // Read Operations // ----------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index 8006caa6ac..a339800d36 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -916,6 +916,15 @@ public abstract class ApplicationResource { throw new NoClusterCoordinatorException(); } + protected Optional getPrimaryNodeId() { + final ClusterCoordinator coordinator = getClusterCoordinator(); + if (coordinator == null) { + throw new NoClusterCoordinatorException(); + } + + return Optional.ofNullable(coordinator.getPrimaryNode()); + } + protected ReplicationTarget getReplicationTarget() { return clusterCoordinator.isActiveClusterCoordinator() ? ReplicationTarget.CLUSTER_NODES : ReplicationTarget.CLUSTER_COORDINATOR; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java index 6cc234362f..5294ad0a4f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java @@ -22,6 +22,11 @@ import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import io.swagger.annotations.Authorization; +import org.apache.nifi.authorization.AccessDeniedException; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; import org.apache.nifi.cluster.protocol.NodeIdentifier; @@ -31,8 +36,13 @@ import org.apache.nifi.web.DownloadableContent; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.entity.ProvenanceEventEntity; +import org.apache.nifi.web.api.entity.ReplayLastEventRequestEntity; +import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity; +import org.apache.nifi.web.api.entity.ReplayLastEventSnapshotDTO; import org.apache.nifi.web.api.entity.SubmitReplayRequestEntity; import org.apache.nifi.web.api.request.LongParameter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -52,7 +62,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; - +import java.util.Collections; /** * RESTful endpoint for querying data provenance. @@ -63,8 +73,10 @@ import java.net.URI; description = "Endpoint for accessing data flow provenance." ) public class ProvenanceEventResource extends ApplicationResource { + private static final Logger logger = LoggerFactory.getLogger(ProvenanceEventResource.class); private NiFiServiceFacade serviceFacade; + private Authorizer authorizer; /** * Gets the content for the input of the specified event. @@ -310,6 +322,105 @@ public class ProvenanceEventResource extends ApplicationResource { return generateOkResponse(entity).build(); } + + /** + * Triggers the latest Provenance Event for the specified component to be replayed. + * + * @param httpServletRequest request + * @param requestEntity The replay request + * @return A replayLastEventResponseEntity + */ + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("latest/replays") + @ApiOperation( + value = "Replays content from a provenance event", + response = ReplayLastEventResponseEntity.class, + authorizations = { + @Authorization(value = "Read Component Provenance Data - /provenance-data/{component-type}/{uuid}"), + @Authorization(value = "Read Component Data - /data/{component-type}/{uuid}"), + @Authorization(value = "Write Component Data - /data/{component-type}/{uuid}") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response submitReplayLatestEvent( + @Context final HttpServletRequest httpServletRequest, + @ApiParam( + value = "The replay request.", + required = true + ) final ReplayLastEventRequestEntity requestEntity) { + + // ensure the event id is specified + if (requestEntity == null || requestEntity.getComponentId() == null) { + throw new IllegalArgumentException("The id of the component must be specified."); + } + final String requestedNodes = requestEntity.getNodes(); + if (requestedNodes == null) { + throw new IllegalArgumentException("The nodes must be specified."); + } + if (!"ALL".equalsIgnoreCase(requestedNodes) && !"PRIMARY".equalsIgnoreCase(requestedNodes)) { + throw new IllegalArgumentException("The nodes must be either ALL or PRIMARY"); + } + + // replicate if cluster manager + if (isReplicateRequest()) { + // Replicate to either Primary Node or all nodes + if (requestedNodes.equalsIgnoreCase("PRIMARY")) { + final NodeIdentifier primaryNodeId = getPrimaryNodeId().orElseThrow(() -> new IllegalStateException("There is currently no Primary Node elected")); + return replicate(HttpMethod.POST, requestEntity, primaryNodeId.getId()); + } else { + return replicate(HttpMethod.POST, requestEntity); + } + } + + return withWriteLock( + serviceFacade, + requestEntity, + lookup -> { + final Authorizable provenance = lookup.getProvenance(); + provenance.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }, + () -> {}, // No verification step necessary - this can be done any time + entity -> { + final ReplayLastEventSnapshotDTO aggregateSnapshot = new ReplayLastEventSnapshotDTO(); + + // Submit provenance query + try { + final ProvenanceEventDTO provenanceEventDto = serviceFacade.submitReplayLastEvent(entity.getComponentId()); + + if (provenanceEventDto == null) { + aggregateSnapshot.setEventAvailable(false); + } else { + aggregateSnapshot.setEventAvailable(true); + aggregateSnapshot.setEventsReplayed(Collections.singleton(provenanceEventDto.getEventId())); + } + } catch (final AccessDeniedException ade) { + logger.error("Failed to replay latest Provenance Event", ade); + aggregateSnapshot.setFailureExplanation("Access Denied"); + } catch (final Exception e) { + logger.error("Failed to replay latest Provenance Event", e); + aggregateSnapshot.setFailureExplanation(e.getMessage()); + } + + final ReplayLastEventResponseEntity responseEntity = new ReplayLastEventResponseEntity(); + responseEntity.setComponentId(entity.getComponentId()); + responseEntity.setNodes(entity.getNodes()); + responseEntity.setAggregateSnapshot(aggregateSnapshot); + + return generateOkResponse(responseEntity).build(); + }); + } + + /** * Creates a new replay request for the content associated with the specified provenance event id. * @@ -393,4 +504,8 @@ public class ProvenanceEventResource extends ApplicationResource { this.serviceFacade = serviceFacade; } + public void setAuthorizer(Authorizer authorizer) { + this.authorizer = authorizer; + } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index a762d8a8a1..f04a9cf45e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -132,6 +132,7 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.SortedSet; import java.util.TimeZone; @@ -1359,6 +1360,39 @@ public class ControllerFacade implements Authorizable { } } + /** + * Submits for replay the latest provenance event that is cached for the component with the given ID + * @param componentId the ID of the component + * @return the ProvenanceEventDTO representing the event that was replayed, or null if the no event was available + * @throws AccessDeniedException if an event is available but the current user is not permitted to replay the event + */ + public ProvenanceEventDTO submitReplayLastEvent(String componentId) { + try { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + if (user == null) { + throw new WebApplicationException(new Throwable("Unable to access details for current user.")); + } + + // lookup the original event + final Optional optionalEvent = flowController.getProvenanceRepository().getLatestCachedEvent(componentId); + if (!optionalEvent.isPresent()) { + return null; + } + + // Authorize the replay + final ProvenanceEventRecord event = optionalEvent.get(); + authorizeReplay(event); + + // Replay the FlowFile + flowController.replayFlowFile(event, user); + + // convert the event record + return createProvenanceEventDto(event, false); + } catch (final IOException ioe) { + throw new NiFiCoreException("An error occurred while getting the specified event.", ioe); + } + } + /** * Authorizes access to replay a specified provenance event. Whether to check read data permission can be specified. The context this * method is invoked may have already verified these permissions. Using a flag here as it forces the caller to acknowledge this fact diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml index 7c4b65359f..839e6af554 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml @@ -560,6 +560,7 @@ + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js index d256a9c615..7cfbd2a1fa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js @@ -676,6 +676,96 @@ } }, + replayLastProvenanceEvent: function(selection, nodes) { + if (selection.size() === 1) { + var selectionData = selection.datum(); + + // submit replay event + var entity = { + 'componentId': selectionData.id, + 'nodes': nodes + }; + + $.ajax({ + type: 'POST', + url: config.urls.api + '/provenance-events/latest/replays', + data: JSON.stringify(entity), + dataType: 'json', + contentType: 'application/json' + }).fail(function (xhr, status, error) { + nfDialog.showOkDialog({ + headerText: 'Failed to Replay Event', + dialogContent: nfCommon.escapeHtml(xhr.responseText) + }); + }).done(function (response) { + if (nfCommon.isDefinedAndNotNull(response.aggregateSnapshot.failureExplanation)) { + nfDialog.showOkDialog({ + headerText: 'Replay Event Failure', + dialogContent: response.aggregateSnapshot.failureExplanation + }); + } else if (response.aggregateSnapshot.eventAvailable !== true) { + nfDialog.showOkDialog({ + headerText: 'No Event Available', + dialogContent: 'There was no recent event available to be replayed.' + }); + } else if (nfCommon.isDefinedAndNotNull(response.nodeSnapshots)) { + var replayedCount = 0; + var unavailableCount = 0; + + for (var i = 0; i < response.nodeSnapshots.length; i++) { + var nodeResponse = response.nodeSnapshots[i]; + if (nodeResponse.snapshot.eventAvailable) { + replayedCount++; + } else { + unavailableCount++; + } + } + + var messageText; + if (unavailableCount === 0) { + messageText = 'All nodes successfully replayed the latest event.'; + } else { + messageText = replayedCount + ' nodes successfully replayed the latest event but ' + unavailableCount + ' had no recent event avaialble to be replayed.'; + } + + nfDialog.showOkDialog({ + headerText: 'Events Replayed', + dialogContent: messageText + }); + } else { + nfDialog.showOkDialog({ + headerText: 'Event Replayed', + dialogContent: 'Successfully replayed the latest event.' + }); + } + + var componentConnections = nfConnection.getComponentConnections(selectionData.id); + for (var i=0; i < componentConnections.length; i++) { + var connection = componentConnections[i]; + nfConnection.reload(connection.id); + } + }); + } + }, + + /** + * Submits a request to replay the last provenance event on all nodes in the cluster. + * + * @argument {selection} selection The selection + */ + replayLastAllNodes: function (selection) { + this.replayLastProvenanceEvent(selection, 'ALL'); + }, + + /** + * Submits a request to replay the last provenance event on the primary node in the cluster. + * + * @argument {selection} selection The selection + */ + replayLastPrimaryNode: function (selection) { + this.replayLastProvenanceEvent(selection, 'PRIMARY'); + }, + /** * Starts the components in the specified selection. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js index 6673fbc062..cb09577fc9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js @@ -637,6 +637,20 @@ && !nfCanvasUtils.isRemoteProcessGroup(selection) && nfCommon.canAccessProvenance(); }; + /** + * Determines whether the current selection should provide ability to replay latest provenance event. + * + * @param {selection} selection + */ + var canReplayProvenance = function (selection) { + // ensure the correct number of components are selected + if (selection.size() !== 1) { + return false; + } + + return nfCanvasUtils.isProcessor(selection) && nfCommon.canAccessProvenance(); + } + /** * Determines whether the current selection is a remote process group. * @@ -831,6 +845,11 @@ {id: 'disable-all-controller-services-menu-item-noselection', condition: emptySelection, menuItem: {clazz: 'icon icon-enable-false', text: 'Disable all controller services', action: 'disableAllControllerServices'}}, {separator: true}, {id: 'data-provenance-menu-item', condition: canAccessProvenance, menuItem: {clazz: 'icon icon-provenance', imgStyle: 'context-menu-provenance', text: 'View data provenance', action: 'openProvenance'}}, + {id: 'data-provenance-replay-last-menu-item', condition: canReplayProvenance, groupMenuItem: {clazz: 'fa fa-repeat', text: 'Replay last event'}, menuItems: [ + {id: 'replay-last-all-nodes-menu-item', condition: canReplayProvenance, menuItem: {text: 'All nodes', action: 'replayLastAllNodes'}}, + {id: 'replay-last-primary-node-menu-item', condition: canReplayProvenance, menuItem: {text: 'Primary node', action: 'replayLastPrimaryNode'}} + ]}, + {separator: true}, {id: 'show-stats-menu-item', condition: supportsStats, menuItem: {clazz: 'fa fa-area-chart', text: 'View status history', action: 'showStats'}}, {id: 'view-state-menu-item', condition: isStatefulProcessor, menuItem: {clazz: 'fa fa-tasks', text: 'View state', action: 'viewState'}}, {id: 'list-queue-menu-item', condition: canListQueue, menuItem: {clazz: 'fa fa-list', text: 'List queue', action: 'listQueue'}}, diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 72d593dbb2..24f95f67f1 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -99,6 +99,7 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -2061,6 +2062,11 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { return result; } + @Override + public Optional getLatestCachedEvent(final String componentId) throws IOException { + return Optional.empty(); + } + /** * This is for testing only and not actually used other than in debugging * diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java index 07eab61c9b..08cfc3f017 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java @@ -57,6 +57,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; @@ -257,6 +258,11 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository { return eventIndex.submitQuery(query, createEventAuthorizer(user), user == null ? null : user.getIdentity()); } + @Override + public Optional getLatestCachedEvent(final String componentId) throws IOException { + return eventIndex.getLatestCachedEvent(componentId); + } + @Override public QuerySubmission retrieveQuerySubmission(final String queryIdentifier, final NiFiUser user) { return eventIndex.retrieveQuerySubmission(queryIdentifier, user); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java index 2f13742439..a8addb6a03 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java @@ -29,6 +29,7 @@ import org.apache.nifi.provenance.store.EventStore; import java.io.Closeable; import java.io.IOException; import java.util.Map; +import java.util.Optional; /** * An Event Index is responsible for indexing Provenance Events in such a way that the index can be quickly @@ -81,6 +82,15 @@ public interface EventIndex extends Closeable { */ QuerySubmission submitQuery(Query query, EventAuthorizer authorizer, String userId); + /** + * Retrieves the most recent Provenance Event that is cached for the given component that is also accessible by the given user + * @param componentId the ID of the component + * + * @return an Optional containing the event, or an empty optional if no events are available or none of the available events are accessible by the given user + * @throws IOException if unable to read from the repository + */ + Optional getLatestCachedEvent(String componentId) throws IOException; + /** * Asynchronously computes the lineage for the FlowFile that is identified by the Provenance Event with the given ID. * diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java index 1c09d5775e..36bfec85d1 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java @@ -17,11 +17,6 @@ package org.apache.nifi.provenance.index.lucene; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.SearchableFields; import org.apache.nifi.provenance.search.Query; @@ -29,6 +24,12 @@ import org.apache.nifi.provenance.search.SearchTerm; import org.apache.nifi.provenance.serialization.StorageSummary; import org.apache.nifi.util.RingBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + public class LatestEventsPerProcessorQuery implements CachedQuery { private static final String COMPONENT_ID_FIELD_NAME = SearchableFields.ComponentID.getSearchableFieldName(); private final ConcurrentMap> latestRecords = new ConcurrentHashMap<>(); @@ -40,6 +41,15 @@ public class LatestEventsPerProcessorQuery implements CachedQuery { ringBuffer.add(storageSummary.getEventId()); } + public List getLatestEventIds(final String componentId) { + final RingBuffer ringBuffer = latestRecords.get(componentId); + if (ringBuffer == null) { + return Collections.emptyList(); + } + + return ringBuffer.asList(); + } + @Override public Optional> evaluate(final Query query) { if (query.getMaxResults() > 1000) { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java index f8cf7092d3..d47ecde77c 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java @@ -107,6 +107,7 @@ public class LuceneEventIndex implements EventIndex { private final EventReporter eventReporter; private final List cachedQueries = new ArrayList<>(); + private LatestEventsPerProcessorQuery latestEventsPerProcessorQuery; // effectively final private ScheduledExecutorService maintenanceExecutor; // effectively final private ScheduledExecutorService cacheWarmerExecutor; @@ -163,7 +164,8 @@ public class LuceneEventIndex implements EventIndex { maintenanceExecutor.scheduleWithFixedDelay(this::purgeObsoleteQueries, 30, 30, TimeUnit.SECONDS); cachedQueries.add(new LatestEventsQuery()); - cachedQueries.add(new LatestEventsPerProcessorQuery()); + latestEventsPerProcessorQuery = new LatestEventsPerProcessorQuery(); + cachedQueries.add(latestEventsPerProcessorQuery); triggerReindexOfDefunctIndices(); triggerCacheWarming(); @@ -667,6 +669,24 @@ public class LuceneEventIndex implements EventIndex { return submission; } + @Override + public Optional getLatestCachedEvent(final String componentId) throws IOException { + final List eventIds = latestEventsPerProcessorQuery.getLatestEventIds(componentId); + if (eventIds.isEmpty()) { + logger.info("There are no recent Provenance Events cached for Component with ID {}", componentId); + return Optional.empty(); + } + + final Long latestEventId = eventIds.get(eventIds.size() - 1); + final Optional latestEvent = eventStore.getEvent(latestEventId); + if (latestEvent.isPresent()) { + logger.info("Returning {} as the most recent Provenance Events cached for Component with ID {}", latestEvent.get(), componentId); + } else { + logger.info("There are no recent Provenance Events cached for Component with ID {}", componentId); + } + + return latestEvent; + } @Override public ComputeLineageSubmission submitLineageComputation(final String flowFileUuid, final NiFiUser user, final EventAuthorizer eventAuthorizer) { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java index 3ac9232e4a..123efb2289 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java @@ -50,6 +50,7 @@ import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -490,6 +491,17 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { return result; } + @Override + public Optional getLatestCachedEvent(final String componentId) throws IOException { + final List matches = ringBuffer.getSelectedElements(event -> componentId.equals(event.getComponentId())); + + if (matches.isEmpty()) { + return Optional.empty(); + } + + return Optional.of(matches.get(matches.size() - 1)); + } + @Override public QuerySubmission retrieveQuerySubmission(final String queryIdentifier, final NiFiUser user) { final QuerySubmission submission = querySubmissionMap.get(queryIdentifier); diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessProvenanceRepository.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessProvenanceRepository.java index 4f8914c3aa..fcc2e797d5 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessProvenanceRepository.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessProvenanceRepository.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -152,6 +153,11 @@ public class StatelessProvenanceRepository implements ProvenanceRepository { throw new UnsupportedOperationException(); } + @Override + public Optional getLatestCachedEvent(final String componentId) throws IOException { + return Optional.empty(); + } + @Override public QuerySubmission retrieveQuerySubmission(final String queryIdentifier, final NiFiUser user) { throw new UnsupportedOperationException(); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java index 7276a53ace..c33e403875 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java @@ -1399,5 +1399,4 @@ public class NiFiClientUtil { return nifiClient.getReportingTasksClient().updateReportingTask(entity); } - } diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ClusteredReplayProvenanceIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ClusteredReplayProvenanceIT.java new file mode 100644 index 0000000000..5407830f3a --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ClusteredReplayProvenanceIT.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.provenance; + +import org.apache.nifi.tests.system.NiFiInstanceFactory; + +public class ClusteredReplayProvenanceIT extends ReplayProvenanceIT { + + @Override + public NiFiInstanceFactory getInstanceFactory() { + return createTwoNodeInstanceFactory(); + } +} diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ReplayProvenanceIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ReplayProvenanceIT.java new file mode 100644 index 0000000000..a163493d10 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ReplayProvenanceIT.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.provenance; + +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient.ReplayEventNodes; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ReplayProvenanceIT extends NiFiSystemIT { + + @ParameterizedTest + @EnumSource(ReplayEventNodes.class) + public void testReplayLastEvent(final ReplayEventNodes nodes) throws NiFiClientException, IOException, InterruptedException { + ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + ConnectionEntity connection = getClientUtil().createConnection(generate, terminate, "success"); + + // Run Generate once + getClientUtil().startProcessor(generate); + waitForQueueCount(connection.getId(), getNumberOfNodes()); + getClientUtil().stopProcessor(generate); + + // Run terminate once + getClientUtil().startProcessor(terminate); + waitForQueueCount(connection.getId(), 0); + getClientUtil().stopProcessor(terminate); + + // Replay last event for terminate and ensure that data is queued up. + final ReplayLastEventResponseEntity replayResponse = getNifiClient().getProvenanceClient().replayLastEvent(terminate.getId(), nodes); + assertNull(replayResponse.getAggregateSnapshot().getFailureExplanation()); + assertEquals(Boolean.TRUE, replayResponse.getAggregateSnapshot().getEventAvailable()); + final int expectedEventsPlayed = (nodes == ReplayEventNodes.PRIMARY) ? 1 : getNumberOfNodes(); + assertEquals(expectedEventsPlayed, replayResponse.getAggregateSnapshot().getEventsReplayed().size()); + + waitForQueueCount(connection.getId(), expectedEventsPlayed); + + // Attempt to replay event for generate - it should provide an error because this is a source processor whose event cannot be replayed + final ReplayLastEventResponseEntity generateReplayResponse = getNifiClient().getProvenanceClient().replayLastEvent(generate.getId(), nodes); + final String failureExplanation = generateReplayResponse.getAggregateSnapshot().getFailureExplanation(); + assertNotNull(failureExplanation); + + // The failure text is not provided if multiple nodes failed, as it can get too unwieldy to understand. + final String expectedFailureText = (nodes == ReplayEventNodes.ALL && getNumberOfNodes() > 1) ? "See logs for more details" : "Source FlowFile Queue"; + assertTrue(failureExplanation.contains(expectedFailureText), failureExplanation); + } + +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProvenanceClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProvenanceClient.java index 60774c9361..8f9c6808ef 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProvenanceClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProvenanceClient.java @@ -18,6 +18,7 @@ package org.apache.nifi.toolkit.cli.impl.client.nifi; import org.apache.nifi.web.api.entity.LineageEntity; import org.apache.nifi.web.api.entity.ProvenanceEntity; +import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity; import java.io.IOException; @@ -33,4 +34,11 @@ public interface ProvenanceClient { LineageEntity getLineageRequest(String lineageRequestId) throws NiFiClientException, IOException; LineageEntity deleteLineageRequest(String lineageRequestId) throws NiFiClientException, IOException; + + ReplayLastEventResponseEntity replayLastEvent(String processorId, ReplayEventNodes replayEventNodes) throws NiFiClientException, IOException; + + enum ReplayEventNodes { + PRIMARY, + ALL; + } } diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProvenanceClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProvenanceClient.java index ea83b34fac..830fa8213a 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProvenanceClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProvenanceClient.java @@ -22,14 +22,18 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig; import org.apache.nifi.web.api.entity.LineageEntity; import org.apache.nifi.web.api.entity.ProvenanceEntity; +import org.apache.nifi.web.api.entity.ReplayLastEventRequestEntity; +import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity; import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; import java.io.IOException; +import java.util.Objects; public class JerseyProvenanceClient extends AbstractJerseyClient implements ProvenanceClient { private final WebTarget provenanceTarget; + private final WebTarget provenanceEventsTarget; public JerseyProvenanceClient(final WebTarget baseTarget) { this(baseTarget, null); @@ -38,6 +42,7 @@ public class JerseyProvenanceClient extends AbstractJerseyClient implements Prov public JerseyProvenanceClient(final WebTarget baseTarget, final RequestConfig requestConfig) { super(requestConfig); this.provenanceTarget = baseTarget.path("/provenance"); + this.provenanceEventsTarget = baseTarget.path("/provenance-events"); } @Override @@ -116,4 +121,21 @@ public class JerseyProvenanceClient extends AbstractJerseyClient implements Prov }); } + + @Override + public ReplayLastEventResponseEntity replayLastEvent(final String processorId, final ReplayEventNodes nodes) throws NiFiClientException, IOException { + Objects.requireNonNull(processorId, "Processor ID required"); + Objects.requireNonNull(nodes, "Nodes must be specified"); + + final ReplayLastEventRequestEntity requestEntity = new ReplayLastEventRequestEntity(); + requestEntity.setComponentId(processorId); + requestEntity.setNodes(nodes.name()); + + return executeAction("Error replaying last event for Processor " + processorId, () -> { + final WebTarget target = provenanceEventsTarget.path("/latest/replays"); + return getRequestBuilder(target).post( + Entity.entity(requestEntity, MediaType.APPLICATION_JSON_TYPE), + ReplayLastEventResponseEntity.class); + }); + } }