NIFI-1801: Scope Templates to Process Groups. This closes #446.

This commit is contained in:
Mark Payne 2016-05-12 13:09:36 -04:00 committed by Matt Gilman
parent 347b281b2d
commit 270944ec69
41 changed files with 1325 additions and 1000 deletions

View File

@ -81,11 +81,6 @@ public class Revision implements Serializable {
return false;
}
// TODO: THIS IS FOR TESTING PURPOSES! DO NOT LET THIS GET CHECKED IN THIS WAY!!!!!!!!!!!!
if (true) {
return true;
}
Revision thatRevision = (Revision) obj;
// ensure that component ID's are the same (including null)
if (thatRevision.getComponentId() == null && getComponentId() != null) {

View File

@ -31,6 +31,7 @@ public class TemplateDTO {
private String uri;
private String id;
private String groupId;
private String name;
private String description;
private Date timestamp;
@ -40,9 +41,7 @@ public class TemplateDTO {
/**
* @return id for this template
*/
@ApiModelProperty(
value = "The id of the template."
)
@ApiModelProperty("The id of the template.")
public String getId() {
return id;
}
@ -51,6 +50,16 @@ public class TemplateDTO {
this.id = id;
}
@ApiModelProperty("The id of the Process Group that the template belongs to.")
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
/**
* @return uri for this template
*/

View File

@ -29,9 +29,9 @@ import org.apache.nifi.cluster.protocol.jaxb.message.DataFlowAdapter;
*/
@XmlJavaTypeAdapter(DataFlowAdapter.class)
public class StandardDataFlow implements Serializable, DataFlow {
private static final long serialVersionUID = 1L;
private final byte[] flow;
private final byte[] templateBytes;
private final byte[] snippetBytes;
private boolean autoStartProcessors;
@ -40,23 +40,20 @@ public class StandardDataFlow implements Serializable, DataFlow {
* Constructs an instance.
*
* @param flow a valid flow as bytes, which cannot be null
* @param templateBytes an XML representation of templates. May be null.
* @param snippetBytes an XML representation of snippets. May be null.
*
* @throws NullPointerException if flow is null
*/
public StandardDataFlow(final byte[] flow, final byte[] templateBytes, final byte[] snippetBytes) {
public StandardDataFlow(final byte[] flow, final byte[] snippetBytes) {
if(flow == null){
throw new NullPointerException("Flow cannot be null");
}
this.flow = flow;
this.templateBytes = templateBytes;
this.snippetBytes = snippetBytes;
}
public StandardDataFlow(final DataFlow toCopy) {
this.flow = copy(toCopy.getFlow());
this.templateBytes = copy(toCopy.getTemplates());
this.snippetBytes = copy(toCopy.getSnippets());
this.autoStartProcessors = toCopy.isAutoStartProcessors();
}
@ -70,10 +67,6 @@ public class StandardDataFlow implements Serializable, DataFlow {
return flow;
}
@Override
public byte[] getTemplates() {
return templateBytes;
}
@Override
public byte[] getSnippets() {

View File

@ -21,7 +21,6 @@ package org.apache.nifi.cluster.protocol.jaxb.message;
public class AdaptedDataFlow {
private byte[] flow;
private byte[] templates;
private byte[] snippets;
private boolean autoStartProcessors;
@ -37,14 +36,6 @@ public class AdaptedDataFlow {
this.flow = flow;
}
public byte[] getTemplates() {
return templates;
}
public void setTemplates(byte[] templates) {
this.templates = templates;
}
public byte[] getSnippets() {
return snippets;
}

View File

@ -31,7 +31,6 @@ public class DataFlowAdapter extends XmlAdapter<AdaptedDataFlow, StandardDataFlo
if (df != null) {
aDf.setFlow(df.getFlow());
aDf.setTemplates(df.getTemplates());
aDf.setSnippets(df.getSnippets());
aDf.setAutoStartProcessors(df.isAutoStartProcessors());
}
@ -41,7 +40,7 @@ public class DataFlowAdapter extends XmlAdapter<AdaptedDataFlow, StandardDataFlo
@Override
public StandardDataFlow unmarshal(final AdaptedDataFlow aDf) {
final StandardDataFlow dataFlow = new StandardDataFlow(aDf.getFlow(), aDf.getTemplates(), aDf.getSnippets());
final StandardDataFlow dataFlow = new StandardDataFlow(aDf.getFlow(), aDf.getSnippets());
dataFlow.setAutoStartProcessors(aDf.isAutoStartProcessors());
return dataFlow;
}

View File

@ -106,7 +106,7 @@ public class NodeProtocolSenderImplTest {
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
ConnectionResponseMessage mockMessage = new ConnectionResponseMessage();
mockMessage.setConnectionResponse(new ConnectionResponse(nodeIdentifier,
new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), null, null, UUID.randomUUID().toString()));
new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0]), null, null, UUID.randomUUID().toString()));
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage);
ConnectionRequestMessage request = new ConnectionRequestMessage();

View File

@ -47,7 +47,6 @@ import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.NiFiProperties;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.NoNodeException;
@ -183,23 +182,7 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im
@Override
public synchronized void removeHeartbeat(final NodeIdentifier nodeId) {
logger.debug("Deleting heartbeat for node {}", nodeId);
final String nodeInfoPath = clusterNodesPath + "/" + nodeId.getId();
heartbeatMessages.remove(nodeId);
try {
getClient().delete().forPath(nodeInfoPath);
logger.info("Removed heartbeat from ZooKeeper for Node {}", nodeId);
} catch (final NoNodeException e) {
// node did not exist. Just return.
logger.debug("Attempted to remove heartbeat for Node with ID {} but no ZNode existed at {}", nodeId, nodeInfoPath);
return;
} catch (final Exception e) {
logger.warn("Failed to remove heartbeat from ZooKeeper for Node {} due to {}", nodeId, e);
logger.warn("", e);
clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Failed to remove node's heartbeat from ZooKeeper due to " + e);
}
}
protected Set<NodeIdentifier> getClusterNodeIds() {

View File

@ -26,10 +26,10 @@ import java.util.regex.Pattern;
import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.entity.FlowSnippetEntity;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.entity.FlowEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
public class FlowSnippetEndpointMerger implements EndpointResponseMerger {
public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance");
@ -43,21 +43,21 @@ public class FlowSnippetEndpointMerger implements EndpointResponseMerger {
@Override
public NodeResponse merge(final URI uri, final String method, Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses, final NodeResponse clientResponse) {
final FlowSnippetEntity responseEntity = clientResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
final FlowSnippetDTO contents = responseEntity.getContents();
final FlowEntity responseEntity = clientResponse.getClientResponse().getEntity(FlowEntity.class);
final FlowDTO flowDto = responseEntity.getFlow();
if (contents == null) {
if (flowDto == null) {
return clientResponse;
} else {
final Map<String, Map<NodeIdentifier, ProcessorDTO>> processorMap = new HashMap<>();
final Map<String, Map<NodeIdentifier, RemoteProcessGroupDTO>> remoteProcessGroupMap = new HashMap<>();
final Map<String, Map<NodeIdentifier, ProcessorEntity>> processorMap = new HashMap<>();
final Map<String, Map<NodeIdentifier, RemoteProcessGroupEntity>> remoteProcessGroupMap = new HashMap<>();
for (final NodeResponse nodeResponse : successfulResponses) {
final FlowSnippetEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
final FlowSnippetDTO nodeContents = nodeResponseEntity.getContents();
final FlowEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowEntity.class);
final FlowDTO nodeContents = nodeResponseEntity.getFlow();
for (final ProcessorDTO nodeProcessor : nodeContents.getProcessors()) {
Map<NodeIdentifier, ProcessorDTO> innerMap = processorMap.get(nodeProcessor.getId());
for (final ProcessorEntity nodeProcessor : nodeContents.getProcessors()) {
Map<NodeIdentifier, ProcessorEntity> innerMap = processorMap.get(nodeProcessor.getId());
if (innerMap == null) {
innerMap = new HashMap<>();
processorMap.put(nodeProcessor.getId(), innerMap);
@ -66,8 +66,8 @@ public class FlowSnippetEndpointMerger implements EndpointResponseMerger {
innerMap.put(nodeResponse.getNodeId(), nodeProcessor);
}
for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : nodeContents.getRemoteProcessGroups()) {
Map<NodeIdentifier, RemoteProcessGroupDTO> innerMap = remoteProcessGroupMap.get(nodeRemoteProcessGroup.getId());
for (final RemoteProcessGroupEntity nodeRemoteProcessGroup : nodeContents.getRemoteProcessGroups()) {
Map<NodeIdentifier, RemoteProcessGroupEntity> innerMap = remoteProcessGroupMap.get(nodeRemoteProcessGroup.getId());
if (innerMap == null) {
innerMap = new HashMap<>();
remoteProcessGroupMap.put(nodeRemoteProcessGroup.getId(), innerMap);
@ -78,21 +78,19 @@ public class FlowSnippetEndpointMerger implements EndpointResponseMerger {
}
final ProcessorEndpointMerger procMerger = new ProcessorEndpointMerger();
for (final ProcessorDTO processor : contents.getProcessors()) {
for (final ProcessorEntity processor : flowDto.getProcessors()) {
final String procId = processor.getId();
final Map<NodeIdentifier, ProcessorDTO> mergeMap = processorMap.get(procId);
final Map<NodeIdentifier, ProcessorEntity> mergeMap = processorMap.get(procId);
procMerger.mergeResponses(processor, mergeMap, successfulResponses, problematicResponses);
}
final RemoteProcessGroupEndpointMerger rpgMerger = new RemoteProcessGroupEndpointMerger();
for (final RemoteProcessGroupDTO remoteProcessGroup : contents.getRemoteProcessGroups()) {
if (remoteProcessGroup.getContents() != null) {
final String remoteProcessGroupId = remoteProcessGroup.getId();
final Map<NodeIdentifier, RemoteProcessGroupDTO> mergeMap = remoteProcessGroupMap.get(remoteProcessGroupId);
for (final RemoteProcessGroupEntity remoteProcessGroup : flowDto.getRemoteProcessGroups()) {
final String remoteProcessGroupId = remoteProcessGroup.getId();
final Map<NodeIdentifier, RemoteProcessGroupEntity> mergeMap = remoteProcessGroupMap.get(remoteProcessGroupId);
rpgMerger.mergeResponses(remoteProcessGroup, mergeMap, successfulResponses, problematicResponses);
}
rpgMerger.mergeResponses(remoteProcessGroup, mergeMap, successfulResponses, problematicResponses);
}
}

View File

@ -72,4 +72,18 @@ public class ProcessorEndpointMerger extends AbstractSingleEntityEndpoint<Proces
// set the merged the validation errors
clientDto.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, dtoMap.size()));
}
protected void mergeResponses(final ProcessorEntity clientEntity, final Map<NodeIdentifier, ProcessorEntity> entityMap, final Set<NodeResponse> successfulResponses,
final Set<NodeResponse> problematicResponses) {
final ProcessorDTO clientDto = clientEntity.getComponent();
final Map<NodeIdentifier, ProcessorDTO> dtoMap = new HashMap<>();
for (final Map.Entry<NodeIdentifier, ProcessorEntity> entry : entityMap.entrySet()) {
final ProcessorEntity nodeProcEntity = entry.getValue();
final ProcessorDTO nodeProcDto = nodeProcEntity.getComponent();
dtoMap.put(entry.getKey(), nodeProcDto);
}
mergeResponses(clientDto, dtoMap, successfulResponses, problematicResponses);
}
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.cluster.coordination.http.endpoints;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -113,4 +114,16 @@ public class RemoteProcessGroupEndpointMerger extends AbstractSingleEntityEndpoi
clientDto.setAuthorizationIssues(mergedAuthorizationIssues);
}
}
protected void mergeResponses(RemoteProcessGroupEntity clientEntity, Map<NodeIdentifier, RemoteProcessGroupEntity> entityMap,
Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) {
final RemoteProcessGroupDTO clientDto = clientEntity.getComponent();
final Map<NodeIdentifier, RemoteProcessGroupDTO> dtoMap = new HashMap<>();
for (final Map.Entry<NodeIdentifier, RemoteProcessGroupEntity> entry : entityMap.entrySet()) {
dtoMap.put(entry.getKey(), entry.getValue().getComponent());
}
mergeResponses(clientDto, dtoMap, successfulResponses, problematicResponses);
}
}

View File

@ -39,7 +39,7 @@ public class ResponseUtils {
public static Set<NodeIdentifier> findLongResponseTimes(final AsyncClusterResponse response, final double stdDeviationMultiple) {
final Set<NodeIdentifier> slowResponses = new HashSet<>();
if (response.isOlderThan(2, TimeUnit.SECONDS)) {
if (response.isOlderThan(1, TimeUnit.SECONDS)) {
// If the response is older than 2 seconds, determines if any node took a long time to respond.
final Set<NodeIdentifier> completedIds = response.getCompletedNodeIdentifiers();

View File

@ -228,17 +228,24 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
throw new IllegalArgumentException("Cannot replicate request to 0 nodes");
}
logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response);
// Update headers to indicate the current revision so that we can
// prevent multiple users changing the flow at the same time
final Map<String, String> updatedHeaders = new HashMap<>(headers);
final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID, key -> UUID.randomUUID().toString());
if (performVerification) {
verifyState(method, uri.getPath());
}
final int numRequests = responseMap.size();
if (numRequests >= MAX_CONCURRENT_REQUESTS) {
logger.debug("Cannot replicate request because there are {} outstanding HTTP Requests already", numRequests);
throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests");
}
// create the request objects and replicate to all nodes
final String requestId = UUID.randomUUID().toString();
final CompletionCallback completionCallback = clusterResponse -> onCompletedResponse(requestId);
final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId);
@ -249,10 +256,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
responseMap.put(requestId, response);
}
// Update headers to indicate the current revision so that we can
// prevent multiple users changing the flow at the same time
final Map<String, String> updatedHeaders = new HashMap<>(headers);
updatedHeaders.put(REQUEST_TRANSACTION_ID, UUID.randomUUID().toString());
logger.debug("For Request ID {}, response object is {}", requestId, response);
// setRevision(updatedHeaders);
// if mutable request, we have to do a two-phase commit where we ask each node to verify
@ -262,6 +266,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
// replicate the actual request.
final boolean mutableRequest = isMutableRequest(method, uri.getPath());
if (mutableRequest && performVerification) {
logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId);
performVerification(nodeIds, method, uri, entity, updatedHeaders, response);
return response;
}
@ -309,13 +314,22 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
@Override
public void onCompletion(final NodeResponse nodeResponse) {
// Add the node response to our collection.
nodeResponses.add(nodeResponse);
// Add the node response to our collection. We later need to know whether or
// not this is the last node response, so we add the response and then check
// the size within a synchronized block to ensure that those two things happen
// atomically. Otherwise, we could have multiple threads checking the sizes of
// the sets at the same time, which could result in multiple threads performing
// the 'all nodes are complete' logic.
final boolean allNodesResponded;
synchronized (nodeResponses) {
nodeResponses.add(nodeResponse);
allNodesResponded = nodeResponses.size() == numNodes;
}
try {
// If we have all of the node responses, then we can verify the responses
// and if good replicate the original request to all of the nodes.
if (nodeResponses.size() == numNodes) {
if (allNodesResponded) {
// Check if we have any requests that do not have a 150-Continue status code.
final long dissentingCount = nodeResponses.stream().filter(p -> p.getStatus() != NODE_CONTINUE_STATUS_CODE).count();

View File

@ -512,7 +512,7 @@ public class DataFlowDaoImpl implements DataFlowDao {
clusterMetadata = (ClusterMetadata) clusterMetadataUnmarshaller.unmarshal(new ByteArrayInputStream(clusterInfoBytes));
}
final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes);
final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, snippetBytes);
dataFlow.setAutoStartProcessors(autoStart);
return new ClusterDataFlow(dataFlow, clusterMetadata == null ? null : clusterMetadata.getPrimaryNodeId(), controllerServiceBytes, reportingTaskBytes);
@ -543,11 +543,9 @@ public class DataFlowDaoImpl implements DataFlowDao {
final DataFlow dataFlow = clusterDataFlow.getDataFlow();
if (dataFlow == null) {
writeTarEntry(tarOut, FLOW_XML_FILENAME, getEmptyFlowBytes());
writeTarEntry(tarOut, TEMPLATES_FILENAME, new byte[0]);
writeTarEntry(tarOut, SNIPPETS_FILENAME, new byte[0]);
} else {
writeTarEntry(tarOut, FLOW_XML_FILENAME, dataFlow.getFlow());
writeTarEntry(tarOut, TEMPLATES_FILENAME, dataFlow.getTemplates());
writeTarEntry(tarOut, SNIPPETS_FILENAME, dataFlow.getSnippets());
}
writeTarEntry(tarOut, CONTROLLER_SERVICES_FILENAME, clusterDataFlow.getControllerServices());

View File

@ -1957,10 +1957,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// disconnect problematic nodes
if (!problematicNodeResponses.isEmpty()) {
if (problematicNodeResponses.size() < nodeResponses.size()) {
logger.warn(String.format("The following nodes failed to process URI '%s'. Requesting each node to disconnect from cluster: ", uriPath, problematicNodeResponses));
disconnectNodes(problematicNodeResponses, "Failed to process URI " + uriPath);
logger.warn(String.format("The following nodes failed to process URI %s '%s'. Requesting each node to disconnect from cluster: ", uriPath, problematicNodeResponses));
disconnectNodes(problematicNodeResponses, "Failed to process URI " + method + " " + uriPath);
} else {
logger.warn("All nodes failed to process URI {}. As a result, no node will be disconnected from cluster", uriPath);
logger.warn("All nodes failed to process URI {} {}. As a result, no node will be disconnected from cluster", method, uriPath);
}
}
}

View File

@ -145,7 +145,7 @@ public class DataFlowManagementServiceImplTest {
public void testLoadFlowSingleNode() throws Exception {
String flowStr = "<rootGroup />";
byte[] flowBytes = flowStr.getBytes();
listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0])));
NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false);
service.setNodeIds(new HashSet<>(Arrays.asList(nodeId)));
@ -165,7 +165,7 @@ public class DataFlowManagementServiceImplTest {
public void testLoadFlowWithSameNodeIds() throws Exception {
String flowStr = "<rootGroup />";
listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0])));
NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false);
NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false);
@ -193,7 +193,7 @@ public class DataFlowManagementServiceImplTest {
String flowStr = "<rootGroup />";
byte[] flowBytes = flowStr.getBytes();
listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0])));
NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false);
NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false);
@ -214,7 +214,7 @@ public class DataFlowManagementServiceImplTest {
public void testLoadFlowWithConstantNodeIdChanging() throws Exception {
String flowStr = "<rootGroup />";
byte[] flowBytes = flowStr.getBytes();
listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0])));
NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false);
NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false);
@ -236,7 +236,7 @@ public class DataFlowManagementServiceImplTest {
public void testLoadFlowWithConstantNodeIdChangingWithRetrievalDelay() throws Exception {
String flowStr = "<rootGroup />";
listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0])));
NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false);
NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false);
@ -259,7 +259,7 @@ public class DataFlowManagementServiceImplTest {
public void testStopRequestedWhileRetrieving() throws Exception {
String flowStr = "<rootGroup />";
listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0])));
Set<NodeIdentifier> nodeIds = new HashSet<>();
for (int i = 0; i < 1000; i++) {
nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false));
@ -289,7 +289,7 @@ public class DataFlowManagementServiceImplTest {
String flowStr = "<rootGroup />";
byte[] flowBytes = flowStr.getBytes();
listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0])));
NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false);
service.setNodeIds(new HashSet<>(Arrays.asList(nodeId)));

