NIFI-4045:

- Addressing issues causing the eventId to not be relayed when submitting a lineage request under certain conditions.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #1903.
This commit is contained in:
Matt Gilman 2017-06-08 13:38:19 -04:00 committed by Pierre Villard
parent 1e7eceee84
commit c99c036c20
6 changed files with 41 additions and 29 deletions

View File

@ -60,10 +60,19 @@ public interface ComputeLineageSubmission {
LineageComputationType getLineageComputationType(); LineageComputationType getLineageComputationType();
/** /**
* @return If the Lineage Computation Type of this submission is * If the Lineage Computation Type of this submission is
* {@link LineageComputationType.EXPAND_CHILDREN} or * {@link LineageComputationType.EXPAND_CHILDREN} or
* {@link LineageComputationType.EXPAND_PARENTS}, indicates the ID event * {@link LineageComputationType.EXPAND_PARENTS}, indicates
* that is to be expanded; otherwise, returns <code>null</code> * the event ID that is to be expanded.
*
* If the Lineage Computation Type of this submission is
* {@link LineageComputationType.FLOWFILE_LINEAGE} and the
* original submission was based off an event id, indicates
* that event ID.
*
* Otherwise returns <code>null</code>.
*
* @return the event id if applicable
*/ */
Long getExpandedEventId(); Long getExpandedEventId();

View File

@ -47,7 +47,8 @@ public class LineageRequestDTO {
* @return event id that was used to generate this lineage * @return event id that was used to generate this lineage
*/ */
@ApiModelProperty( @ApiModelProperty(
value = "" value = "The event id that was used to generate this lineage, if applicable. The event id is allowed for any type of lineageRequestType. If the lineageRequestType is FLOWFILE and the "
+ "flowfile uuid is also included in the request, the event id will be ignored."
) )
public Long getEventId() { public Long getEventId() {
return eventId; return eventId;
@ -90,7 +91,7 @@ public class LineageRequestDTO {
* @return uuid that was used to generate this lineage * @return uuid that was used to generate this lineage
*/ */
@ApiModelProperty( @ApiModelProperty(
value = "The uuid that was used to generate the lineage." value = "The flowfile uuid that was used to generate the lineage. The flowfile uuid is only allowed when the lineageRequestType is FLOWFILE and will take precedence over event id."
) )
public String getUuid() { public String getUuid() {
return uuid; return uuid;

View File

@ -485,7 +485,7 @@ public class ProvenanceResource extends ApplicationResource {
} }
break; break;
case FLOWFILE: case FLOWFILE:
// ensure the uuid has been specified // ensure the uuid or event id has been specified
if (requestDto.getUuid() == null && requestDto.getEventId() == null) { if (requestDto.getUuid() == null && requestDto.getEventId() == null) {
throw new IllegalArgumentException("The flowfile uuid or event id must be specified when the event type is FLOWFILE."); throw new IllegalArgumentException("The flowfile uuid or event id must be specified when the event type is FLOWFILE.");
} }

View File

@ -2432,6 +2432,7 @@ public final class DtoFactory {
if (uuids.size() == 1) { if (uuids.size() == 1) {
requestDto.setUuid(uuids.iterator().next()); requestDto.setUuid(uuids.iterator().next());
} }
requestDto.setEventId(computeLineageSubmission.getExpandedEventId());
requestDto.setLineageRequestType(LineageRequestType.FLOWFILE); requestDto.setLineageRequestType(LineageRequestType.FLOWFILE);
break; break;
} }

View File

@ -1105,12 +1105,12 @@ public class ControllerFacade implements Authorizable {
final ProvenanceRepository provenanceRepository = flowController.getProvenanceRepository(); final ProvenanceRepository provenanceRepository = flowController.getProvenanceRepository();
final ComputeLineageSubmission result; final ComputeLineageSubmission result;
// submit the event
if (LineageRequestType.FLOWFILE.equals(requestDto.getLineageRequestType())) { if (LineageRequestType.FLOWFILE.equals(requestDto.getLineageRequestType())) {
// submit uuid if (requestDto.getUuid() != null) {
if (requestDto.getEventId() == null) { // submit uuid if it is specified
result = provenanceRepository.submitLineageComputation(requestDto.getUuid(), NiFiUserUtils.getNiFiUser()); result = provenanceRepository.submitLineageComputation(requestDto.getUuid(), NiFiUserUtils.getNiFiUser());
} else { } else {
// submit the event if the flowfile uuid needs to be looked up
result = provenanceRepository.submitLineageComputation(requestDto.getEventId(), NiFiUserUtils.getNiFiUser()); result = provenanceRepository.submitLineageComputation(requestDto.getEventId(), NiFiUserUtils.getNiFiUser());
} }
} else { } else {

View File

@ -16,25 +16,6 @@
*/ */
package org.apache.nifi.provenance; package org.apache.nifi.provenance;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.nifi.authorization.AccessDeniedException; import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.AuthorizationResult; import org.apache.nifi.authorization.AuthorizationResult;
import org.apache.nifi.authorization.AuthorizationResult.Result; import org.apache.nifi.authorization.AuthorizationResult.Result;
@ -61,6 +42,26 @@ import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
import org.apache.nifi.util.RingBuffer.IterationDirection; import org.apache.nifi.util.RingBuffer.IterationDirection;
import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.ResourceNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
public class VolatileProvenanceRepository implements ProvenanceRepository { public class VolatileProvenanceRepository implements ProvenanceRepository {
// properties // properties
@ -502,7 +503,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
return result; return result;
} }
return submitLineageComputation(event.getFlowFileUuid(), user); return submitLineageComputation(Collections.singleton(event.getFlowFileUuid()), user, LineageComputationType.FLOWFILE_LINEAGE, eventId);
} }
@Override @Override