diff --git a/nifi-commons/nifi-web-utils/pom.xml b/nifi-commons/nifi-web-utils/pom.xml index 7cc0581e82..dd2f3efe9f 100644 --- a/nifi-commons/nifi-web-utils/pom.xml +++ b/nifi-commons/nifi-web-utils/pom.xml @@ -34,10 +34,6 @@ org.apache.commons commons-lang3 - - com.sun.jersey - jersey-core - com.sun.jersey jersey-client diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/BulletinEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/BulletinEntity.java index 6606c1d48d..4b52bf873d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/BulletinEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/BulletinEntity.java @@ -18,7 +18,9 @@ package org.apache.nifi.web.api.entity; import org.apache.nifi.web.api.dto.BulletinDTO; import org.apache.nifi.web.api.dto.ReadablePermission; +import org.apache.nifi.web.api.dto.util.TimeAdapter; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; import java.util.Date; /** @@ -84,6 +86,7 @@ public class BulletinEntity extends Entity implements ReadablePermission { /** * @return When this bulletin was generated. */ + @XmlJavaTypeAdapter(TimeAdapter.class) public Date getTimestamp() { return timestamp; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java index 2beff221bb..1fd6a4962b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java @@ -78,13 +78,12 @@ public interface RequestReplicator { * will contain the results that are immediately available, as well as an identifier for obtaining an updated result * later. NOTE: This method will ALWAYS indicate that the request has been replicated. * - * @param method the HTTP method (e.g., POST, PUT) - * @param uri the base request URI (up to, but not including, the query string) - * @param entity an entity + * @param method the HTTP method (e.g., POST, PUT) + * @param uri the base request URI (up to, but not including, the query string) + * @param entity an entity * @param headers any HTTP headers * @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later - * - * @throws ConnectingNodeMutableRequestException if the request attempts to modify the flow and there is a node that is in the CONNECTING state + * @throws ConnectingNodeMutableRequestException if the request attempts to modify the flow and there is a node that is in the CONNECTING state * @throws DisconnectedNodeMutableRequestException if the request attempts to modify the flow and there is a node that is in the DISCONNECTED state */ AsyncClusterResponse replicate(String method, URI uri, Object entity, Map headers); @@ -92,20 +91,19 @@ public interface RequestReplicator { /** * Requests are sent to each node in the given set of Node Identifiers. The returned AsyncClusterResponse object will contain * the results that are immediately available, as well as an identifier for obtaining an updated result later. - * + *

* HTTP DELETE, GET, HEAD, and OPTIONS methods will throw an IllegalArgumentException if used. * - * @param nodeIds the node identifiers - * @param method the HTTP method (e.g., POST, PUT) - * @param uri the base request URI (up to, but not including, the query string) - * @param entity an entity - * @param headers any HTTP headers - * @param indicateReplicated if true, will add a header indicating to the receiving nodes that the request - * has already been replicated, so the receiving node will not replicate the request itself. + * @param nodeIds the node identifiers + * @param method the HTTP method (e.g., POST, PUT) + * @param uri the base request URI (up to, but not including, the query string) + * @param entity an entity + * @param headers any HTTP headers + * @param indicateReplicated if true, will add a header indicating to the receiving nodes that the request + * has already been replicated, so the receiving node will not replicate the request itself. * @param performVerification if true, and the request is mutable, will verify that all nodes are connected before - * making the request and that all nodes are able to perform the request before acutally attempting to perform the task. - * If false, will perform no such verification - * + * making the request and that all nodes are able to perform the request before acutally attempting to perform the task. + * If false, will perform no such verification * @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later */ AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean indicateReplicated, boolean performVerification); @@ -115,11 +113,10 @@ public interface RequestReplicator { * Forwards a request to the Cluster Coordinator so that it is able to replicate the request to all nodes in the cluster. * * @param coordinatorNodeId the node identifier of the Cluster Coordinator - * @param method the HTTP method (e.g., POST, PUT) - * @param uri the base request URI (up to, but not including, the query string) - * @param entity an entity - * @param headers any HTTP headers - * + * @param method the HTTP method (e.g., POST, PUT) + * @param uri the base request URI (up to, but not including, the query string) + * @param entity an entity + * @param headers any HTTP headers * @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later */ AsyncClusterResponse forwardToCoordinator(NodeIdentifier coordinatorNodeId, String method, URI uri, Object entity, Map headers); @@ -135,7 +132,7 @@ public interface RequestReplicator { * * @param requestIdentifier the identifier of the request to obtain a response for * @return an AsyncClusterResponse that provides the most up-to-date status of the request with the given identifier, or null if - * no request exists with the given identifier + * no request exists with the given identifier */ AsyncClusterResponse getClusterResponse(String requestIdentifier); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index 3342470cd3..c1ee77b9d0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -103,33 +103,33 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { /** * Creates an instance using a connection timeout and read timeout of 3 seconds * - * @param numThreads the number of threads to use when parallelizing requests - * @param client a client for making requests + * @param numThreads the number of threads to use when parallelizing requests + * @param client a client for making requests * @param clusterCoordinator the cluster coordinator to use for interacting with node statuses - * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null. - * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. - * @param nifiProperties properties + * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null. + * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. + * @param nifiProperties properties */ public ThreadPoolRequestReplicator(final int numThreads, final Client client, final ClusterCoordinator clusterCoordinator, - final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) { + final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) { this(numThreads, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties); } /** * Creates an instance. * - * @param numThreads the number of threads to use when parallelizing requests - * @param client a client for making requests + * @param numThreads the number of threads to use when parallelizing requests + * @param client a client for making requests * @param clusterCoordinator the cluster coordinator to use for interacting with node statuses - * @param connectionTimeout the connection timeout specified in milliseconds - * @param readTimeout the read timeout specified in milliseconds - * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null. - * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. - * @param nifiProperties properties + * @param connectionTimeout the connection timeout specified in milliseconds + * @param readTimeout the read timeout specified in milliseconds + * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null. + * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. + * @param nifiProperties properties */ public ThreadPoolRequestReplicator(final int numThreads, final Client client, final ClusterCoordinator clusterCoordinator, - final String connectionTimeout, final String readTimeout, final RequestCompletionCallback callback, - final EventReporter eventReporter, final NiFiProperties nifiProperties) { + final String connectionTimeout, final String readTimeout, final RequestCompletionCallback callback, + final EventReporter eventReporter, final NiFiProperties nifiProperties) { if (numThreads <= 0) { throw new IllegalArgumentException("The number of threads must be greater than zero."); } else if (client == null) { @@ -222,7 +222,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { @Override public AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, - final boolean indicateReplicated, final boolean performVerification) { + final boolean indicateReplicated, final boolean performVerification) { final Map updatedHeaders = new HashMap<>(headers); updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, ComponentIdGenerator.generateId().toString()); @@ -275,19 +275,18 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { /** * Replicates the request to all nodes in the given set of node identifiers * - * @param nodeIds the NodeIdentifiers that identify which nodes to send the request to - * @param method the HTTP method to use - * @param uri the URI to send the request to - * @param entity the entity to use - * @param headers the HTTP Headers + * @param nodeIds the NodeIdentifiers that identify which nodes to send the request to + * @param method the HTTP method to use + * @param uri the URI to send the request to + * @param entity the entity to use + * @param headers the HTTP Headers * @param performVerification whether or not to verify that all nodes in the cluster are connected and that all nodes can perform request. Ignored if request is not mutable. - * @param response the response to update with the results - * @param executionPhase true if this is the execution phase, false otherwise - * + * @param response the response to update with the results + * @param executionPhase true if this is the execution phase, false otherwise * @return an AsyncClusterResponse that can be used to obtain the result */ private AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean performVerification, - StandardAsyncClusterResponse response, boolean executionPhase) { + StandardAsyncClusterResponse response, boolean executionPhase) { // state validation Objects.requireNonNull(nodeIds); @@ -330,9 +329,9 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { if (numRequests >= MAX_CONCURRENT_REQUESTS) { final Map countsByUri = responseMap.values().stream().collect( - Collectors.groupingBy( - StandardAsyncClusterResponse::getURIPath, - Collectors.counting())); + Collectors.groupingBy( + StandardAsyncClusterResponse::getURIPath, + Collectors.counting())); logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", method, uri.getPath(), numRequests, countsByUri); throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests"); @@ -345,7 +344,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { // create a response object if one was not already passed to us if (response == null) { response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds, - responseMerger, completionCallback, responseConsumedCallback); + responseMerger, completionCallback, responseConsumedCallback); responseMap.put(requestId, response); } @@ -377,14 +376,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { // replicate the request to all nodes final Function requestFactory = - nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback); + nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback); replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders); return response; } - private void performVerification(Set nodeIds, String method, URI uri, Object entity, Map headers, StandardAsyncClusterResponse clusterResponse) { logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath()); @@ -432,7 +430,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", dissentingCount, method, uri.getPath()); final Function requestFactory = - nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null); + nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null); replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders); } @@ -451,13 +449,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus(); logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur", - response.getStatus(), response.getNodeId(), method, uri.getPath()); + response.getStatus(), response.getNodeId(), method, uri.getPath()); } else { final String nodeExplanation = clientResponse.getEntity(String.class); message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: " + nodeExplanation; logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur. Node explanation: {}", - response.getStatus(), response.getNodeId(), method, uri.getPath(), nodeExplanation); + response.getStatus(), response.getNodeId(), method, uri.getPath(), nodeExplanation); } // if a node reports forbidden, use that as the response failure @@ -516,9 +514,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } // Visible for testing - overriding this method makes it easy to verify behavior without actually making any web requests - protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, - final NodeIdentifier nodeId, final String method, - final URI uri, final String requestId, + protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId, final Map headers) { final ClientResponse clientResponse; final long startNanos = System.nanoTime(); @@ -565,8 +561,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * Verifies that the cluster is in a state that will allow requests to be made using the given HTTP Method and URI path * * @param httpMethod the HTTP Method - * @param uriPath the URI Path - * + * @param uriPath the URI Path * @throw IllegalClusterStateException if the cluster is not in a state that allows a request to made to the given URI Path using the given HTTP Method */ private void verifyClusterState(final String httpMethod, final String uriPath) { @@ -611,7 +606,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { callback.afterRequest(response.getURIPath(), response.getMethod(), response.getCompletedNodeResponses()); } catch (final Exception e) { logger.warn("Completed request {} {} but failed to properly handle the Request Completion Callback due to {}", - response.getMethod(), response.getURIPath(), e.toString()); + response.getMethod(), response.getURIPath(), e.toString()); logger.warn("", e); } } @@ -629,7 +624,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { final int sequentialLongRequests = counter.incrementAndGet(); if (sequentialLongRequests >= 3) { final String message = "Response time from " + nodeId + " was slow for each of the last 3 requests made. " - + "To see more information about timing, enable DEBUG logging for " + logger.getName(); + + "To see more information about timing, enable DEBUG logging for " + logger.getName(); logger.warn(message); if (eventReporter != null) { @@ -647,8 +642,8 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { private void logTimingInfo(final AsyncClusterResponse response) { // Calculate min, max, mean for the requests final LongSummaryStatistics stats = response.getNodesInvolved().stream() - .map(p -> response.getNodeResponse(p).getRequestDuration(TimeUnit.MILLISECONDS)) - .collect(Collectors.summarizingLong(Long::longValue)); + .map(p -> response.getNodeResponse(p).getRequestDuration(TimeUnit.MILLISECONDS)) + .collect(Collectors.summarizingLong(Long::longValue)); final StringBuilder sb = new StringBuilder(); sb.append("Node Responses for ").append(response.getMethod()).append(" ").append(response.getURIPath()).append(" (Request ID ").append(response.getRequestIdentifier()).append("):\n"); @@ -657,14 +652,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } logger.debug("For {} {} (Request ID {}), minimum response time = {}, max = {}, average = {} ms", - response.getMethod(), response.getURIPath(), response.getRequestIdentifier(), stats.getMin(), stats.getMax(), stats.getAverage()); + response.getMethod(), response.getURIPath(), response.getRequestIdentifier(), stats.getMin(), stats.getMax(), stats.getAverage()); logger.debug(sb.toString()); } - private void replicateRequest(final Set nodeIds, final String scheme, final String path, - final Function callableFactory, final Map headers) { + final Function callableFactory, final Map headers) { if (nodeIds.isEmpty()) { return; // return quickly for trivial case @@ -703,7 +697,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { private final NodeRequestCompletionCallback callback; private NodeHttpRequest(final NodeIdentifier nodeId, final String method, - final URI uri, final Object entity, final Map headers, final NodeRequestCompletionCallback callback) { + final URI uri, final Object entity, final Map headers, final NodeRequestCompletionCallback callback) { this.nodeId = nodeId; this.method = method; this.uri = uri; @@ -791,10 +785,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { private synchronized int purgeExpiredRequests() { final Set expiredRequestIds = ThreadPoolRequestReplicator.this.responseMap.entrySet().stream() - .filter(entry -> entry.getValue().isOlderThan(30, TimeUnit.SECONDS)) // older than 30 seconds - .filter(entry -> entry.getValue().isComplete()) // is complete - .map(entry -> entry.getKey()) // get the request id - .collect(Collectors.toSet()); + .filter(entry -> entry.getValue().isOlderThan(30, TimeUnit.SECONDS)) // older than 30 seconds + .filter(entry -> entry.getValue().isComplete()) // is complete + .map(entry -> entry.getKey()) // get the request id + .collect(Collectors.toSet()); expiredRequestIds.forEach(id -> onResponseConsumed(id)); return responseMap.size(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java index 1db2602178..50d58919fd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@ -16,12 +16,6 @@ */ package org.apache.nifi.cluster.coordination.http.replication; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientResponse; @@ -29,19 +23,6 @@ import com.sun.jersey.api.client.ClientResponse.Status; import com.sun.jersey.api.client.WebResource; import com.sun.jersey.core.header.InBoundHeaders; import com.sun.jersey.core.header.OutBoundHeaders; -import java.io.ByteArrayInputStream; -import java.net.SocketTimeoutException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import javax.ws.rs.HttpMethod; import org.apache.commons.collections4.map.MultiValueMap; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; @@ -62,6 +43,26 @@ import org.mockito.internal.util.reflection.Whitebox; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import javax.ws.rs.HttpMethod; +import java.io.ByteArrayInputStream; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + public class TestThreadPoolRequestReplicator { @BeforeClass @@ -164,10 +165,8 @@ public class TestThreadPoolRequestReplicator { final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) { @Override - protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, - final NodeIdentifier nodeId, final String method, - final URI uri, final String requestId, - Map givenHeaders) { + protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, + final URI uri, final String requestId, Map givenHeaders) { // the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them. final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata"); final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); @@ -235,7 +234,7 @@ public class TestThreadPoolRequestReplicator { = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) { @Override public AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, - boolean indicateReplicated, boolean verify) { + boolean indicateReplicated, boolean verify) { return null; } }; @@ -288,10 +287,8 @@ public class TestThreadPoolRequestReplicator { final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) { @Override - protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, - final NodeIdentifier nodeId, final String method, - final URI uri, final String requestId, - Map givenHeaders) { + protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, + final URI uri, final String requestId, Map givenHeaders) { // the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them. final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata"); final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); @@ -333,10 +330,8 @@ public class TestThreadPoolRequestReplicator { final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) { @Override - protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, - final NodeIdentifier nodeId, final String method, - final URI uri, final String requestId, - Map givenHeaders) { + protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, + final URI uri, final String requestId, Map givenHeaders) { if (delayMillis > 0L) { try { Thread.sleep(delayMillis); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 55d98b5d64..dfa3e9517c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -3029,7 +3029,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build(); } - return dtoFactory.createPropertyDescriptorDto(descriptor, "root"); + return dtoFactory.createPropertyDescriptorDto(descriptor, null); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index 4f251dd432..92d6b7a6c6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -487,7 +487,7 @@ public abstract class ApplicationResource { * @return the response */ protected Response withWriteLock(final NiFiServiceFacade serviceFacade, final T entity, final Revision revision, final AuthorizeAccess authorizer, - final Runnable verifier, final BiFunction action) { + final Runnable verifier, final BiFunction action) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); @@ -536,7 +536,7 @@ public abstract class ApplicationResource { * @return the response */ protected Response withWriteLock(final NiFiServiceFacade serviceFacade, final T entity, final Set revisions, final AuthorizeAccess authorizer, - final Runnable verifier, final BiFunction, T, Response> action) { + final Runnable verifier, final BiFunction, T, Response> action) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); @@ -577,10 +577,10 @@ public abstract class ApplicationResource { /** * Executes an action through the service facade. * - * @param serviceFacade service facade - * @param authorizer authorizer - * @param verifier verifier - * @param action the action to execute + * @param serviceFacade service facade + * @param authorizer authorizer + * @param verifier verifier + * @param action the action to execute * @return the response */ protected Response withWriteLock(final NiFiServiceFacade serviceFacade, final T entity, final AuthorizeAccess authorizer, @@ -786,8 +786,7 @@ public abstract class ApplicationResource { return requestReplicator.replicate(targetNodes, method, path, entity, headers, true, true).awaitMergedResponse().getResponse(); } else { headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, nodeId.getId()); - return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method, - path, entity, headers, false, true).awaitMergedResponse().getResponse(); + return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, path, entity, headers).awaitMergedResponse().getResponse(); } } catch (final InterruptedException ie) { return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + path + " was interrupted").type("text/plain").build(); @@ -820,9 +819,8 @@ public abstract class ApplicationResource { final Set nodeIds = Collections.singleton(targetNode); return getRequestReplicator().replicate(nodeIds, method, getAbsolutePath(), entity, getHeaders(), true, true).awaitMergedResponse().getResponse(); } else { - final Set coordinatorNode = Collections.singleton(getClusterCoordinatorNode()); final Map headers = getHeaders(Collections.singletonMap(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, targetNode.getId())); - return getRequestReplicator().replicate(coordinatorNode, method, getAbsolutePath(), entity, headers, false, true).awaitMergedResponse().getResponse(); + return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, getAbsolutePath(), entity, headers).awaitMergedResponse().getResponse(); } } catch (final InterruptedException ie) { return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build(); @@ -916,7 +914,7 @@ public abstract class ApplicationResource { if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse(); } else { - return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, path, entity, headers).awaitMergedResponse(); + return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, path, entity, headers).awaitMergedResponse(); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java index f5ff1888c5..1fe87ab06f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java @@ -348,7 +348,8 @@ public class ControllerResource extends ApplicationResource { throw new IllegalArgumentException("Controller service details must be specified."); } - if (requestControllerServiceEntity.getRevision() == null || (requestControllerServiceEntity.getRevision().getVersion() == null || requestControllerServiceEntity.getRevision().getVersion() != 0)) { + if (requestControllerServiceEntity.getRevision() == null + || (requestControllerServiceEntity.getRevision().getVersion() == null || requestControllerServiceEntity.getRevision().getVersion() != 0)) { throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Controller service."); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java index 5e826743fa..a8be20d93b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java @@ -57,10 +57,8 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Set; /** @@ -165,8 +163,8 @@ public class CountersResource extends ApplicationResource { if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse(); } else { - final Set coordinatorNode = Collections.singleton(getClusterCoordinatorNode()); - nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false, true).awaitMergedResponse(); + nodeResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse(); } final CountersEntity entity = (CountersEntity) nodeResponse.getUpdatedEntity(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index a957544213..b348828fec 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -34,7 +34,6 @@ import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; -import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.Snippet; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.ResourceNotFoundException; @@ -98,7 +97,6 @@ import javax.xml.transform.stream.StreamSource; import java.io.InputStream; import java.net.URI; import java.net.URISyntaxException; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -1312,7 +1310,8 @@ public class ProcessGroupResource extends ApplicationResource { throw new IllegalArgumentException("Remote process group details must be specified."); } - if (requestRemoteProcessGroupEntity.getRevision() == null || (requestRemoteProcessGroupEntity.getRevision().getVersion() == null || requestRemoteProcessGroupEntity.getRevision().getVersion() != 0)) { + if (requestRemoteProcessGroupEntity.getRevision() == null + || (requestRemoteProcessGroupEntity.getRevision().getVersion() == null || requestRemoteProcessGroupEntity.getRevision().getVersion() != 0)) { throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Remote process group."); } @@ -1963,8 +1962,8 @@ public class ProcessGroupResource extends ApplicationResource { if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { return getRequestReplicator().replicate(HttpMethod.POST, importUri, entity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse(); } else { - final Set coordinatorNode = Collections.singleton(getClusterCoordinatorNode()); - return getRequestReplicator().replicate(coordinatorNode, HttpMethod.POST, importUri, entity, getHeaders(headersToOverride), false, true).awaitMergedResponse().getResponse(); + return getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), HttpMethod.POST, importUri, entity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse(); } } @@ -2100,7 +2099,8 @@ public class ProcessGroupResource extends ApplicationResource { throw new IllegalArgumentException("Controller service details must be specified."); } - if (requestControllerServiceEntity.getRevision() == null || (requestControllerServiceEntity.getRevision().getVersion() == null || requestControllerServiceEntity.getRevision().getVersion() != 0)) { + if (requestControllerServiceEntity.getRevision() == null + || (requestControllerServiceEntity.getRevision().getVersion() == null || requestControllerServiceEntity.getRevision().getVersion() != 0)) { throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Controller service."); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java index 641042b3b2..0be9c49aa6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java @@ -34,7 +34,6 @@ import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.manager.NodeResponse; -import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO; import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity; @@ -48,10 +47,8 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Set; /** * RESTful endpoint for retrieving system diagnostics. @@ -143,8 +140,8 @@ public class SystemDiagnosticsResource extends ApplicationResource { if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse(); } else { - final Set coordinatorNode = Collections.singleton(getClusterCoordinatorNode()); - nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false, true).awaitMergedResponse(); + nodeResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse(); } final SystemDiagnosticsEntity entity = (SystemDiagnosticsEntity) nodeResponse.getUpdatedEntity(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/TenantsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/TenantsResource.java index fae7345f59..4c687d2837 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/TenantsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/TenantsResource.java @@ -167,22 +167,22 @@ public class TenantsResource extends ApplicationResource { return replicate(HttpMethod.POST, requestUserEntity); } - // get revision from the config - final RevisionDTO revisionDTO = requestUserEntity.getRevision(); - Revision requestRevision = new Revision(revisionDTO.getVersion(), revisionDTO.getClientId(), requestUserEntity.getComponent().getId()); return withWriteLock( serviceFacade, requestUserEntity, - requestRevision, lookup -> { final Authorizable tenants = lookup.getTenant(); tenants.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, null, - (revision, userEntity) -> { + userEntity -> { // set the user id as appropriate userEntity.getComponent().setId(generateUuid()); + // get revision from the config + final RevisionDTO revisionDTO = userEntity.getRevision(); + Revision revision = new Revision(revisionDTO.getVersion(), revisionDTO.getClientId(), userEntity.getComponent().getId()); + // create the user and generate the json final UserEntity entity = serviceFacade.createUser(revision, userEntity.getComponent()); populateRemainingUserEntityContent(entity); @@ -552,22 +552,22 @@ public class TenantsResource extends ApplicationResource { return replicate(HttpMethod.POST, requestUserGroupEntity); } - // get revision from the config - final RevisionDTO revisionDTO = requestUserGroupEntity.getRevision(); - Revision requestRevision = new Revision(revisionDTO.getVersion(), revisionDTO.getClientId(), requestUserGroupEntity.getComponent().getId()); return withWriteLock( serviceFacade, requestUserGroupEntity, - requestRevision, lookup -> { final Authorizable tenants = lookup.getTenant(); tenants.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, null, - (revision, userGroupEntity) -> { + userGroupEntity -> { // set the user group id as appropriate userGroupEntity.getComponent().setId(generateUuid()); + // get revision from the config + final RevisionDTO revisionDTO = userGroupEntity.getRevision(); + Revision revision = new Revision(revisionDTO.getVersion(), revisionDTO.getClientId(), userGroupEntity.getComponent().getId()); + // create the user group and generate the json final UserGroupEntity entity = serviceFacade.createUserGroup(revision, userGroupEntity.getComponent()); populateRemainingUserGroupEntityContent(entity); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/users/user-delete-dialog.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/users/user-delete-dialog.jsp index ef30bafeb4..9fcabb1597 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/users/user-delete-dialog.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/users/user-delete-dialog.jsp @@ -16,6 +16,6 @@

\ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/users/nf-users-table.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/users/nf-users-table.js index 96efd73820..79267e059c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/users/nf-users-table.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/users/nf-users-table.js @@ -31,7 +31,7 @@ nf.UsersTable = (function () { var initUserDeleteDialog = function () { $('#user-delete-dialog').modal({ - headerText: 'Delete User', + headerText: 'Delete Account', buttons: [{ buttonText: 'Delete', color: {