mirror of https://github.com/apache/nifi.git
NIFI-2260:
- Addressing issue searching provenance on a specific node. - Fixing issues viewing content and replaying events. This closes #658.
This commit is contained in:
parent
53326c7f9b
commit
5cd5a4ce78
|
@ -17,12 +17,11 @@
|
||||||
package org.apache.nifi.web.api.dto.provenance;
|
package org.apache.nifi.web.api.dto.provenance;
|
||||||
|
|
||||||
import com.wordnik.swagger.annotations.ApiModelProperty;
|
import com.wordnik.swagger.annotations.ApiModelProperty;
|
||||||
import java.util.Date;
|
import org.apache.nifi.web.api.dto.util.TimestampAdapter;
|
||||||
|
|
||||||
import javax.xml.bind.annotation.XmlType;
|
import javax.xml.bind.annotation.XmlType;
|
||||||
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
|
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
|
||||||
|
import java.util.Date;
|
||||||
import org.apache.nifi.web.api.dto.util.TimestampAdapter;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A provenance submission. Incorporates the request, its current status, and the results.
|
* A provenance submission. Incorporates the request, its current status, and the results.
|
||||||
|
@ -32,7 +31,6 @@ public class ProvenanceDTO {
|
||||||
|
|
||||||
private String id;
|
private String id;
|
||||||
private String uri;
|
private String uri;
|
||||||
private String clusterNodeId;
|
|
||||||
|
|
||||||
private Date submissionTime;
|
private Date submissionTime;
|
||||||
private Date expiration;
|
private Date expiration;
|
||||||
|
@ -71,20 +69,6 @@ public class ProvenanceDTO {
|
||||||
this.uri = uri;
|
this.uri = uri;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @return id of the node in the cluster where this provenance originated
|
|
||||||
*/
|
|
||||||
@ApiModelProperty(
|
|
||||||
value = "The id of the node in the cluster where this provenance originated."
|
|
||||||
)
|
|
||||||
public String getClusterNodeId() {
|
|
||||||
return clusterNodeId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setClusterNodeId(String clusterNodeId) {
|
|
||||||
this.clusterNodeId = clusterNodeId;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return time the query was submitted
|
* @return time the query was submitted
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -17,12 +17,12 @@
|
||||||
package org.apache.nifi.web.api.dto.provenance;
|
package org.apache.nifi.web.api.dto.provenance;
|
||||||
|
|
||||||
import com.wordnik.swagger.annotations.ApiModelProperty;
|
import com.wordnik.swagger.annotations.ApiModelProperty;
|
||||||
import java.util.Date;
|
import org.apache.nifi.web.api.dto.util.DateTimeAdapter;
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import javax.xml.bind.annotation.XmlType;
|
import javax.xml.bind.annotation.XmlType;
|
||||||
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
|
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
|
||||||
import org.apache.nifi.web.api.dto.util.DateTimeAdapter;
|
import java.util.Date;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A request for provenance.
|
* A request for provenance.
|
||||||
|
@ -31,6 +31,7 @@ import org.apache.nifi.web.api.dto.util.DateTimeAdapter;
|
||||||
public class ProvenanceRequestDTO {
|
public class ProvenanceRequestDTO {
|
||||||
|
|
||||||
private Map<String, String> searchTerms;
|
private Map<String, String> searchTerms;
|
||||||
|
private String clusterNodeId;
|
||||||
private Date startDate;
|
private Date startDate;
|
||||||
private Date endDate;
|
private Date endDate;
|
||||||
private String minimumFileSize;
|
private String minimumFileSize;
|
||||||
|
@ -122,4 +123,18 @@ public class ProvenanceRequestDTO {
|
||||||
public void setMaxResults(Integer maxResults) {
|
public void setMaxResults(Integer maxResults) {
|
||||||
this.maxResults = maxResults;
|
this.maxResults = maxResults;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return id of the node in the cluster where this provenance originated
|
||||||
|
*/
|
||||||
|
@ApiModelProperty(
|
||||||
|
value = "The id of the node in the cluster where this provenance originated."
|
||||||
|
)
|
||||||
|
public String getClusterNodeId() {
|
||||||
|
return clusterNodeId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setClusterNodeId(String clusterNodeId) {
|
||||||
|
this.clusterNodeId = clusterNodeId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,37 @@
|
||||||
|
|
||||||
package org.apache.nifi.cluster.coordination.http.replication;
|
package org.apache.nifi.cluster.coordination.http.replication;
|
||||||
|
|
||||||
|
import com.sun.jersey.api.client.Client;
|
||||||
|
import com.sun.jersey.api.client.ClientResponse;
|
||||||
|
import com.sun.jersey.api.client.WebResource;
|
||||||
|
import com.sun.jersey.api.client.config.ClientConfig;
|
||||||
|
import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
|
||||||
|
import com.sun.jersey.core.util.MultivaluedMapImpl;
|
||||||
|
import org.apache.nifi.authorization.user.NiFiUser;
|
||||||
|
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
||||||
|
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
||||||
|
import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
|
||||||
|
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
|
||||||
|
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
|
||||||
|
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
|
||||||
|
import org.apache.nifi.cluster.manager.NodeResponse;
|
||||||
|
import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
|
||||||
|
import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
|
||||||
|
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
|
||||||
|
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
|
||||||
|
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
|
||||||
|
import org.apache.nifi.cluster.manager.exception.UriConstructionException;
|
||||||
|
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||||
|
import org.apache.nifi.events.EventReporter;
|
||||||
|
import org.apache.nifi.reporting.Severity;
|
||||||
|
import org.apache.nifi.util.FormatUtils;
|
||||||
|
import org.apache.nifi.web.security.ProxiedEntitiesUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.ws.rs.HttpMethod;
|
||||||
|
import javax.ws.rs.core.MediaType;
|
||||||
|
import javax.ws.rs.core.MultivaluedMap;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -42,39 +73,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import javax.ws.rs.HttpMethod;
|
|
||||||
import javax.ws.rs.core.MediaType;
|
|
||||||
import javax.ws.rs.core.MultivaluedMap;
|
|
||||||
|
|
||||||
import org.apache.nifi.authorization.user.NiFiUser;
|
|
||||||
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
|
||||||
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
|
||||||
import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
|
|
||||||
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
|
|
||||||
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
|
|
||||||
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
|
|
||||||
import org.apache.nifi.cluster.manager.NodeResponse;
|
|
||||||
import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
|
|
||||||
import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
|
|
||||||
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
|
|
||||||
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
|
|
||||||
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
|
|
||||||
import org.apache.nifi.cluster.manager.exception.UriConstructionException;
|
|
||||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
|
||||||
import org.apache.nifi.events.EventReporter;
|
|
||||||
import org.apache.nifi.reporting.Severity;
|
|
||||||
import org.apache.nifi.util.FormatUtils;
|
|
||||||
import org.apache.nifi.web.security.ProxiedEntitiesUtils;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.sun.jersey.api.client.Client;
|
|
||||||
import com.sun.jersey.api.client.ClientResponse;
|
|
||||||
import com.sun.jersey.api.client.WebResource;
|
|
||||||
import com.sun.jersey.api.client.config.ClientConfig;
|
|
||||||
import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
|
|
||||||
import com.sun.jersey.core.util.MultivaluedMapImpl;
|
|
||||||
|
|
||||||
public class ThreadPoolRequestReplicator implements RequestReplicator {
|
public class ThreadPoolRequestReplicator implements RequestReplicator {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
|
private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
|
||||||
|
@ -631,12 +629,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
||||||
|
|
||||||
|
|
||||||
private URI createURI(final URI exampleUri, final NodeIdentifier nodeId) {
|
private URI createURI(final URI exampleUri, final NodeIdentifier nodeId) {
|
||||||
return createURI(exampleUri.getScheme(), nodeId.getApiAddress(), nodeId.getApiPort(), exampleUri.getPath());
|
return createURI(exampleUri.getScheme(), nodeId.getApiAddress(), nodeId.getApiPort(), exampleUri.getPath(), exampleUri.getQuery());
|
||||||
}
|
}
|
||||||
|
|
||||||
private URI createURI(final String scheme, final String nodeApiAddress, final int nodeApiPort, final String path) {
|
private URI createURI(final String scheme, final String nodeApiAddress, final int nodeApiPort, final String path, final String query) {
|
||||||
try {
|
try {
|
||||||
return new URI(scheme, null, nodeApiAddress, nodeApiPort, path, null, null);
|
return new URI(scheme, null, nodeApiAddress, nodeApiPort, path, query, null);
|
||||||
} catch (final URISyntaxException e) {
|
} catch (final URISyntaxException e) {
|
||||||
throw new UriConstructionException(e);
|
throw new UriConstructionException(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,7 @@ public class StandardNiFiContentAccess implements ContentAccess {
|
||||||
.compile("/flowfile-queues/([a-f0-9\\-]{36})/flowfiles/([a-f0-9\\-]{36})/content.*");
|
.compile("/flowfile-queues/([a-f0-9\\-]{36})/flowfiles/([a-f0-9\\-]{36})/content.*");
|
||||||
|
|
||||||
private static final Pattern PROVENANCE_CONTENT_URI_PATTERN = Pattern
|
private static final Pattern PROVENANCE_CONTENT_URI_PATTERN = Pattern
|
||||||
.compile("/provenance/events/([0-9]+)/content/((?:input)|(?:output)).*");
|
.compile("/provenance-events/([0-9]+)/content/((?:input)|(?:output)).*");
|
||||||
|
|
||||||
private NiFiProperties properties;
|
private NiFiProperties properties;
|
||||||
private NiFiServiceFacade serviceFacade;
|
private NiFiServiceFacade serviceFacade;
|
||||||
|
|
|
@ -49,7 +49,6 @@ import org.apache.nifi.web.api.dto.RevisionDTO;
|
||||||
import org.apache.nifi.web.api.dto.SnippetDTO;
|
import org.apache.nifi.web.api.dto.SnippetDTO;
|
||||||
import org.apache.nifi.web.api.entity.ComponentEntity;
|
import org.apache.nifi.web.api.entity.ComponentEntity;
|
||||||
import org.apache.nifi.web.api.entity.TransactionResultEntity;
|
import org.apache.nifi.web.api.entity.TransactionResultEntity;
|
||||||
import org.apache.nifi.web.api.request.ClientIdParameter;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -291,16 +290,6 @@ public abstract class ApplicationResource {
|
||||||
return entity;
|
return entity;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected MultivaluedMap<String, String> getRequestParameters(final boolean forceClientId) {
|
|
||||||
final MultivaluedMap<String, String> params = getRequestParameters();
|
|
||||||
if (forceClientId) {
|
|
||||||
if (StringUtils.isBlank(params.getFirst(CLIENT_ID))) {
|
|
||||||
params.putSingle(CLIENT_ID, new ClientIdParameter().getClientId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return params;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Map<String, String> getHeaders() {
|
protected Map<String, String> getHeaders() {
|
||||||
return getHeaders(new HashMap<String, String>());
|
return getHeaders(new HashMap<String, String>());
|
||||||
}
|
}
|
||||||
|
@ -519,7 +508,7 @@ public abstract class ApplicationResource {
|
||||||
* @throws UnknownNodeException if the nodeUuid given does not map to any node in the cluster
|
* @throws UnknownNodeException if the nodeUuid given does not map to any node in the cluster
|
||||||
*/
|
*/
|
||||||
protected Response replicate(final String method, final String nodeUuid) {
|
protected Response replicate(final String method, final String nodeUuid) {
|
||||||
return replicate(method, getRequestParameters(true), nodeUuid);
|
return replicate(method, getRequestParameters(), nodeUuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -595,11 +584,11 @@ public abstract class ApplicationResource {
|
||||||
// to the cluster nodes themselves.
|
// to the cluster nodes themselves.
|
||||||
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||||
final Set<NodeIdentifier> nodeIds = Collections.singleton(targetNode);
|
final Set<NodeIdentifier> nodeIds = Collections.singleton(targetNode);
|
||||||
return getRequestReplicator().replicate(nodeIds, method, getAbsolutePath(), getRequestParameters(true), getHeaders(), true).awaitMergedResponse().getResponse();
|
return getRequestReplicator().replicate(nodeIds, method, getAbsolutePath(), getRequestParameters(), getHeaders(), true).awaitMergedResponse().getResponse();
|
||||||
} else {
|
} else {
|
||||||
final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
|
final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
|
||||||
final Map<String, String> headers = getHeaders(Collections.singletonMap(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, targetNode.getId()));
|
final Map<String, String> headers = getHeaders(Collections.singletonMap(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, targetNode.getId()));
|
||||||
return getRequestReplicator().replicate(coordinatorNode, method, getAbsolutePath(), getRequestParameters(true), headers, false).awaitMergedResponse().getResponse();
|
return getRequestReplicator().replicate(coordinatorNode, method, getAbsolutePath(), getRequestParameters(), headers, false).awaitMergedResponse().getResponse();
|
||||||
}
|
}
|
||||||
} catch (final InterruptedException ie) {
|
} catch (final InterruptedException ie) {
|
||||||
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build();
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build();
|
||||||
|
@ -614,7 +603,7 @@ public abstract class ApplicationResource {
|
||||||
* @return the response from the request
|
* @return the response from the request
|
||||||
*/
|
*/
|
||||||
protected Response replicate(final String method) {
|
protected Response replicate(final String method) {
|
||||||
return replicate(method, getRequestParameters(true));
|
return replicate(method, getRequestParameters());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -626,7 +615,7 @@ public abstract class ApplicationResource {
|
||||||
* @throws InterruptedException if interrupted while replicating the request
|
* @throws InterruptedException if interrupted while replicating the request
|
||||||
*/
|
*/
|
||||||
protected NodeResponse replicateNodeResponse(final String method) throws InterruptedException {
|
protected NodeResponse replicateNodeResponse(final String method) throws InterruptedException {
|
||||||
return replicateNodeResponse(method, getRequestParameters(true), (Map<String, String>) null);
|
return replicateNodeResponse(method, getRequestParameters(), (Map<String, String>) null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -16,25 +16,13 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.web.api;
|
package org.apache.nifi.web.api;
|
||||||
|
|
||||||
import java.util.Collections;
|
import com.sun.jersey.api.core.ResourceContext;
|
||||||
import java.util.HashMap;
|
import com.wordnik.swagger.annotations.Api;
|
||||||
import java.util.Map;
|
import com.wordnik.swagger.annotations.ApiOperation;
|
||||||
import java.util.Set;
|
import com.wordnik.swagger.annotations.ApiParam;
|
||||||
|
import com.wordnik.swagger.annotations.ApiResponse;
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import com.wordnik.swagger.annotations.ApiResponses;
|
||||||
import javax.ws.rs.Consumes;
|
import com.wordnik.swagger.annotations.Authorization;
|
||||||
import javax.ws.rs.DefaultValue;
|
|
||||||
import javax.ws.rs.GET;
|
|
||||||
import javax.ws.rs.HttpMethod;
|
|
||||||
import javax.ws.rs.PUT;
|
|
||||||
import javax.ws.rs.Path;
|
|
||||||
import javax.ws.rs.PathParam;
|
|
||||||
import javax.ws.rs.Produces;
|
|
||||||
import javax.ws.rs.QueryParam;
|
|
||||||
import javax.ws.rs.core.Context;
|
|
||||||
import javax.ws.rs.core.MediaType;
|
|
||||||
import javax.ws.rs.core.Response;
|
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.authorization.AccessDeniedException;
|
import org.apache.nifi.authorization.AccessDeniedException;
|
||||||
import org.apache.nifi.authorization.AuthorizationRequest;
|
import org.apache.nifi.authorization.AuthorizationRequest;
|
||||||
|
@ -56,13 +44,23 @@ import org.apache.nifi.web.api.entity.CounterEntity;
|
||||||
import org.apache.nifi.web.api.entity.CountersEntity;
|
import org.apache.nifi.web.api.entity.CountersEntity;
|
||||||
import org.apache.nifi.web.api.entity.Entity;
|
import org.apache.nifi.web.api.entity.Entity;
|
||||||
|
|
||||||
import com.sun.jersey.api.core.ResourceContext;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
import com.wordnik.swagger.annotations.Api;
|
import javax.ws.rs.Consumes;
|
||||||
import com.wordnik.swagger.annotations.ApiOperation;
|
import javax.ws.rs.DefaultValue;
|
||||||
import com.wordnik.swagger.annotations.ApiParam;
|
import javax.ws.rs.GET;
|
||||||
import com.wordnik.swagger.annotations.ApiResponse;
|
import javax.ws.rs.HttpMethod;
|
||||||
import com.wordnik.swagger.annotations.ApiResponses;
|
import javax.ws.rs.PUT;
|
||||||
import com.wordnik.swagger.annotations.Authorization;
|
import javax.ws.rs.Path;
|
||||||
|
import javax.ws.rs.PathParam;
|
||||||
|
import javax.ws.rs.Produces;
|
||||||
|
import javax.ws.rs.QueryParam;
|
||||||
|
import javax.ws.rs.core.Context;
|
||||||
|
import javax.ws.rs.core.MediaType;
|
||||||
|
import javax.ws.rs.core.Response;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -167,10 +165,10 @@ public class CountersResource extends ApplicationResource {
|
||||||
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
|
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
|
||||||
// to the cluster nodes themselves.
|
// to the cluster nodes themselves.
|
||||||
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||||
nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
|
nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse();
|
||||||
} else {
|
} else {
|
||||||
final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
|
final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
|
||||||
nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), false).awaitMergedResponse();
|
nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false).awaitMergedResponse();
|
||||||
}
|
}
|
||||||
|
|
||||||
final CountersEntity entity = (CountersEntity) nodeResponse.getUpdatedEntity();
|
final CountersEntity entity = (CountersEntity) nodeResponse.getUpdatedEntity();
|
||||||
|
|
|
@ -305,6 +305,7 @@ public class ProvenanceEventResource extends ApplicationResource {
|
||||||
|
|
||||||
// get the provenance event
|
// get the provenance event
|
||||||
final ProvenanceEventDTO event = serviceFacade.getProvenanceEvent(id.getLong());
|
final ProvenanceEventDTO event = serviceFacade.getProvenanceEvent(id.getLong());
|
||||||
|
event.setClusterNodeId(clusterNodeId);
|
||||||
|
|
||||||
// create a response entity
|
// create a response entity
|
||||||
final ProvenanceEventEntity entity = new ProvenanceEventEntity();
|
final ProvenanceEventEntity entity = new ProvenanceEventEntity();
|
||||||
|
|
|
@ -220,11 +220,11 @@ public class ProvenanceResource extends ApplicationResource {
|
||||||
headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
|
headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
|
||||||
|
|
||||||
// determine where this request should be sent
|
// determine where this request should be sent
|
||||||
if (provenanceDto.getClusterNodeId() == null) {
|
if (provenanceDto.getRequest() == null || provenanceDto.getRequest().getClusterNodeId() == null) {
|
||||||
// replicate to all nodes
|
// replicate to all nodes
|
||||||
return replicate(HttpMethod.POST, provenanceEntity, headersToOverride);
|
return replicate(HttpMethod.POST, provenanceEntity, headersToOverride);
|
||||||
} else {
|
} else {
|
||||||
return replicate(HttpMethod.POST, provenanceEntity, provenanceDto.getClusterNodeId(), headersToOverride);
|
return replicate(HttpMethod.POST, provenanceEntity, provenanceDto.getRequest().getClusterNodeId(), headersToOverride);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -242,9 +242,13 @@ public class ProvenanceResource extends ApplicationResource {
|
||||||
|
|
||||||
// submit the provenance request
|
// submit the provenance request
|
||||||
final ProvenanceDTO dto = serviceFacade.submitProvenance(provenanceDto);
|
final ProvenanceDTO dto = serviceFacade.submitProvenance(provenanceDto);
|
||||||
dto.setClusterNodeId(provenanceDto.getClusterNodeId());
|
|
||||||
populateRemainingProvenanceContent(dto);
|
populateRemainingProvenanceContent(dto);
|
||||||
|
|
||||||
|
// set the cluster id if necessary
|
||||||
|
if (provenanceDto.getRequest() != null && provenanceDto.getRequest().getClusterNodeId() != null) {
|
||||||
|
dto.getRequest().setClusterNodeId(provenanceDto.getRequest().getClusterNodeId());
|
||||||
|
}
|
||||||
|
|
||||||
// create the response entity
|
// create the response entity
|
||||||
final ProvenanceEntity entity = new ProvenanceEntity();
|
final ProvenanceEntity entity = new ProvenanceEntity();
|
||||||
entity.setProvenance(dto);
|
entity.setProvenance(dto);
|
||||||
|
@ -308,7 +312,7 @@ public class ProvenanceResource extends ApplicationResource {
|
||||||
|
|
||||||
// get the provenance
|
// get the provenance
|
||||||
final ProvenanceDTO dto = serviceFacade.getProvenance(id);
|
final ProvenanceDTO dto = serviceFacade.getProvenance(id);
|
||||||
dto.setClusterNodeId(clusterNodeId);
|
dto.getRequest().setClusterNodeId(clusterNodeId);
|
||||||
populateRemainingProvenanceContent(dto);
|
populateRemainingProvenanceContent(dto);
|
||||||
|
|
||||||
// create the response entity
|
// create the response entity
|
||||||
|
|
|
@ -16,21 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.web.api;
|
package org.apache.nifi.web.api;
|
||||||
|
|
||||||
import java.util.Collections;
|
import com.wordnik.swagger.annotations.Api;
|
||||||
import java.util.HashMap;
|
import com.wordnik.swagger.annotations.ApiOperation;
|
||||||
import java.util.Map;
|
import com.wordnik.swagger.annotations.ApiParam;
|
||||||
import java.util.Set;
|
import com.wordnik.swagger.annotations.ApiResponse;
|
||||||
|
import com.wordnik.swagger.annotations.ApiResponses;
|
||||||
import javax.ws.rs.Consumes;
|
import com.wordnik.swagger.annotations.Authorization;
|
||||||
import javax.ws.rs.DefaultValue;
|
|
||||||
import javax.ws.rs.GET;
|
|
||||||
import javax.ws.rs.HttpMethod;
|
|
||||||
import javax.ws.rs.Path;
|
|
||||||
import javax.ws.rs.Produces;
|
|
||||||
import javax.ws.rs.QueryParam;
|
|
||||||
import javax.ws.rs.core.MediaType;
|
|
||||||
import javax.ws.rs.core.Response;
|
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.authorization.AccessDeniedException;
|
import org.apache.nifi.authorization.AccessDeniedException;
|
||||||
import org.apache.nifi.authorization.AuthorizationRequest;
|
import org.apache.nifi.authorization.AuthorizationRequest;
|
||||||
|
@ -48,12 +39,19 @@ import org.apache.nifi.web.NiFiServiceFacade;
|
||||||
import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
|
import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
|
||||||
import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity;
|
import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity;
|
||||||
|
|
||||||
import com.wordnik.swagger.annotations.Api;
|
import javax.ws.rs.Consumes;
|
||||||
import com.wordnik.swagger.annotations.ApiOperation;
|
import javax.ws.rs.DefaultValue;
|
||||||
import com.wordnik.swagger.annotations.ApiParam;
|
import javax.ws.rs.GET;
|
||||||
import com.wordnik.swagger.annotations.ApiResponse;
|
import javax.ws.rs.HttpMethod;
|
||||||
import com.wordnik.swagger.annotations.ApiResponses;
|
import javax.ws.rs.Path;
|
||||||
import com.wordnik.swagger.annotations.Authorization;
|
import javax.ws.rs.Produces;
|
||||||
|
import javax.ws.rs.QueryParam;
|
||||||
|
import javax.ws.rs.core.MediaType;
|
||||||
|
import javax.ws.rs.core.Response;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RESTful endpoint for retrieving system diagnostics.
|
* RESTful endpoint for retrieving system diagnostics.
|
||||||
|
@ -146,10 +144,10 @@ public class SystemDiagnosticsResource extends ApplicationResource {
|
||||||
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
|
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
|
||||||
// to the cluster nodes themselves.
|
// to the cluster nodes themselves.
|
||||||
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||||
nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
|
nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse();
|
||||||
} else {
|
} else {
|
||||||
final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
|
final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
|
||||||
nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), false).awaitMergedResponse();
|
nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false).awaitMergedResponse();
|
||||||
}
|
}
|
||||||
|
|
||||||
final SystemDiagnosticsEntity entity = (SystemDiagnosticsEntity) nodeResponse.getUpdatedEntity();
|
final SystemDiagnosticsEntity entity = (SystemDiagnosticsEntity) nodeResponse.getUpdatedEntity();
|
||||||
|
|
|
@ -1147,12 +1147,12 @@ public class ControllerFacade implements Authorizable {
|
||||||
throw new ResourceNotFoundException("Unable to find the specified event.");
|
throw new ResourceNotFoundException("Unable to find the specified event.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// authorize the replay
|
||||||
|
authorizeReplay(originalEvent.getComponentId(), originalEvent.getAttributes(), originalEvent.getSourceQueueIdentifier());
|
||||||
|
|
||||||
// replay the flow file
|
// replay the flow file
|
||||||
final ProvenanceEventRecord event = flowController.replayFlowFile(originalEvent, user);
|
final ProvenanceEventRecord event = flowController.replayFlowFile(originalEvent, user);
|
||||||
|
|
||||||
// authorize the replay
|
|
||||||
authorizeReplay(event.getComponentId(), event.getAttributes(), event.getSourceQueueIdentifier());
|
|
||||||
|
|
||||||
// convert the event record
|
// convert the event record
|
||||||
return createProvenanceEventDto(event);
|
return createProvenanceEventDto(event);
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
|
|
|
@ -99,7 +99,7 @@ nf.ProvenanceTable = (function () {
|
||||||
var eventId = $('#provenance-event-id').text();
|
var eventId = $('#provenance-event-id').text();
|
||||||
|
|
||||||
// build the uri to the data
|
// build the uri to the data
|
||||||
var dataUri = config.urls.provenanceEvents + '/' + encodeURIComponent(eventId) + '/content/' + encodeURIComponent(direction);
|
var dataUri = controllerUri + 'provenance-events/' + encodeURIComponent(eventId) + '/content/' + encodeURIComponent(direction);
|
||||||
|
|
||||||
// generate tokens as necessary
|
// generate tokens as necessary
|
||||||
var getAccessTokens = $.Deferred(function (deferred) {
|
var getAccessTokens = $.Deferred(function (deferred) {
|
||||||
|
@ -900,9 +900,9 @@ nf.ProvenanceTable = (function () {
|
||||||
*/
|
*/
|
||||||
var getProvenance = function (provenance) {
|
var getProvenance = function (provenance) {
|
||||||
var url = provenance.uri;
|
var url = provenance.uri;
|
||||||
if (nf.Common.isDefinedAndNotNull(provenance.clusterNodeId)) {
|
if (nf.Common.isDefinedAndNotNull(provenance.request.clusterNodeId)) {
|
||||||
url += '?' + $.param({
|
url += '?' + $.param({
|
||||||
clusterNodeId: provenance.clusterNodeId
|
clusterNodeId: provenance.request.clusterNodeId
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -921,9 +921,9 @@ nf.ProvenanceTable = (function () {
|
||||||
*/
|
*/
|
||||||
var cancelProvenance = function (provenance) {
|
var cancelProvenance = function (provenance) {
|
||||||
var url = provenance.uri;
|
var url = provenance.uri;
|
||||||
if (nf.Common.isDefinedAndNotNull(provenance.clusterNodeId)) {
|
if (nf.Common.isDefinedAndNotNull(provenance.request.clusterNodeId)) {
|
||||||
url += '?' + $.param({
|
url += '?' + $.param({
|
||||||
clusterNodeId: provenance.clusterNodeId
|
clusterNodeId: provenance.request.clusterNodeId
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -49,7 +49,7 @@ nf.Provenance = (function () {
|
||||||
*/
|
*/
|
||||||
var config = {
|
var config = {
|
||||||
urls: {
|
urls: {
|
||||||
flowConfig: '../nifi-api/flow/config',
|
clusterSummary: '../nifi-api/flow/cluster/summary',
|
||||||
banners: '../nifi-api/flow/banners',
|
banners: '../nifi-api/flow/banners',
|
||||||
about: '../nifi-api/flow/about',
|
about: '../nifi-api/flow/about',
|
||||||
currentUser: '../nifi-api/flow/current-user'
|
currentUser: '../nifi-api/flow/current-user'
|
||||||
|
@ -67,9 +67,9 @@ nf.Provenance = (function () {
|
||||||
var detectedCluster = function () {
|
var detectedCluster = function () {
|
||||||
return $.ajax({
|
return $.ajax({
|
||||||
type: 'GET',
|
type: 'GET',
|
||||||
url: config.urls.flowConfig
|
url: config.urls.clusterSummary
|
||||||
}).done(function (response) {
|
}).done(function (response) {
|
||||||
isClustered = response.flowConfiguration.clustered;
|
isClustered = response.clusterSummary.connectedToCluster;
|
||||||
}).fail(nf.Common.handleAjaxError);
|
}).fail(nf.Common.handleAjaxError);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -51,7 +51,7 @@ nf.Summary = (function () {
|
||||||
urls: {
|
urls: {
|
||||||
banners: '../nifi-api/flow/banners',
|
banners: '../nifi-api/flow/banners',
|
||||||
about: '../nifi-api/flow/about',
|
about: '../nifi-api/flow/about',
|
||||||
flowConfig: '../nifi-api/flow/config'
|
clusterSummary: '../nifi-api/flow/cluster/summary'
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -62,9 +62,9 @@ nf.Summary = (function () {
|
||||||
return $.Deferred(function (deferred) {
|
return $.Deferred(function (deferred) {
|
||||||
$.ajax({
|
$.ajax({
|
||||||
type: 'GET',
|
type: 'GET',
|
||||||
url: config.urls.flowConfig
|
url: config.urls.clusterSummary
|
||||||
}).done(function (response) {
|
}).done(function (response) {
|
||||||
nf.SummaryTable.init(response.flowConfiguration.clustered).done(function () {
|
nf.SummaryTable.init(response.clusterSummary.connectedToCluster).done(function () {
|
||||||
deferred.resolve();
|
deferred.resolve();
|
||||||
}).fail(function () {
|
}).fail(function () {
|
||||||
deferred.reject();
|
deferred.reject();
|
||||||
|
|
Loading…
Reference in New Issue