mirror of https://github.com/apache/nifi.git
NIFI-13638: Limiting the maximum number of provenance events to return when fetching the latest events. (#9156)
This commit is contained in:
parent
8ba7d7805a
commit
38292762bd
|
@ -103,7 +103,7 @@ public class NoOpProvenanceRepository implements ProvenanceRepository {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId) {
|
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId, final int eventLimit) {
|
||||||
return List.of();
|
return List.of();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -257,8 +257,8 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId) throws IOException {
|
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId, final int eventLimit) throws IOException {
|
||||||
return eventIndex.getLatestCachedEvents(componentId);
|
return eventIndex.getLatestCachedEvents(componentId, eventLimit);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -85,11 +85,12 @@ public interface EventIndex extends Closeable {
|
||||||
/**
|
/**
|
||||||
* Retrieves the list of Provenance Events that are cached for the most recent invocation of the given component
|
* Retrieves the list of Provenance Events that are cached for the most recent invocation of the given component
|
||||||
* @param componentId the ID of the component
|
* @param componentId the ID of the component
|
||||||
|
* @param eventLimit the maximum number of events to return
|
||||||
*
|
*
|
||||||
* @return an Optional containing the event, or an empty optional if no events are available or none of the available events are accessible by the given user
|
* @return an Optional containing the event, or an empty optional if no events are available or none of the available events are accessible by the given user
|
||||||
* @throws IOException if unable to read from the repository
|
* @throws IOException if unable to read from the repository
|
||||||
*/
|
*/
|
||||||
List<ProvenanceEventRecord> getLatestCachedEvents(String componentId) throws IOException;
|
List<ProvenanceEventRecord> getLatestCachedEvents(String componentId, int eventLimit) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Asynchronously computes the lineage for the FlowFile that is identified by the Provenance Event with the given ID.
|
* Asynchronously computes the lineage for the FlowFile that is identified by the Provenance Event with the given ID.
|
||||||
|
|
|
@ -643,15 +643,17 @@ public class LuceneEventIndex implements EventIndex {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId) throws IOException {
|
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId, final int eventLimit) throws IOException {
|
||||||
final List<Long> eventIds = latestEventsPerProcessorQuery.getLatestEventIds(componentId);
|
final List<Long> eventIds = latestEventsPerProcessorQuery.getLatestEventIds(componentId);
|
||||||
if (eventIds.isEmpty()) {
|
if (eventIds.isEmpty()) {
|
||||||
logger.info("There are no recent Provenance Events cached for Component with ID {}", componentId);
|
logger.info("There are no recent Provenance Events cached for Component with ID {}", componentId);
|
||||||
return List.of();
|
return List.of();
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<ProvenanceEventRecord> latestEvents = new ArrayList<>(eventIds.size());
|
final List<Long> filtered = eventIds.subList(0, Math.min(eventIds.size(), eventLimit));
|
||||||
for (final Long eventId : eventIds) {
|
|
||||||
|
final List<ProvenanceEventRecord> latestEvents = new ArrayList<>(filtered.size());
|
||||||
|
for (final Long eventId : filtered) {
|
||||||
final Optional<ProvenanceEventRecord> latestEvent = eventStore.getEvent(eventId);
|
final Optional<ProvenanceEventRecord> latestEvent = eventStore.getEvent(eventId);
|
||||||
if (latestEvent.isPresent()) {
|
if (latestEvent.isPresent()) {
|
||||||
latestEvents.add(latestEvent.get());
|
latestEvents.add(latestEvent.get());
|
||||||
|
|
|
@ -491,7 +491,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId) {
|
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId, final int eventLimit) {
|
||||||
final List<ProvenanceEventRecord> matches = ringBuffer.getSelectedElements(
|
final List<ProvenanceEventRecord> matches = ringBuffer.getSelectedElements(
|
||||||
event -> componentId.equals(event.getComponentId()), 1);
|
event -> componentId.equals(event.getComponentId()), 1);
|
||||||
|
|
||||||
|
|
|
@ -96,10 +96,11 @@ public interface ProvenanceRepository extends ProvenanceEventRepository {
|
||||||
/**
|
/**
|
||||||
* Retrieves the Provenance Events that are cached for the most recent invocation of the given component.
|
* Retrieves the Provenance Events that are cached for the most recent invocation of the given component.
|
||||||
* @param componentId the ID of the component
|
* @param componentId the ID of the component
|
||||||
|
* @param eventLimit the maximum number of events to return
|
||||||
* @return the list of events that are cached for the given component
|
* @return the list of events that are cached for the given component
|
||||||
* @throws IOException if unable to read from the repository
|
* @throws IOException if unable to read from the repository
|
||||||
*/
|
*/
|
||||||
List<ProvenanceEventRecord> getLatestCachedEvents(String componentId) throws IOException;
|
List<ProvenanceEventRecord> getLatestCachedEvents(String componentId, int eventLimit) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param queryIdentifier of the query
|
* @param queryIdentifier of the query
|
||||||
|
|
|
@ -91,7 +91,7 @@ public class MockProvenanceRepository implements ProvenanceRepository {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId) {
|
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId, final int eventLimit) {
|
||||||
return List.of();
|
return List.of();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -326,9 +326,10 @@ public interface NiFiServiceFacade {
|
||||||
/**
|
/**
|
||||||
* Gets the latest provenance events for the specified component.
|
* Gets the latest provenance events for the specified component.
|
||||||
* @param componentId the ID of the components to retrieve the latest events for
|
* @param componentId the ID of the components to retrieve the latest events for
|
||||||
|
* @param eventLimit the maximum number of events to return
|
||||||
* @return the latest provenance events
|
* @return the latest provenance events
|
||||||
*/
|
*/
|
||||||
LatestProvenanceEventsEntity getLatestProvenanceEvents(String componentId);
|
LatestProvenanceEventsEntity getLatestProvenanceEvents(String componentId, int eventLimit);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the configuration for this controller.
|
* Gets the configuration for this controller.
|
||||||
|
|
|
@ -3663,8 +3663,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LatestProvenanceEventsEntity getLatestProvenanceEvents(final String componentId) {
|
public LatestProvenanceEventsEntity getLatestProvenanceEvents(final String componentId, final int eventLimit) {
|
||||||
final LatestProvenanceEventsDTO dto = controllerFacade.getLatestProvenanceEvents(componentId);
|
final LatestProvenanceEventsDTO dto = controllerFacade.getLatestProvenanceEvents(componentId, eventLimit);
|
||||||
|
|
||||||
final LatestProvenanceEventsEntity entity = new LatestProvenanceEventsEntity();
|
final LatestProvenanceEventsEntity entity = new LatestProvenanceEventsEntity();
|
||||||
entity.setLatestProvenanceEvents(dto);
|
entity.setLatestProvenanceEvents(dto);
|
||||||
|
|
|
@ -26,6 +26,7 @@ import io.swagger.v3.oas.annotations.security.SecurityRequirement;
|
||||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||||
import jakarta.servlet.http.HttpServletRequest;
|
import jakarta.servlet.http.HttpServletRequest;
|
||||||
import jakarta.ws.rs.Consumes;
|
import jakarta.ws.rs.Consumes;
|
||||||
|
import jakarta.ws.rs.DefaultValue;
|
||||||
import jakarta.ws.rs.GET;
|
import jakarta.ws.rs.GET;
|
||||||
import jakarta.ws.rs.HttpMethod;
|
import jakarta.ws.rs.HttpMethod;
|
||||||
import jakarta.ws.rs.POST;
|
import jakarta.ws.rs.POST;
|
||||||
|
@ -526,14 +527,19 @@ public class ProvenanceEventResource extends ApplicationResource {
|
||||||
description = "The ID of the component to retrieve the latest Provenance Events for.",
|
description = "The ID of the component to retrieve the latest Provenance Events for.",
|
||||||
required = true
|
required = true
|
||||||
)
|
)
|
||||||
@PathParam("componentId") final String componentId
|
@PathParam("componentId") final String componentId,
|
||||||
|
@Parameter(
|
||||||
|
description = "The number of events to limit the response to. Defaults to 10."
|
||||||
|
)
|
||||||
|
@DefaultValue("10")
|
||||||
|
@QueryParam("limit") int limit
|
||||||
) {
|
) {
|
||||||
if (isReplicateRequest()) {
|
if (isReplicateRequest()) {
|
||||||
return replicate(HttpMethod.GET);
|
return replicate(HttpMethod.GET);
|
||||||
}
|
}
|
||||||
|
|
||||||
// get the latest provenance events
|
// get the latest provenance events
|
||||||
final LatestProvenanceEventsEntity entity = serviceFacade.getLatestProvenanceEvents(componentId);
|
final LatestProvenanceEventsEntity entity = serviceFacade.getLatestProvenanceEvents(componentId, limit);
|
||||||
|
|
||||||
// generate the response
|
// generate the response
|
||||||
return generateOkResponse(entity).build();
|
return generateOkResponse(entity).build();
|
||||||
|
|
|
@ -155,6 +155,8 @@ public class ControllerFacade implements Authorizable {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(ControllerFacade.class);
|
private static final Logger logger = LoggerFactory.getLogger(ControllerFacade.class);
|
||||||
|
|
||||||
|
private static final int MAX_REPLAY_EVENT_COUNT = 10;
|
||||||
|
|
||||||
// nifi components
|
// nifi components
|
||||||
private FlowController flowController;
|
private FlowController flowController;
|
||||||
private FlowService flowService;
|
private FlowService flowService;
|
||||||
|
@ -1415,7 +1417,7 @@ public class ControllerFacade implements Authorizable {
|
||||||
}
|
}
|
||||||
|
|
||||||
// lookup the original event
|
// lookup the original event
|
||||||
final List<ProvenanceEventRecord> latestEvents = flowController.getProvenanceRepository().getLatestCachedEvents(componentId);
|
final List<ProvenanceEventRecord> latestEvents = flowController.getProvenanceRepository().getLatestCachedEvents(componentId, MAX_REPLAY_EVENT_COUNT);
|
||||||
if (latestEvents.isEmpty()) {
|
if (latestEvents.isEmpty()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -1537,7 +1539,7 @@ public class ControllerFacade implements Authorizable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public LatestProvenanceEventsDTO getLatestProvenanceEvents(final String componentId) {
|
public LatestProvenanceEventsDTO getLatestProvenanceEvents(final String componentId, final int eventLimit) {
|
||||||
final Authorizable authorizable = flowController.getProvenanceAuthorizableFactory().createProvenanceDataAuthorizable(componentId);
|
final Authorizable authorizable = flowController.getProvenanceAuthorizableFactory().createProvenanceDataAuthorizable(componentId);
|
||||||
final Authorizer authorizer = flowController.getAuthorizer();
|
final Authorizer authorizer = flowController.getAuthorizer();
|
||||||
if (!authorizable.isAuthorized(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser())) {
|
if (!authorizable.isAuthorized(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser())) {
|
||||||
|
@ -1545,7 +1547,7 @@ public class ControllerFacade implements Authorizable {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final List<ProvenanceEventRecord> events = flowController.getProvenanceRepository().getLatestCachedEvents(componentId);
|
final List<ProvenanceEventRecord> events = flowController.getProvenanceRepository().getLatestCachedEvents(componentId, eventLimit);
|
||||||
final List<ProvenanceEventDTO> eventDtos = new ArrayList<>();
|
final List<ProvenanceEventDTO> eventDtos = new ArrayList<>();
|
||||||
for (final ProvenanceEventRecord event : events) {
|
for (final ProvenanceEventRecord event : events) {
|
||||||
eventDtos.add(createProvenanceEventDto(event, false));
|
eventDtos.add(createProvenanceEventDto(event, false));
|
||||||
|
|
|
@ -153,7 +153,7 @@ public class StatelessProvenanceRepository implements ProvenanceRepository {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId) {
|
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId, final int eventLimit) {
|
||||||
return List.of();
|
return List.of();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue