mirror of https://github.com/apache/nifi.git
NIFI-2635:
- Fixing contrib check issues. - Clean up pom. - Addressing issue where reporting task property descriptor using wrong scope. NIFI-2635: - Fixing issue with revisions when creating users and user groups. - Forwarding requests to the coordinator instead of replicating. - Tweaking verbage in dialog for removing users and groups. This closes #943
This commit is contained in:
parent
1745c1274b
commit
a6133d4ce3
|
@ -34,10 +34,6 @@
|
|||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-client</artifactId>
|
||||
|
|
|
@ -18,7 +18,9 @@ package org.apache.nifi.web.api.entity;
|
|||
|
||||
import org.apache.nifi.web.api.dto.BulletinDTO;
|
||||
import org.apache.nifi.web.api.dto.ReadablePermission;
|
||||
import org.apache.nifi.web.api.dto.util.TimeAdapter;
|
||||
|
||||
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
|
@ -84,6 +86,7 @@ public class BulletinEntity extends Entity implements ReadablePermission {
|
|||
/**
|
||||
* @return When this bulletin was generated.
|
||||
*/
|
||||
@XmlJavaTypeAdapter(TimeAdapter.class)
|
||||
public Date getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
|
|
@ -78,13 +78,12 @@ public interface RequestReplicator {
|
|||
* will contain the results that are immediately available, as well as an identifier for obtaining an updated result
|
||||
* later. NOTE: This method will ALWAYS indicate that the request has been replicated.
|
||||
*
|
||||
* @param method the HTTP method (e.g., POST, PUT)
|
||||
* @param uri the base request URI (up to, but not including, the query string)
|
||||
* @param entity an entity
|
||||
* @param method the HTTP method (e.g., POST, PUT)
|
||||
* @param uri the base request URI (up to, but not including, the query string)
|
||||
* @param entity an entity
|
||||
* @param headers any HTTP headers
|
||||
* @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later
|
||||
*
|
||||
* @throws ConnectingNodeMutableRequestException if the request attempts to modify the flow and there is a node that is in the CONNECTING state
|
||||
* @throws ConnectingNodeMutableRequestException if the request attempts to modify the flow and there is a node that is in the CONNECTING state
|
||||
* @throws DisconnectedNodeMutableRequestException if the request attempts to modify the flow and there is a node that is in the DISCONNECTED state
|
||||
*/
|
||||
AsyncClusterResponse replicate(String method, URI uri, Object entity, Map<String, String> headers);
|
||||
|
@ -92,20 +91,19 @@ public interface RequestReplicator {
|
|||
/**
|
||||
* Requests are sent to each node in the given set of Node Identifiers. The returned AsyncClusterResponse object will contain
|
||||
* the results that are immediately available, as well as an identifier for obtaining an updated result later.
|
||||
*
|
||||
* <p>
|
||||
* HTTP DELETE, GET, HEAD, and OPTIONS methods will throw an IllegalArgumentException if used.
|
||||
*
|
||||
* @param nodeIds the node identifiers
|
||||
* @param method the HTTP method (e.g., POST, PUT)
|
||||
* @param uri the base request URI (up to, but not including, the query string)
|
||||
* @param entity an entity
|
||||
* @param headers any HTTP headers
|
||||
* @param indicateReplicated if <code>true</code>, will add a header indicating to the receiving nodes that the request
|
||||
* has already been replicated, so the receiving node will not replicate the request itself.
|
||||
* @param nodeIds the node identifiers
|
||||
* @param method the HTTP method (e.g., POST, PUT)
|
||||
* @param uri the base request URI (up to, but not including, the query string)
|
||||
* @param entity an entity
|
||||
* @param headers any HTTP headers
|
||||
* @param indicateReplicated if <code>true</code>, will add a header indicating to the receiving nodes that the request
|
||||
* has already been replicated, so the receiving node will not replicate the request itself.
|
||||
* @param performVerification if <code>true</code>, and the request is mutable, will verify that all nodes are connected before
|
||||
* making the request and that all nodes are able to perform the request before acutally attempting to perform the task.
|
||||
* If false, will perform no such verification
|
||||
*
|
||||
* making the request and that all nodes are able to perform the request before acutally attempting to perform the task.
|
||||
* If false, will perform no such verification
|
||||
* @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later
|
||||
*/
|
||||
AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated, boolean performVerification);
|
||||
|
@ -115,11 +113,10 @@ public interface RequestReplicator {
|
|||
* Forwards a request to the Cluster Coordinator so that it is able to replicate the request to all nodes in the cluster.
|
||||
*
|
||||
* @param coordinatorNodeId the node identifier of the Cluster Coordinator
|
||||
* @param method the HTTP method (e.g., POST, PUT)
|
||||
* @param uri the base request URI (up to, but not including, the query string)
|
||||
* @param entity an entity
|
||||
* @param headers any HTTP headers
|
||||
*
|
||||
* @param method the HTTP method (e.g., POST, PUT)
|
||||
* @param uri the base request URI (up to, but not including, the query string)
|
||||
* @param entity an entity
|
||||
* @param headers any HTTP headers
|
||||
* @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later
|
||||
*/
|
||||
AsyncClusterResponse forwardToCoordinator(NodeIdentifier coordinatorNodeId, String method, URI uri, Object entity, Map<String, String> headers);
|
||||
|
@ -135,7 +132,7 @@ public interface RequestReplicator {
|
|||
*
|
||||
* @param requestIdentifier the identifier of the request to obtain a response for
|
||||
* @return an AsyncClusterResponse that provides the most up-to-date status of the request with the given identifier, or <code>null</code> if
|
||||
* no request exists with the given identifier
|
||||
* no request exists with the given identifier
|
||||
*/
|
||||
AsyncClusterResponse getClusterResponse(String requestIdentifier);
|
||||
}
|
||||
|
|
|
@ -103,33 +103,33 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
|||
/**
|
||||
* Creates an instance using a connection timeout and read timeout of 3 seconds
|
||||
*
|
||||
* @param numThreads the number of threads to use when parallelizing requests
|
||||
* @param client a client for making requests
|
||||
* @param numThreads the number of threads to use when parallelizing requests
|
||||
* @param client a client for making requests
|
||||
* @param clusterCoordinator the cluster coordinator to use for interacting with node statuses
|
||||
* @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null.
|
||||
* @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null.
|
||||
* @param nifiProperties properties
|
||||
* @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null.
|
||||
* @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null.
|
||||
* @param nifiProperties properties
|
||||
*/
|
||||
public ThreadPoolRequestReplicator(final int numThreads, final Client client, final ClusterCoordinator clusterCoordinator,
|
||||
final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) {
|
||||
final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) {
|
||||
this(numThreads, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an instance.
|
||||
*
|
||||
* @param numThreads the number of threads to use when parallelizing requests
|
||||
* @param client a client for making requests
|
||||
* @param numThreads the number of threads to use when parallelizing requests
|
||||
* @param client a client for making requests
|
||||
* @param clusterCoordinator the cluster coordinator to use for interacting with node statuses
|
||||
* @param connectionTimeout the connection timeout specified in milliseconds
|
||||
* @param readTimeout the read timeout specified in milliseconds
|
||||
* @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null.
|
||||
* @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null.
|
||||
* @param nifiProperties properties
|
||||
* @param connectionTimeout the connection timeout specified in milliseconds
|
||||
* @param readTimeout the read timeout specified in milliseconds
|
||||
* @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null.
|
||||
* @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null.
|
||||
* @param nifiProperties properties
|
||||
*/
|
||||
public ThreadPoolRequestReplicator(final int numThreads, final Client client, final ClusterCoordinator clusterCoordinator,
|
||||
final String connectionTimeout, final String readTimeout, final RequestCompletionCallback callback,
|
||||
final EventReporter eventReporter, final NiFiProperties nifiProperties) {
|
||||
final String connectionTimeout, final String readTimeout, final RequestCompletionCallback callback,
|
||||
final EventReporter eventReporter, final NiFiProperties nifiProperties) {
|
||||
if (numThreads <= 0) {
|
||||
throw new IllegalArgumentException("The number of threads must be greater than zero.");
|
||||
} else if (client == null) {
|
||||
|
@ -222,7 +222,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
|||
|
||||
@Override
|
||||
public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers,
|
||||
final boolean indicateReplicated, final boolean performVerification) {
|
||||
final boolean indicateReplicated, final boolean performVerification) {
|
||||
final Map<String, String> updatedHeaders = new HashMap<>(headers);
|
||||
|
||||
updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, ComponentIdGenerator.generateId().toString());
|
||||
|
@ -275,19 +275,18 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
|||
/**
|
||||
* Replicates the request to all nodes in the given set of node identifiers
|
||||
*
|
||||
* @param nodeIds the NodeIdentifiers that identify which nodes to send the request to
|
||||
* @param method the HTTP method to use
|
||||
* @param uri the URI to send the request to
|
||||
* @param entity the entity to use
|
||||
* @param headers the HTTP Headers
|
||||
* @param nodeIds the NodeIdentifiers that identify which nodes to send the request to
|
||||
* @param method the HTTP method to use
|
||||
* @param uri the URI to send the request to
|
||||
* @param entity the entity to use
|
||||
* @param headers the HTTP Headers
|
||||
* @param performVerification whether or not to verify that all nodes in the cluster are connected and that all nodes can perform request. Ignored if request is not mutable.
|
||||
* @param response the response to update with the results
|
||||
* @param executionPhase <code>true</code> if this is the execution phase, <code>false</code> otherwise
|
||||
*
|
||||
* @param response the response to update with the results
|
||||
* @param executionPhase <code>true</code> if this is the execution phase, <code>false</code> otherwise
|
||||
* @return an AsyncClusterResponse that can be used to obtain the result
|
||||
*/
|
||||
private AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean performVerification,
|
||||
StandardAsyncClusterResponse response, boolean executionPhase) {
|
||||
StandardAsyncClusterResponse response, boolean executionPhase) {
|
||||
|
||||
// state validation
|
||||
Objects.requireNonNull(nodeIds);
|
||||
|
@ -330,9 +329,9 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
|||
|
||||
if (numRequests >= MAX_CONCURRENT_REQUESTS) {
|
||||
final Map<String, Long> countsByUri = responseMap.values().stream().collect(
|
||||
Collectors.groupingBy(
|
||||
StandardAsyncClusterResponse::getURIPath,
|
||||
Collectors.counting()));
|
||||
Collectors.groupingBy(
|
||||
StandardAsyncClusterResponse::getURIPath,
|
||||
Collectors.counting()));
|
||||
|
||||
logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", method, uri.getPath(), numRequests, countsByUri);
|
||||
throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests");
|
||||
|
@ -345,7 +344,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
|||
// create a response object if one was not already passed to us
|
||||
if (response == null) {
|
||||
response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds,
|
||||
responseMerger, completionCallback, responseConsumedCallback);
|
||||
responseMerger, completionCallback, responseConsumedCallback);
|
||||
responseMap.put(requestId, response);
|
||||
}
|
||||
|
||||
|
@ -377,14 +376,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
|||
|
||||
// replicate the request to all nodes
|
||||
final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
|
||||
nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback);
|
||||
nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback);
|
||||
replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders);
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
|
||||
|
||||
private void performVerification(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse) {
|
||||
logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath());
|
||||
|
||||
|
@ -432,7 +430,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
|||
logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", dissentingCount, method, uri.getPath());
|
||||
|
||||
final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
|
||||
nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null);
|
||||
nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null);
|
||||
|
||||
replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders);
|
||||
}
|
||||
|
@ -451,13 +449,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
|||
message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus();
|
||||
|
||||
logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur",
|
||||
response.getStatus(), response.getNodeId(), method, uri.getPath());
|
||||
response.getStatus(), response.getNodeId(), method, uri.getPath());
|
||||
} else {
|
||||
final String nodeExplanation = clientResponse.getEntity(String.class);
|
||||
message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: " + nodeExplanation;
|
||||
|
||||
logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur. Node explanation: {}",
|
||||
response.getStatus(), response.getNodeId(), method, uri.getPath(), nodeExplanation);
|
||||
response.getStatus(), response.getNodeId(), method, uri.getPath(), nodeExplanation);
|
||||
}
|
||||
|
||||
// if a node reports forbidden, use that as the response failure
|
||||
|
@ -516,9 +514,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
|||
}
|
||||
|
||||
// Visible for testing - overriding this method makes it easy to verify behavior without actually making any web requests
|
||||
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder,
|
||||
final NodeIdentifier nodeId, final String method,
|
||||
final URI uri, final String requestId,
|
||||
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId,
|
||||
final Map<String, String> headers) {
|
||||
final ClientResponse clientResponse;
|
||||
final long startNanos = System.nanoTime();
|
||||
|
@ -565,8 +561,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
|||
* Verifies that the cluster is in a state that will allow requests to be made using the given HTTP Method and URI path
|
||||
*
|
||||
* @param httpMethod the HTTP Method
|
||||
* @param uriPath the URI Path
|
||||
*
|
||||
* @param uriPath the URI Path
|
||||
* @throw IllegalClusterStateException if the cluster is not in a state that allows a request to made to the given URI Path using the given HTTP Method
|
||||
*/
|
||||
private void verifyClusterState(final String httpMethod, final String uriPath) {
|
||||
|
@ -611,7 +606,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
|||
callback.afterRequest(response.getURIPath(), response.getMethod(), response.getCompletedNodeResponses());
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Completed request {} {} but failed to properly handle the Request Completion Callback due to {}",
|
||||
response.getMethod(), response.getURIPath(), e.toString());
|
||||
response.getMethod(), response.getURIPath(), e.toString());
|
||||
logger.warn("", e);
|
||||
}
|
||||
}
|
||||
|
@ -629,7 +624,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
|||
final int sequentialLongRequests = counter.incrementAndGet();
|
||||
if (sequentialLongRequests >= 3) {
|
||||
final String message = "Response time from " + nodeId + " was slow for each of the last 3 requests made. "
|
||||
+ "To see more information about timing, enable DEBUG logging for " + logger.getName();
|
||||
+ "To see more information about timing, enable DEBUG logging for " + logger.getName();
|
||||
|
||||
logger.warn(message);
|
||||
if (eventReporter != null) {
|
||||
|
@ -647,8 +642,8 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
|||
private void logTimingInfo(final AsyncClusterResponse response) {
|
||||
// Calculate min, max, mean for the requests
|
||||
final LongSummaryStatistics stats = response.getNodesInvolved().stream()
|
||||
.map(p -> response.getNodeResponse(p).getRequestDuration(TimeUnit.MILLISECONDS))
|
||||
.collect(Collectors.summarizingLong(Long::longValue));
|
||||
.map(p -> response.getNodeResponse(p).getRequestDuration(TimeUnit.MILLISECONDS))
|
||||
.collect(Collectors.summarizingLong(Long::longValue));
|
||||
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
sb.append("Node Responses for ").append(response.getMethod()).append(" ").append(response.getURIPath()).append(" (Request ID ").append(response.getRequestIdentifier()).append("):\n");
|
||||
|
@ -657,14 +652,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
|||
}
|
||||
|
||||
logger.debug("For {} {} (Request ID {}), minimum response time = {}, max = {}, average = {} ms",
|
||||
response.getMethod(), response.getURIPath(), response.getRequestIdentifier(), stats.getMin(), stats.getMax(), stats.getAverage());
|
||||
response.getMethod(), response.getURIPath(), response.getRequestIdentifier(), stats.getMin(), stats.getMax(), stats.getAverage());
|
||||
logger.debug(sb.toString());
|
||||
}
|
||||
|
||||
|
||||
|
||||
private void replicateRequest(final Set<NodeIdentifier> nodeIds, final String scheme, final String path,
|
||||
final Function<NodeIdentifier, NodeHttpRequest> callableFactory, final Map<String, String> headers) {
|
||||
final Function<NodeIdentifier, NodeHttpRequest> callableFactory, final Map<String, String> headers) {
|
||||
|
||||
if (nodeIds.isEmpty()) {
|
||||
return; // return quickly for trivial case
|
||||
|
@ -703,7 +697,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
|||
private final NodeRequestCompletionCallback callback;
|
||||
|
||||
private NodeHttpRequest(final NodeIdentifier nodeId, final String method,
|
||||
final URI uri, final Object entity, final Map<String, String> headers, final NodeRequestCompletionCallback callback) {
|
||||
final URI uri, final Object entity, final Map<String, String> headers, final NodeRequestCompletionCallback callback) {
|
||||
this.nodeId = nodeId;
|
||||
this.method = method;
|
||||
this.uri = uri;
|
||||
|
@ -791,10 +785,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
|||
|
||||
private synchronized int purgeExpiredRequests() {
|
||||
final Set<String> expiredRequestIds = ThreadPoolRequestReplicator.this.responseMap.entrySet().stream()
|
||||
.filter(entry -> entry.getValue().isOlderThan(30, TimeUnit.SECONDS)) // older than 30 seconds
|
||||
.filter(entry -> entry.getValue().isComplete()) // is complete
|
||||
.map(entry -> entry.getKey()) // get the request id
|
||||
.collect(Collectors.toSet());
|
||||
.filter(entry -> entry.getValue().isOlderThan(30, TimeUnit.SECONDS)) // older than 30 seconds
|
||||
.filter(entry -> entry.getValue().isComplete()) // is complete
|
||||
.map(entry -> entry.getKey()) // get the request id
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
expiredRequestIds.forEach(id -> onResponseConsumed(id));
|
||||
return responseMap.size();
|
||||
|
|
|
@ -16,12 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.cluster.coordination.http.replication;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import com.sun.jersey.api.client.Client;
|
||||
import com.sun.jersey.api.client.ClientHandlerException;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
|
@ -29,19 +23,6 @@ import com.sun.jersey.api.client.ClientResponse.Status;
|
|||
import com.sun.jersey.api.client.WebResource;
|
||||
import com.sun.jersey.core.header.InBoundHeaders;
|
||||
import com.sun.jersey.core.header.OutBoundHeaders;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import javax.ws.rs.HttpMethod;
|
||||
import org.apache.commons.collections4.map.MultiValueMap;
|
||||
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
||||
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
|
||||
|
@ -62,6 +43,26 @@ import org.mockito.internal.util.reflection.Whitebox;
|
|||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import javax.ws.rs.HttpMethod;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestThreadPoolRequestReplicator {
|
||||
|
||||
@BeforeClass
|
||||
|
@ -164,10 +165,8 @@ public class TestThreadPoolRequestReplicator {
|
|||
final ThreadPoolRequestReplicator replicator
|
||||
= new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
|
||||
@Override
|
||||
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder,
|
||||
final NodeIdentifier nodeId, final String method,
|
||||
final URI uri, final String requestId,
|
||||
Map<String, String> givenHeaders) {
|
||||
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
|
||||
final URI uri, final String requestId, Map<String, String> givenHeaders) {
|
||||
// the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them.
|
||||
final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata");
|
||||
final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
|
||||
|
@ -235,7 +234,7 @@ public class TestThreadPoolRequestReplicator {
|
|||
= new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
|
||||
@Override
|
||||
public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers,
|
||||
boolean indicateReplicated, boolean verify) {
|
||||
boolean indicateReplicated, boolean verify) {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
@ -288,10 +287,8 @@ public class TestThreadPoolRequestReplicator {
|
|||
final ThreadPoolRequestReplicator replicator
|
||||
= new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
|
||||
@Override
|
||||
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder,
|
||||
final NodeIdentifier nodeId, final String method,
|
||||
final URI uri, final String requestId,
|
||||
Map<String, String> givenHeaders) {
|
||||
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
|
||||
final URI uri, final String requestId, Map<String, String> givenHeaders) {
|
||||
// the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them.
|
||||
final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata");
|
||||
final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
|
||||
|
@ -333,10 +330,8 @@ public class TestThreadPoolRequestReplicator {
|
|||
final ThreadPoolRequestReplicator replicator
|
||||
= new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
|
||||
@Override
|
||||
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder,
|
||||
final NodeIdentifier nodeId, final String method,
|
||||
final URI uri, final String requestId,
|
||||
Map<String, String> givenHeaders) {
|
||||
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
|
||||
final URI uri, final String requestId, Map<String, String> givenHeaders) {
|
||||
if (delayMillis > 0L) {
|
||||
try {
|
||||
Thread.sleep(delayMillis);
|
||||
|
|
|
@ -3029,7 +3029,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
|||
descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
|
||||
}
|
||||
|
||||
return dtoFactory.createPropertyDescriptorDto(descriptor, "root");
|
||||
return dtoFactory.createPropertyDescriptorDto(descriptor, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -487,7 +487,7 @@ public abstract class ApplicationResource {
|
|||
* @return the response
|
||||
*/
|
||||
protected <T extends Entity> Response withWriteLock(final NiFiServiceFacade serviceFacade, final T entity, final Revision revision, final AuthorizeAccess authorizer,
|
||||
final Runnable verifier, final BiFunction<Revision, T, Response> action) {
|
||||
final Runnable verifier, final BiFunction<Revision, T, Response> action) {
|
||||
|
||||
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||
|
||||
|
@ -536,7 +536,7 @@ public abstract class ApplicationResource {
|
|||
* @return the response
|
||||
*/
|
||||
protected <T extends Entity> Response withWriteLock(final NiFiServiceFacade serviceFacade, final T entity, final Set<Revision> revisions, final AuthorizeAccess authorizer,
|
||||
final Runnable verifier, final BiFunction<Set<Revision>, T, Response> action) {
|
||||
final Runnable verifier, final BiFunction<Set<Revision>, T, Response> action) {
|
||||
|
||||
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||
|
||||
|
@ -577,10 +577,10 @@ public abstract class ApplicationResource {
|
|||
/**
|
||||
* Executes an action through the service facade.
|
||||
*
|
||||
* @param serviceFacade service facade
|
||||
* @param authorizer authorizer
|
||||
* @param verifier verifier
|
||||
* @param action the action to execute
|
||||
* @param serviceFacade service facade
|
||||
* @param authorizer authorizer
|
||||
* @param verifier verifier
|
||||
* @param action the action to execute
|
||||
* @return the response
|
||||
*/
|
||||
protected <T extends Entity> Response withWriteLock(final NiFiServiceFacade serviceFacade, final T entity, final AuthorizeAccess authorizer,
|
||||
|
@ -786,8 +786,7 @@ public abstract class ApplicationResource {
|
|||
return requestReplicator.replicate(targetNodes, method, path, entity, headers, true, true).awaitMergedResponse().getResponse();
|
||||
} else {
|
||||
headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, nodeId.getId());
|
||||
return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method,
|
||||
path, entity, headers, false, true).awaitMergedResponse().getResponse();
|
||||
return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, path, entity, headers).awaitMergedResponse().getResponse();
|
||||
}
|
||||
} catch (final InterruptedException ie) {
|
||||
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + path + " was interrupted").type("text/plain").build();
|
||||
|
@ -820,9 +819,8 @@ public abstract class ApplicationResource {
|
|||
final Set<NodeIdentifier> nodeIds = Collections.singleton(targetNode);
|
||||
return getRequestReplicator().replicate(nodeIds, method, getAbsolutePath(), entity, getHeaders(), true, true).awaitMergedResponse().getResponse();
|
||||
} else {
|
||||
final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
|
||||
final Map<String, String> headers = getHeaders(Collections.singletonMap(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, targetNode.getId()));
|
||||
return getRequestReplicator().replicate(coordinatorNode, method, getAbsolutePath(), entity, headers, false, true).awaitMergedResponse().getResponse();
|
||||
return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, getAbsolutePath(), entity, headers).awaitMergedResponse().getResponse();
|
||||
}
|
||||
} catch (final InterruptedException ie) {
|
||||
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build();
|
||||
|
@ -916,7 +914,7 @@ public abstract class ApplicationResource {
|
|||
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||
return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse();
|
||||
} else {
|
||||
return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, path, entity, headers).awaitMergedResponse();
|
||||
return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, path, entity, headers).awaitMergedResponse();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -348,7 +348,8 @@ public class ControllerResource extends ApplicationResource {
|
|||
throw new IllegalArgumentException("Controller service details must be specified.");
|
||||
}
|
||||
|
||||
if (requestControllerServiceEntity.getRevision() == null || (requestControllerServiceEntity.getRevision().getVersion() == null || requestControllerServiceEntity.getRevision().getVersion() != 0)) {
|
||||
if (requestControllerServiceEntity.getRevision() == null
|
||||
|| (requestControllerServiceEntity.getRevision().getVersion() == null || requestControllerServiceEntity.getRevision().getVersion() != 0)) {
|
||||
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Controller service.");
|
||||
}
|
||||
|
||||
|
|
|
@ -57,10 +57,8 @@ import javax.ws.rs.QueryParam;
|
|||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -165,8 +163,8 @@ public class CountersResource extends ApplicationResource {
|
|||
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||
nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse();
|
||||
} else {
|
||||
final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
|
||||
nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false, true).awaitMergedResponse();
|
||||
nodeResponse = getRequestReplicator().forwardToCoordinator(
|
||||
getClusterCoordinatorNode(), HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse();
|
||||
}
|
||||
|
||||
final CountersEntity entity = (CountersEntity) nodeResponse.getUpdatedEntity();
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.nifi.authorization.RequestAction;
|
|||
import org.apache.nifi.authorization.resource.Authorizable;
|
||||
import org.apache.nifi.authorization.user.NiFiUser;
|
||||
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
import org.apache.nifi.controller.Snippet;
|
||||
import org.apache.nifi.web.NiFiServiceFacade;
|
||||
import org.apache.nifi.web.ResourceNotFoundException;
|
||||
|
@ -98,7 +97,6 @@ import javax.xml.transform.stream.StreamSource;
|
|||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -1312,7 +1310,8 @@ public class ProcessGroupResource extends ApplicationResource {
|
|||
throw new IllegalArgumentException("Remote process group details must be specified.");
|
||||
}
|
||||
|
||||
if (requestRemoteProcessGroupEntity.getRevision() == null || (requestRemoteProcessGroupEntity.getRevision().getVersion() == null || requestRemoteProcessGroupEntity.getRevision().getVersion() != 0)) {
|
||||
if (requestRemoteProcessGroupEntity.getRevision() == null
|
||||
|| (requestRemoteProcessGroupEntity.getRevision().getVersion() == null || requestRemoteProcessGroupEntity.getRevision().getVersion() != 0)) {
|
||||
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Remote process group.");
|
||||
}
|
||||
|
||||
|
@ -1963,8 +1962,8 @@ public class ProcessGroupResource extends ApplicationResource {
|
|||
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||
return getRequestReplicator().replicate(HttpMethod.POST, importUri, entity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse();
|
||||
} else {
|
||||
final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
|
||||
return getRequestReplicator().replicate(coordinatorNode, HttpMethod.POST, importUri, entity, getHeaders(headersToOverride), false, true).awaitMergedResponse().getResponse();
|
||||
return getRequestReplicator().forwardToCoordinator(
|
||||
getClusterCoordinatorNode(), HttpMethod.POST, importUri, entity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2100,7 +2099,8 @@ public class ProcessGroupResource extends ApplicationResource {
|
|||
throw new IllegalArgumentException("Controller service details must be specified.");
|
||||
}
|
||||
|
||||
if (requestControllerServiceEntity.getRevision() == null || (requestControllerServiceEntity.getRevision().getVersion() == null || requestControllerServiceEntity.getRevision().getVersion() != 0)) {
|
||||
if (requestControllerServiceEntity.getRevision() == null
|
||||
|| (requestControllerServiceEntity.getRevision().getVersion() == null || requestControllerServiceEntity.getRevision().getVersion() != 0)) {
|
||||
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Controller service.");
|
||||
}
|
||||
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.nifi.authorization.resource.ResourceFactory;
|
|||
import org.apache.nifi.authorization.user.NiFiUser;
|
||||
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
||||
import org.apache.nifi.cluster.manager.NodeResponse;
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
import org.apache.nifi.web.NiFiServiceFacade;
|
||||
import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
|
||||
import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity;
|
||||
|
@ -48,10 +47,8 @@ import javax.ws.rs.Produces;
|
|||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* RESTful endpoint for retrieving system diagnostics.
|
||||
|
@ -143,8 +140,8 @@ public class SystemDiagnosticsResource extends ApplicationResource {
|
|||
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||
nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse();
|
||||
} else {
|
||||
final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
|
||||
nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false, true).awaitMergedResponse();
|
||||
nodeResponse = getRequestReplicator().forwardToCoordinator(
|
||||
getClusterCoordinatorNode(), HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse();
|
||||
}
|
||||
|
||||
final SystemDiagnosticsEntity entity = (SystemDiagnosticsEntity) nodeResponse.getUpdatedEntity();
|
||||
|
|
|
@ -167,22 +167,22 @@ public class TenantsResource extends ApplicationResource {
|
|||
return replicate(HttpMethod.POST, requestUserEntity);
|
||||
}
|
||||
|
||||
// get revision from the config
|
||||
final RevisionDTO revisionDTO = requestUserEntity.getRevision();
|
||||
Revision requestRevision = new Revision(revisionDTO.getVersion(), revisionDTO.getClientId(), requestUserEntity.getComponent().getId());
|
||||
return withWriteLock(
|
||||
serviceFacade,
|
||||
requestUserEntity,
|
||||
requestRevision,
|
||||
lookup -> {
|
||||
final Authorizable tenants = lookup.getTenant();
|
||||
tenants.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
||||
},
|
||||
null,
|
||||
(revision, userEntity) -> {
|
||||
userEntity -> {
|
||||
// set the user id as appropriate
|
||||
userEntity.getComponent().setId(generateUuid());
|
||||
|
||||
// get revision from the config
|
||||
final RevisionDTO revisionDTO = userEntity.getRevision();
|
||||
Revision revision = new Revision(revisionDTO.getVersion(), revisionDTO.getClientId(), userEntity.getComponent().getId());
|
||||
|
||||
// create the user and generate the json
|
||||
final UserEntity entity = serviceFacade.createUser(revision, userEntity.getComponent());
|
||||
populateRemainingUserEntityContent(entity);
|
||||
|
@ -552,22 +552,22 @@ public class TenantsResource extends ApplicationResource {
|
|||
return replicate(HttpMethod.POST, requestUserGroupEntity);
|
||||
}
|
||||
|
||||
// get revision from the config
|
||||
final RevisionDTO revisionDTO = requestUserGroupEntity.getRevision();
|
||||
Revision requestRevision = new Revision(revisionDTO.getVersion(), revisionDTO.getClientId(), requestUserGroupEntity.getComponent().getId());
|
||||
return withWriteLock(
|
||||
serviceFacade,
|
||||
requestUserGroupEntity,
|
||||
requestRevision,
|
||||
lookup -> {
|
||||
final Authorizable tenants = lookup.getTenant();
|
||||
tenants.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
|
||||
},
|
||||
null,
|
||||
(revision, userGroupEntity) -> {
|
||||
userGroupEntity -> {
|
||||
// set the user group id as appropriate
|
||||
userGroupEntity.getComponent().setId(generateUuid());
|
||||
|
||||
// get revision from the config
|
||||
final RevisionDTO revisionDTO = userGroupEntity.getRevision();
|
||||
Revision revision = new Revision(revisionDTO.getVersion(), revisionDTO.getClientId(), userGroupEntity.getComponent().getId());
|
||||
|
||||
// create the user group and generate the json
|
||||
final UserGroupEntity entity = serviceFacade.createUserGroup(revision, userGroupEntity.getComponent());
|
||||
populateRemainingUserGroupEntityContent(entity);
|
||||
|
|
|
@ -16,6 +16,6 @@
|
|||
<div id="user-delete-dialog" class="hidden">
|
||||
<div class="dialog-content">
|
||||
<input type="hidden" id="user-id-delete-dialog"/>
|
||||
Are you sure you want to delete the user account for '<span id="user-identity-delete-dialog"></span>'?
|
||||
Are you sure you want to delete the account for '<span id="user-identity-delete-dialog"></span>'?
|
||||
</div>
|
||||
</div>
|
|
@ -31,7 +31,7 @@ nf.UsersTable = (function () {
|
|||
|
||||
var initUserDeleteDialog = function () {
|
||||
$('#user-delete-dialog').modal({
|
||||
headerText: 'Delete User',
|
||||
headerText: 'Delete Account',
|
||||
buttons: [{
|
||||
buttonText: 'Delete',
|
||||
color: {
|
||||
|
|
Loading…
Reference in New Issue