NIFI-1745: Refactor how revisions are handled at NCM/Distributed to Node. This closes #454

This commit is contained in:
Mark Payne 2016-05-17 11:51:09 -04:00 committed by Matt Gilman
parent afc8c645a1
commit 4b74e4de74
43 changed files with 693 additions and 1318 deletions

View File

@ -467,6 +467,8 @@ language governing permissions and limitations under the License. -->
<nifi.cluster.manager.protocol.threads>10</nifi.cluster.manager.protocol.threads>
<nifi.cluster.manager.safemode.duration>0 sec</nifi.cluster.manager.safemode.duration>
<nifi.cluster.request.replication.claim.timeout>1 min</nifi.cluster.request.replication.claim.timeout>
<!-- nifi.properties: zookeeper properties -->
<nifi.zookeeper.connect.string></nifi.zookeeper.connect.string>
<nifi.zookeeper.connect.timeout>3 secs</nifi.zookeeper.connect.timeout>

View File

@ -186,6 +186,8 @@ public class NiFiProperties extends Properties {
public static final String CLUSTER_MANAGER_PROTOCOL_THREADS = "nifi.cluster.manager.protocol.threads";
public static final String CLUSTER_MANAGER_SAFEMODE_DURATION = "nifi.cluster.manager.safemode.duration";
public static final String REQUEST_REPLICATION_CLAIM_TIMEOUT = "nifi.cluster.request.replication.claim.timeout";
// kerberos properties
public static final String KERBEROS_KRB5_FILE = "nifi.kerberos.krb5.file";
public static final String KERBEROS_SERVICE_PRINCIPAL = "nifi.kerberos.service.principal";
@ -249,6 +251,8 @@ public class NiFiProperties extends Properties {
public static final int DEFAULT_CLUSTER_MANAGER_PROTOCOL_THREADS = 10;
public static final String DEFAULT_CLUSTER_MANAGER_SAFEMODE_DURATION = "0 sec";
public static final String DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT = "1 min";
// state management defaults
public static final String DEFAULT_STATE_MANAGEMENT_CONFIG_FILE = "conf/state-management.xml";

View File

@ -1,44 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-framework-cluster-web</artifactId>
<packaging>jar</packaging>
<description>The clustering software for communicating with the NiFi web api.</description>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-administration</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-user-actions</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,65 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.context;
import java.io.Serializable;
import java.util.List;
import org.apache.nifi.action.Action;
import org.apache.nifi.web.Revision;
/**
* Contains contextual information about clustering that may be serialized
* between manager and node when communicating over HTTP.
*/
public interface ClusterContext extends Serializable {
/**
* Returns a list of auditable actions. The list is modifiable and will
* never be null.
*
* @return a collection of actions
*/
List<Action> getActions();
Revision getRevision();
void setRevision(Revision revision);
/**
* @return true if the request was sent by the cluster manager; false
* otherwise
*/
boolean isRequestSentByClusterManager();
/**
* Sets the flag to indicate if a request was sent by the cluster manager.
*
* @param flag true if the request was sent by the cluster manager; false
* otherwise
*/
void setRequestSentByClusterManager(boolean flag);
/**
* Gets an id generation seed. This is used to ensure that nodes are able to
* generate the same id across the cluster. This is usually handled by the
* cluster manager creating the id, however for some actions (snippets,
* templates, etc) this is not possible.
*
* @return generated id seed
*/
String getIdGenerationSeed();
}

View File

@ -1,69 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.context;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.nifi.action.Action;
import org.apache.nifi.web.Revision;
/**
* A basic implementation of the context.
*/
public class ClusterContextImpl implements ClusterContext, Serializable {
private final List<Action> actions = new ArrayList<>();
private Revision revision;
private boolean requestSentByClusterManager;
private final String idGenerationSeed = UUID.randomUUID().toString();
@Override
public List<Action> getActions() {
return actions;
}
@Override
public Revision getRevision() {
return revision;
}
@Override
public void setRevision(Revision revision) {
this.revision = revision;
}
@Override
public boolean isRequestSentByClusterManager() {
return requestSentByClusterManager;
}
@Override
public void setRequestSentByClusterManager(boolean requestSentByClusterManager) {
this.requestSentByClusterManager = requestSentByClusterManager;
}
@Override
public String getIdGenerationSeed() {
return this.idGenerationSeed;
}
}

View File

@ -1,42 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.context;
/**
* Manages a cluster context on a threadlocal.
*/
public class ClusterContextThreadLocal {
private static final ThreadLocal<ClusterContext> contextHolder = new ThreadLocal<>();
public static void removeContext() {
contextHolder.remove();
}
public static ClusterContext createEmptyContext() {
return new ClusterContextImpl();
}
public static ClusterContext getContext() {
return contextHolder.get();
}
public static void setContext(final ClusterContext context) {
contextHolder.set(context);
}
}

View File

@ -65,10 +65,6 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-cluster-protocol</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-cluster-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-utils</artifactId>

View File

@ -24,7 +24,19 @@ import java.util.Set;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
public interface RequestReplicator {
public static final String REQUEST_TRANSACTION_ID = "X-RequestTransactionId";
public static final String REQUEST_TRANSACTION_ID_HEADER = "X-RequestTransactionId";
public static final String CLUSTER_ID_GENERATION_SEED_HEADER = "X-Cluster-Id-Generation-Seed";
/**
* The HTTP header that the requestor specifies to ask a node if they are able to process a given request. The value
* is always 150-NodeContinue. The node will respond with 150 CONTINUE if it is able to
* process the request, 417 EXPECTATION_FAILED otherwise.
*/
public static final String REQUEST_VALIDATION_HTTP_HEADER = "X-Validation-Expects";
public static final String NODE_CONTINUE = "150-NodeContinue";
public static final int NODE_CONTINUE_STATUS_CODE = 150;
/**
* Starts the instance for replicating requests. Calling this method on an already started instance has no effect.

View File

@ -43,8 +43,6 @@ import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextImpl;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
@ -62,8 +60,6 @@ import org.apache.nifi.events.EventReporter;
import org.apache.nifi.logging.NiFiLog;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.OptimisticLockingManager;
import org.apache.nifi.web.util.WebUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -75,21 +71,6 @@ import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
import com.sun.jersey.core.util.MultivaluedMapImpl;
public class ThreadPoolRequestReplicator implements RequestReplicator {
/**
* The HTTP header to store a cluster context. An example of what may be stored in the context is a node's
* auditable actions in response to a cluster request. The cluster context is serialized
* using Java's serialization mechanism and hex encoded.
*/
static final String CLUSTER_CONTEXT_HTTP_HEADER = "X-ClusterContext";
/**
* The HTTP header that the NCM specifies to ask a node if they are able to process a given request. The value
* is always 150-NodeContinue. The node will respond with 150 CONTINUE if it is able to
* process the request, 417 EXPECTATION_FAILED otherwise.
*/
static final String NCM_EXPECTS_HTTP_HEADER = "X-NcmExpects";
static final String NODE_CONTINUE = "150-NodeContinue";
static final int NODE_CONTINUE_STATUS_CODE = 150;
private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(ThreadPoolRequestReplicator.class));
private static final int MAX_CONCURRENT_REQUESTS = 100;
@ -102,7 +83,6 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
private final EventReporter eventReporter;
private final RequestCompletionCallback callback;
private final ClusterCoordinator clusterCoordinator;
private final OptimisticLockingManager lockingManager;
private final DataFlowManagementService dfmService;
private ExecutorService executorService;
@ -121,8 +101,8 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
* @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null.
*/
public ThreadPoolRequestReplicator(final int numThreads, final Client client, final ClusterCoordinator clusterCoordinator,
final RequestCompletionCallback callback, final EventReporter eventReporter, final OptimisticLockingManager lockingManager, final DataFlowManagementService dfmService) {
this(numThreads, client, clusterCoordinator, "3 sec", "3 sec", callback, eventReporter, null, lockingManager, dfmService);
final RequestCompletionCallback callback, final EventReporter eventReporter, final DataFlowManagementService dfmService) {
this(numThreads, client, clusterCoordinator, "3 sec", "3 sec", callback, eventReporter, null, dfmService);
}
/**
@ -138,7 +118,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
*/
public ThreadPoolRequestReplicator(final int numThreads, final Client client, final ClusterCoordinator clusterCoordinator,
final String connectionTimeout, final String readTimeout, final RequestCompletionCallback callback, final EventReporter eventReporter,
final WebClusterManager clusterManager, final OptimisticLockingManager lockingManager, final DataFlowManagementService dfmService) {
final WebClusterManager clusterManager, final DataFlowManagementService dfmService) {
if (numThreads <= 0) {
throw new IllegalArgumentException("The number of threads must be greater than zero.");
} else if (client == null) {
@ -153,7 +133,6 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
this.responseMerger = new StandardHttpResponseMerger(clusterManager);
this.eventReporter = eventReporter;
this.callback = callback;
this.lockingManager = lockingManager;
this.dfmService = dfmService;
client.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT, connectionTimeoutMs);
@ -198,7 +177,9 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
@Override
public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers) {
return replicate(nodeIds, method, uri, entity, headers, true, null);
final Map<String, String> headersPlusIdGenerationSeed = new HashMap<>(headers);
headersPlusIdGenerationSeed.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, UUID.randomUUID().toString());
return replicate(nodeIds, method, uri, entity, headersPlusIdGenerationSeed, true, null);
}
/**
@ -233,7 +214,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
// Update headers to indicate the current revision so that we can
// prevent multiple users changing the flow at the same time
final Map<String, String> updatedHeaders = new HashMap<>(headers);
final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID, key -> UUID.randomUUID().toString());
final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString());
if (performVerification) {
verifyState(method, uri.getPath());
@ -284,29 +265,16 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback);
replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders);
// TODO: Must handle revisions!!
return response;
}
private void setRevision(final Map<String, String> headers) {
final ClusterContext clusterCtx = new ClusterContextImpl();
clusterCtx.setRequestSentByClusterManager(true); // indicate request is sent from cluster manager
clusterCtx.setRevision(lockingManager.getLastModification().getRevision());
// serialize cluster context and add to request header
final String serializedClusterCtx = WebUtils.serializeObjectToHex(clusterCtx);
headers.put(CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterCtx);
}
private void performVerification(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse) {
logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath());
final Map<String, String> updatedHeaders = new HashMap<>(headers);
updatedHeaders.put(NCM_EXPECTS_HTTP_HEADER, NODE_CONTINUE);
updatedHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE);
final int numNodes = nodeIds.size();
final NodeRequestCompletionCallback completionCallback = new NodeRequestCompletionCallback() {
@ -474,6 +442,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
}
}
/**
* Removes the AsyncClusterResponse with the given ID from the map and handles any cleanup
* or post-processing related to the request after the client has consumed the response
*
* @param requestId the ID of the request that has been consumed by the client
*/
private void onResponseConsumed(final String requestId) {
final AsyncClusterResponse response = responseMap.remove(requestId);
@ -482,6 +456,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
}
}
/**
* When all nodes have completed a request and provided a response (or have timed out), this method will be invoked
* to handle calling the Callback that was provided for the request, if any, and handle any cleanup or post-processing
* related to the request
*
* @param requestId the ID of the request that has completed
*/
private void onCompletedResponse(final String requestId) {
final AsyncClusterResponse response = responseMap.get(requestId);
@ -543,16 +524,15 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
}
private void replicateRequest(final Set<NodeIdentifier> nodeIds, final String scheme,
final String path, final Function<NodeIdentifier, NodeHttpRequest> callableFactory, final Map<String, String> headers) {
private void replicateRequest(final Set<NodeIdentifier> nodeIds, final String scheme, final String path,
final Function<NodeIdentifier, NodeHttpRequest> callableFactory, final Map<String, String> headers) {
if (nodeIds.isEmpty()) {
return; // return quickly for trivial case
}
// submit the requests to the nodes
final String requestId = UUID.randomUUID().toString();
headers.put(WebClusterManager.REQUEST_ID_HEADER, requestId);
for (final NodeIdentifier nodeId : nodeIds) {
final NodeHttpRequest callable = callableFactory.apply(nodeId);
executorService.submit(callable);

View File

@ -25,7 +25,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
@ -268,8 +267,6 @@ public class HttpRequestReplicatorImpl implements HttpRequestReplicator {
}
// submit the requests to the nodes
final String requestId = UUID.randomUUID().toString();
headers.put(WebClusterManager.REQUEST_ID_HEADER, requestId);
for (final Map.Entry<NodeIdentifier, URI> entry : uriMap.entrySet()) {
final NodeIdentifier nodeId = entry.getKey();
final URI nodeUri = entry.getValue();
@ -339,15 +336,15 @@ public class HttpRequestReplicatorImpl implements HttpRequestReplicator {
}
final StringBuilder sb = new StringBuilder();
sb.append("Node Responses for ").append(method).append(" ").append(path).append(" (Request ID ").append(requestId).append("):\n");
sb.append("Node Responses for ").append(method).append(" ").append(path).append(":\n");
for (final NodeResponse response : result) {
sb.append(response).append("\n");
}
final long averageNanos = (nanosAdded == 0) ? -1L : nanosSum / nanosAdded;
final long averageMillis = (averageNanos < 0) ? averageNanos : TimeUnit.MILLISECONDS.convert(averageNanos, TimeUnit.NANOSECONDS);
logger.debug("For {} {} (Request ID {}), minimum response time = {}, max = {}, average = {} ms",
method, path, requestId, min, max, averageMillis);
logger.debug("For {} {}, minimum response time = {}, max = {}, average = {} ms",
method, path, min, max, averageMillis);
logger.debug(sb.toString());
}

View File

@ -19,7 +19,6 @@ package org.apache.nifi.cluster.manager.impl;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
@ -54,13 +53,10 @@ import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextImpl;
import org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor;
import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
@ -76,7 +72,6 @@ import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
import org.apache.nifi.cluster.flow.ClusterDataFlow;
import org.apache.nifi.cluster.flow.DaoException;
import org.apache.nifi.cluster.flow.DataFlowManagementService;
import org.apache.nifi.cluster.flow.PersistedFlowState;
import org.apache.nifi.cluster.manager.HttpClusterManager;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.ConflictingNodeIdException;
@ -165,11 +160,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.DomUtils;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.web.OptimisticLockingManager;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.UpdateRevision;
import org.apache.nifi.web.api.dto.BulletinDTO;
import org.apache.nifi.web.util.WebUtils;
import org.slf4j.Logger;
@ -197,58 +188,21 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
*/
public class WebClusterManager implements HttpClusterManager, ProtocolHandler, ControllerServiceProvider, ReportingTaskProvider, RequestCompletionCallback {
public static final String ROOT_GROUP_ID_ALIAS = "root";
public static final String BULLETIN_CATEGORY = "Clustering";
private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(WebClusterManager.class));
/**
* The HTTP header to store a cluster context. An example of what may be stored in the context is a node's auditable actions in response to a cluster request. The cluster context is serialized
* using Java's serialization mechanism and hex encoded.
*/
public static final String CLUSTER_CONTEXT_HTTP_HEADER = "X-ClusterContext";
/**
* HTTP Header that stores a unique ID for each request that is replicated to the nodes. This is used for logging purposes so that request information, such as timing, can be correlated between
* the NCM and the nodes
*/
public static final String REQUEST_ID_HEADER = "X-RequestID";
/**
* The HTTP header that the NCM specifies to ask a node if they are able to process a given request. The value is always 150-NodeContinue. The node will respond with 150 CONTINUE if it is able to
* process the request, 417 EXPECTATION_FAILED otherwise.
*/
public static final String NCM_EXPECTS_HTTP_HEADER = "X-NcmExpects";
public static final int NODE_CONTINUE_STATUS_CODE = 150;
/**
* The HTTP header that the NCM specifies to indicate that a node should invalidate the specified user group. This is done to ensure that user cache is not stale when an administrator modifies a
* group through the UI.
*/
public static final String CLUSTER_INVALIDATE_USER_GROUP_HEADER = "X-ClusterInvalidateUserGroup";
/**
* The HTTP header that the NCM specifies to indicate that a node should invalidate the specified user. This is done to ensure that user cache is not stale when an administrator modifies a user
* through the UI.
*/
public static final String CLUSTER_INVALIDATE_USER_HEADER = "X-ClusterInvalidateUser";
/**
* The default number of seconds to respond to a connecting node if the manager cannot provide it with a current data flow.
*/
private static final int DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS = 5;
public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}");
public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
public static final Pattern COUNTER_URI_PATTERN = Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}");
private final NiFiProperties properties;
private final DataFlowManagementService dataFlowManagementService;
private final ClusterManagerProtocolSenderListener senderListener;
private final OptimisticLockingManager optimisticLockingManager;
private final StringEncryptor encryptor;
private final ReentrantReadWriteLock resourceRWLock = new ReentrantReadWriteLock(true);
private final ClusterManagerLock readLock = new ClusterManagerLock(resourceRWLock.readLock(), "Read");
@ -281,8 +235,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
private final RequestReplicator httpRequestReplicator;
public WebClusterManager(
final DataFlowManagementService dataFlowManagementService, final ClusterManagerProtocolSenderListener senderListener,
final NiFiProperties properties, final StringEncryptor encryptor, final OptimisticLockingManager optimisticLockingManager) {
final DataFlowManagementService dataFlowManagementService, final ClusterManagerProtocolSenderListener senderListener,
final NiFiProperties properties, final StringEncryptor encryptor) {
if (dataFlowManagementService == null) {
throw new IllegalArgumentException("DataFlowManagementService may not be null.");
@ -298,7 +252,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
this.instanceId = UUID.randomUUID().toString();
this.senderListener = senderListener;
this.encryptor = encryptor;
this.optimisticLockingManager = optimisticLockingManager;
senderListener.addHandler(this);
senderListener.setBulletinRepository(bulletinRepository);
@ -356,7 +309,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(properties));
return new ThreadPoolRequestReplicator(numThreads, jerseyClient, clusterCoordinator, connectionTimeout, readTimeout, this,
eventReporter, this, optimisticLockingManager, dataFlowManagementService);
eventReporter, this, dataFlowManagementService);
}
private EventReporter createEventReporter() {
@ -1734,85 +1687,21 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
logger.debug("Applying prototype request " + uri + " to nodes.");
// the starting state of the flow (current, stale, unknown)
final PersistedFlowState originalPersistedFlowState = dataFlowManagementService.getPersistedFlowState();
// replicate request
final AsyncClusterResponse clusterResponse = httpRequestReplicator.replicate(nodeIds, method, uri, entity == null ? parameters : entity, headers);
// check if this request can change the flow
final boolean mutableRequest = canChangeNodeState(method, uri);
final ObjectHolder<NodeResponse> holder = new ObjectHolder<>(null);
final UpdateRevision federateRequest = new UpdateRevision() {
@Override
public Revision execute(Revision currentRevision) {
// update headers to contain cluster contextual information to send to the node
final Map<String, String> updatedHeaders = new HashMap<>(headers);
final ClusterContext clusterCtx = new ClusterContextImpl();
clusterCtx.setRequestSentByClusterManager(true); // indicate request is sent from cluster manager
clusterCtx.setRevision(currentRevision);
// serialize cluster context and add to request header
final String serializedClusterCtx = WebUtils.serializeObjectToHex(clusterCtx);
updatedHeaders.put(CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterCtx);
// replicate request
final AsyncClusterResponse clusterResponse = httpRequestReplicator.replicate(nodeIds, method, uri, entity == null ? parameters : entity, updatedHeaders);
final NodeResponse clientResponse;
try {
clientResponse = clusterResponse.awaitMergedResponse();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Thread was interrupted while waiting for a response from one or more nodes", e);
final Set<NodeIdentifier> noResponses = clusterResponse.getNodesInvolved();
noResponses.removeAll(clusterResponse.getCompletedNodeIdentifiers());
throw new IllegalClusterStateException("Interrupted while waiting for a response from the following nodes: " + noResponses, e);
}
holder.set(clientResponse);
// if we have a response get the updated cluster context for auditing and revision updating
Revision updatedRevision = null;
if (mutableRequest && clientResponse != null) {
try {
// get the cluster context from the response header
final String serializedClusterContext = clientResponse.getClientResponse().getHeaders().getFirst(CLUSTER_CONTEXT_HTTP_HEADER);
if (StringUtils.isNotBlank(serializedClusterContext)) {
// deserialize object
final Serializable clusterContextObj = WebUtils.deserializeHexToObject(serializedClusterContext);
// if we have a valid object, audit the actions
if (clusterContextObj instanceof ClusterContext) {
final ClusterContext clusterContext = (ClusterContext) clusterContextObj;
if (auditService != null) {
try {
auditService.addActions(clusterContext.getActions());
} catch (final Throwable t) {
logger.warn("Unable to record actions: " + t.getMessage());
if (logger.isDebugEnabled()) {
logger.warn(StringUtils.EMPTY, t);
}
}
}
updatedRevision = clusterContext.getRevision();
}
}
} catch (final ClassNotFoundException cnfe) {
logger.warn("Classpath issue detected because failed to deserialize cluster context from node response due to: " + cnfe, cnfe);
}
}
return updatedRevision;
}
};
// federate the request and lock on the revision
if (mutableRequest) {
optimisticLockingManager.setRevision(federateRequest);
} else {
federateRequest.execute(optimisticLockingManager.getLastModification().getRevision());
final NodeResponse clientResponse;
try {
clientResponse = clusterResponse.awaitMergedResponse();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Thread was interrupted while waiting for a response from one or more nodes", e);
final Set<NodeIdentifier> noResponses = clusterResponse.getNodesInvolved();
noResponses.removeAll(clusterResponse.getCompletedNodeIdentifiers());
throw new IllegalClusterStateException("Interrupted while waiting for a response from the following nodes: " + noResponses, e);
}
return holder.get();
return clientResponse;
}
@ -2152,20 +2041,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
private void notifyDataFlowManagmentServiceOfFlowStateChange(final PersistedFlowState newState) {
writeLock.lock();
try {
logger.debug("Notifying DataFlow Management Service that flow state is " + newState);
dataFlowManagementService.setPersistedFlowState(newState);
if (newState != PersistedFlowState.CURRENT) {
cachedDataFlow = null;
/* do not reset primary node ID because only the data flow has changed */
}
} finally {
writeLock.unlock("notifyDataFlowManagementServiceOfFlowStateChange");
}
}
public NodeHeartbeat getLatestHeartbeat(final NodeIdentifier nodeId) {
return heartbeatMonitor.getLatestHeartbeat(nodeId);
}

View File

@ -27,7 +27,6 @@ import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.io.socket.multicast.DiscoverableService;
import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.OptimisticLockingManager;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.context.ApplicationContext;
@ -46,8 +45,6 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
private StringEncryptor encryptor;
private OptimisticLockingManager optimisticLockingManager;
@Override
public Object getObject() throws Exception {
if (properties.isClusterManager() && properties.isNode()) {
@ -67,8 +64,7 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
dataFlowService,
senderListener,
properties,
encryptor,
optimisticLockingManager
encryptor
);
// set the service broadcaster
@ -119,8 +115,4 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
public void setEncryptor(final StringEncryptor encryptor) {
this.encryptor = encryptor;
}
public void setOptimisticLockingManager(OptimisticLockingManager optimisticLockingManager) {
this.optimisticLockingManager = optimisticLockingManager;
}
}

View File

@ -59,14 +59,10 @@
<property name="properties" ref="nifiProperties"/>
</bean>
<!-- cluster manager optimistic locking manager -->
<bean id="clusterManagerOptimisticLockingManager" class="org.apache.nifi.web.StandardOptimisticLockingManager"/>
<!-- cluster manager -->
<bean id="clusterManager" class="org.apache.nifi.cluster.spring.WebClusterManagerFactoryBean">
<property name="properties" ref="nifiProperties"/>
<property name="encryptor" ref="stringEncryptor"/>
<property name="optimisticLockingManager" ref="clusterManagerOptimisticLockingManager"/>
</bean>
<!-- discoverable services -->

View File

@ -40,7 +40,6 @@ import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.StandardOptimisticLockingManager;
import org.apache.nifi.web.api.entity.Entity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.Assert;
@ -156,13 +155,12 @@ public class TestThreadPoolRequestReplicator {
final AtomicInteger requestCount = new AtomicInteger(0);
final DataFlowManagementService dfmService = Mockito.mock(DataFlowManagementService.class);
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator,
"1 sec", "1 sec", null, null, null, new StandardOptimisticLockingManager(), dfmService) {
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, null, dfmService) {
@Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) {
// the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them.
final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata");
final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.NCM_EXPECTS_HTTP_HEADER);
final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
final int statusCode;
if (requestCount.incrementAndGet() == 1) {
@ -208,13 +206,12 @@ public class TestThreadPoolRequestReplicator {
final AtomicInteger requestCount = new AtomicInteger(0);
final DataFlowManagementService dfmService = Mockito.mock(DataFlowManagementService.class);
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator,
"1 sec", "1 sec", null, null, null, new StandardOptimisticLockingManager(), dfmService) {
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, null, dfmService) {
@Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) {
// the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them.
final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata");
final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.NCM_EXPECTS_HTTP_HEADER);
final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
final int requestIndex = requestCount.incrementAndGet();
assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE, expectsHeader);
@ -256,8 +253,7 @@ public class TestThreadPoolRequestReplicator {
final ClusterCoordinator coordinator = Mockito.mock(ClusterCoordinator.class);
final DataFlowManagementService dfmService = Mockito.mock(DataFlowManagementService.class);
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator,
"1 sec", "1 sec", null, null, null, new StandardOptimisticLockingManager(), dfmService) {
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, null, dfmService) {
@Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) {
if (delayMillis > 0L) {

View File

@ -170,6 +170,9 @@ nifi.zookeeper.connect.timeout=${nifi.zookeeper.connect.timeout}
nifi.zookeeper.session.timeout=${nifi.zookeeper.session.timeout}
nifi.zookeeper.root.node=${nifi.zookeeper.root.node}
# How long a request should be allowed to hold a 'lock' on a component. #
nifi.cluster.request.replication.claim.timeout=${nifi.cluster.request.replication.claim.timeout}
# cluster manager properties (only configure for cluster manager) #
nifi.cluster.is.manager=${nifi.cluster.is.manager}
nifi.cluster.manager.address=${nifi.cluster.manager.address}

View File

@ -152,11 +152,6 @@
<artifactId>nifi-framework-cluster</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-cluster-web</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>

View File

@ -18,16 +18,15 @@ package org.apache.nifi.audit;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.details.FlowChangeMoveDetails;
import org.apache.nifi.action.details.MoveDetails;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.dao.ProcessGroupDAO;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
/**
@ -58,13 +57,6 @@ public abstract class NiFiAuditor {
* @param logger logger
*/
protected void saveActions(Collection<Action> actions, Logger logger) {
ClusterContext ctx = ClusterContextThreadLocal.getContext();
// if we're a connected node, then put audit actions on threadlocal to propagate back to manager
if (ctx != null) {
ctx.getActions().addAll(actions);
}
// always save the actions regardless of cluster or stand-alone
// all nodes in a cluster will have their own local copy without batching
try {

View File

@ -81,6 +81,7 @@ import org.apache.nifi.web.api.entity.SnippetEntity;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
@ -306,19 +307,21 @@ public interface NiFiServiceFacade {
* @param description description
* @param snippetId id
* @param groupId id of the process group
* @param idGenerationSeed the seed to use for generating a UUID
* @return template
*/
TemplateDTO createTemplate(String name, String description, String snippetId, String groupId);
TemplateDTO createTemplate(String name, String description, String snippetId, String groupId, Optional<String> idGenerationSeed);
/**
* Imports the specified Template.
*
* @param templateDTO The template dto
* @param groupId id of the process group
* @param idGenerationSeed the seed to use for generating a UUID
*
* @return The new template dto
*/
TemplateDTO importTemplate(TemplateDTO templateDTO, String groupId);
TemplateDTO importTemplate(TemplateDTO templateDTO, String groupId, Optional<String> idGenerationSeed);
/**
* Instantiate the corresponding template.
@ -327,9 +330,10 @@ public interface NiFiServiceFacade {
* @param templateId template id
* @param originX x
* @param originY y
* @param idGenerationSeed the ID to use for generating UUID's. May be null.
* @return snapshot
*/
FlowEntity createTemplateInstance(String groupId, Double originX, Double originY, String templateId);
FlowEntity createTemplateInstance(String groupId, Double originX, Double originY, String templateId, String idGenerationSeed);
/**
* Gets the template with the specified id.
@ -1302,9 +1306,10 @@ public interface NiFiServiceFacade {
* @param snippetId snippet id
* @param originX x
* @param originY y
* @param idGenerationSeed the seed to use for generating UUID's. May be null.
* @return snapshot
*/
FlowEntity copySnippet(String groupId, String snippetId, Double originX, Double originY);
FlowEntity copySnippet(String groupId, String snippetId, Double originX, Double originY, String idGenerationSeed);
/**
* Creates a new snippet.

View File

@ -16,6 +16,30 @@
*/
package org.apache.nifi.web;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.ws.rs.WebApplicationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
@ -30,8 +54,6 @@ import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
@ -42,6 +64,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
@ -168,28 +191,6 @@ import org.apache.nifi.web.util.SnippetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.WebApplicationException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* Implementation of NiFiServiceFacade that performs revision checking.
*/
@ -200,9 +201,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private ControllerFacade controllerFacade;
private SnippetUtils snippetUtils;
// optimistic locking manager
// private OptimisticLockingManager optimisticLockingManager;
// revision manager
private RevisionManager revisionManager;
@ -239,7 +237,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// -----------------------------------------
@Override
public void claimRevision(Revision revision) {
revisionManager.requestClaim(revision);
revisionManager.requestClaim(revision, NiFiUserUtils.getNiFiUser());
}
// -----------------------------------------
@ -274,22 +272,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public void verifyDeleteConnection(String connectionId) {
try {
connectionDAO.verifyDelete(connectionId);
} catch (final Exception e) {
revisionManager.cancelClaim(connectionId);
throw e;
}
connectionDAO.verifyDelete(connectionId);
}
@Override
public void verifyDeleteFunnel(String funnelId) {
try {
funnelDAO.verifyDelete(funnelId);
} catch (final Exception e) {
revisionManager.cancelClaim(funnelId);
throw e;
}
funnelDAO.verifyDelete(funnelId);
}
@Override
@ -308,12 +296,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public void verifyDeleteInputPort(String inputPortId) {
try {
inputPortDAO.verifyDelete(inputPortId);
} catch (final Exception e) {
revisionManager.cancelClaim(inputPortId);
throw e;
}
inputPortDAO.verifyDelete(inputPortId);
}
@Override
@ -332,12 +315,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public void verifyDeleteOutputPort(String outputPortId) {
try {
outputPortDAO.verifyDelete(outputPortId);
} catch (final Exception e) {
revisionManager.cancelClaim(outputPortId);
throw e;
}
outputPortDAO.verifyDelete(outputPortId);
}
@Override
@ -356,12 +334,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public void verifyDeleteProcessor(String processorId) {
try {
processorDAO.verifyDelete(processorId);
} catch (final Exception e) {
revisionManager.cancelClaim(processorId);
throw e;
}
processorDAO.verifyDelete(processorId);
}
@Override
@ -380,12 +353,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public void verifyDeleteProcessGroup(String groupId) {
try {
processGroupDAO.verifyDelete(groupId);
} catch (final Exception e) {
revisionManager.cancelClaim(groupId);
throw e;
}
processGroupDAO.verifyDelete(groupId);
}
@Override
@ -424,12 +392,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public void verifyDeleteRemoteProcessGroup(String remoteProcessGroupId) {
try {
remoteProcessGroupDAO.verifyDelete(remoteProcessGroupId);
} catch (final Exception e) {
revisionManager.cancelClaim(remoteProcessGroupId);
throw e;
}
remoteProcessGroupDAO.verifyDelete(remoteProcessGroupId);
}
@Override
@ -458,12 +421,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public void verifyDeleteControllerService(String controllerServiceId) {
try {
controllerServiceDAO.verifyDelete(controllerServiceId);
} catch (final Exception e) {
revisionManager.cancelClaim(controllerServiceId);
throw e;
}
controllerServiceDAO.verifyDelete(controllerServiceId);
}
@Override
@ -482,12 +440,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public void verifyDeleteReportingTask(String reportingTaskId) {
try {
reportingTaskDAO.verifyDelete(reportingTaskId);
} catch (final Exception e) {
revisionManager.cancelClaim(reportingTaskId);
throw e;
}
reportingTaskDAO.verifyDelete(reportingTaskId);
}
// -----------------------------------------
@ -581,9 +534,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
* @return A ConfigurationSnapshot that represents the new configuration
*/
private <D, C> RevisionUpdate<D> updateComponent(final Revision revision, final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, D> dtoCreation) {
final String modifier = NiFiUserUtils.getNiFiUserName();
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final String modifier = user.getUserName();
try {
final RevisionUpdate<D> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(revision), modifier, new UpdateRevisionTask<D>() {
final RevisionUpdate<D> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new UpdateRevisionTask<D>() {
@Override
public RevisionUpdate<D> update() {
// ensure write access to the flow
@ -706,12 +660,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
requestProcessGroup.authorize(authorizer, RequestAction.WRITE);
}
final String modifier = NiFiUserUtils.getNiFiUserName();
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final String modifier = user.getUserName();
final RevisionClaim revisionClaim = new StandardRevisionClaim(requiredRevisions);
RevisionUpdate<SnippetDTO> versionedSnippet;
try {
versionedSnippet = revisionManager.updateRevision(revisionClaim, modifier, new UpdateRevisionTask<SnippetDTO>() {
versionedSnippet = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask<SnippetDTO>() {
@Override
public RevisionUpdate<SnippetDTO> update() {
// get the updated component
@ -1081,7 +1036,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
*/
private <D, C> D deleteComponent(final Revision revision, final Authorizable authorizable, final Runnable deleteAction, final D dto) {
final RevisionClaim claim = new StandardRevisionClaim(revision);
return revisionManager.deleteRevision(claim, new DeleteRevisionTask<D>() {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
return revisionManager.deleteRevision(claim, user, new DeleteRevisionTask<D>() {
@Override
public D performTask() {
logger.debug("Attempting to delete component {} with claim {}", authorizable, claim);
@ -1089,6 +1046,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// ensure access to the component
authorizable.authorize(authorizer, RequestAction.WRITE);
// If the component has outgoing connections, ensure that we can delete them all.
if (authorizable instanceof Connectable) {
final Connectable connectable = (Connectable) authorizable;
for (final Connection connection : connectable.getConnections()) {
connection.authorize(authorizer, RequestAction.WRITE);
}
}
deleteAction.run();
// save the flow
@ -1102,12 +1067,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public void verifyDeleteSnippet(String id) {
try {
snippetDAO.verifyDelete(id);
} catch (final Exception e) {
revisionManager.cancelClaim(id);
throw e;
}
snippetDAO.verifyDelete(id);
}
@Override
@ -1356,7 +1316,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public FlowEntity copySnippet(final String groupId, final String snippetId, final Double originX, final Double originY) {
public FlowEntity copySnippet(final String groupId, final String snippetId, final Double originX, final Double originY, final String idGenerationSeed) {
final FlowDTO flowDto = revisionManager.get(groupId,
rev -> {
// ensure access to process group
@ -1364,7 +1324,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
processGroup.authorize(authorizer, RequestAction.WRITE);
// create the new snippet
final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY);
final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY, idGenerationSeed);
// TODO - READ access to all components in snippet
@ -1505,7 +1465,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public TemplateDTO createTemplate(String name, String description, String snippetId, String groupId) {
public TemplateDTO createTemplate(String name, String description, String snippetId, String groupId, Optional<String> idGenerationSeed) {
// get the specified snippet
Snippet snippet = snippetDAO.getSnippet(snippetId);
@ -1517,12 +1477,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
templateDTO.setSnippet(snippetUtils.populateFlowSnippet(snippet, true, true));
// set the id based on the specified seed
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
templateDTO.setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
} else {
templateDTO.setId(UUID.randomUUID().toString());
}
final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString();
templateDTO.setId(uuid);
// create the template
Template template = templateDAO.createTemplate(templateDTO, groupId);
@ -1531,14 +1487,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public TemplateDTO importTemplate(TemplateDTO templateDTO, String groupId) {
public TemplateDTO importTemplate(TemplateDTO templateDTO, String groupId, Optional<String> idGenerationSeed) {
// ensure id is set
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
templateDTO.setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
} else {
templateDTO.setId(UUID.randomUUID().toString());
}
final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString();
templateDTO.setId(uuid);
// mark the timestamp
templateDTO.setTimestamp(new Date());
@ -1551,7 +1503,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public FlowEntity createTemplateInstance(final String groupId, final Double originX, final Double originY, final String templateId) {
public FlowEntity createTemplateInstance(final String groupId, final Double originX, final Double originY, final String templateId, final String idGenerationSeed) {
final FlowDTO flowDto = revisionManager.get(groupId, rev -> {
// ensure access to process group
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
@ -1559,7 +1511,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// instantiate the template - there is no need to make another copy of the flow snippet since the actual template
// was copied and this dto is only used to instantiate it's components (which as already completed)
final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId);
final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId, idGenerationSeed);
// TODO - READ access to all components in snippet
@ -1624,9 +1576,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public ProcessorEntity setProcessorAnnotationData(final Revision revision, final String processorId, final String annotationData) {
final String modifier = NiFiUserUtils.getNiFiUserName();
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final String modifier = user.getUserName();
final RevisionUpdate<ProcessorEntity> update = revisionManager.updateRevision(new StandardRevisionClaim(revision), modifier, new UpdateRevisionTask<ProcessorEntity>() {
final RevisionUpdate<ProcessorEntity> update = revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new UpdateRevisionTask<ProcessorEntity>() {
@Override
public RevisionUpdate<ProcessorEntity> update() {
// create the processor config
@ -1701,9 +1654,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final Map<String, Revision> referenceRevisions, final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) {
final RevisionClaim claim = new StandardRevisionClaim(referenceRevisions.values());
final String modifier = NiFiUserUtils.getNiFiUserName();
final RevisionUpdate<ControllerServiceReferencingComponentsEntity> update = revisionManager.updateRevision(claim, modifier,
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final RevisionUpdate<ControllerServiceReferencingComponentsEntity> update = revisionManager.updateRevision(claim, user,
new UpdateRevisionTask<ControllerServiceReferencingComponentsEntity>() {
@Override
public RevisionUpdate<ControllerServiceReferencingComponentsEntity> update() {

View File

@ -16,32 +16,16 @@
*/
package org.apache.nifi.web.api;
import com.sun.jersey.api.core.HttpContext;
import com.sun.jersey.api.representation.Form;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.authorization.user.NiFiUserDetails;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.entity.ComponentEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter;
import org.apache.nifi.web.util.WebUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.UUID;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -53,14 +37,25 @@ import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriBuilderException;
import javax.ws.rs.core.UriInfo;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.user.NiFiUserDetails;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.entity.ComponentEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter;
import org.apache.nifi.web.util.WebUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import com.sun.jersey.api.core.HttpContext;
import com.sun.jersey.api.representation.Form;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider;
/**
* Base class for controllers.
@ -77,8 +72,6 @@ public abstract class ApplicationResource {
public static final String PROXIED_ENTITIES_CHAIN_HTTP_HEADER = "X-ProxiedEntitiesChain";
public static final String PROXIED_ENTITY_USER_DETAILS_HTTP_HEADER = "X-ProxiedEntityUserDetails";
private static final int HEADER_BUFFER_SIZE = 16 * 1024; // 16kb
private static final int CLUSTER_CONTEXT_HEADER_VALUE_MAX_BYTES = (int) (0.75 * HEADER_BUFFER_SIZE);
private static final Logger logger = LoggerFactory.getLogger(ApplicationResource.class);
public static final String NODEWISE = "false";
@ -173,85 +166,26 @@ public abstract class ApplicationResource {
* @return builder
*/
protected ResponseBuilder clusterContext(ResponseBuilder response) {
NiFiProperties properties = NiFiProperties.getInstance();
if (!properties.isNode()) {
return response;
}
// get cluster context from threadlocal
ClusterContext clusterCtx = ClusterContextThreadLocal.getContext();
if (clusterCtx != null) {
// serialize cluster context
String serializedClusterContext = WebUtils.serializeObjectToHex(clusterCtx);
if (serializedClusterContext.length() > CLUSTER_CONTEXT_HEADER_VALUE_MAX_BYTES) {
/*
* Actions is the only field that can vary in size. If we have no
* actions and we exceeded the header size, then basic assumptions
* about the cluster context have been violated.
*/
if (clusterCtx.getActions().isEmpty()) {
throw new IllegalStateException(
String.format("Serialized Cluster context size '%d' is too big for response header", serializedClusterContext.length()));
}
// use the first action as the prototype for creating the "batch" action
Action prototypeAction = clusterCtx.getActions().get(0);
// log the batched actions
StringBuilder loggedActions = new StringBuilder();
createBatchedActionLogStatement(loggedActions, clusterCtx.getActions());
logger.info(loggedActions.toString());
// remove current actions and replace with batch action
clusterCtx.getActions().clear();
// create the batch action
FlowChangeAction batchAction = new FlowChangeAction();
batchAction.setOperation(Operation.Batch);
// copy values from prototype action
batchAction.setTimestamp(prototypeAction.getTimestamp());
batchAction.setUserIdentity(prototypeAction.getUserIdentity());
batchAction.setUserName(prototypeAction.getUserName());
batchAction.setSourceId(prototypeAction.getSourceId());
batchAction.setSourceName(prototypeAction.getSourceName());
batchAction.setSourceType(prototypeAction.getSourceType());
// add batch action
clusterCtx.getActions().add(batchAction);
// create the final serialized copy of the cluster context
serializedClusterContext = WebUtils.serializeObjectToHex(clusterCtx);
}
// put serialized cluster context in response header
response.header(WebClusterManager.CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterContext);
}
// TODO: Remove this method. Since ClusterContext was removed, it is no longer needed. However,
// it is called by practically every endpoint so for now it is just being stubbed out.
return response;
}
/**
* @return the cluster context if found in the request header 'X-CLUSTER_CONTEXT'.
*/
protected ClusterContext getClusterContextFromRequest() {
String clusterContextHeaderValue = httpServletRequest.getHeader(WebClusterManager.CLUSTER_CONTEXT_HTTP_HEADER);
if (StringUtils.isNotBlank(clusterContextHeaderValue)) {
try {
// deserialize object
Serializable clusterContextObj = WebUtils.deserializeHexToObject(clusterContextHeaderValue);
if (clusterContextObj instanceof ClusterContext) {
return (ClusterContext) clusterContextObj;
}
} catch (ClassNotFoundException cnfe) {
logger.warn("Classpath issue detected because failed to deserialize cluster context from request due to: " + cnfe, cnfe);
}
}
return null;
protected String generateUuid() {
final Optional<String> seed = getIdGenerationSeed();
return seed.isPresent() ? UUID.nameUUIDFromBytes(seed.get().getBytes(StandardCharsets.UTF_8)).toString() : UUID.randomUUID().toString();
}
protected Optional<String> getIdGenerationSeed() {
final String idGenerationSeed = httpServletRequest.getHeader(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER);
if (StringUtils.isBlank(idGenerationSeed)) {
return Optional.empty();
}
return Optional.of(idGenerationSeed);
}
/**
* Generates an Ok response with no content.
*
@ -300,7 +234,7 @@ public abstract class ApplicationResource {
* @return a 150 Node Continue response to be used within the cluster request handshake
*/
protected ResponseBuilder generateContinueResponse() {
return Response.status(WebClusterManager.NODE_CONTINUE_STATUS_CODE);
return Response.status(RequestReplicator.NODE_CONTINUE_STATUS_CODE);
}
protected URI getAbsolutePath() {
@ -390,13 +324,6 @@ public abstract class ApplicationResource {
return result;
}
private void createBatchedActionLogStatement(StringBuilder strb, Collection<Action> actions) {
strb.append("Cluster context too big for response header. Replacing below actions with 'batch' action...\n");
for (Action action : actions) {
strb.append(ReflectionToStringBuilder.toString(action, ToStringStyle.MULTI_LINE_STYLE)).append("\n");
}
}
/**
* Checks whether the request is part of a two-phase commit style request (either phase 1 or phase 2)
*
@ -404,7 +331,7 @@ public abstract class ApplicationResource {
* @return <code>true</code> if the request represents a two-phase commit style request
*/
protected boolean isTwoPhaseRequest(HttpServletRequest httpServletRequest) {
final String headerValue = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID);
final String headerValue = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
return headerValue != null;
}
@ -419,7 +346,7 @@ public abstract class ApplicationResource {
* first of the two phases.
*/
protected boolean isValidationPhase(HttpServletRequest httpServletRequest) {
return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER) != null;
return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER) != null;
}
/**

View File

@ -16,18 +16,31 @@
*/
package org.apache.nifi.web.api;
import com.sun.jersey.api.core.ResourceContext;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization;
import java.net.URI;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
@ -51,27 +64,13 @@ import org.apache.nifi.web.api.entity.ReportingTaskEntity;
import org.apache.nifi.web.api.entity.ReportingTasksEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import com.sun.jersey.api.core.ResourceContext;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization;
/**
* RESTful endpoint for managing a Flow Controller.
@ -134,7 +133,7 @@ public class ControllerResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
@ -267,7 +266,7 @@ public class ControllerResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
@ -379,7 +378,7 @@ public class ControllerResource extends ApplicationResource {
final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), "controller");
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
@ -504,18 +503,13 @@ public class ControllerResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
// set the processor id as appropriate
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
reportingTaskEntity.getComponent().setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
} else {
reportingTaskEntity.getComponent().setId(UUID.randomUUID().toString());
}
reportingTaskEntity.getComponent().setId(generateUuid());
// create the reporting task and generate the json
final ReportingTaskEntity entity = serviceFacade.createReportingTask(reportingTaskEntity.getComponent());

View File

@ -16,31 +16,12 @@
*/
package org.apache.nifi.web.api;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.cluster.node.Node;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.DownloadableContent;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.FlowFileDTO;
import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
import org.apache.nifi.web.api.dto.ListingRequestDTO;
import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.FlowFileEntity;
import org.apache.nifi.web.api.entity.ListingRequestEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.HashSet;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
@ -59,14 +40,32 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.cluster.node.Node;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.DownloadableContent;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.FlowFileDTO;
import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
import org.apache.nifi.web.api.dto.ListingRequestDTO;
import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.FlowFileEntity;
import org.apache.nifi.web.api.entity.ListingRequestEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization;
/**
* RESTful endpoint for managing a flowfile queue.
@ -334,20 +333,14 @@ public class FlowFileQueueResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
serviceFacade.verifyListQueue(id);
return generateContinueResponse().build();
}
// ensure the id is the same across the cluster
final String listingRequestId;
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
listingRequestId = UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString();
} else {
listingRequestId = UUID.randomUUID().toString();
}
final String listingRequestId = generateUuid();
// submit the listing request
final ListingRequestDTO listingRequest = serviceFacade.createFlowFileListingRequest(id, listingRequestId);
@ -466,7 +459,7 @@ public class FlowFileQueueResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
@ -530,19 +523,13 @@ public class FlowFileQueueResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
// ensure the id is the same across the cluster
final String dropRequestId;
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
dropRequestId = UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString();
} else {
dropRequestId = UUID.randomUUID().toString();
}
final String dropRequestId = generateUuid();
// submit the drop request
final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(id, dropRequestId);
@ -661,7 +648,7 @@ public class FlowFileQueueResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}

View File

@ -16,13 +16,22 @@
*/
package org.apache.nifi.web.api;
import com.sun.jersey.api.core.ResourceContext;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization;
import java.util.HashSet;
import java.util.Set;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.AuthorizationRequest;
@ -33,8 +42,6 @@ import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
@ -88,22 +95,13 @@ import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.IntegerParameter;
import org.apache.nifi.web.api.request.LongParameter;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import com.sun.jersey.api.core.ResourceContext;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization;
/**
* RESTful endpoint for managing a Flow.
@ -348,16 +346,7 @@ public class FlowResource extends ApplicationResource {
)
public Response generateClientId() {
authorizeFlow();
final String clientId;
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
clientId = UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString();
} else {
clientId = UUID.randomUUID().toString();
}
return clusterContext(generateOkResponse(clientId)).build();
return clusterContext(generateOkResponse(generateUuid())).build();
}
// ------

View File

@ -16,17 +16,38 @@
*/
package org.apache.nifi.web.api;
import com.sun.jersey.api.core.ResourceContext;
import com.sun.jersey.multipart.FormDataParam;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.transform.stream.StreamSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.NiFiServiceFacade;
@ -67,36 +88,14 @@ import org.apache.nifi.web.api.request.LongParameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.transform.stream.StreamSource;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import com.sun.jersey.api.core.ResourceContext;
import com.sun.jersey.multipart.FormDataParam;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization;
/**
* RESTful endpoint for managing a Group.
@ -111,7 +110,6 @@ public class ProcessGroupResource extends ApplicationResource {
private static final Logger logger = LoggerFactory.getLogger(ProcessGroupResource.class);
private static final String VERBOSE = "false";
private static final String RECURSIVE = "false";
@Context
private ResourceContext resourceContext;
@ -494,18 +492,13 @@ public class ProcessGroupResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
// set the processor id as appropriate
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
processGroupEntity.getComponent().setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
} else {
processGroupEntity.getComponent().setId(UUID.randomUUID().toString());
}
processGroupEntity.getComponent().setId(generateUuid());
// create the process group contents
final ProcessGroupEntity entity = serviceFacade.createProcessGroup(groupId, processGroupEntity.getComponent());
@ -643,18 +636,13 @@ public class ProcessGroupResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
// set the processor id as appropriate
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
processorEntity.getComponent().setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
} else {
processorEntity.getComponent().setId(UUID.randomUUID().toString());
}
processorEntity.getComponent().setId(generateUuid());
// create the new processor
final ProcessorEntity entity = serviceFacade.createProcessor(groupId, processorEntity.getComponent());
@ -781,18 +769,13 @@ public class ProcessGroupResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
// set the processor id as appropriate
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
portEntity.getComponent().setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
} else {
portEntity.getComponent().setId(UUID.randomUUID().toString());
}
portEntity.getComponent().setId(generateUuid());
// create the input port and generate the json
final PortEntity entity = serviceFacade.createInputPort(groupId, portEntity.getComponent());
@ -916,18 +899,13 @@ public class ProcessGroupResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
// set the processor id as appropriate
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
portEntity.getComponent().setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
} else {
portEntity.getComponent().setId(UUID.randomUUID().toString());
}
portEntity.getComponent().setId(generateUuid());
// create the output port and generate the json
final PortEntity entity = serviceFacade.createOutputPort(groupId, portEntity.getComponent());
@ -1052,18 +1030,13 @@ public class ProcessGroupResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
// set the processor id as appropriate
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
funnelEntity.getComponent().setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
} else {
funnelEntity.getComponent().setId(UUID.randomUUID().toString());
}
funnelEntity.getComponent().setId(generateUuid());
// create the funnel and generate the json
final FunnelEntity entity = serviceFacade.createFunnel(groupId, funnelEntity.getComponent());
@ -1188,18 +1161,13 @@ public class ProcessGroupResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
// set the processor id as appropriate
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
labelEntity.getComponent().setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
} else {
labelEntity.getComponent().setId(UUID.randomUUID().toString());
}
labelEntity.getComponent().setId(generateUuid());
// create the label and generate the json
final LabelEntity entity = serviceFacade.createLabel(groupId, labelEntity.getComponent());
@ -1330,18 +1298,13 @@ public class ProcessGroupResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
// set the processor id as appropriate
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
requestProcessGroupDTO.setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
} else {
requestProcessGroupDTO.setId(UUID.randomUUID().toString());
}
requestProcessGroupDTO.setId(generateUuid());
// parse the uri
final URI uri;
@ -1509,19 +1472,14 @@ public class ProcessGroupResource extends ApplicationResource {
final ConnectionDTO connection = connectionEntity.getComponent();
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
serviceFacade.verifyCreateConnection(groupId, connection);
return generateContinueResponse().build();
}
// set the processor id as appropriate
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
connection.setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
} else {
connection.setId(UUID.randomUUID().toString());
}
connection.setId(generateUuid());
// create the new relationship target
final ConnectionEntity entity = serviceFacade.createConnection(groupId, connection);
@ -1654,18 +1612,13 @@ public class ProcessGroupResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
// set the processor id as appropriate
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
snippetEntity.getSnippet().setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
} else {
snippetEntity.getSnippet().setId(UUID.randomUUID().toString());
}
snippetEntity.getSnippet().setId(generateUuid());
// create the snippet
final SnippetEntity entity = serviceFacade.createSnippet(snippetEntity.getSnippet());
@ -1961,14 +1914,14 @@ public class ProcessGroupResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
// copy the specified snippet
final FlowEntity flowEntity = serviceFacade.copySnippet(
groupId, copySnippetEntity.getSnippetId(), copySnippetEntity.getOriginX(), copySnippetEntity.getOriginY());
groupId, copySnippetEntity.getSnippetId(), copySnippetEntity.getOriginX(), copySnippetEntity.getOriginY(), getIdGenerationSeed().orElse(null));
// get the snippet
final FlowDTO flow = flowEntity.getFlow();
@ -2044,14 +1997,14 @@ public class ProcessGroupResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
// create the template and generate the json
final FlowEntity entity = serviceFacade.createTemplateInstance(groupId, instantiateTemplateRequestEntity.getOriginX(),
instantiateTemplateRequestEntity.getOriginY(), instantiateTemplateRequestEntity.getTemplateId());
instantiateTemplateRequestEntity.getOriginY(), instantiateTemplateRequestEntity.getTemplateId(), getIdGenerationSeed().orElse(null));
final FlowDTO flowSnippet = entity.getFlow();
@ -2170,14 +2123,14 @@ public class ProcessGroupResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
// create the template and generate the json
final TemplateDTO template = serviceFacade.createTemplate(createTemplateRequestEntity.getName(), createTemplateRequestEntity.getDescription(),
createTemplateRequestEntity.getSnippetId(), groupId);
createTemplateRequestEntity.getSnippetId(), groupId, getIdGenerationSeed());
templateResource.populateRemainingTemplateContent(template);
// build the response entity
@ -2288,7 +2241,7 @@ public class ProcessGroupResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
@ -2300,7 +2253,7 @@ public class ProcessGroupResource extends ApplicationResource {
}
// import the template
final TemplateDTO template = serviceFacade.importTemplate(templateEntity.getTemplate(), groupId);
final TemplateDTO template = serviceFacade.importTemplate(templateEntity.getTemplate(), groupId, getIdGenerationSeed());
templateResource.populateRemainingTemplateContent(template);
// build the response entity
@ -2387,18 +2340,13 @@ public class ProcessGroupResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
// set the processor id as appropriate
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
controllerServiceEntity.getComponent().setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
} else {
controllerServiceEntity.getComponent().setId(UUID.randomUUID().toString());
}
controllerServiceEntity.getComponent().setId(generateUuid());
// create the controller service and generate the json
final ControllerServiceEntity entity = serviceFacade.createControllerService(groupId, controllerServiceEntity.getComponent());

View File

@ -16,14 +16,32 @@
*/
package org.apache.nifi.web.api;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.cluster.node.Node;
@ -47,31 +65,13 @@ import org.apache.nifi.web.api.request.LongParameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization;
/**
* RESTful endpoint for querying data provenance.
@ -210,7 +210,7 @@ public class ProvenanceResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
@ -489,19 +489,13 @@ public class ProvenanceResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
// ensure the id is the same across the cluster
final String provenanceId;
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
provenanceId = UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString();
} else {
provenanceId = UUID.randomUUID().toString();
}
final String provenanceId = generateUuid();
// set the provenance id accordingly
provenanceDto.setId(provenanceId);
@ -657,7 +651,7 @@ public class ProvenanceResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
@ -847,7 +841,7 @@ public class ProvenanceResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
@ -999,7 +993,7 @@ public class ProvenanceResource extends ApplicationResource {
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}

View File

@ -29,9 +29,10 @@ public interface SnippetDAO {
* @param snippetId snippet id
* @param originX x
* @param originY y
* @param idGenerationSeed the seed to use for generating UUID's. May be null.
* @return snippet
*/
FlowSnippetDTO copySnippet(String groupId, String snippetId, Double originX, Double originY);
FlowSnippetDTO copySnippet(String groupId, String snippetId, Double originX, Double originY, String idGenerationSeed);
/**
* Creates a snippet.

View File

@ -49,9 +49,10 @@ public interface TemplateDAO {
* @param originX x
* @param originY y
* @param templateId template id
* @param idGenerationSeed the seed to use for generating UUID's. May be null.
* @return flow snippet
*/
FlowSnippetDTO instantiateTemplate(String groupId, Double originX, Double originY, String templateId);
FlowSnippetDTO instantiateTemplate(String groupId, Double originX, Double originY, String templateId, String idGenerationSeed);
/**
* Gets the specified template.

View File

@ -59,7 +59,7 @@ public class StandardSnippetDAO implements SnippetDAO {
}
@Override
public FlowSnippetDTO copySnippet(final String groupId, final String snippetId, final Double originX, final Double originY) {
public FlowSnippetDTO copySnippet(final String groupId, final String snippetId, final Double originX, final Double originY, final String idGenerationSeed) {
try {
// ensure the parent group exist
final ProcessGroup processGroup = flowController.getGroup(groupId);
@ -85,7 +85,7 @@ public class StandardSnippetDAO implements SnippetDAO {
lookupSensitiveProperties(snippetContents);
// copy snippet
snippetContents = snippetUtils.copy(snippetContents, processGroup);
snippetContents = snippetUtils.copy(snippetContents, processGroup, idGenerationSeed);
// move the snippet if necessary
if (originX != null && originY != null) {

View File

@ -72,7 +72,7 @@ public class StandardTemplateDAO extends ComponentDAO implements TemplateDAO {
}
@Override
public FlowSnippetDTO instantiateTemplate(String groupId, Double originX, Double originY, String templateId) {
public FlowSnippetDTO instantiateTemplate(String groupId, Double originX, Double originY, String templateId, String idGenerationSeed) {
ProcessGroup group = locateProcessGroup(flowController, groupId);
// get the template id and find the template
@ -86,7 +86,7 @@ public class StandardTemplateDAO extends ComponentDAO implements TemplateDAO {
try {
// copy the template which pre-processes all ids
TemplateDTO templateDetails = template.getDetails();
FlowSnippetDTO snippet = snippetUtils.copy(templateDetails.getSnippet(), group);
FlowSnippetDTO snippet = snippetUtils.copy(templateDetails.getSnippet(), group, idGenerationSeed);
// reposition the template contents
org.apache.nifi.util.SnippetUtils.moveSnippet(snippet, originX, originY);

View File

@ -1,114 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.filter;
import java.io.IOException;
import java.io.Serializable;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.logging.NiFiLog;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.util.WebUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;
/**
* A filter that prevents direct access to nodes (i.e., flow controllers connected to a cluster). Direct access to nodes by clients external to the cluster is prevented because the dataflow must be
* identical across the cluster.
*
* Direct access to a node is determined by the presence of a custom request header. The header key is "X-CLUSTER_MANAGER" and the value can be anything/empty. The presence of this header is a simple
* way to flag that the request was issued by the cluster manager and may proceed to the next filter.
*
* Since this header may be faked, we only make decisions about the header if the application instance is a node and connected to the cluster.
*
*/
public class NodeRequestFilter implements Filter {
private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(NodeRequestFilter.class));
private FilterConfig config;
@Override
public void doFilter(final ServletRequest req, final ServletResponse resp, final FilterChain filterChain)
throws IOException, ServletException {
ApplicationContext ctx = WebApplicationContextUtils.getWebApplicationContext(config.getServletContext());
NiFiProperties properties = ctx.getBean("nifiProperties", NiFiProperties.class);
HttpServletRequest httpReq = (HttpServletRequest) req;
HttpServletResponse httpResp = (HttpServletResponse) resp;
if (properties.isClusterManager() || "HEAD".equalsIgnoreCase(httpReq.getMethod())) {
filterChain.doFilter(req, resp);
} else {
NiFiServiceFacade serviceFacade = ctx.getBean("serviceFacade", NiFiServiceFacade.class);
if (!serviceFacade.isClustered()) {
filterChain.doFilter(req, resp);
} else {
// get the cluster context from the request
ClusterContext clusterContext = null;
String clusterContextHeaderValue = httpReq.getHeader(WebClusterManager.CLUSTER_CONTEXT_HTTP_HEADER);
if (StringUtils.isNotBlank(clusterContextHeaderValue)) {
try {
// deserialize object
Serializable clusterContextObj = WebUtils.deserializeHexToObject(clusterContextHeaderValue);
if (clusterContextObj instanceof ClusterContext) {
clusterContext = (ClusterContext) clusterContextObj;
}
} catch (final ClassNotFoundException cnfe) {
logger.warn("Failed to deserialize cluster context from request due to: " + cnfe, cnfe);
}
}
// if don't have a cluster context or the context indicates
if (clusterContext == null || !clusterContext.isRequestSentByClusterManager()) {
// node is connected and request is not from cluster manager, so respond with error
httpResp.setContentType("text/plain");
httpResp.setStatus(HttpServletResponse.SC_FORBIDDEN);
httpResp.getWriter().print("Direct access to Flow Controller node is only permissible if node is disconnected.");
} else {
ClusterContextThreadLocal.setContext(clusterContext);
filterChain.doFilter(req, resp);
}
}
}
}
@Override
public void init(final FilterConfig config) {
this.config = config;
}
@Override
public void destroy() {
}
}

View File

@ -1,53 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.filter;
import java.io.IOException;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
/**
* A filter to remove the threadlocal configuration.
*
*/
public class ThreadLocalFilter implements Filter {
@Override
public void doFilter(final ServletRequest req, final ServletResponse resp, final FilterChain filterChain)
throws IOException, ServletException {
try {
filterChain.doFilter(req, resp);
} finally {
ClusterContextThreadLocal.removeContext();
}
}
@Override
public void init(final FilterConfig config) {
}
@Override
public void destroy() {
}
}

View File

@ -27,9 +27,8 @@ import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.logging.NiFiLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -52,9 +51,9 @@ public class TimerFilter implements Filter {
filterChain.doFilter(req, resp);
} finally {
final long stop = System.nanoTime();
final String requestId = ((HttpServletRequest) req).getHeader(WebClusterManager.REQUEST_ID_HEADER);
logger.debug("{} {} from {} request duration: {} millis", request.getMethod(), request.getRequestURL().toString(),
req.getRemoteHost(), TimeUnit.MILLISECONDS.convert(stop - start, TimeUnit.NANOSECONDS));
final String requestId = ((HttpServletRequest) req).getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
logger.debug("{} {} from {} request duration for Request ID {}: {} millis", request.getMethod(), request.getRequestURL().toString(),
req.getRemoteHost(), requestId, TimeUnit.MILLISECONDS.convert(stop - start, TimeUnit.NANOSECONDS));
}
}

View File

@ -27,8 +27,6 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
@ -291,8 +289,8 @@ public final class SnippetUtils {
return false;
}
public FlowSnippetDTO copy(final FlowSnippetDTO snippetContents, final ProcessGroup group) {
final FlowSnippetDTO snippetCopy = copyContentsForGroup(snippetContents, group.getIdentifier(), null, null);
public FlowSnippetDTO copy(final FlowSnippetDTO snippetContents, final ProcessGroup group, final String idGenerationSeed) {
final FlowSnippetDTO snippetCopy = copyContentsForGroup(snippetContents, group.getIdentifier(), null, null, idGenerationSeed);
resolveNameConflicts(snippetCopy, group);
return snippetCopy;
}
@ -347,7 +345,8 @@ public final class SnippetUtils {
}
}
private FlowSnippetDTO copyContentsForGroup(final FlowSnippetDTO snippetContents, final String groupId, final Map<String, ConnectableDTO> parentConnectableMap, Map<String, String> serviceIdMap) {
private FlowSnippetDTO copyContentsForGroup(final FlowSnippetDTO snippetContents, final String groupId, final Map<String, ConnectableDTO> parentConnectableMap, Map<String, String> serviceIdMap,
String idGenerationSeed) {
final FlowSnippetDTO snippetContentsCopy = new FlowSnippetDTO();
//
@ -359,7 +358,7 @@ public final class SnippetUtils {
if (snippetContents.getControllerServices() != null) {
for (final ControllerServiceDTO serviceDTO : snippetContents.getControllerServices()) {
final ControllerServiceDTO service = dtoFactory.copy(serviceDTO);
service.setId(generateId(serviceDTO.getId()));
service.setId(generateId(serviceDTO.getId(), idGenerationSeed));
service.setState(ControllerServiceState.DISABLED.name());
services.add(service);
@ -396,7 +395,7 @@ public final class SnippetUtils {
if (snippetContents.getLabels() != null) {
for (final LabelDTO labelDTO : snippetContents.getLabels()) {
final LabelDTO label = dtoFactory.copy(labelDTO);
label.setId(generateId(labelDTO.getId()));
label.setId(generateId(labelDTO.getId(), idGenerationSeed));
label.setParentGroupId(groupId);
labels.add(label);
}
@ -416,7 +415,7 @@ public final class SnippetUtils {
if (snippetContents.getFunnels() != null) {
for (final FunnelDTO funnelDTO : snippetContents.getFunnels()) {
final FunnelDTO cp = dtoFactory.copy(funnelDTO);
cp.setId(generateId(funnelDTO.getId()));
cp.setId(generateId(funnelDTO.getId(), idGenerationSeed));
cp.setParentGroupId(groupId);
funnels.add(cp);
@ -429,7 +428,7 @@ public final class SnippetUtils {
if (snippetContents.getInputPorts() != null) {
for (final PortDTO portDTO : snippetContents.getInputPorts()) {
final PortDTO cp = dtoFactory.copy(portDTO);
cp.setId(generateId(portDTO.getId()));
cp.setId(generateId(portDTO.getId(), idGenerationSeed));
cp.setParentGroupId(groupId);
cp.setState(ScheduledState.STOPPED.toString());
inputPorts.add(cp);
@ -447,7 +446,7 @@ public final class SnippetUtils {
if (snippetContents.getOutputPorts() != null) {
for (final PortDTO portDTO : snippetContents.getOutputPorts()) {
final PortDTO cp = dtoFactory.copy(portDTO);
cp.setId(generateId(portDTO.getId()));
cp.setId(generateId(portDTO.getId(), idGenerationSeed));
cp.setParentGroupId(groupId);
cp.setState(ScheduledState.STOPPED.toString());
outputPorts.add(cp);
@ -468,7 +467,7 @@ public final class SnippetUtils {
if (snippetContents.getProcessors() != null) {
for (final ProcessorDTO processorDTO : snippetContents.getProcessors()) {
final ProcessorDTO cp = dtoFactory.copy(processorDTO);
cp.setId(generateId(processorDTO.getId()));
cp.setId(generateId(processorDTO.getId(), idGenerationSeed));
cp.setParentGroupId(groupId);
cp.setState(ScheduledState.STOPPED.toString());
processors.add(cp);
@ -489,11 +488,11 @@ public final class SnippetUtils {
if (snippetContents.getProcessGroups() != null) {
for (final ProcessGroupDTO groupDTO : snippetContents.getProcessGroups()) {
final ProcessGroupDTO cp = dtoFactory.copy(groupDTO, false);
cp.setId(generateId(groupDTO.getId()));
cp.setId(generateId(groupDTO.getId(), idGenerationSeed));
cp.setParentGroupId(groupId);
// copy the contents of this group - we do not copy via the dto factory since we want to specify new ids
final FlowSnippetDTO contentsCopy = copyContentsForGroup(groupDTO.getContents(), cp.getId(), connectableMap, serviceIdMap);
final FlowSnippetDTO contentsCopy = copyContentsForGroup(groupDTO.getContents(), cp.getId(), connectableMap, serviceIdMap, idGenerationSeed);
cp.setContents(contentsCopy);
groups.add(cp);
}
@ -504,7 +503,7 @@ public final class SnippetUtils {
if (snippetContents.getRemoteProcessGroups() != null) {
for (final RemoteProcessGroupDTO remoteGroupDTO : snippetContents.getRemoteProcessGroups()) {
final RemoteProcessGroupDTO cp = dtoFactory.copy(remoteGroupDTO);
cp.setId(generateId(remoteGroupDTO.getId()));
cp.setId(generateId(remoteGroupDTO.getId(), idGenerationSeed));
cp.setParentGroupId(groupId);
final RemoteProcessGroupContentsDTO contents = cp.getContents();
@ -539,7 +538,7 @@ public final class SnippetUtils {
throw new IllegalArgumentException("The flow snippet contains a Connection that references a component that is not included.");
}
cp.setId(generateId(connectionDTO.getId()));
cp.setId(generateId(connectionDTO.getId(), idGenerationSeed));
cp.setSource(source);
cp.setDestination(destination);
cp.setParentGroupId(groupId);
@ -593,13 +592,11 @@ public final class SnippetUtils {
/**
* Generates a new id for the current id that is specified. If no seed is found, a new random id will be created.
*/
private String generateId(final String currentId) {
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
final String seed = clusterContext.getIdGenerationSeed() + "-" + currentId;
return UUID.nameUUIDFromBytes(seed.getBytes(StandardCharsets.UTF_8)).toString();
} else {
private String generateId(final String currentId, final String seed) {
if (seed == null) {
return UUID.randomUUID().toString();
} else {
return UUID.nameUUIDFromBytes(seed.getBytes(StandardCharsets.UTF_8)).toString();
}
}

View File

@ -38,7 +38,9 @@
</bean>
<!-- revision manager -->
<bean id="revisionManager" class="org.apache.nifi.web.revision.NaiveRevisionManager" />
<bean id="revisionManager" class="org.apache.nifi.web.revision.NaiveRevisionManager">
<constructor-arg ref="nifiProperties"></constructor-arg>
</bean>
<!-- content access -->
<bean id="contentAccess" class="org.apache.nifi.web.StandardNiFiContentAccess">

View File

@ -54,14 +54,6 @@
<filter-name>timer</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<filter>
<filter-name>threadLocal</filter-name>
<filter-class>org.apache.nifi.web.filter.ThreadLocalFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>threadLocal</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<filter>
<filter-name>springSecurityFilterChain</filter-name>
<filter-class>org.springframework.web.filter.DelegatingFilterProxy</filter-class>
@ -90,12 +82,4 @@
<filter-name>gzipCompressionFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<filter>
<filter-name>nodeRequestFilter</filter-name>
<filter-class>org.apache.nifi.web.filter.NodeRequestFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>nodeRequestFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
</web-app>

View File

@ -35,9 +35,5 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-security</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-cluster-web</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -20,8 +20,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
import org.apache.nifi.web.revision.NaiveRevisionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -115,13 +113,7 @@ public class StandardOptimisticLockingManager implements OptimisticLockingManage
public FlowModification getLastModification() {
lock();
try {
final Revision revision;
final ClusterContext ctx = ClusterContextThreadLocal.getContext();
if (ctx == null || ctx.getRevision() == null) {
revision = currentRevision;
} else {
revision = ctx.getRevision();
}
final Revision revision = currentRevision;
return new FlowModification(revision, lastModifier);
} finally {
@ -136,12 +128,7 @@ public class StandardOptimisticLockingManager implements OptimisticLockingManage
lastModifier = lastModification.getLastModifier();
// record the updated revision in the cluster context if possible
final ClusterContext ctx = ClusterContextThreadLocal.getContext();
if (ctx != null) {
ctx.setRevision(lastModification.getRevision());
} else {
currentRevision = lastModification.getRevision();
}
currentRevision = lastModification.getRevision();
} finally {
unlock();
}

View File

@ -36,6 +36,9 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.FlowModification;
import org.apache.nifi.web.InvalidRevisionException;
import org.apache.nifi.web.Revision;
@ -61,6 +64,10 @@ public class NaiveRevisionManager implements RevisionManager {
this(1, TimeUnit.MINUTES);
}
public NaiveRevisionManager(final NiFiProperties properties) {
this(getRequestTimeoutMillis(properties), TimeUnit.MILLISECONDS);
}
/**
* Constructs a new NaiveRevisionManager that uses the given amount of time as the expiration time
* for a Revision Claims
@ -72,13 +79,20 @@ public class NaiveRevisionManager implements RevisionManager {
this.claimExpirationNanos = timeUnit.toNanos(claimExpiration);
}
@Override
public RevisionClaim requestClaim(Revision revision) throws InvalidRevisionException {
return requestClaim(Collections.singleton(revision));
private static long getRequestTimeoutMillis(final NiFiProperties properties) {
return FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.REQUEST_REPLICATION_CLAIM_TIMEOUT,
NiFiProperties.DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT), TimeUnit.MILLISECONDS);
}
@Override
public RevisionClaim requestClaim(final Collection<Revision> revisions) {
public RevisionClaim requestClaim(final Revision revision, final NiFiUser user) throws InvalidRevisionException {
Objects.requireNonNull(user);
return requestClaim(Collections.singleton(revision), user);
}
@Override
public RevisionClaim requestClaim(final Collection<Revision> revisions, final NiFiUser user) {
Objects.requireNonNull(user);
logger.debug("Attempting to claim Revisions {}", revisions);
// Try to obtain a Revision Claim (temporary lock) on all revisions
@ -91,7 +105,7 @@ public class NaiveRevisionManager implements RevisionManager {
final Revision revision = revisionList.get(i);
final RevisionLock revisionLock = getRevisionLock(revision);
final ClaimResult claimResult = revisionLock.requestClaim(revision);
final ClaimResult claimResult = revisionLock.requestClaim(revision, user);
logger.trace("Obtained Revision Claim for {}", revision);
if (claimResult.isSuccessful()) {
@ -147,7 +161,8 @@ public class NaiveRevisionManager implements RevisionManager {
}
@Override
public <T> T deleteRevision(final RevisionClaim claim, final DeleteRevisionTask<T> task) throws ExpiredRevisionClaimException {
public <T> T deleteRevision(final RevisionClaim claim, final NiFiUser user, final DeleteRevisionTask<T> task) throws ExpiredRevisionClaimException {
Objects.requireNonNull(user);
logger.debug("Attempting to delete revision using {}", claim);
int successCount = 0;
final List<Revision> revisionList = new ArrayList<>(claim.getRevisions());
@ -156,7 +171,7 @@ public class NaiveRevisionManager implements RevisionManager {
String failedId = null;
for (final Revision revision : revisionList) {
final RevisionLock revisionLock = getRevisionLock(revision);
final boolean verified = revisionLock.requestWriteLock(revision);
final boolean verified = revisionLock.requestWriteLock(revision, user);
if (verified) {
logger.trace("Verified Revision Claim for {}", revision);
@ -185,7 +200,7 @@ public class NaiveRevisionManager implements RevisionManager {
for (int i = 0; i < successCount; i++) {
final Revision revision = revisionList.get(i);
final RevisionLock revisionLock = getRevisionLock(revision);
revisionLock.relinquishRevisionClaim(revision);
revisionLock.relinquishRevisionClaim(revision, null);
logger.debug("Relinquished lock for {}", revision);
}
@ -194,7 +209,8 @@ public class NaiveRevisionManager implements RevisionManager {
}
@Override
public <T> RevisionUpdate<T> updateRevision(final RevisionClaim originalClaim, final String modifier, final UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException {
public <T> RevisionUpdate<T> updateRevision(final RevisionClaim originalClaim, final NiFiUser user, final UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException {
Objects.requireNonNull(user);
int successCount = 0;
logger.debug("Attempting to update revision using {}", originalClaim);
@ -204,7 +220,7 @@ public class NaiveRevisionManager implements RevisionManager {
String failedId = null;
for (final Revision revision : revisionList) {
final RevisionLock revisionLock = getRevisionLock(revision);
final boolean verified = revisionLock.requestWriteLock(revision);
final boolean verified = revisionLock.requestWriteLock(revision, user);
if (verified) {
logger.trace("Verified Revision Claim for {}", revision);
@ -247,7 +263,7 @@ public class NaiveRevisionManager implements RevisionManager {
for (final Revision revision : revisionList) {
final Revision updatedRevision = updatedRevisions.get(revision);
getRevisionLock(revision).unlock(revision, updatedRevision, modifier);
getRevisionLock(revision).unlock(revision, updatedRevision, user.getUserName());
if (updatedRevision.getVersion() != revision.getVersion()) {
logger.debug("Unlocked Revision {} and updated associated Version to {}", revision, updatedRevision.getVersion());
@ -274,7 +290,8 @@ public class NaiveRevisionManager implements RevisionManager {
}
@Override
public boolean releaseClaim(final RevisionClaim claim) {
public boolean releaseClaim(final RevisionClaim claim, final NiFiUser user) {
Objects.requireNonNull(user);
boolean success = true;
final List<Revision> revisions = new ArrayList<>(claim.getRevisions());
@ -282,7 +299,7 @@ public class NaiveRevisionManager implements RevisionManager {
for (final Revision revision : revisions) {
final RevisionLock revisionLock = getRevisionLock(revision);
success = revisionLock.relinquishRevisionClaim(revision) && success;
success = revisionLock.relinquishRevisionClaim(revision, user) && success;
}
return success;
@ -299,14 +316,37 @@ public class NaiveRevisionManager implements RevisionManager {
return false;
}
return revisionLock.releaseClaimIfCurrentThread();
return revisionLock.releaseClaimIfCurrentThread(null);
}
@Override
public boolean cancelClaim(Revision revision) {
logger.debug("Attempting to cancel claim for {}", revision);
final RevisionLock revisionLock = getRevisionLock(revision);
if (revisionLock == null) {
logger.debug("No Revision Lock exists for {} - there is no claim to cancel", revision);
return false;
}
return revisionLock.releaseClaimIfCurrentThread(revision);
}
@Override
public boolean cancelClaims(final Set<Revision> revisions) {
boolean successful = false;
for (final Revision revision : revisions) {
successful = cancelClaim(revision);
}
return successful;
}
@Override
public <T> T get(final String componentId, final ReadOnlyRevisionCallback<T> callback) {
final RevisionLock revisionLock = revisionLockMap.computeIfAbsent(componentId, id -> new RevisionLock(new FlowModification(new Revision(0L, null, id), null), claimExpirationNanos));
logger.debug("Attempting to obtain read lock for {}", revisionLock.getRevision());
revisionLock.acquireReadLock();
revisionLock.acquireReadLock(null, revisionLock.getRevision().getClientId());
logger.debug("Obtained read lock for {}", revisionLock.getRevision());
try {
@ -327,7 +367,7 @@ public class NaiveRevisionManager implements RevisionManager {
for (final String componentId : sortedIds) {
final RevisionLock revisionLock = revisionLockMap.computeIfAbsent(componentId, id -> new RevisionLock(new FlowModification(new Revision(0L, null, id), null), claimExpirationNanos));
logger.trace("Attempting to obtain read lock for {}", revisionLock.getRevision());
revisionLock.acquireReadLock();
revisionLock.acquireReadLock(null, revisionLock.getRevision().getClientId());
revisionLocks.push(revisionLock);
logger.trace("Obtained read lock for {}", revisionLock.getRevision());
}
@ -376,9 +416,9 @@ public class NaiveRevisionManager implements RevisionManager {
*
* @return <code>true</code> if the Revision is valid and a Claim has been granted, <code>false</code> otherwise
*/
public ClaimResult requestClaim(final Revision proposedRevision) {
public ClaimResult requestClaim(final Revision proposedRevision, final NiFiUser user) {
// acquire the claim, blocking if necessary.
acquireClaim(proposedRevision.getClientId());
acquireClaim(user, proposedRevision.getClientId());
threadLock.writeLock().lock();
try {
@ -408,7 +448,7 @@ public class NaiveRevisionManager implements RevisionManager {
* @return <code>true</code> if the Revision Claim was upgraded to a lock, <code>false</code> otherwise
* @throws ExpiredRevisionClaimException if the Revision Claim for the given Revision has already expired
*/
public boolean requestWriteLock(final Revision proposedRevision) throws ExpiredRevisionClaimException {
public boolean requestWriteLock(final Revision proposedRevision, final NiFiUser user) throws ExpiredRevisionClaimException {
Objects.requireNonNull(proposedRevision);
threadLock.writeLock().lock();
@ -423,26 +463,34 @@ public class NaiveRevisionManager implements RevisionManager {
throw ise;
}
if (stamp.getClientId() == null || stamp.getClientId().equals(proposedRevision.getClientId())) {
// TODO - Must make sure that we don't have an expired stamp if it is the result of another
// operation taking a long time. I.e., Client A fires off two requests for Component X. If the
// first one takes 2 minutes to complete, it should not result in the second request getting
// rejected. I.e., we want to ensure that if the request is received before the Claim expired,
// that we do not throw an ExpiredRevisionClaimException. Expiration of the Revision is intended
// only to avoid the case where a node obtains a Claim and then the node is lost or otherwise does
// not fulfill the second phase of the two-phase commit.
// We may need a Queue of updates (queue would need to be bounded, with a request getting
// rejected if queue is full).
if (stamp.isExpired()) {
throw new ExpiredRevisionClaimException("Claim for " + proposedRevision + " has expired");
}
// Intentionally leave the thread lock in a locked state!
releaseLock = false;
return true;
} else {
logger.debug("Failed to verify {} because the Client ID was not the same as the Lock Stamp's Client ID (Lock Stamp was {})", proposedRevision, stamp);
final boolean userEqual = stamp.getUser() == null || stamp.getUser().equals(user);
if (!userEqual) {
logger.debug("Failed to verify {} because the User was not the same as the Lock Stamp's User (Lock Stamp was {})", proposedRevision, stamp);
throw new InvalidRevisionException("Cannot obtain write lock for " + proposedRevision + " because it was claimed by " + stamp.getUser());
}
final boolean clientIdEqual = stamp.getClientId() == null || stamp.getClientId().equals(proposedRevision.getClientId());
if (!clientIdEqual) {
logger.debug("Failed to verify {} because the Client ID was not the same as the Lock Stamp's Client ID (Lock Stamp was {})", proposedRevision, stamp);
throw new InvalidRevisionException("Cannot obtain write lock for " + proposedRevision + " because it was claimed with a different Client ID");
}
// TODO - Must make sure that we don't have an expired stamp if it is the result of another
// operation taking a long time. I.e., Client A fires off two requests for Component X. If the
// first one takes 2 minutes to complete, it should not result in the second request getting
// rejected. I.e., we want to ensure that if the request is received before the Claim expired,
// that we do not throw an ExpiredRevisionClaimException. Expiration of the Revision is intended
// only to avoid the case where a node obtains a Claim and then the node is lost or otherwise does
// not fulfill the second phase of the two-phase commit.
// We may need a Queue of updates (queue would need to be bounded, with a request getting
// rejected if queue is full).
if (stamp.isExpired()) {
throw new ExpiredRevisionClaimException("Claim for " + proposedRevision + " has expired");
}
// Intentionally leave the thread lock in a locked state!
releaseLock = false;
return true;
}
} finally {
if (releaseLock) {
@ -453,13 +501,13 @@ public class NaiveRevisionManager implements RevisionManager {
return false;
}
private void acquireClaim(final String clientId) {
private void acquireClaim(final NiFiUser user, final String clientId) {
while (true) {
final LockStamp stamp = lockStamp.get();
if (stamp == null || stamp.isExpired()) {
final long now = System.nanoTime();
final boolean lockObtained = lockStamp.compareAndSet(stamp, new LockStamp(clientId, now + lockNanos));
final boolean lockObtained = lockStamp.compareAndSet(stamp, new LockStamp(user, clientId, now + lockNanos));
if (lockObtained) {
return;
}
@ -469,7 +517,7 @@ public class NaiveRevisionManager implements RevisionManager {
}
}
public void acquireReadLock() {
public void acquireReadLock(final NiFiUser user, final String clientId) {
// Wait until we can claim the lock stamp
boolean obtained = false;
while (!obtained) {
@ -477,7 +525,7 @@ public class NaiveRevisionManager implements RevisionManager {
// write lock held. Wait until it is null and then replace it atomically
// with a LockStamp that does not expire (expiration time is Long.MAX_VALUE).
final LockStamp curStamp = lockStamp.get();
obtained = (curStamp == null || curStamp.isExpired()) && lockStamp.compareAndSet(curStamp, new LockStamp(null, Long.MAX_VALUE));
obtained = (curStamp == null || curStamp.isExpired()) && lockStamp.compareAndSet(curStamp, new LockStamp(user, clientId, Long.MAX_VALUE));
if (!obtained) {
// Could not obtain lock. Yield so that we don't sit
@ -499,9 +547,13 @@ public class NaiveRevisionManager implements RevisionManager {
lockStamp.set(null);
}
public boolean releaseClaimIfCurrentThread() {
public boolean releaseClaimIfCurrentThread(final Revision revision) {
threadLock.writeLock().lock();
try {
if (revision != null && !getRevision().equals(revision)) {
throw new InvalidRevisionException("Cannot release claim because the provided Revision is not valid");
}
final LockStamp stamp = lockStamp.get();
if (stamp == null) {
logger.debug("Cannot cancel claim for {} because there is no claim held", getRevision());
@ -528,12 +580,17 @@ public class NaiveRevisionManager implements RevisionManager {
* @param proposedRevision the proposed revision to check against the current revision
* @return <code>true</code> if the Revision Claim was relinquished, <code>false</code> otherwise
*/
public boolean relinquishRevisionClaim(final Revision proposedRevision) {
public boolean relinquishRevisionClaim(final Revision proposedRevision, final NiFiUser user) {
threadLock.writeLock().lock();
try {
if (getRevision().equals(proposedRevision)) {
releaseClaim();
return true;
final LockStamp stamp = lockStamp.get();
if (stamp == null || stamp.getUser().equals(user)) {
if (getRevision().equals(proposedRevision)) {
releaseClaim();
return true;
}
} else {
throw new InvalidRevisionException("Cannot relinquish claim for " + proposedRevision + " because it was claimed by " + stamp.getUser());
}
return false;
@ -581,8 +638,18 @@ public class NaiveRevisionManager implements RevisionManager {
*/
public void renewExpiration(final long timestamp) {
final LockStamp stamp = lockStamp.get();
final String clientId = stamp == null ? null : stamp.getClientId();
lockStamp.set(new LockStamp(clientId, timestamp));
final NiFiUser user;
final String clientId;
if (stamp == null) {
user = null;
clientId = null;
} else {
user = stamp.getUser();
clientId = stamp.getClientId();
}
lockStamp.set(new LockStamp(user, clientId, timestamp));
}
public Revision getRevision() {
@ -593,16 +660,22 @@ public class NaiveRevisionManager implements RevisionManager {
private static class LockStamp {
private final NiFiUser user;
private final String clientId;
private final long expirationTimestamp;
private final Thread obtainingThread;
public LockStamp(final String clientId, final long expirationTimestamp) {
public LockStamp(final NiFiUser user, final String clientId, final long expirationTimestamp) {
this.user = user;
this.clientId = clientId;
this.expirationTimestamp = expirationTimestamp;
this.obtainingThread = Thread.currentThread();
}
public NiFiUser getUser() {
return user;
}
public String getClientId() {
return clientId;
}
@ -617,7 +690,7 @@ public class NaiveRevisionManager implements RevisionManager {
@Override
public String toString() {
return clientId;
return "LockStamp[user=" + user + ", clientId=" + clientId + "]";
}
}

View File

@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.web.InvalidRevisionException;
import org.apache.nifi.web.Revision;
@ -82,12 +83,13 @@ public interface RevisionManager {
*
* @param revisions a Set of Revisions, each of which corresponds to a different
* component for which a Claim is to be acquired.
* @param user the user for which the claim is being requested
*
* @return the Revision Claim that was granted, if one was granted.
*
* @throws InvalidRevisionException if any of the Revisions provided is out-of-date.
*/
RevisionClaim requestClaim(Collection<Revision> revisions) throws InvalidRevisionException;
RevisionClaim requestClaim(Collection<Revision> revisions, NiFiUser user) throws InvalidRevisionException;
/**
* <p>
@ -96,12 +98,13 @@ public interface RevisionManager {
* </p>
*
* @param revision the revision to request a claim for
* @param user the user for which the claim is being requested
*
* @return the Revision Claim that was granted, if one was granted.
*
* @throws InvalidRevisionException if any of the Revisions provided is out-of-date.
*/
RevisionClaim requestClaim(Revision revision) throws InvalidRevisionException;
RevisionClaim requestClaim(Revision revision, NiFiUser user) throws InvalidRevisionException;
/**
* Returns the current Revision for the component with the given ID. If no Revision yet exists for the
@ -121,7 +124,7 @@ public interface RevisionManager {
*
* @param claim the Revision Claim that is responsible for holding a Claim on the Revisions for each component that is
* to be updated
* @param modifier the name of the entity that is modifying the resource
* @param modifier the user that is modifying the resource
* @param task the task that is responsible for updating the components whose Revisions are claimed by the given
* RevisionClaim. The returned Revision set should include a Revision for each Revision that is the
* supplied Revision Claim. If there exists any Revision in the provided RevisionClaim that is not part
@ -131,7 +134,7 @@ public interface RevisionManager {
*
* @throws ExpiredRevisionClaimException if the Revision Claim has expired
*/
<T> RevisionUpdate<T> updateRevision(RevisionClaim claim, String modifier, UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException;
<T> RevisionUpdate<T> updateRevision(RevisionClaim claim, NiFiUser modifier, UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException;
/**
* Performs the given task that is expected to remove a component from the flow. As a result,
@ -139,12 +142,13 @@ public interface RevisionManager {
*
* @param claim the Revision Claim that is responsible for holding a Claim on the Revisions for each component that is
* to be removed
* @param user the user that is requesting that the revision be deleted
* @param task the task that is responsible for deleting the components whose Revisions are claimed by the given RevisionClaim
* @return the value returned from the DeleteRevisionTask
*
* @throws ExpiredRevisionClaimException if the Revision Claim has expired
*/
<T> T deleteRevision(RevisionClaim claim, DeleteRevisionTask<T> task) throws ExpiredRevisionClaimException;
<T> T deleteRevision(RevisionClaim claim, NiFiUser user, DeleteRevisionTask<T> task) throws ExpiredRevisionClaimException;
/**
* Performs some operation to obtain an object of type T whose identifier is provided via
@ -173,11 +177,12 @@ public interface RevisionManager {
* are up-to-date.
*
* @param claim the claim that holds the revisions
* @param user the user that is releasing the claim. Must be the same user that claimed the revision.
*
* @return <code>true</code> if the claim was released, <code>false</code> if the Revisions were not
* up-to-date
*/
boolean releaseClaim(RevisionClaim claim);
boolean releaseClaim(RevisionClaim claim, NiFiUser user);
/**
* Releases the claim on the revision for the given component if the claim was obtained by the calling thread
@ -186,4 +191,20 @@ public interface RevisionManager {
* @return <code>true</code> if the claim was released, false otherwise
*/
boolean cancelClaim(String componentId);
/**
* Releases the claim on the given revision if the claim was obtained by the calling thread
*
* @param revision the Revision to cancel the claim for
* @return <code>true</code> if the claim was released, false otherwise
*/
boolean cancelClaim(Revision revision);
/**
* Releases the claims on the given revisions if the claim was obtained by the calling thread
*
* @param revisions the Revisions to cancel claims for
* @return <code>true</code> if all claims were released, false otherwise
*/
boolean cancelClaims(Set<Revision> revisions);
}

View File

@ -36,6 +36,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.web.FlowModification;
import org.apache.nifi.web.InvalidRevisionException;
import org.apache.nifi.web.Revision;
@ -45,6 +46,7 @@ import org.junit.Test;
public class TestNaiveRevisionManager {
private static final String CLIENT_1 = "client-1";
private static final String COMPONENT_1 = "component-1";
private static final NiFiUser USER_1 = new NiFiUser("user-1");
private RevisionUpdate<Object> components(final Revision revision) {
return new StandardRevisionUpdate<Object>(null, new FlowModification(revision, null));
@ -70,10 +72,10 @@ public class TestNaiveRevisionManager {
public void testTypicalFlow() throws ExpiredRevisionClaimException {
final RevisionManager revisionManager = new NaiveRevisionManager();
final Revision originalRevision = new Revision(0L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(originalRevision);
final RevisionClaim claim = revisionManager.requestClaim(originalRevision, USER_1);
assertNotNull(claim);
revisionManager.updateRevision(claim, "unit test", () -> components(new Revision(1L, CLIENT_1, COMPONENT_1)));
revisionManager.updateRevision(claim, USER_1, () -> components(new Revision(1L, CLIENT_1, COMPONENT_1)));
final Revision updatedRevision = revisionManager.getRevision(originalRevision.getComponentId());
assertNotNull(updatedRevision);
@ -86,13 +88,13 @@ public class TestNaiveRevisionManager {
public void testExpiration() throws InterruptedException {
final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MILLISECONDS);
final Revision originalRevision = new Revision(0L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(originalRevision);
final RevisionClaim claim = revisionManager.requestClaim(originalRevision, USER_1);
assertNotNull(claim);
Thread.sleep(100);
try {
revisionManager.updateRevision(claim, "unit test", () -> components(originalRevision, claim.getRevisions()));
revisionManager.updateRevision(claim, USER_1, () -> components(originalRevision, claim.getRevisions()));
Assert.fail("Expected Revision Claim to have expired but it did not");
} catch (final ExpiredRevisionClaimException erce) {
// expected
@ -103,12 +105,12 @@ public class TestNaiveRevisionManager {
public void testConflictingClaimsFromDifferentClients() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.SECONDS);
final Revision originalRevision = new Revision(0L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(originalRevision);
final RevisionClaim claim = revisionManager.requestClaim(originalRevision, USER_1);
assertNotNull(claim);
final Revision differentClientRevision = new Revision(0L, "client-2", COMPONENT_1);
final long start = System.nanoTime();
final RevisionClaim differentClientClaim = revisionManager.requestClaim(differentClientRevision);
final RevisionClaim differentClientClaim = revisionManager.requestClaim(differentClientRevision, USER_1);
final long nanos = System.nanoTime() - start;
// we should block for 2 seconds. But the timing won't necessarily be exact,
@ -139,7 +141,7 @@ public class TestNaiveRevisionManager {
public void testGetWithReadLockAndContentionWithTimeout() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.SECONDS);
final Revision originalRevision = new Revision(8L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(originalRevision);
final RevisionClaim claim = revisionManager.requestClaim(originalRevision, USER_1);
assertNotNull(claim);
final long start = System.nanoTime();
@ -156,7 +158,7 @@ public class TestNaiveRevisionManager {
public void testGetWithReadLockAndContentionWithEventualLockResolution() {
final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES);
final Revision originalRevision = new Revision(8L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(originalRevision);
final RevisionClaim claim = revisionManager.requestClaim(originalRevision, USER_1);
assertNotNull(claim);
final Revision updatedRevision = new Revision(100L, CLIENT_1, COMPONENT_1);
@ -166,7 +168,7 @@ public class TestNaiveRevisionManager {
@Override
public void run() {
try {
revisionManager.updateRevision(claim, "unit test", () -> {
revisionManager.updateRevision(claim, USER_1, () -> {
// Wait 2 seconds and then return
try {
Thread.sleep(2000L);
@ -199,21 +201,21 @@ public class TestNaiveRevisionManager {
public void testDeleteRevision() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(firstClaim);
final Revision secondRevision = new Revision(2L, CLIENT_1, COMPONENT_1);
final FlowModification mod = new FlowModification(secondRevision, "unit test");
revisionManager.updateRevision(firstClaim, "unit test", () -> new StandardRevisionUpdate<Void>(null, mod, null));
revisionManager.updateRevision(firstClaim, USER_1, () -> new StandardRevisionUpdate<Void>(null, mod, null));
final Revision updatedRevision = revisionManager.getRevision(COMPONENT_1);
assertEquals(secondRevision, updatedRevision);
final RevisionClaim secondClaim = revisionManager.requestClaim(updatedRevision);
final RevisionClaim secondClaim = revisionManager.requestClaim(updatedRevision, USER_1);
assertNotNull(secondClaim);
final Object obj = new Object();
final Object ret = revisionManager.deleteRevision(secondClaim, () -> obj);
final Object ret = revisionManager.deleteRevision(secondClaim, USER_1, () -> obj);
assertEquals(obj, ret);
final Revision curRevision = revisionManager.getRevision(COMPONENT_1);
@ -228,11 +230,11 @@ public class TestNaiveRevisionManager {
public void testSameClientDifferentRevisionsDoNotBlockEachOther() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(firstClaim);
final Revision secondRevision = new Revision(1L, CLIENT_1, "component-2");
final RevisionClaim secondClaim = revisionManager.requestClaim(secondRevision);
final RevisionClaim secondClaim = revisionManager.requestClaim(secondRevision, USER_1);
assertNotNull(secondClaim);
}
@ -240,14 +242,14 @@ public class TestNaiveRevisionManager {
public void testSameClientSameRevisionBlocks() throws InterruptedException, ExecutionException {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(firstClaim);
final Revision secondRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final Runnable runnable = new Runnable() {
@Override
public void run() {
revisionManager.requestClaim(secondRevision);
revisionManager.requestClaim(secondRevision, USER_1);
}
};
final ExecutorService exec = Executors.newFixedThreadPool(1);
@ -265,14 +267,83 @@ public class TestNaiveRevisionManager {
public void testDifferentClientDifferentRevisionsDoNotBlockEachOther() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(firstClaim);
final Revision secondRevision = new Revision(1L, "client-2", "component-2");
final RevisionClaim secondClaim = revisionManager.requestClaim(secondRevision);
final RevisionClaim secondClaim = revisionManager.requestClaim(secondRevision, USER_1);
assertNotNull(secondClaim);
}
@Test
public void testDifferentUserCannotClaimWriteLock() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(firstClaim);
final NiFiUser user2 = new NiFiUser("user-2");
try {
revisionManager.updateRevision(firstClaim, user2, () -> null);
Assert.fail("Expected updateRevision to fail with a different user but it succeeded");
} catch (final InvalidRevisionException ire) {
// Expected behavior
}
}
@Test
public void testDifferentUserCannotDeleteRevision() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(firstClaim);
final NiFiUser user2 = new NiFiUser("user-2");
try {
revisionManager.deleteRevision(firstClaim, user2, () -> null);
Assert.fail("Expected deleteRevision to fail with a different user but it succeeded");
} catch (final InvalidRevisionException ire) {
// Expected behavior
}
}
@Test
public void testSameUserDifferentClientIdCannotDeleteRevision() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(firstClaim);
final Revision differentClientId = new Revision(1L, "client-2", COMPONENT_1);
final RevisionClaim differentClaimIdClaim = new StandardRevisionClaim(differentClientId);
try {
revisionManager.deleteRevision(differentClaimIdClaim, USER_1, () -> null);
Assert.fail("Expected deleteRevision to fail with a different user but it succeeded");
} catch (final InvalidRevisionException ire) {
// Expected behavior
}
}
@Test
public void testSameUserDifferentClientIdCannotClaimWriteLock() {
final RevisionManager revisionManager = new NaiveRevisionManager(2, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim firstClaim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(firstClaim);
final Revision differentClientId = new Revision(1L, "client-2", COMPONENT_1);
final RevisionClaim differentClaimIdClaim = new StandardRevisionClaim(differentClientId);
try {
revisionManager.updateRevision(differentClaimIdClaim, USER_1, () -> null);
Assert.fail("Expected deleteRevision to fail with a different user but it succeeded");
} catch (final InvalidRevisionException ire) {
// Expected behavior
}
}
@Test(timeout = 10000)
public void testDifferentOrderedRevisionsDoNotCauseDeadlock() throws ExpiredRevisionClaimException, InterruptedException {
// Because we block before obtaining a claim on a revision if another client has the revision claimed,
@ -288,7 +359,7 @@ public class TestNaiveRevisionManager {
final Revision revision3c = new Revision(3L, "client-3", "c");
final Revision revision3a = new Revision(3L, "client-3", "a");
final RevisionClaim claim1 = revisionManager.requestClaim(Arrays.asList(revision1a, revision1b));
final RevisionClaim claim1 = revisionManager.requestClaim(Arrays.asList(revision1a, revision1b), USER_1);
assertNotNull(claim1);
final AtomicBoolean claim2Obtained = new AtomicBoolean(false);
@ -299,13 +370,13 @@ public class TestNaiveRevisionManager {
new Thread(new Runnable() {
@Override
public void run() {
final RevisionClaim claim2 = revisionManager.requestClaim(Arrays.asList(revision2b, revision2c));
final RevisionClaim claim2 = revisionManager.requestClaim(Arrays.asList(revision2b, revision2c), USER_1);
assertNotNull(claim2);
claim2Obtained.set(true);
claim2Ref.set(claim2);
try {
revisionManager.updateRevision(claim2, "unit test", () -> components(new Revision(3L, "client-2", "b"), new Revision(3L, "client-2", "c")));
revisionManager.updateRevision(claim2, USER_1, () -> components(new Revision(3L, "client-2", "b"), new Revision(3L, "client-2", "c")));
} catch (ExpiredRevisionClaimException e) {
Assert.fail("Revision unexpected expired");
}
@ -315,13 +386,13 @@ public class TestNaiveRevisionManager {
new Thread(new Runnable() {
@Override
public void run() {
final RevisionClaim claim3 = revisionManager.requestClaim(Arrays.asList(revision3c, revision3a));
final RevisionClaim claim3 = revisionManager.requestClaim(Arrays.asList(revision3c, revision3a), USER_1);
assertNotNull(claim3);
claim3Obtained.set(true);
claim3Ref.set(claim3);
try {
revisionManager.updateRevision(claim3Ref.get(), "unit test", () -> components(new Revision(2L, "client-3", "c"), new Revision(2L, "client-3", "a")));
revisionManager.updateRevision(claim3Ref.get(), USER_1, () -> components(new Revision(2L, "client-3", "c"), new Revision(2L, "client-3", "a")));
} catch (ExpiredRevisionClaimException e) {
Assert.fail("Revision unexpected expired");
}
@ -332,7 +403,7 @@ public class TestNaiveRevisionManager {
assertFalse(claim2Obtained.get());
assertFalse(claim3Obtained.get());
revisionManager.updateRevision(claim1, "unit test", () -> components(new Revision(3L, "client-1", "a"), new Revision(2L, "client-1", "b")));
revisionManager.updateRevision(claim1, USER_1, () -> components(new Revision(3L, "client-1", "a"), new Revision(2L, "client-1", "b")));
Thread.sleep(250L);
assertTrue(claim2Obtained.get() && claim3Obtained.get());
@ -350,20 +421,20 @@ public class TestNaiveRevisionManager {
public void testReleaseClaim() {
final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(firstRevision);
final RevisionClaim claim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(claim);
final RevisionClaim invalidClaim = new StandardRevisionClaim(new Revision(2L, "client-2", COMPONENT_1));
assertFalse(revisionManager.releaseClaim(invalidClaim));
assertFalse(revisionManager.releaseClaim(invalidClaim, USER_1));
assertTrue(revisionManager.releaseClaim(claim));
assertTrue(revisionManager.releaseClaim(claim, USER_1));
}
@Test(timeout = 10000)
public void testCancelClaimSameThread() {
final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(firstRevision);
final RevisionClaim claim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(claim);
assertFalse(revisionManager.cancelClaim("component-2"));
@ -374,7 +445,7 @@ public class TestNaiveRevisionManager {
public void testCancelClaimDifferentThread() throws InterruptedException {
final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES);
final Revision firstRevision = new Revision(1L, CLIENT_1, COMPONENT_1);
final RevisionClaim claim = revisionManager.requestClaim(firstRevision);
final RevisionClaim claim = revisionManager.requestClaim(firstRevision, USER_1);
assertNotNull(claim);
final Thread t = new Thread(new Runnable() {
@ -396,12 +467,12 @@ public class TestNaiveRevisionManager {
final RevisionManager revisionManager = new NaiveRevisionManager(10, TimeUnit.MINUTES);
final Revision component1V1 = new Revision(1L, CLIENT_1, COMPONENT_1);
final Revision component2V1 = new Revision(1L, CLIENT_1, "component-2");
final RevisionClaim claim = revisionManager.requestClaim(Arrays.asList(component1V1, component2V1));
final RevisionClaim claim = revisionManager.requestClaim(Arrays.asList(component1V1, component2V1), USER_1);
assertNotNull(claim);
// Perform update but only update the revision for component-2
final Revision component1V2 = new Revision(2L, "client-2", COMPONENT_1);
revisionManager.updateRevision(claim, "unit test", new UpdateRevisionTask<Void>() {
revisionManager.updateRevision(claim, USER_1, new UpdateRevisionTask<Void>() {
@Override
public RevisionUpdate<Void> update() {
return new StandardRevisionUpdate<>(null, new FlowModification(component1V2, "unit test"));
@ -410,14 +481,14 @@ public class TestNaiveRevisionManager {
// Obtain a claim with correct revisions
final Revision component2V2 = new Revision(2L, "client-2", "component-2");
revisionManager.requestClaim(Arrays.asList(component1V2, component2V1));
revisionManager.requestClaim(Arrays.asList(component1V2, component2V1), USER_1);
// Attempt to update with incorrect revision for second component
final RevisionClaim wrongClaim = new StandardRevisionClaim(component1V2, component2V2);
final Revision component1V3 = new Revision(3L, CLIENT_1, COMPONENT_1);
try {
revisionManager.updateRevision(wrongClaim, "unit test",
revisionManager.updateRevision(wrongClaim, USER_1,
() -> new StandardRevisionUpdate<>(null, new FlowModification(component1V3, "unit test"), Collections.emptySet()));
Assert.fail("Expected an Invalid Revision Exception");
} catch (final InvalidRevisionException ire) {
@ -425,14 +496,14 @@ public class TestNaiveRevisionManager {
}
// release claim should fail because we are passing the wrong revision for component 2
assertFalse(revisionManager.releaseClaim(new StandardRevisionClaim(component1V2, component2V2)));
assertFalse(revisionManager.releaseClaim(new StandardRevisionClaim(component1V2, component2V2), USER_1));
// release claim should succeed because we are now using the proper revisions
assertTrue(revisionManager.releaseClaim(new StandardRevisionClaim(component1V2, component2V1)));
assertTrue(revisionManager.releaseClaim(new StandardRevisionClaim(component1V2, component2V1), USER_1));
// verify that we can update again.
final RevisionClaim thirdClaim = revisionManager.requestClaim(Arrays.asList(component1V2, component2V1));
final RevisionClaim thirdClaim = revisionManager.requestClaim(Arrays.asList(component1V2, component2V1), USER_1);
assertNotNull(thirdClaim);
revisionManager.updateRevision(thirdClaim, "unit test", () -> new StandardRevisionUpdate<>(null, new FlowModification(component1V3, "unit test")));
revisionManager.updateRevision(thirdClaim, USER_1, () -> new StandardRevisionUpdate<>(null, new FlowModification(component1V3, "unit test")));
}
}

View File

@ -31,7 +31,6 @@
<module>nifi-site-to-site</module>
<module>nifi-framework-core</module>
<module>nifi-framework-cluster-protocol</module>
<module>nifi-framework-cluster-web</module>
<module>nifi-framework-cluster</module>
<module>nifi-user-actions</module>
<module>nifi-framework-authorization</module>

View File

@ -33,11 +33,6 @@
<artifactId>nifi-framework-cluster-protocol</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-cluster-web</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-cluster</artifactId>