View File

@ -23,11 +23,6 @@ public interface DataFlow {
*/
public byte[] getFlow();
/**
* @return the raw byte array of the templates
*/
public byte[] getTemplates();
/**
* @return the raw byte array of the snippets
*/

View File

@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import java.util.HashSet;
import java.util.Set;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.AuthorizationRequest;
import org.apache.nifi.authorization.AuthorizationResult;
import org.apache.nifi.authorization.AuthorizationResult.Result;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.LabelDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
public class Template implements Authorizable {
private final TemplateDTO dto;
private volatile ProcessGroup processGroup;
public Template(final TemplateDTO dto) {
this.dto = dto;
}
public String getIdentifier() {
return dto.getId();
}
/**
* Returns a TemplateDTO object that describes the contents of this Template
*
* @return template dto
*/
public TemplateDTO getDetails() {
return dto;
}
public void setProcessGroup(final ProcessGroup group) {
this.processGroup = group;
}
public ProcessGroup getProcessGroup() {
return processGroup;
}
@Override
public Authorizable getParentAuthorizable() {
return null;
}
@Override
public Resource getResource() {
return ResourceFactory.getComponentResource(ResourceType.Template, dto.getId(), dto.getName());
}
private Set<Authorizable> getAuthorizableComponents() {
return getAuthorizableComponents(processGroup);
}
private Set<Authorizable> getAuthorizableComponents(final ProcessGroup processGroup) {
final Set<Authorizable> authComponents = new HashSet<>();
final FlowSnippetDTO snippet = dto.getSnippet();
authComponents.add(processGroup);
// If there is any component in the DTO that still exists in the flow, check its authorizations
for (final ConnectionDTO connectionDto : snippet.getConnections()) {
final Connection connection = processGroup.getConnection(connectionDto.getId());
if (connection != null) {
authComponents.add(connection);
}
}
// TODO: Authorize Controller Services
for (final ControllerServiceDTO service : snippet.getControllerServices()) {
}
for (final LabelDTO labelDto : snippet.getLabels()) {
final Label label = processGroup.getLabel(labelDto.getId());
if (label != null) {
authComponents.add(label);
}
}
for (final ProcessorDTO processorDto : snippet.getProcessors()) {
final ProcessorNode procNode = processGroup.getProcessor(processorDto.getId());
if (procNode != null) {
authComponents.add(procNode);
}
}
for (final RemoteProcessGroupDTO groupDto : snippet.getRemoteProcessGroups()) {
final RemoteProcessGroup rpg = processGroup.getRemoteProcessGroup(groupDto.getId());
if (rpg != null) {
authComponents.add(rpg);
}
}
for (final ProcessGroupDTO groupDto : snippet.getProcessGroups()) {
final ProcessGroup group = processGroup.getProcessGroup(groupDto.getId());
if (group != null) {
authComponents.addAll(getAuthorizableComponents(processGroup));
}
}
return authComponents;
}
@Override
public void authorize(final Authorizer authorizer, final RequestAction action) throws AccessDeniedException {
final AuthorizationResult result = checkAuthorization(authorizer, action, true);
if (Result.Denied.equals(result)) {
final String explanation = result.getExplanation() == null ? "Access is denied" : result.getExplanation();
throw new AccessDeniedException(explanation);
}
}
@Override
public AuthorizationResult checkAuthorization(final Authorizer authorizer, final RequestAction action) {
return checkAuthorization(authorizer, action, false);
}
private AuthorizationResult checkAuthorization(final Authorizer authorizer, final RequestAction action, final boolean accessAttempt) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// TODO - include user details context
// build the request
final AuthorizationRequest request = new AuthorizationRequest.Builder()
.identity(user.getIdentity())
.anonymous(user.isAnonymous())
.accessAttempt(accessAttempt)
.action(action)
.resource(getResource())
.build();
// perform the authorization
final AuthorizationResult result = authorizer.authorize(request);
// verify the results
if (Result.ResourceNotFound.equals(result.getResult())) {
for (final Authorizable child : getAuthorizableComponents()) {
final AuthorizationResult childResult = child.checkAuthorization(authorizer, action);
if (Result.Denied.equals(childResult)) {
return childResult;
}
}
return AuthorizationResult.denied();
} else {
return result;
}
}
@Override
public String toString() {
return "Template[id=" + getIdentifier() + ", Name=" + dto.getName() + "]";
}
}

View File

@ -28,6 +28,7 @@ import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flowfile.FlowFile;
@ -768,4 +769,45 @@ public interface ProcessGroup extends Authorizable {
* @throws IllegalStateException if the move is not valid at this time
*/
void verifyCanMove(Snippet snippet, ProcessGroup newProcessGroup);
/**
* Adds the given template to this Process Group
*
* @param template the template to add
*/
void addTemplate(Template template);
/**
* Removes the given template from the Process Group
*
* @param template the template to remove
*/
void removeTemplate(Template template);
/**
* Returns the template with the given ID
*
* @param id the ID of the template
* @return the template with the given ID or <code>null</code> if no template
* exists in this Process Group with the given ID
*/
Template getTemplate(String id);
/**
* @param id of the template
* @return the Template with the given ID, if it exists as a child or
* descendant of this ProcessGroup. This performs a recursive search of all
* descendant ProcessGroups
*/
Template findTemplate(String id);
/**
* @return a Set of all Templates that belong to this Process Group
*/
Set<Template> getTemplates();
/**
* @return a Set of all Templates that belong to this Process Group and any descendant Process Groups
*/
Set<Template> findAllTemplates();
}

View File

@ -82,8 +82,8 @@ import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.Size;
import org.apache.nifi.connectable.StandardConnection;
import org.apache.nifi.controller.cluster.Heartbeater;
import org.apache.nifi.controller.cluster.ClusterProtocolHeartbeater;
import org.apache.nifi.controller.cluster.Heartbeater;
import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
@ -220,7 +220,6 @@ import org.apache.nifi.web.api.dto.RelationshipDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
@ -257,7 +256,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final ProvenanceEventRepository provenanceEventRepository;
private final VolatileBulletinRepository bulletinRepository;
private final StandardProcessScheduler processScheduler;
private final TemplateManager templateManager;
private final SnippetManager snippetManager;
private final long gracefulShutdownSeconds;
private final ExtensionManager extensionManager;
@ -478,11 +476,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
this.configuredForClustering = configuredForClustering;
this.heartbeatDelaySeconds = (int) FormatUtils.getTimeDuration(properties.getNodeHeartbeatInterval(), TimeUnit.SECONDS);
try {
this.templateManager = new TemplateManager(properties.getTemplateDirectory());
} catch (final IOException e) {
throw new RuntimeException(e);
}
this.snippetManager = new SnippetManager();
@ -1478,72 +1471,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
//
// Template access
//
/**
* Adds a template to this controller. The contents of this template must be part of the current flow. This is going create a template based on a snippet of this flow.
*
* @param dto template
* @return a copy of the given DTO
* @throws IOException if an I/O error occurs when persisting the Template
* @throws NullPointerException if the DTO is null
* @throws IllegalArgumentException if does not contain all required information, such as the template name or a processor's configuration element
*/
public Template addTemplate(final TemplateDTO dto) throws IOException {
return templateManager.addTemplate(dto);
}
/**
* Removes all templates from this controller
*
* @throws IOException ioe
*/
public void clearTemplates() throws IOException {
templateManager.clear();
}
/**
* Imports the specified template into this controller. The contents of this template may have come from another NiFi instance.
*
* @param dto dto
* @return template
* @throws IOException ioe
*/
public Template importTemplate(final TemplateDTO dto) throws IOException {
return templateManager.importTemplate(dto);
}
/**
* @param id identifier
* @return the template with the given ID, or <code>null</code> if no template exists with the given ID
*/
public Template getTemplate(final String id) {
return templateManager.getTemplate(id);
}
public TemplateManager getTemplateManager() {
return templateManager;
}
/**
* @return all templates that this controller knows about
*/
public Collection<Template> getTemplates() {
return templateManager.getTemplates();
}
/**
* Removes the template with the given ID.
*
* @param id the ID of the template to remove
* @throws NullPointerException if the argument is null
* @throws IllegalStateException if no template exists with the given ID
* @throws IOException if template could not be removed
*/
public void removeTemplate(final String id) throws IOException, IllegalStateException {
templateManager.removeTemplate(id);
}
private Position toPosition(final PositionDTO dto) {
return new Position(dto.getX(), dto.getY());

View File

@ -16,20 +16,25 @@
*/
package org.apache.nifi.controller;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
@ -76,13 +81,16 @@ import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.lifecycle.LifeCycleStartException;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.nar.NarClassLoaders;
import org.apache.nifi.persistence.FlowConfigurationDAO;
import org.apache.nifi.persistence.StandardXMLFlowConfigurationDAO;
import org.apache.nifi.persistence.TemplateDeserializer;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.file.FileUtils;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -522,16 +530,14 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
final byte[] flowBytes = baos.toByteArray();
baos.reset();
final byte[] templateBytes = controller.getTemplateManager().export();
final byte[] snippetBytes = controller.getSnippetManager().export();
// create the response
final FlowResponseMessage response = new FlowResponseMessage();
response.setDataFlow(new StandardDataFlow(flowBytes, templateBytes, snippetBytes));
response.setDataFlow(new StandardDataFlow(flowBytes, snippetBytes));
return response;
} catch (final Exception ex) {
throw new ProtocolException("Failed serializing flow controller state for flow request due to: " + ex, ex);
} finally {
@ -606,27 +612,21 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
private void loadFromBytes(final DataFlow proposedFlow, final boolean allowEmptyFlow)
throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException {
logger.trace("Loading flow from bytes");
final TemplateManager templateManager = controller.getTemplateManager();
templateManager.loadTemplates();
logger.trace("Finished loading templates");
// resolve the given flow (null means load flow from disk)
final DataFlow actualProposedFlow;
final byte[] flowBytes;
final byte[] templateBytes;
if (proposedFlow == null) {
final ByteArrayOutputStream flowOnDisk = new ByteArrayOutputStream();
copyCurrentFlow(flowOnDisk);
flowBytes = flowOnDisk.toByteArray();
templateBytes = templateManager.export();
logger.debug("Loaded Flow from bytes");
} else {
flowBytes = proposedFlow.getFlow();
templateBytes = proposedFlow.getTemplates();
logger.debug("Loaded flow from proposed flow");
}
actualProposedFlow = new StandardDataFlow(flowBytes, templateBytes, null);
actualProposedFlow = new StandardDataFlow(flowBytes, null);
if (firstControllerInitialization) {
// load the controller services
@ -642,6 +642,17 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
throw new FlowSynchronizationException("Failed to load flow because unable to connect to cluster and local flow is empty");
}
final List<Template> templates = loadTemplates();
for (final Template template : templates) {
final Template existing = rootGroup.getTemplate(template.getIdentifier());
if (existing == null) {
logger.info("Imported Template '{}' to Root Group", template.getDetails().getName());
rootGroup.addTemplate(template);
} else {
logger.info("Template '{}' was already present in Root Group so will not import from file", template.getDetails().getName());
}
}
// lazy initialization of controller tasks and flow
if (firstControllerInitialization) {
logger.debug("First controller initialization. Loading reporting tasks and initializing controller.");
@ -653,6 +664,56 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
}
}
/**
* In NiFi 0.x, templates were stored in a templates directory as separate files. They are
* now stored in the flow itself. If there already are templates in that directory, though,
* we want to restore them.
*
* @return the templates found in the templates directory
* @throws IOException if unable to read from the file system
*/
public List<Template> loadTemplates() throws IOException {
final NiFiProperties properties = NiFiProperties.getInstance();
final Path templatePath = properties.getTemplateDirectory();
final File[] files = templatePath.toFile().listFiles(pathname -> {
final String lowerName = pathname.getName().toLowerCase();
return lowerName.endsWith(".template") || lowerName.endsWith(".xml");
});
if (files == null) {
return Collections.emptyList();
}
final List<Template> templates = new ArrayList<>();
for (final File file : files) {
try (final FileInputStream fis = new FileInputStream(file);
final BufferedInputStream bis = new BufferedInputStream(fis)) {
final TemplateDTO templateDto;
try {
templateDto = TemplateDeserializer.deserialize(bis);
} catch (final Exception e) {
logger.error("Unable to interpret " + file + " as a Template. Skipping file.");
continue;
}
if (templateDto.getId() == null) {
// If there is no ID assigned, we need to assign one. We do this by generating
// an ID from the name. This is because we know that Template Names are unique
// and are consistent across all nodes in the cluster.
final String uuid = UUID.nameUUIDFromBytes(templateDto.getName().getBytes(StandardCharsets.UTF_8)).toString();
templateDto.setId(uuid);
}
final Template template = new Template(templateDto);
templates.add(template);
}
}
return templates;
}
private ConnectionResponse connect(final boolean retryOnCommsFailure, final boolean retryIndefinitely) throws ConnectionException {
writeLock.lock();
try {
@ -759,7 +820,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
// start the processors as indicated by the dataflow
controller.onFlowInitialized(dataFlow.isAutoStartProcessors());
loadTemplates(dataFlow.getTemplates());
loadSnippets(dataFlow.getSnippets());
controller.startHeartbeating();
} catch (final UninheritableFlowException ufe) {
@ -794,17 +854,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
}
}
public void loadTemplates(final byte[] bytes) throws IOException {
if (bytes.length == 0) {
return;
}
controller.clearTemplates();
for (final Template template : TemplateManager.parseBytes(bytes)) {
controller.addTemplate(template.getDetails());
}
}
public void loadSnippets(final byte[] bytes) throws IOException {
if (bytes.length == 0) {
@ -828,6 +877,9 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
@Override
public void run() {
final ClassLoader currentCl = Thread.currentThread().getContextClassLoader();
final ClassLoader cl = NarClassLoaders.getFrameworkClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
//Hang onto the SaveHolder here rather than setting it to null because if the save fails we will try again
final SaveHolder holder = StandardFlowService.this.saveHolder.get();
@ -864,6 +916,10 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
// record the failed save as a bulletin
final Bulletin saveFailureBulletin = BulletinFactory.createBulletin(EVENT_CATEGORY, LogLevel.ERROR.name(), "Unable to save flow controller configuration.");
controller.getBulletinRepository().addBulletin(saveFailureBulletin);
} finally {
if (currentCl != null) {
Thread.currentThread().setContextClassLoader(currentCl);
}
}
}
}

View File

@ -24,7 +24,6 @@ import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@ -98,6 +97,7 @@ import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
@ -138,6 +138,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
@Override
public void sync(final FlowController controller, final DataFlow proposedFlow, final StringEncryptor encryptor)
throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException {
// TODO - Include templates
// handle corner cases involving no proposed flow
if (proposedFlow == null) {
if (controller.getGroup(controller.getRootGroupId()).isEmpty()) {
@ -204,14 +206,10 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
throw new FlowSerializationException(e);
}
logger.trace("Exporting templates from controller");
final byte[] existingTemplates = controller.getTemplateManager().export();
logger.trace("Exporting snippets from controller");
final byte[] existingSnippets = controller.getSnippetManager().export();
final DataFlow existingDataFlow = new StandardDataFlow(existingFlow, existingTemplates, existingSnippets);
final boolean existingTemplatesEmpty = existingTemplates == null || existingTemplates.length == 0;
final DataFlow existingDataFlow = new StandardDataFlow(existingFlow, existingSnippets);
// check that the proposed flow is inheritable by the controller
try {
@ -222,13 +220,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
throw new UninheritableFlowException("Proposed configuration is not inheritable by the flow controller because of flow differences: " + problemInheriting);
}
}
if (!existingTemplatesEmpty) {
logger.trace("Checking template inheritability");
final String problemInheriting = checkTemplateInheritability(existingDataFlow, proposedFlow);
if (problemInheriting != null) {
throw new UninheritableFlowException("Proposed configuration is not inheritable by the flow controller because of flow differences: " + problemInheriting);
}
}
} catch (final FingerprintException fe) {
throw new FlowSerializationException("Failed to generate flow fingerprints", fe);
}
@ -301,16 +292,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
}
logger.trace("Synching templates");
if ((existingTemplates == null || existingTemplates.length == 0) && proposedFlow.getTemplates() != null && proposedFlow.getTemplates().length > 0) {
// need to load templates
final TemplateManager templateManager = controller.getTemplateManager();
final List<Template> proposedTemplateList = TemplateManager.parseBytes(proposedFlow.getTemplates());
for (final Template template : proposedTemplateList) {
templateManager.addTemplate(template.getDetails());
}
}
// clear the snippets that are currently in memory
logger.trace("Clearing existing snippets");
final SnippetManager snippetManager = controller.getSnippetManager();
@ -711,6 +692,17 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
updateControllerService(controller, serviceNodeElement, encryptor);
}
// Replace the templates with those from the proposed flow
final List<Element> templateNodeList = getChildrenByTagName(processGroupElement, "template");
for (final Template template : processGroup.getTemplates()) {
processGroup.removeTemplate(template);
}
for (final Element templateElement : templateNodeList) {
final TemplateDTO templateDto = TemplateUtils.parseDto(templateElement);
final Template template = new Template(templateDto);
processGroup.addTemplate(template);
}
return processGroup;
}
@ -1052,6 +1044,13 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
ControllerServiceLoader.loadControllerServices(serviceNodeList, controller, processGroup, encryptor, controller.getBulletinRepository(), autoResumeState);
}
final List<Element> templateNodeList = getChildrenByTagName(processGroupElement, "template");
for (final Element templateNode : templateNodeList) {
final TemplateDTO templateDTO = TemplateUtils.parseDto(templateNode);
final Template template = new Template(templateDTO);
processGroup.addTemplate(template);
}
return processGroup;
}
@ -1103,60 +1102,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
return null;
}
/**
* Returns true if the given controller can inherit the proposed flow without orphaning flow files.
*
* @param existingFlow flow
* @param proposedFlow the flow to inherit
*
* @return null if the controller can inherit the specified flow, an explanation of why it cannot be inherited otherwise
*
* @throws FingerprintException if flow fingerprints could not be generated
*/
public String checkTemplateInheritability(final DataFlow existingFlow, final DataFlow proposedFlow) throws FingerprintException {
if (existingFlow == null) {
return null; // no existing flow, so equivalent to proposed flow
}
// check if the Flow is inheritable
final FingerprintFactory fingerprintFactory = new FingerprintFactory(encryptor);
// check if the Templates are inheritable
final byte[] existingTemplateBytes = existingFlow.getTemplates();
if (existingTemplateBytes == null || existingTemplateBytes.length == 0) {
return null;
}
final List<Template> existingTemplates = TemplateManager.parseBytes(existingTemplateBytes);
final String existingTemplateFingerprint = fingerprintFactory.createFingerprint(existingTemplates);
if (existingTemplateFingerprint.trim().isEmpty()) {
return null;
}
final byte[] proposedTemplateBytes = proposedFlow.getTemplates();
if (proposedTemplateBytes == null || proposedTemplateBytes.length == 0) {
return "Proposed Flow does not contain any Templates but Current Flow does";
}
final List<Template> proposedTemplates = TemplateManager.parseBytes(proposedTemplateBytes);
final String proposedTemplateFingerprint = fingerprintFactory.createFingerprint(proposedTemplates);
if (proposedTemplateFingerprint.trim().isEmpty()) {
return "Proposed Flow does not contain any Templates but Current Flow does";
}
try {
final String existingTemplateMd5 = fingerprintFactory.md5Hash(existingTemplateFingerprint);
final String proposedTemplateMd5 = fingerprintFactory.md5Hash(proposedTemplateFingerprint);
if (!existingTemplateMd5.equals(proposedTemplateMd5)) {
return findFirstDiscrepancy(existingTemplateFingerprint, proposedTemplateFingerprint, "Templates");
}
} catch (final NoSuchAlgorithmException e) {
throw new FingerprintException(e);
}
return null;
}
private String findFirstDiscrepancy(final String existing, final String proposed, final String comparisonDescription) {
final int shortestFileLength = Math.min(existing.length(), proposed.length());
for (int i = 0; i < shortestFileLength; i++) {

View File

@ -1,37 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import org.apache.nifi.web.api.dto.TemplateDTO;
public class Template {
private final TemplateDTO dto;
public Template(final TemplateDTO dto) {
this.dto = dto;
}
/**
* Returns a TemplateDTO object that describes the contents of this Template
*
* @return template dto
*/
public TemplateDTO getDetails() {
return dto;
}
}

View File

@ -1,524 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import static java.util.Objects.requireNonNull;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.persistence.TemplateDeserializer;
import org.apache.nifi.persistence.TemplateSerializer;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.DataOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TemplateManager {
private static final Logger logger = LoggerFactory.getLogger(TemplateManager.class);
private final Path directory;
private final Map<String, Template> templateMap = new HashMap<>();
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private final FileFilter templateFileFilter = new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.getName().toLowerCase().endsWith(".template");
}
};
public TemplateManager(final Path storageLocation) throws IOException {
directory = storageLocation;
if (!Files.exists(directory)) {
Files.createDirectories(directory);
} else {
if (!Files.isDirectory(directory)) {
throw new IllegalArgumentException(directory.toString() + " is not a directory");
}
// use toFile().canXXX, rather than Files.is... because on Windows 7, we sometimes get the wrong result for Files.is... (running Java 7 update 9)
if (!directory.toFile().canExecute() || !directory.toFile().canWrite()) {
throw new IOException("Invalid permissions for directory " + directory.toString());
}
}
}
/**
* Adds a template to this manager. The contents of this template must be part of the current flow. This is going create a template based on a snippet of this flow. Any sensitive properties in the
* TemplateDTO will be removed.
*
* @param dto dto
* @return a copy of the given DTO
* @throws IOException if an I/O error occurs when persisting the Template
* @throws NullPointerException if the DTO is null
* @throws IllegalArgumentException if does not contain all required information, such as the template name or a processor's configuration element
*/
public Template addTemplate(final TemplateDTO dto) throws IOException {
scrubTemplate(dto.getSnippet());
return importTemplate(dto);
}
private void verifyCanImport(final TemplateDTO dto) {
// ensure the template is specified
if (dto == null || dto.getSnippet() == null) {
throw new IllegalArgumentException("Template details not specified.");
}
// ensure the name is specified
if (StringUtils.isBlank(dto.getName())) {
throw new IllegalArgumentException("Template name cannot be blank.");
}
readLock.lock();
try {
for (final Template template : templateMap.values()) {
final TemplateDTO existingDto = template.getDetails();
// ensure a template with this name doesnt already exist
if (dto.getName().equals(existingDto.getName())) {
throw new IllegalStateException(String.format("A template named '%s' already exists.", dto.getName()));
}
}
} finally {
readLock.unlock();
}
}
/**
* Clears all Templates from the TemplateManager
*
* @throws java.io.IOException ioe
*/
public void clear() throws IOException {
writeLock.lock();
try {
templateMap.clear();
final File[] files = directory.toFile().listFiles(templateFileFilter);
if (files == null) {
return;
}
for (final File file : files) {
boolean successful = false;
for (int i = 0; i < 10; i++) {
if (file.delete()) {
successful = true;
break;
}
}
if (!successful && file.exists()) {
throw new IOException("Failed to delete template file " + file.getAbsolutePath());
}
}
} finally {
writeLock.unlock();
}
}
/**
* @param id template id
* @return the template with the given id, if it exists; else, returns null
*/
public Template getTemplate(final String id) {
readLock.lock();
try {
return templateMap.get(id);
} finally {
readLock.unlock();
}
}
/**
* Loads the templates from disk
*
* @throws IOException ioe
*/
public void loadTemplates() throws IOException {
writeLock.lock();
try {
final File[] files = directory.toFile().listFiles(templateFileFilter);
if (files == null) {
return;
}
for (final File file : files) {
try (final FileInputStream fis = new FileInputStream(file);
final BufferedInputStream bis = new BufferedInputStream(fis)) {
final TemplateDTO templateDto = TemplateDeserializer.deserialize(bis);
templateMap.put(templateDto.getId(), new Template(templateDto));
}
}
} finally {
writeLock.unlock();
}
}
public Template importTemplate(final TemplateDTO dto) throws IOException {
// ensure we can add this template
verifyCanImport(dto);
writeLock.lock();
try {
if (requireNonNull(dto).getId() == null) {
dto.setId(UUID.randomUUID().toString());
}
final Template template = new Template(dto);
persistTemplate(template);
templateMap.put(dto.getId(), template);
return template;
} finally {
writeLock.unlock();
}
}
/**
* Persists the given template to disk
*
* @param template template
* @throws IOException ioe
*/
private void persistTemplate(final Template template) throws IOException {
final Path path = directory.resolve(template.getDetails().getId() + ".template");
Files.write(path, TemplateSerializer.serialize(template.getDetails()), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
}
/**
* Scrubs the template prior to persisting in order to remove fields that shouldn't be included or are unnecessary.
*
* @param snippet snippet
*/
private void scrubTemplate(final FlowSnippetDTO snippet) {
// ensure that contents have been specified
if (snippet != null) {
// go through each processor if specified
if (snippet.getProcessors() != null) {
scrubProcessors(snippet.getProcessors());
}
// go through each connection if specified
if (snippet.getConnections() != null) {
scrubConnections(snippet.getConnections());
}
// go through each remote process group if specified
if (snippet.getRemoteProcessGroups() != null) {
scrubRemoteProcessGroups(snippet.getRemoteProcessGroups());
}
// go through each process group if specified
if (snippet.getProcessGroups() != null) {
scrubProcessGroups(snippet.getProcessGroups());
}
// go through each controller service if specified
if (snippet.getControllerServices() != null) {
scrubControllerServices(snippet.getControllerServices());
}
}
}
/**
* Scrubs process groups prior to saving.
*
* @param processGroups groups
*/
private void scrubProcessGroups(final Set<ProcessGroupDTO> processGroups) {
// go through each process group
for (final ProcessGroupDTO processGroupDTO : processGroups) {
scrubTemplate(processGroupDTO.getContents());
}
}
/**
* Scrubs processors prior to saving. This includes removing sensitive properties, validation errors, property descriptors, etc.
*
* @param processors procs
*/
private void scrubProcessors(final Set<ProcessorDTO> processors) {
// go through each processor
for (final ProcessorDTO processorDTO : processors) {
final ProcessorConfigDTO processorConfig = processorDTO.getConfig();
// ensure that some property configuration have been specified
if (processorConfig != null) {
// if properties have been specified, remove sensitive ones
if (processorConfig.getProperties() != null) {
Map<String, String> processorProperties = processorConfig.getProperties();
// look for sensitive properties and remove them
if (processorConfig.getDescriptors() != null) {
final Collection<PropertyDescriptorDTO> descriptors = processorConfig.getDescriptors().values();
for (PropertyDescriptorDTO descriptor : descriptors) {
if (descriptor.isSensitive()) {
processorProperties.put(descriptor.getName(), null);
}
}
}
}
processorConfig.setCustomUiUrl(null);
}
// remove validation errors
processorDTO.setValidationErrors(null);
processorDTO.setInputRequirement(null);
}
}
private void scrubControllerServices(final Set<ControllerServiceDTO> controllerServices) {
for (final ControllerServiceDTO serviceDTO : controllerServices) {
final Map<String, String> properties = serviceDTO.getProperties();
final Map<String, PropertyDescriptorDTO> descriptors = serviceDTO.getDescriptors();
if (properties != null && descriptors != null) {
for (final PropertyDescriptorDTO descriptor : descriptors.values()) {
if (descriptor.isSensitive()) {
properties.put(descriptor.getName(), null);
}
}
}
serviceDTO.setCustomUiUrl(null);
serviceDTO.setValidationErrors(null);
}
}
/**
* Scrubs connections prior to saving. This includes removing available relationships.
*
* @param connections conns
*/
private void scrubConnections(final Set<ConnectionDTO> connections) {
// go through each connection
for (final ConnectionDTO connectionDTO : connections) {
connectionDTO.setAvailableRelationships(null);
scrubConnectable(connectionDTO.getSource());
scrubConnectable(connectionDTO.getDestination());
}
}
/**
* Remove unnecessary fields in connectables prior to saving.
*
* @param connectable connectable
*/
private void scrubConnectable(final ConnectableDTO connectable) {
if (connectable != null) {
connectable.setComments(null);
connectable.setExists(null);
connectable.setRunning(null);
connectable.setTransmitting(null);
connectable.setName(null);
}
}
/**
* Remove unnecessary fields in remote groups prior to saving.
*
* @param remoteGroups groups
*/
private void scrubRemoteProcessGroups(final Set<RemoteProcessGroupDTO> remoteGroups) {
// go through each remote process group
for (final RemoteProcessGroupDTO remoteProcessGroupDTO : remoteGroups) {
remoteProcessGroupDTO.setFlowRefreshed(null);
remoteProcessGroupDTO.setInputPortCount(null);
remoteProcessGroupDTO.setOutputPortCount(null);
remoteProcessGroupDTO.setTransmitting(null);
// if this remote process group has contents
if (remoteProcessGroupDTO.getContents() != null) {
RemoteProcessGroupContentsDTO contents = remoteProcessGroupDTO.getContents();
// scrub any remote input ports
if (contents.getInputPorts() != null) {
scrubRemotePorts(contents.getInputPorts());
}
// scrub and remote output ports
if (contents.getOutputPorts() != null) {
scrubRemotePorts(contents.getOutputPorts());
}
}
}
}
/**
* Remove unnecessary fields in remote ports prior to saving.
*
* @param remotePorts ports
*/
private void scrubRemotePorts(final Set<RemoteProcessGroupPortDTO> remotePorts) {
for (final Iterator<RemoteProcessGroupPortDTO> remotePortIter = remotePorts.iterator(); remotePortIter.hasNext();) {
final RemoteProcessGroupPortDTO remotePortDTO = remotePortIter.next();
// if the flow is not connected to this remote port, remove it
if (remotePortDTO.isConnected() == null || !remotePortDTO.isConnected().booleanValue()) {
remotePortIter.remove();
continue;
}
remotePortDTO.setExists(null);
remotePortDTO.setTargetRunning(null);
}
}
/**
* Removes the template with the given ID.
*
* @param id the ID of the template to remove
* @throws NullPointerException if the argument is null
* @throws IllegalStateException if no template exists with the given ID
* @throws IOException if template could not be removed
*/
public void removeTemplate(final String id) throws IOException, IllegalStateException {
writeLock.lock();
try {
final Template removed = templateMap.remove(requireNonNull(id));
// ensure the template exists
if (removed == null) {
throw new IllegalStateException("No template with ID " + id + " exists");
} else {
try {
// remove the template from the archive directory
final Path path = directory.resolve(removed.getDetails().getId() + ".template");
Files.delete(path);
} catch (final NoSuchFileException e) {
logger.warn(String.format("Template file for template %s not found when attempting to remove. Continuing...", id));
} catch (final IOException e) {
logger.error(String.format("Unable to remove template file for template %s.", id));
// since the template file existed and we were unable to remove it, rollback
// by returning it to the template map
templateMap.put(id, removed);
// rethrow
throw e;
}
}
} finally {
writeLock.unlock();
}
}
public Set<Template> getTemplates() {
readLock.lock();
try {
return new HashSet<>(templateMap.values());
} finally {
readLock.unlock();
}
}
public static List<Template> parseBytes(final byte[] bytes) {
final List<Template> templates = new ArrayList<>();
try (final InputStream rawIn = new ByteArrayInputStream(bytes);
final DataInputStream in = new DataInputStream(rawIn)) {
while (isMoreData(in)) {
final int length = in.readInt();
final byte[] buffer = new byte[length];
StreamUtils.fillBuffer(in, buffer, true);
final TemplateDTO dto = TemplateDeserializer.deserialize(new ByteArrayInputStream(buffer));
templates.add(new Template(dto));
}
} catch (final IOException e) {
throw new RuntimeException("Could not parse bytes", e); // won't happen because of the types of streams being used
}
return templates;
}
private static boolean isMoreData(final InputStream in) throws IOException {
in.mark(1);
final int nextByte = in.read();
if (nextByte == -1) {
return false;
}
in.reset();
return true;
}
public byte[] export() {
readLock.lock();
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream dos = new DataOutputStream(baos)) {
for (final Template template : templateMap.values()) {
final TemplateDTO dto = template.getDetails();
final byte[] bytes = TemplateSerializer.serialize(dto);
dos.writeInt(bytes.length);
dos.write(bytes);
}
return baos.toByteArray();
} catch (final IOException e) {
// won't happen
return null;
} finally {
readLock.unlock();
}
}
}

View File

@ -0,0 +1,287 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;
import javax.xml.transform.dom.DOMSource;
import org.apache.nifi.persistence.TemplateDeserializer;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.w3c.dom.Element;
public class TemplateUtils {
public static TemplateDTO parseDto(final Element templateElement) {
try {
JAXBContext context = JAXBContext.newInstance(TemplateDTO.class);
Unmarshaller unmarshaller = context.createUnmarshaller();
return unmarshaller.unmarshal(new DOMSource(templateElement), TemplateDTO.class).getValue();
} catch (final Exception e) {
throw new RuntimeException("Could not parse XML as a valid template", e);
}
}
public static TemplateDTO parseDto(final byte[] bytes) {
try (final InputStream in = new ByteArrayInputStream(bytes)) {
return TemplateDeserializer.deserialize(in);
} catch (final IOException ioe) {
throw new RuntimeException("Could not parse bytes as template", ioe); // won't happen because of the types of streams being used
}
}
public static List<Template> parseTemplateStream(final byte[] bytes) {
final List<Template> templates = new ArrayList<>();
try (final InputStream rawIn = new ByteArrayInputStream(bytes);
final DataInputStream in = new DataInputStream(rawIn)) {
while (isMoreData(in)) {
final int length = in.readInt();
final byte[] buffer = new byte[length];
StreamUtils.fillBuffer(in, buffer, true);
final TemplateDTO dto = TemplateDeserializer.deserialize(new ByteArrayInputStream(buffer));
templates.add(new Template(dto));
}
} catch (final IOException e) {
throw new RuntimeException("Could not parse bytes", e); // won't happen because of the types of streams being used
}
return templates;
}
private static boolean isMoreData(final InputStream in) throws IOException {
in.mark(1);
final int nextByte = in.read();
if (nextByte == -1) {
return false;
}
in.reset();
return true;
}
/**
* Scrubs the template prior to persisting in order to remove fields that shouldn't be included or are unnecessary.
*
* @param templateDto template
*/
public static void scrubTemplate(final TemplateDTO templateDto) {
scrubSnippet(templateDto.getSnippet());
}
private static void scrubSnippet(final FlowSnippetDTO snippet) {
// ensure that contents have been specified
if (snippet != null) {
// go through each processor if specified
if (snippet.getProcessors() != null) {
scrubProcessors(snippet.getProcessors());
}
// go through each connection if specified
if (snippet.getConnections() != null) {
scrubConnections(snippet.getConnections());
}
// go through each remote process group if specified
if (snippet.getRemoteProcessGroups() != null) {
scrubRemoteProcessGroups(snippet.getRemoteProcessGroups());
}
// go through each process group if specified
if (snippet.getProcessGroups() != null) {
scrubProcessGroups(snippet.getProcessGroups());
}
// go through each controller service if specified
if (snippet.getControllerServices() != null) {
scrubControllerServices(snippet.getControllerServices());
}
}
}
/**
* Scrubs process groups prior to saving.
*
* @param processGroups groups
*/
private static void scrubProcessGroups(final Set<ProcessGroupDTO> processGroups) {
// go through each process group
for (final ProcessGroupDTO processGroupDTO : processGroups) {
scrubSnippet(processGroupDTO.getContents());
}
}
/**
* Scrubs processors prior to saving. This includes removing sensitive properties, validation errors, property descriptors, etc.
*
* @param processors procs
*/
private static void scrubProcessors(final Set<ProcessorDTO> processors) {
// go through each processor
for (final ProcessorDTO processorDTO : processors) {
final ProcessorConfigDTO processorConfig = processorDTO.getConfig();
// ensure that some property configuration have been specified
if (processorConfig != null) {
// if properties have been specified, remove sensitive ones
if (processorConfig.getProperties() != null) {
Map<String, String> processorProperties = processorConfig.getProperties();
// look for sensitive properties and remove them
if (processorConfig.getDescriptors() != null) {
final Collection<PropertyDescriptorDTO> descriptors = processorConfig.getDescriptors().values();
for (PropertyDescriptorDTO descriptor : descriptors) {
if (descriptor.isSensitive()) {
processorProperties.put(descriptor.getName(), null);
}
}
}
}
processorConfig.setCustomUiUrl(null);
}
// remove validation errors
processorDTO.setValidationErrors(null);
processorDTO.setInputRequirement(null);
}
}
private static void scrubControllerServices(final Set<ControllerServiceDTO> controllerServices) {
for (final ControllerServiceDTO serviceDTO : controllerServices) {
final Map<String, String> properties = serviceDTO.getProperties();
final Map<String, PropertyDescriptorDTO> descriptors = serviceDTO.getDescriptors();
if (properties != null && descriptors != null) {
for (final PropertyDescriptorDTO descriptor : descriptors.values()) {
if (descriptor.isSensitive()) {
properties.put(descriptor.getName(), null);
}
}
}
serviceDTO.setCustomUiUrl(null);
serviceDTO.setValidationErrors(null);
}
}
/**
* Scrubs connections prior to saving. This includes removing available relationships.
*
* @param connections conns
*/
private static void scrubConnections(final Set<ConnectionDTO> connections) {
// go through each connection
for (final ConnectionDTO connectionDTO : connections) {
connectionDTO.setAvailableRelationships(null);
scrubConnectable(connectionDTO.getSource());
scrubConnectable(connectionDTO.getDestination());
}
}
/**
* Remove unnecessary fields in connectables prior to saving.
*
* @param connectable connectable
*/
private static void scrubConnectable(final ConnectableDTO connectable) {
if (connectable != null) {
connectable.setComments(null);
connectable.setExists(null);
connectable.setRunning(null);
connectable.setTransmitting(null);
connectable.setName(null);
}
}
/**
* Remove unnecessary fields in remote groups prior to saving.
*
* @param remoteGroups groups
*/
private static void scrubRemoteProcessGroups(final Set<RemoteProcessGroupDTO> remoteGroups) {
// go through each remote process group
for (final RemoteProcessGroupDTO remoteProcessGroupDTO : remoteGroups) {
remoteProcessGroupDTO.setFlowRefreshed(null);
remoteProcessGroupDTO.setInputPortCount(null);
remoteProcessGroupDTO.setOutputPortCount(null);
remoteProcessGroupDTO.setTransmitting(null);
// if this remote process group has contents
if (remoteProcessGroupDTO.getContents() != null) {
RemoteProcessGroupContentsDTO contents = remoteProcessGroupDTO.getContents();
// scrub any remote input ports
if (contents.getInputPorts() != null) {
scrubRemotePorts(contents.getInputPorts());
}
// scrub and remote output ports
if (contents.getOutputPorts() != null) {
scrubRemotePorts(contents.getOutputPorts());
}
}
}
}
/**
* Remove unnecessary fields in remote ports prior to saving.
*
* @param remotePorts ports
*/
private static void scrubRemotePorts(final Set<RemoteProcessGroupPortDTO> remotePorts) {
for (final Iterator<RemoteProcessGroupPortDTO> remotePortIter = remotePorts.iterator(); remotePortIter.hasNext();) {
final RemoteProcessGroupPortDTO remotePortDTO = remotePortIter.next();
// if the flow is not connected to this remote port, remove it
if (remotePortDTO.isConnected() == null || !remotePortDTO.isConnected().booleanValue()) {
remotePortIter.remove();
continue;
}
remotePortDTO.setExists(null);
remotePortDTO.setTargetRunning(null);
}
}
}

View File

@ -17,6 +17,8 @@
package org.apache.nifi.controller.serialization;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -42,6 +44,7 @@ import org.apache.nifi.connectable.Size;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
@ -49,12 +52,14 @@ import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.persistence.TemplateSerializer;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.w3c.dom.DOMException;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
/**
* Serializes a Flow Controller as XML to an output stream.
@ -186,6 +191,10 @@ public class StandardFlowSerializer implements FlowSerializer {
for (final ControllerServiceNode service : group.getControllerServices(false)) {
addControllerService(element, service);
}
for (final Template template : group.getTemplates()) {
addTemplate(element, template);
}
}
private void addStyle(final Element parentElement, final Map<String, String> style) {
@ -456,4 +465,21 @@ public class StandardFlowSerializer implements FlowSerializer {
element.appendChild(toAdd);
}
public static void addTemplate(final Element element, final Template template) {
try {
final byte[] serialized = TemplateSerializer.serialize(template.getDetails());
final DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
final DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder();
final Document document;
try (final InputStream in = new ByteArrayInputStream(serialized)) {
document = docBuilder.parse(in);
}
final Node templateNode = element.getOwnerDocument().importNode(document.getDocumentElement(), true);
element.appendChild(templateNode);
} catch (final Exception e) {
throw new FlowSerializationException(e);
}
}
}

View File

@ -39,6 +39,7 @@ import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.Template;
@ -47,12 +48,12 @@ import org.apache.nifi.controller.serialization.FlowFromDOMFactory;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.util.DomUtils;
import org.apache.nifi.web.api.dto.ComponentDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.FunnelDTO;
import org.apache.nifi.web.api.dto.LabelDTO;
import org.apache.nifi.web.api.dto.ComponentDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
@ -62,7 +63,6 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
@ -311,6 +311,7 @@ public final class FingerprintFactory {
private StringBuilder addTemplateFingerprint(final StringBuilder builder, final TemplateDTO dto) {
builder.append(dto.getId());
builder.append(dto.getGroupId());
builder.append(dto.getName());
builder.append(dto.getDescription());
final FlowSnippetDTO snippet = dto.getSnippet();

View File

@ -41,6 +41,7 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
@ -95,6 +96,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private final Map<String, ProcessorNode> processors = new HashMap<>();
private final Map<String, Funnel> funnels = new HashMap<>();
private final Map<String, ControllerServiceNode> controllerServices = new HashMap<>();
private final Map<String, Template> templates = new HashMap<>();
private final StringEncryptor encryptor;
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
@ -1882,6 +1884,99 @@ public final class StandardProcessGroup implements ProcessGroup {
}
@Override
public void addTemplate(final Template template) {
requireNonNull(template);
writeLock.lock();
try {
final String id = template.getDetails().getId();
if (id == null) {
throw new IllegalStateException("Cannot add template that has no ID");
}
if (templates.containsKey(id)) {
throw new IllegalStateException("Process Group already contains a Template with ID " + id);
}
templates.put(id, template);
template.setProcessGroup(this);
LOG.info("{} added to {}", template, this);
} finally {
writeLock.unlock();
}
}
@Override
public Template getTemplate(final String id) {
readLock.lock();
try {
return templates.get(id);
} finally {
readLock.unlock();
}
}
@Override
public Template findTemplate(final String id) {
return findTemplate(id, this);
}
private Template findTemplate(final String id, final ProcessGroup start) {
final Template template = start.getTemplate(id);
if (template != null) {
return template;
}
for (final ProcessGroup child : start.getProcessGroups()) {
final Template childTemplate = findTemplate(id, child);
if (childTemplate != null) {
return childTemplate;
}
}
return null;
}
@Override
public Set<Template> getTemplates() {
readLock.lock();
try {
return new HashSet<>(templates.values());
} finally {
readLock.unlock();
}
}
@Override
public Set<Template> findAllTemplates() {
return findAllTemplates(this);
}
private Set<Template> findAllTemplates(final ProcessGroup group) {
final Set<Template> templates = new HashSet<>(group.getTemplates());
for (final ProcessGroup childGroup : group.getProcessGroups()) {
templates.addAll(findAllTemplates(childGroup));
}
return templates;
}
@Override
public void removeTemplate(final Template template) {
writeLock.lock();
try {
final Template existing = templates.get(requireNonNull(template).getIdentifier());
if (existing == null) {
throw new IllegalStateException(template + " is not a member of this ProcessGroup");
}
templates.remove(template.getIdentifier());
LOG.info("{} removed from flow", template);
} finally {
writeLock.unlock();
}
}
@Override
public void remove(final Snippet snippet) {
writeLock.lock();

View File

@ -44,4 +44,5 @@ public final class TemplateSerializer {
throw new FlowSerializationException(e);
}
}
}

View File

@ -80,7 +80,7 @@ public class StandardFlowServiceTest {
@Test
public void testLoadWithFlow() throws IOException {
byte[] flowBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow.xml"));
flowService.load(new StandardDataFlow(flowBytes, null, null));
flowService.load(new StandardDataFlow(flowBytes, null));
FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@ -95,16 +95,16 @@ public class StandardFlowServiceTest {
@Test(expected = FlowSerializationException.class)
public void testLoadWithCorruptFlow() throws IOException {
byte[] flowBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow-corrupt.xml"));
flowService.load(new StandardDataFlow(flowBytes, null, null));
flowService.load(new StandardDataFlow(flowBytes, null));
}
@Test
public void testLoadExistingFlow() throws IOException {
byte[] flowBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow.xml"));
flowService.load(new StandardDataFlow(flowBytes, null, null));
flowService.load(new StandardDataFlow(flowBytes, null));
flowBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow-inheritable.xml"));
flowService.load(new StandardDataFlow(flowBytes, null, null));
flowService.load(new StandardDataFlow(flowBytes, null));
FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@ -118,11 +118,11 @@ public class StandardFlowServiceTest {
@Test
public void testLoadExistingFlowWithUninheritableFlow() throws IOException {
byte[] originalBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow.xml"));
flowService.load(new StandardDataFlow(originalBytes, null, null));
flowService.load(new StandardDataFlow(originalBytes, null));
try {
byte[] updatedBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow-uninheritable.xml"));
flowService.load(new StandardDataFlow(updatedBytes, null, null));
flowService.load(new StandardDataFlow(updatedBytes, null));
fail("should have thrown " + UninheritableFlowException.class);
} catch (UninheritableFlowException ufe) {
@ -140,11 +140,11 @@ public class StandardFlowServiceTest {
@Test
public void testLoadExistingFlowWithCorruptFlow() throws IOException {
byte[] originalBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow.xml"));
flowService.load(new StandardDataFlow(originalBytes, null, null));
flowService.load(new StandardDataFlow(originalBytes, null));
try {
byte[] updatedBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow-corrupt.xml"));
flowService.load(new StandardDataFlow(updatedBytes, null, null));
flowService.load(new StandardDataFlow(updatedBytes, null));
fail("should have thrown " + FlowSerializationException.class);
} catch (FlowSerializationException ufe) {

View File

@ -32,6 +32,7 @@ import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.groups.ProcessGroup;
@ -527,4 +528,33 @@ public class MockProcessGroup implements ProcessGroup {
}
@Override
public void addTemplate(Template template) {
throw new UnsupportedOperationException();
}
@Override
public void removeTemplate(Template template) {
}
@Override
public Template getTemplate(String id) {
return null;
}
@Override
public Template findTemplate(String id) {
return null;
}
@Override
public Set<Template> getTemplates() {
return null;
}
@Override
public Set<Template> findAllTemplates() {
return null;
}
}

View File

@ -304,17 +304,20 @@ public interface NiFiServiceFacade {
* @param name name
* @param description description
* @param snippetId id
* @param groupId id of the process group
* @return template
*/
TemplateDTO createTemplate(String name, String description, String snippetId);
TemplateDTO createTemplate(String name, String description, String snippetId, String groupId);
/**
* Imports the specified Template.
*
* @param templateDTO The template dto
* @param groupId id of the process group
*
* @return The new template dto
*/
TemplateDTO importTemplate(TemplateDTO templateDTO);
TemplateDTO importTemplate(TemplateDTO templateDTO, String groupId);
/**
* Instantiate the corresponding template.

View File

@ -16,8 +16,31 @@
*/
package org.apache.nifi.web;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.ws.rs.WebApplicationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
@ -166,35 +189,14 @@ import org.apache.nifi.web.revision.StandardRevisionClaim;
import org.apache.nifi.web.revision.StandardRevisionUpdate;
import org.apache.nifi.web.revision.UpdateRevisionTask;
import org.apache.nifi.web.util.SnippetUtils;
import javax.ws.rs.WebApplicationException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of NiFiServiceFacade that performs revision checking.
*/
public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private static final Logger logger = Logger.getLogger(StandardNiFiServiceFacade.class);
private static final Logger logger = LoggerFactory.getLogger(StandardNiFiServiceFacade.class);
// nifi core components
private ControllerFacade controllerFacade;
@ -258,136 +260,236 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public void verifyUpdateConnection(ConnectionDTO connectionDTO) {
// if connection does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (connectionDAO.hasConnection(connectionDTO.getId())) {
connectionDAO.verifyUpdate(connectionDTO);
} else {
connectionDAO.verifyCreate(connectionDTO.getParentGroupId(), connectionDTO);
try {
// if connection does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (connectionDAO.hasConnection(connectionDTO.getId())) {
connectionDAO.verifyUpdate(connectionDTO);
} else {
connectionDAO.verifyCreate(connectionDTO.getParentGroupId(), connectionDTO);
}
} catch (final Exception e) {
revisionManager.cancelClaim(connectionDTO.getId());
throw e;
}
}
@Override
public void verifyDeleteConnection(String connectionId) {
connectionDAO.verifyDelete(connectionId);
try {
connectionDAO.verifyDelete(connectionId);
} catch (final Exception e) {
revisionManager.cancelClaim(connectionId);
throw e;
}
}
@Override
public void verifyDeleteFunnel(String funnelId) {
funnelDAO.verifyDelete(funnelId);
try {
funnelDAO.verifyDelete(funnelId);
} catch (final Exception e) {
revisionManager.cancelClaim(funnelId);
throw e;
}
}
@Override
public void verifyUpdateInputPort(PortDTO inputPortDTO) {
// if connection does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (inputPortDAO.hasPort(inputPortDTO.getId())) {
inputPortDAO.verifyUpdate(inputPortDTO);
try {
// if connection does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (inputPortDAO.hasPort(inputPortDTO.getId())) {
inputPortDAO.verifyUpdate(inputPortDTO);
}
} catch (final Exception e) {
revisionManager.cancelClaim(inputPortDTO.getId());
throw e;
}
}
@Override
public void verifyDeleteInputPort(String inputPortId) {
inputPortDAO.verifyDelete(inputPortId);
try {
inputPortDAO.verifyDelete(inputPortId);
} catch (final Exception e) {
revisionManager.cancelClaim(inputPortId);
throw e;
}
}
@Override
public void verifyUpdateOutputPort(PortDTO outputPortDTO) {
// if connection does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (outputPortDAO.hasPort(outputPortDTO.getId())) {
outputPortDAO.verifyUpdate(outputPortDTO);
try {
// if connection does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (outputPortDAO.hasPort(outputPortDTO.getId())) {
outputPortDAO.verifyUpdate(outputPortDTO);
}
} catch (final Exception e) {
revisionManager.cancelClaim(outputPortDTO.getId());
throw e;
}
}
@Override
public void verifyDeleteOutputPort(String outputPortId) {
outputPortDAO.verifyDelete(outputPortId);
try {
outputPortDAO.verifyDelete(outputPortId);
} catch (final Exception e) {
revisionManager.cancelClaim(outputPortId);
throw e;
}
}
@Override
public void verifyUpdateProcessor(ProcessorDTO processorDTO) {
// if group does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (processorDAO.hasProcessor(processorDTO.getId())) {
processorDAO.verifyUpdate(processorDTO);
try {
// if group does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (processorDAO.hasProcessor(processorDTO.getId())) {
processorDAO.verifyUpdate(processorDTO);
}
} catch (final Exception e) {
revisionManager.cancelClaim(processorDTO.getId());
throw e;
}
}
@Override
public void verifyDeleteProcessor(String processorId) {
processorDAO.verifyDelete(processorId);
try {
processorDAO.verifyDelete(processorId);
} catch (final Exception e) {
revisionManager.cancelClaim(processorId);
throw e;
}
}
@Override
public void verifyUpdateProcessGroup(ProcessGroupDTO processGroupDTO) {
// if group does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (processGroupDAO.hasProcessGroup(processGroupDTO.getId())) {
processGroupDAO.verifyUpdate(processGroupDTO);
try {
// if group does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (processGroupDAO.hasProcessGroup(processGroupDTO.getId())) {
processGroupDAO.verifyUpdate(processGroupDTO);
}
} catch (final Exception e) {
revisionManager.cancelClaim(processGroupDTO.getId());
throw e;
}
}
@Override
public void verifyDeleteProcessGroup(String groupId) {
processGroupDAO.verifyDelete(groupId);
try {
processGroupDAO.verifyDelete(groupId);
} catch (final Exception e) {
revisionManager.cancelClaim(groupId);
throw e;
}
}
@Override
public void verifyUpdateRemoteProcessGroup(RemoteProcessGroupDTO remoteProcessGroupDTO) {
// if remote group does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (remoteProcessGroupDAO.hasRemoteProcessGroup(remoteProcessGroupDTO.getId())) {
remoteProcessGroupDAO.verifyUpdate(remoteProcessGroupDTO);
try {
// if remote group does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (remoteProcessGroupDAO.hasRemoteProcessGroup(remoteProcessGroupDTO.getId())) {
remoteProcessGroupDAO.verifyUpdate(remoteProcessGroupDTO);
}
} catch (final Exception e) {
revisionManager.cancelClaim(remoteProcessGroupDTO.getId());
throw e;
}
}
@Override
public void verifyUpdateRemoteProcessGroupInputPort(String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
remoteProcessGroupDAO.verifyUpdateInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO);
try {
remoteProcessGroupDAO.verifyUpdateInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO);
} catch (final Exception e) {
revisionManager.cancelClaim(remoteProcessGroupId);
throw e;
}
}
@Override
public void verifyUpdateRemoteProcessGroupOutputPort(String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
remoteProcessGroupDAO.verifyUpdateOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO);
try {
remoteProcessGroupDAO.verifyUpdateOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO);
} catch (final Exception e) {
revisionManager.cancelClaim(remoteProcessGroupId);
throw e;
}
}
@Override
public void verifyDeleteRemoteProcessGroup(String remoteProcessGroupId) {
remoteProcessGroupDAO.verifyDelete(remoteProcessGroupId);
try {
remoteProcessGroupDAO.verifyDelete(remoteProcessGroupId);
} catch (final Exception e) {
revisionManager.cancelClaim(remoteProcessGroupId);
throw e;
}
}
@Override
public void verifyUpdateControllerService(ControllerServiceDTO controllerServiceDTO) {
// if service does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId())) {
controllerServiceDAO.verifyUpdate(controllerServiceDTO);
try {
// if service does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId())) {
controllerServiceDAO.verifyUpdate(controllerServiceDTO);
}
} catch (final Exception e) {
revisionManager.cancelClaim(controllerServiceDTO.getId());
throw e;
}
}
@Override
public void verifyUpdateControllerServiceReferencingComponents(String controllerServiceId, ScheduledState scheduledState, ControllerServiceState controllerServiceState) {
controllerServiceDAO.verifyUpdateReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
try {
controllerServiceDAO.verifyUpdateReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
} catch (final Exception e) {
revisionManager.cancelClaim(controllerServiceId);
throw e;
}
}
@Override
public void verifyDeleteControllerService(String controllerServiceId) {
controllerServiceDAO.verifyDelete(controllerServiceId);
try {
controllerServiceDAO.verifyDelete(controllerServiceId);
} catch (final Exception e) {
revisionManager.cancelClaim(controllerServiceId);
throw e;
}
}
@Override
public void verifyUpdateReportingTask(ReportingTaskDTO reportingTaskDTO) {
// if tasks does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId())) {
reportingTaskDAO.verifyUpdate(reportingTaskDTO);
try {
// if tasks does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId())) {
reportingTaskDAO.verifyUpdate(reportingTaskDTO);
}
} catch (final Exception e) {
revisionManager.cancelClaim(reportingTaskDTO.getId());
throw e;
}
}
@Override
public void verifyDeleteReportingTask(String reportingTaskId) {
reportingTaskDAO.verifyDelete(reportingTaskId);
try {
reportingTaskDAO.verifyDelete(reportingTaskId);
} catch (final Exception e) {
revisionManager.cancelClaim(reportingTaskId);
throw e;
}
}
// -----------------------------------------
@ -515,10 +617,15 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public void verifyUpdateSnippet(SnippetDTO snippetDto) {
// if snippet does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (snippetDAO.hasSnippet(snippetDto.getId())) {
snippetDAO.verifyUpdate(snippetDto);
try {
// if snippet does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (snippetDAO.hasSnippet(snippetDto.getId())) {
snippetDAO.verifyUpdate(snippetDto);
}
} catch (final Exception e) {
revisionManager.cancelClaim(snippetDto.getId());
throw e;
}
}
@ -843,7 +950,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public void verifyCanClearProcessorState(final String processorId) {
processorDAO.verifyClearState(processorId);
try {
processorDAO.verifyClearState(processorId);
} catch (final Exception e) {
revisionManager.cancelClaim(processorId);
throw e;
}
}
@Override
@ -874,7 +986,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public void verifyCanClearControllerServiceState(final String controllerServiceId) {
controllerServiceDAO.verifyClearState(controllerServiceId);
try {
controllerServiceDAO.verifyClearState(controllerServiceId);
} catch (final Exception e) {
revisionManager.cancelClaim(controllerServiceId);
throw e;
}
}
@Override
@ -884,7 +1001,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public void verifyCanClearReportingTaskState(final String reportingTaskId) {
reportingTaskDAO.verifyClearState(reportingTaskId);
try {
reportingTaskDAO.verifyClearState(reportingTaskId);
} catch (final Exception e) {
revisionManager.cancelClaim(reportingTaskId);
throw e;
}
}
@Override
@ -978,6 +1100,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return revisionManager.deleteRevision(claim, new DeleteRevisionTask<D>() {
@Override
public D performTask() {
logger.debug("Attempting to delete component {} with claim {}", authorizable, claim);
// ensure access to the component
authorizable.authorize(authorizer, RequestAction.WRITE);
@ -985,6 +1109,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
logger.debug("Deletion of component {} was successful", authorizable);
return dto;
}
@ -993,7 +1118,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public void verifyDeleteSnippet(String id) {
snippetDAO.verifyDelete(id);
try {
snippetDAO.verifyDelete(id);
} catch (final Exception e) {
revisionManager.cancelClaim(id);
throw e;
}
}
@Override
@ -1394,7 +1524,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public TemplateDTO createTemplate(String name, String description, String snippetId) {
public TemplateDTO createTemplate(String name, String description, String snippetId, String groupId) {
// get the specified snippet
Snippet snippet = snippetDAO.getSnippet(snippetId);
@ -1409,27 +1539,31 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
templateDTO.setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
} else {
templateDTO.setId(UUID.randomUUID().toString());
}
// create the template
Template template = templateDAO.createTemplate(templateDTO);
Template template = templateDAO.createTemplate(templateDTO, groupId);
return dtoFactory.createTemplateDTO(template);
}
@Override
public TemplateDTO importTemplate(TemplateDTO templateDTO) {
public TemplateDTO importTemplate(TemplateDTO templateDTO, String groupId) {
// ensure id is set
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
templateDTO.setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
} else {
templateDTO.setId(UUID.randomUUID().toString());
}
// mark the timestamp
templateDTO.setTimestamp(new Date());
// import the template
final Template template = templateDAO.importTemplate(templateDTO);
final Template template = templateDAO.importTemplate(templateDTO, groupId);
// return the template dto
return dtoFactory.createTemplateDTO(template);

View File

@ -96,6 +96,7 @@ import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@ -2187,7 +2188,8 @@ public class ProcessGroupResource extends ApplicationResource {
// create the template and generate the json
final RevisionDTO revisionDTO = createTemplateRequestEntity.getRevision();
final TemplateDTO template = serviceFacade.createTemplate(createTemplateRequestEntity.getName(), createTemplateRequestEntity.getDescription(), createTemplateRequestEntity.getSnippetId());
final TemplateDTO template = serviceFacade.createTemplate(createTemplateRequestEntity.getName(), createTemplateRequestEntity.getDescription(),
createTemplateRequestEntity.getSnippetId(), groupId);
templateResource.populateRemainingTemplateContent(template);
// create the revision
@ -2320,7 +2322,7 @@ public class ProcessGroupResource extends ApplicationResource {
}
// import the template
final TemplateDTO template = serviceFacade.importTemplate(templateEntity.getTemplate());
final TemplateDTO template = serviceFacade.importTemplate(templateEntity.getTemplate(), groupId);
templateResource.populateRemainingTemplateContent(template);
// create the revision
@ -2502,6 +2504,11 @@ public class ProcessGroupResource extends ApplicationResource {
// replicate if cluster manager
if (properties.isClusterManager() && Availability.NODE.equals(avail)) {
return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
} else if (properties.isClusterManager()) {
// create the response entity
final ControllerServicesEntity entity = new ControllerServicesEntity();
entity.setControllerServices(Collections.emptySet());
return clusterContext(generateOkResponse(entity)).build();
}
// get all the controller services

View File

@ -46,6 +46,8 @@ import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.PropertyDescriptorEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.LongParameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
@ -77,6 +79,7 @@ import java.util.Set;
description = "Endpoint for managing a Processor."
)
public class ProcessorResource extends ApplicationResource {
private static final Logger logger = LoggerFactory.getLogger(ProcessorResource.class);
private static final List<Long> POSSIBLE_RUN_DURATIONS = Arrays.asList(0L, 25L, 50L, 100L, 250L, 500L, 1000L, 2000L);
@ -481,16 +484,32 @@ public class ProcessorResource extends ApplicationResource {
// handle expects request (usually from the cluster manager)
final Revision revision = getRevision(processorEntity, id);
final boolean validationPhase = isValidationPhase(httpServletRequest);
if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
final boolean twoPhaseRequest = isTwoPhaseRequest(httpServletRequest);
final String requestId = getHeaders().get("X-RequestTransactionId");
logger.debug("For Update Processor, Validation Phase = {}, Two-phase request = {}, Request ID = {}", validationPhase, twoPhaseRequest, requestId);
if (validationPhase || !twoPhaseRequest) {
serviceFacade.claimRevision(revision);
logger.debug("Claimed Revision {}", revision);
}
if (validationPhase) {
serviceFacade.verifyUpdateProcessor(requestProcessorDTO);
logger.debug("Verified Update of Processor");
return generateContinueResponse().build();
}
// update the processor
final UpdateResult<ProcessorEntity> result = serviceFacade.updateProcessor(revision, requestProcessorDTO);
final UpdateResult<ProcessorEntity> result;
try {
logger.debug("Updating Processor with Revision {}", revision);
result = serviceFacade.updateProcessor(revision, requestProcessorDTO);
logger.debug("Updated Processor with Revision {}", revision);
} catch (final Exception e) {
final boolean tpr = isTwoPhaseRequest(httpServletRequest);
logger.error("Got Exception trying to update processor. two-phase request = {}, validation phase = {}, revision = {}", tpr, validationPhase, revision);
logger.error("", e);
throw e;
}
final ProcessorEntity entity = result.getResult();
populateRemainingProcessorEntityContent(entity);

View File

@ -35,10 +35,7 @@ public class IllegalArgumentExceptionMapper implements ExceptionMapper<IllegalAr
public Response toResponse(IllegalArgumentException exception) {
// log the error
logger.info(String.format("%s. Returning %s response.", exception, Response.Status.BAD_REQUEST));
if (logger.isDebugEnabled()) {
logger.debug(StringUtils.EMPTY, exception);
}
logger.debug(StringUtils.EMPTY, exception);
return Response.status(Response.Status.BAD_REQUEST).entity(exception.getMessage()).type("text/plain").build();
}

View File

@ -780,7 +780,7 @@ public class ControllerFacade implements Authorizable {
}
// add each template
for (final Template template : flowController.getTemplates()) {
for (final Template template : root.findAllTemplates()) {
final TemplateDTO details = template.getDetails();
resources.add(ResourceFactory.getComponentResource(ResourceType.Template, details.getId(), details.getName()));
}

View File

@ -28,17 +28,19 @@ public interface TemplateDAO {
* Creates a template.
*
* @param templateDTO The template DTO
* @param groupId the ID of the group to add the template to
* @return The template
*/
Template createTemplate(TemplateDTO templateDTO);
Template createTemplate(TemplateDTO templateDTO, String groupId);
/**
* Import the specified template.
*
* @param templateDTO dto
* @param groupId the ID of the group to add the template to
* @return template
*/
Template importTemplate(TemplateDTO templateDTO);
Template importTemplate(TemplateDTO templateDTO, String groupId);
/**
* Instantiate the corresponding template.

View File

@ -16,12 +16,13 @@
*/
package org.apache.nifi.web.dao.impl;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import javax.ws.rs.WebApplicationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.TemplateUtils;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.web.NiFiCoreException;
@ -30,7 +31,6 @@ import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.dao.TemplateDAO;
import org.apache.nifi.web.util.SnippetUtils;
import org.apache.commons.lang3.StringUtils;
/**
*
@ -42,7 +42,7 @@ public class StandardTemplateDAO extends ComponentDAO implements TemplateDAO {
private Template locateTemplate(String templateId) {
// get the template
Template template = flowController.getTemplate(templateId);
Template template = flowController.getGroup(flowController.getRootGroupId()).findTemplate(templateId);
// ensure the template exists
if (template == null) {
@ -53,21 +53,22 @@ public class StandardTemplateDAO extends ComponentDAO implements TemplateDAO {
}
@Override
public Template createTemplate(TemplateDTO templateDTO) {
try {
return flowController.addTemplate(templateDTO);
} catch (IOException ioe) {
throw new WebApplicationException(new IOException("Unable to save specified template: " + ioe.getMessage()));
public Template createTemplate(TemplateDTO templateDTO, String groupId) {
final ProcessGroup processGroup = flowController.getGroup(groupId);
if (processGroup == null) {
throw new ResourceNotFoundException("Could not find Process Group with ID " + groupId);
}
TemplateUtils.scrubTemplate(templateDTO);
final Template template = new Template(templateDTO);
processGroup.addTemplate(template);
return template;
}
@Override
public Template importTemplate(TemplateDTO templateDTO) {
try {
return flowController.importTemplate(templateDTO);
} catch (IOException ioe) {
throw new WebApplicationException(new IOException("Unable to import specified template: " + ioe.getMessage()));
}
public Template importTemplate(TemplateDTO templateDTO, String groupId) {
return createTemplate(templateDTO, groupId);
}
@Override
@ -75,7 +76,7 @@ public class StandardTemplateDAO extends ComponentDAO implements TemplateDAO {
ProcessGroup group = locateProcessGroup(flowController, groupId);
// get the template id and find the template
Template template = flowController.getTemplate(templateId);
Template template = getTemplate(templateId);
// ensure the template could be found
if (template == null) {
@ -103,14 +104,10 @@ public class StandardTemplateDAO extends ComponentDAO implements TemplateDAO {
@Override
public void deleteTemplate(String templateId) {
// ensure the template exists
locateTemplate(templateId);
final Template template = locateTemplate(templateId);
try {
// remove the specified template
flowController.removeTemplate(templateId);
} catch (final IOException ioe) {
throw new WebApplicationException(new IOException("Unable to remove specified template: " + ioe.getMessage()));
}
// remove the specified template
template.getProcessGroup().removeTemplate(template);
}
@Override
@ -121,7 +118,7 @@ public class StandardTemplateDAO extends ComponentDAO implements TemplateDAO {
@Override
public Set<Template> getTemplates() {
final Set<Template> templates = new HashSet<>();
for (final Template template : flowController.getTemplates()) {
for (final Template template : flowController.getGroup(flowController.getRootGroupId()).findAllTemplates()) {
templates.add(template);
}
return templates;

View File

@ -62,7 +62,7 @@ public class NaiveRevisionManager implements RevisionManager {
}
/**
* Constructs a new NaiveRevisionManager that uses the given number of Nanoseconds as the expiration time
* Constructs a new NaiveRevisionManager that uses the given amount of time as the expiration time
* for a Revision Claims
*
* @param claimExpiration how long a Revision Claim should last
@ -92,7 +92,7 @@ public class NaiveRevisionManager implements RevisionManager {
final RevisionLock revisionLock = getRevisionLock(revision);
final ClaimResult claimResult = revisionLock.requestClaim(revision);
logger.debug("Obtained Revision Claim for {}", revision);
logger.trace("Obtained Revision Claim for {}", revision);
if (claimResult.isSuccessful()) {
locksObtained.add(revisionLock);
@ -107,7 +107,7 @@ public class NaiveRevisionManager implements RevisionManager {
// if we got a Revision Claim on each Revision, return a successful result
if (locksObtained.size() == revisionList.size()) {
logger.debug("Obtained Revision Claim for all components");
logger.trace("Obtained Revision Claim for all components");
// it's possible that obtaining the locks took a while if we are obtaining
// many. Renew the timestamp to ensure that the first locks obtained don't
@ -123,6 +123,7 @@ public class NaiveRevisionManager implements RevisionManager {
// We failed to obtain all of the Revision Claims necessary. Since
// we need this call to atomically obtain all or nothing, we have to now
// release the locks that we did obtain.
logger.debug("Failed to obtain all necessary Revisions; releasing claims for {}", locksObtained);
for (final RevisionLock revisionLock : locksObtained) {
revisionLock.releaseClaim();
}
@ -158,7 +159,7 @@ public class NaiveRevisionManager implements RevisionManager {
final boolean verified = revisionLock.requestWriteLock(revision);
if (verified) {
logger.debug("Verified Revision Claim for {}", revision);
logger.trace("Verified Revision Claim for {}", revision);
successCount++;
} else {
logger.debug("Failed to verify Revision Claim for {}", revision);
@ -168,7 +169,7 @@ public class NaiveRevisionManager implements RevisionManager {
}
if (successCount == revisionList.size()) {
logger.debug("Successfully verified Revision Claim for all revisions");
logger.debug("Successfully verified Revision Claim for all revisions {}", claim);
final T taskValue = task.performTask();
for (final Revision revision : revisionList) {
@ -206,7 +207,7 @@ public class NaiveRevisionManager implements RevisionManager {
final boolean verified = revisionLock.requestWriteLock(revision);
if (verified) {
logger.debug("Verified Revision Claim for {}", revision);
logger.trace("Verified Revision Claim for {}", revision);
successCount++;
} else {
logger.debug("Failed to verify Revision Claim for {}", revision);
@ -245,7 +246,14 @@ public class NaiveRevisionManager implements RevisionManager {
}
for (final Revision revision : revisionList) {
getRevisionLock(revision).unlock(revision, updatedRevisions.get(revision), modifier);
final Revision updatedRevision = updatedRevisions.get(revision);
getRevisionLock(revision).unlock(revision, updatedRevision, modifier);
if (updatedRevision.getVersion() != revision.getVersion()) {
logger.debug("Unlocked Revision {} and updated associated Version to {}", revision, updatedRevision.getVersion());
} else {
logger.debug("Unlocked Revision {} without updating Version", revision);
}
}
}
@ -280,6 +288,20 @@ public class NaiveRevisionManager implements RevisionManager {
return success;
}
@Override
public boolean cancelClaim(String componentId) {
logger.debug("Attempting to cancel claim for component {}", componentId);
final Revision revision = new Revision(0L, null, componentId);
final RevisionLock revisionLock = getRevisionLock(revision);
if (revisionLock == null) {
logger.debug("No Revision Lock exists for Component {} - there is no claim to cancel", componentId);
return false;
}
return revisionLock.releaseClaimIfCurrentThread();
}
@Override
public <T> T get(final String componentId, final ReadOnlyRevisionCallback<T> callback) {
final RevisionLock revisionLock = revisionLockMap.computeIfAbsent(componentId, id -> new RevisionLock(new FlowModification(new Revision(0L, null, id), null), claimExpirationNanos));
@ -301,15 +323,16 @@ public class NaiveRevisionManager implements RevisionManager {
sortedIds.sort(Collator.getInstance());
final Stack<RevisionLock> revisionLocks = new Stack<>();
logger.debug("Will attempt to obtain read locks for components {}", componentIds);
for (final String componentId : sortedIds) {
final RevisionLock revisionLock = revisionLockMap.computeIfAbsent(componentId, id -> new RevisionLock(new FlowModification(new Revision(0L, null, id), null), claimExpirationNanos));
logger.debug("Attempting to obtain read lock for {}", revisionLock.getRevision());
logger.trace("Attempting to obtain read lock for {}", revisionLock.getRevision());
revisionLock.acquireReadLock();
revisionLocks.push(revisionLock);
logger.debug("Obtained read lock for {}", revisionLock.getRevision());
logger.trace("Obtained read lock for {}", revisionLock.getRevision());
}
logger.debug("Obtained read lock for all necessary components; calling call-back");
logger.debug("Obtained read lock for all necessary components {}; calling call-back", componentIds);
try {
return callback.get();
} finally {
@ -327,7 +350,6 @@ public class NaiveRevisionManager implements RevisionManager {
return;
}
revisionLock.releaseClaim();
}
@ -390,38 +412,44 @@ public class NaiveRevisionManager implements RevisionManager {
Objects.requireNonNull(proposedRevision);
threadLock.writeLock().lock();
if (getRevision().equals(proposedRevision)) {
final LockStamp stamp = lockStamp.get();
boolean releaseLock = true;
try {
if (getRevision().equals(proposedRevision)) {
final LockStamp stamp = lockStamp.get();
if (stamp == null) {
logger.debug("Attempted to obtain write lock for {} but no Claim was obtained", proposedRevision);
throw new IllegalStateException("No claim has been obtained for " + proposedRevision + " so cannot lock the component for modification");
}
if (stamp.getClientId() == null || stamp.getClientId().equals(proposedRevision.getClientId())) {
// TODO - Must make sure that we don't have an expired stamp if it is the result of another
// operation taking a long time. I.e., Client A fires off two requests for Component X. If the
// first one takes 2 minutes to complete, it should not result in the second request getting
// rejected. I.e., we want to ensure that if the request is received before the Claim expired,
// that we do not throw an ExpiredRevisionClaimException. Expiration of the Revision is intended
// only to avoid the case where a node obtains a Claim and then the node is lost or otherwise does
// not fulfill the second phase of the two-phase commit.
// We may need a Queue of updates (queue would need to be bounded, with a request getting
// rejected if queue is full).
if (stamp.isExpired()) {
threadLock.writeLock().unlock();
throw new ExpiredRevisionClaimException("Claim for " + proposedRevision + " has expired");
if (stamp == null) {
final IllegalStateException ise = new IllegalStateException("No claim has been obtained for " + proposedRevision + " so cannot lock the component for modification");
logger.debug("Attempted to obtain write lock for {} but no Claim was obtained; throwing IllegalStateException", proposedRevision, ise);
throw ise;
}
// Intentionally leave the thread lock in a locked state!
return true;
} else {
logger.debug("Failed to verify {} because the Client ID was not the same as the Lock Stamp's Client ID (Lock Stamp was {})", proposedRevision, stamp);
if (stamp.getClientId() == null || stamp.getClientId().equals(proposedRevision.getClientId())) {
// TODO - Must make sure that we don't have an expired stamp if it is the result of another
// operation taking a long time. I.e., Client A fires off two requests for Component X. If the
// first one takes 2 minutes to complete, it should not result in the second request getting
// rejected. I.e., we want to ensure that if the request is received before the Claim expired,
// that we do not throw an ExpiredRevisionClaimException. Expiration of the Revision is intended
// only to avoid the case where a node obtains a Claim and then the node is lost or otherwise does
// not fulfill the second phase of the two-phase commit.
// We may need a Queue of updates (queue would need to be bounded, with a request getting
// rejected if queue is full).
if (stamp.isExpired()) {
throw new ExpiredRevisionClaimException("Claim for " + proposedRevision + " has expired");
}
// Intentionally leave the thread lock in a locked state!
releaseLock = false;
return true;
} else {
logger.debug("Failed to verify {} because the Client ID was not the same as the Lock Stamp's Client ID (Lock Stamp was {})", proposedRevision, stamp);
}
}
} finally {
if (releaseLock) {
threadLock.writeLock().unlock();
}
}
// revision is wrong. Unlock thread lock and return false
threadLock.writeLock().unlock();
return false;
}
@ -471,6 +499,29 @@ public class NaiveRevisionManager implements RevisionManager {
lockStamp.set(null);
}
public boolean releaseClaimIfCurrentThread() {
threadLock.writeLock().lock();
try {
final LockStamp stamp = lockStamp.get();
if (stamp == null) {
logger.debug("Cannot cancel claim for {} because there is no claim held", getRevision());
return false;
}
if (stamp.isObtainedByCurrentThread()) {
releaseClaim();
logger.debug("Successfully canceled claim for {}", getRevision());
return true;
}
logger.debug("Cannot cancel claim for {} because it is held by Thread {} and current Thread is {}",
getRevision(), stamp.obtainingThread, Thread.currentThread().getName());
return false;
} finally {
threadLock.writeLock().unlock();
}
}
/**
* Releases the Revision Claim if and only if the current revision matches the proposed revision
*
@ -544,10 +595,12 @@ public class NaiveRevisionManager implements RevisionManager {
private static class LockStamp {
private final String clientId;
private final long expirationTimestamp;
private final Thread obtainingThread;
public LockStamp(final String clientId, final long expirationTimestamp) {
this.clientId = clientId;
this.expirationTimestamp = expirationTimestamp;
this.obtainingThread = Thread.currentThread();
}
public String getClientId() {
@ -558,6 +611,10 @@ public class NaiveRevisionManager implements RevisionManager {
return System.nanoTime() > expirationTimestamp;
}
public boolean isObtainedByCurrentThread() {
return obtainingThread == Thread.currentThread();
}
@Override
public String toString() {
return clientId;

View File

@ -178,4 +178,12 @@ public interface RevisionManager {
* up-to-date
*/
boolean releaseClaim(RevisionClaim claim);
/**
* Releases the claim on the revision for the given component if the claim was obtained by the calling thread
*
* @param componentId the ID of the component
* @return <code>true</code> if the claim was released, false otherwise
*/
boolean cancelClaim(String componentId);
}

View File

@ -27,7 +27,12 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@ -35,11 +40,8 @@ import org.apache.nifi.web.FlowModification;
import org.apache.nifi.web.InvalidRevisionException;
import org.apache.nifi.web.Revision;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@Ignore
public class TestNaiveRevisionManager {
private static final String CLIENT_1 = "client-1";
private static final String COMPONENT_1 = "component-1";
@ -234,6 +236,31 @@ public class TestNaiveRevisionManager {
assertNotNull(secondClaim);
}
@Test(timeout = 10000)
public void testSameClientSameRevisionBlocks() throws InterruptedException, ExecutionException {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision);
assertNotNull(firstClaim);
final Revision secondRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final Runnable runnable = new Runnable() {
@Override
public void run() {
revisionManager.requestClaim(secondRevision);
}
};
final ExecutorService exec = Executors.newFixedThreadPool(1);
final Future<?> future = exec.submit(runnable);
try {
future.get(2, TimeUnit.SECONDS);
Assert.fail("Call to obtain claim on revision did not block when claim was already held");
} catch (TimeoutException e) {
// Expected
}
}
@Test(timeout = 10000)
public void testDifferentClientDifferentRevisionsDoNotBlockEachOther() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
@ -332,6 +359,38 @@ public class TestNaiveRevisionManager {
assertTrue(revisionManager.releaseClaim(claim));
}
@Test(timeout = 10000)
public void testCancelClaimSameThread() {
final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(firstRevision);
assertNotNull(claim);
assertFalse(revisionManager.cancelClaim("component-2"));
assertTrue(revisionManager.cancelClaim(COMPONENT_1));
}
@Test(timeout = 10000)
public void testCancelClaimDifferentThread() throws InterruptedException {
final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(firstRevision);
assertNotNull(claim);
final Thread t = new Thread(new Runnable() {
@Override
public void run() {
assertFalse(revisionManager.cancelClaim("component-2"));
assertFalse(revisionManager.cancelClaim(COMPONENT_1));
}
});
t.setDaemon(true);
t.start();
Thread.sleep(1000L);
assertTrue(revisionManager.cancelClaim(COMPONENT_1));
}
@Test(timeout = 10000)
public void testUpdateWithSomeWrongRevision() {
final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES);