diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java index 318b1a0737..ff94eba4c5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java @@ -17,24 +17,30 @@ package org.apache.nifi.cluster.coordination.http.replication; -import org.apache.nifi.cluster.coordination.http.HttpResponseMapper; -import org.apache.nifi.cluster.manager.NodeResponse; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.net.URI; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.apache.nifi.cluster.coordination.http.HttpResponseMapper; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class StandardAsyncClusterResponse implements AsyncClusterResponse { private static final Logger logger = LoggerFactory.getLogger(StandardAsyncClusterResponse.class); + private static final int DEFAULT_RESPONSE_BUFFER_SIZE = 1024 * 1024; + + public static final String VERIFICATION_PHASE = "Verification Phase"; + public static final String COMMIT_PHASE = "Execution Phase"; + public static final String ONLY_PHASE = "Only Phase"; private final String id; private final Set nodeIds; @@ -45,21 +51,38 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse { private final Runnable completedResultFetchedCallback; private final long creationTimeNanos; private final boolean merge; + private final AtomicInteger responseBufferLeft; private final Map responseMap = new HashMap<>(); private final AtomicInteger requestsCompleted = new AtomicInteger(0); private NodeResponse mergedResponse; // guarded by synchronizing on this private RuntimeException failure; // guarded by synchronizing on this + private volatile String phase; + private volatile long phaseStartTime = System.nanoTime(); + private final long creationTime = System.nanoTime(); - public StandardAsyncClusterResponse(final String id, final URI uri, final String method, final Set nodeIds, - final HttpResponseMapper responseMapper, final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback, final boolean merge) { + private final Map timingInfo = new LinkedHashMap<>(); + + public StandardAsyncClusterResponse(final String id, final URI uri, final String method, final Set nodeIds, final HttpResponseMapper responseMapper, + final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback, final boolean merge) { + this(id, uri, method, nodeIds, responseMapper, completionCallback, completedResultFetchedCallback, merge, DEFAULT_RESPONSE_BUFFER_SIZE); + } + + public StandardAsyncClusterResponse(final String id, final URI uri, final String method, final Set nodeIds, final HttpResponseMapper responseMapper, + final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback, final boolean merge, final int responseBufferSize) { this.id = id; this.nodeIds = Collections.unmodifiableSet(new HashSet<>(nodeIds)); this.uri = uri; this.method = method; this.merge = merge; + if ("POST".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) { + phase = VERIFICATION_PHASE; + } else { + phase = ONLY_PHASE; + } + creationTimeNanos = System.nanoTime(); for (final NodeIdentifier nodeId : nodeIds) { responseMap.put(nodeId, new ResponseHolder(creationTimeNanos)); @@ -68,8 +91,51 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse { this.responseMapper = responseMapper; this.completionCallback = completionCallback; this.completedResultFetchedCallback = completedResultFetchedCallback; + this.responseBufferLeft = new AtomicInteger(responseBufferSize); } + public boolean requestBuffer(final int size) { + boolean updated = false; + while (!updated) { + final int bytesLeft = responseBufferLeft.get(); + if (bytesLeft < size) { + return false; + } + + updated = responseBufferLeft.compareAndSet(bytesLeft, bytesLeft - size); + } + + return true; + } + + public void setPhase(final String phase) { + this.phase = phase; + phaseStartTime = System.nanoTime(); + } + + public synchronized void addTiming(final String description, final String node, final long nanos) { + final StringBuilder sb = new StringBuilder(description); + if (phase != ONLY_PHASE) { + sb.append(" (").append(phase).append(")"); + } + sb.append(" for ").append(node); + timingInfo.put(sb.toString(), nanos); + } + + private synchronized void logTimingInfo() { + if (!logger.isDebugEnabled()) { + return; + } + + final StringBuilder sb = new StringBuilder(); + sb.append(String.format("For %s %s Timing Info is as follows:\n", method, uri)); + for (final Map.Entry entry : timingInfo.entrySet()) { + sb.append(entry.getKey()).append(" took ").append(TimeUnit.NANOSECONDS.toMillis(entry.getValue())).append(" millis\n"); + } + logger.debug(sb.toString()); + } + + @Override public String getRequestIdentifier() { return id; @@ -148,7 +214,11 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse { .map(p -> p.getResponse()) .filter(response -> response != null) .collect(Collectors.toSet()); + + final long start = System.nanoTime(); mergedResponse = responseMapper.mapResponses(uri, method, nodeResponses, merge); + final long nanos = System.nanoTime() - start; + addTiming("Map/Merge Responses", "All Nodes", nanos); logger.debug("Notifying all that merged response is complete for {}", id); this.notifyAll(); @@ -169,6 +239,8 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse { } } + logTimingInfo(); + return getMergedResponse(true); } @@ -195,6 +267,8 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse { } } + logTimingInfo(); + return getMergedResponse(true); } @@ -217,10 +291,18 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse { if (completedCount == responseMap.size()) { logger.debug("Notifying all that merged response is ready for {}", id); + addTiming("Phase Completed", "All Nodes", System.nanoTime() - phaseStartTime); + + final long start = System.nanoTime(); + synchronized (this) { this.notifyAll(); } + final long nanos = System.nanoTime() - start; + timingInfo.put("Notifying All Threads that Request is Complete", nanos); + timingInfo.put("Total Time for All Nodes", System.nanoTime() - creationTime); + if (completionCallback != null) { completionCallback.onCompletion(this); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index 3b4470fd70..bc2f8bb092 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -250,10 +250,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { // we need to ensure that we use proper locking. We don't want two requests modifying the flow at the same // time, so we use a write lock if the request is mutable and a read lock otherwise. final Lock lock = isMutableRequest(method, uri.getPath()) ? writeLock : readLock; - logger.debug("Obtaining lock {} in order to replicate request {} {}", method, uri); + logger.debug("Obtaining lock {} in order to replicate request {} {}", lock, method, uri); lock.lock(); try { - logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri); + logger.debug("Lock {} obtained in order to replicate request {} {}", lock, method, uri); // Unlocking of the lock is performed within the replicate method, as we need to ensure that it is unlocked only after // the entire request has completed. @@ -316,6 +316,36 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { throw new IllegalArgumentException("Cannot replicate request to 0 nodes"); } + // Update headers to indicate the current revision so that we can + // prevent multiple users changing the flow at the same time + final Map updatedHeaders = new HashMap<>(headers); + final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString()); + + // create a response object if one was not already passed to us + if (response == null) { + // create the request objects and replicate to all nodes. + // When the request has completed, we need to ensure that we notify the monitor, if there is one. + final CompletionCallback completionCallback = clusterResponse -> { + try { + onCompletedResponse(requestId); + } finally { + if (monitor != null) { + synchronized (monitor) { + monitor.notify(); + } + + logger.debug("Notified monitor {} because request {} {} has completed", monitor, method, uri); + } + } + }; + + final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId); + + response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds, + responseMapper, completionCallback, responseConsumedCallback, merge); + responseMap.put(requestId, response); + } + // verify all of the nodes exist and are in the proper state for (final NodeIdentifier nodeId : nodeIds) { final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId); @@ -330,13 +360,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response); - // Update headers to indicate the current revision so that we can - // prevent multiple users changing the flow at the same time - final Map updatedHeaders = new HashMap<>(headers); - final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString()); - if (performVerification) { + final long start = System.nanoTime(); verifyClusterState(method, uri.getPath()); + final long nanos = System.nanoTime() - start; + response.addTiming("Verify Cluster State", "All Nodes", nanos); } int numRequests = responseMap.size(); @@ -354,31 +382,6 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests"); } - // create the request objects and replicate to all nodes. - // When the request has completed, we need to ensure that we notify the monitor, if there is one. - final CompletionCallback completionCallback = clusterResponse -> { - try { - onCompletedResponse(requestId); - } finally { - if (monitor != null) { - synchronized (monitor) { - monitor.notify(); - } - - logger.debug("Notified monitor {} because request {} {} has completed", monitor, method, uri); - } - } - }; - - final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId); - - // create a response object if one was not already passed to us - if (response == null) { - response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds, - responseMapper, completionCallback, responseConsumedCallback, merge); - responseMap.put(requestId, response); - } - logger.debug("For Request ID {}, response object is {}", requestId, response); // if mutable request, we have to do a two-phase commit where we ask each node to verify @@ -391,6 +394,8 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId); performVerification(nodeIds, method, uri, entity, updatedHeaders, response, merge, monitor); return response; + } else if (mutableRequest) { + response.setPhase(StandardAsyncClusterResponse.COMMIT_PHASE); } // Callback function for generating a NodeHttpRequestCallable that can be used to perform the work @@ -407,8 +412,8 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { // replicate the request to all nodes final Function requestFactory = - nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback); - replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders); + nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback, finalResponse); + submitAsyncRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders); return response; } catch (final Throwable t) { @@ -431,6 +436,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { final Map validationHeaders = new HashMap<>(headers); validationHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE); + final long startNanos = System.nanoTime(); final int numNodes = nodeIds.size(); final NodeRequestCompletionCallback completionCallback = new NodeRequestCompletionCallback() { final Set nodeResponses = Collections.synchronizedSet(new HashSet<>()); @@ -450,9 +456,14 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } try { + final long nanos = System.nanoTime() - startNanos; + clusterResponse.addTiming("Completed Verification", nodeResponse.getNodeId().toString(), nanos); + // If we have all of the node responses, then we can verify the responses // and if good replicate the original request to all of the nodes. if (allNodesResponded) { + clusterResponse.addTiming("Verification Completed", "All Nodes", nanos); + // Check if we have any requests that do not have a 150-Continue status code. final long dissentingCount = nodeResponses.stream().filter(p -> p.getStatus() != NODE_CONTINUE_STATUS_CODE).count(); @@ -473,9 +484,9 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", dissentingCount, method, uri.getPath()); final Function requestFactory = - nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null); + nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null, clusterResponse); - replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders); + submitAsyncRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders); } }); cancelLockThread.setName("Cancel Flow Locks"); @@ -547,10 +558,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { }; // Callback function for generating a NodeHttpRequestCallable that can be used to perform the work - final Function requestFactory = nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, validationHeaders, completionCallback); + final Function requestFactory = nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, validationHeaders, completionCallback, + clusterResponse); // replicate the 'verification request' to all nodes - replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, validationHeaders); + submitAsyncRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, validationHeaders); } @@ -566,7 +578,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { // Visible for testing - overriding this method makes it easy to verify behavior without actually making any web requests protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId, - final Map headers) { + final Map headers, final StandardAsyncClusterResponse clusterResponse) { final ClientResponse clientResponse; final long startNanos = System.nanoTime(); logger.debug("Replicating request to {} {}, request ID = {}, headers = {}", method, uri, requestId, headers); @@ -594,7 +606,20 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { throw new IllegalArgumentException("HTTP Method '" + method + "' not supported for request replication."); } - return new NodeResponse(nodeId, method, uri, clientResponse, System.nanoTime() - startNanos, requestId); + final long nanos = System.nanoTime() - startNanos; + clusterResponse.addTiming("Perform HTTP Request", nodeId.toString(), nanos); + final NodeResponse nodeResponse = new NodeResponse(nodeId, method, uri, clientResponse, System.nanoTime() - startNanos, requestId); + if (nodeResponse.is2xx()) { + final int length = nodeResponse.getClientResponse().getLength(); + if (length > 0) { + final boolean canBufferResponse = clusterResponse.requestBuffer(length); + if (canBufferResponse) { + nodeResponse.bufferResponse(); + } + } + } + + return nodeResponse; } private boolean isMutableRequest(final String method, final String uriPath) { @@ -708,7 +733,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } - private void replicateRequest(final Set nodeIds, final String scheme, final String path, + private void submitAsyncRequest(final Set nodeIds, final String scheme, final String path, final Function callableFactory, final Map headers) { if (nodeIds.isEmpty()) { @@ -746,20 +771,26 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { private final Object entity; private final Map headers = new HashMap<>(); private final NodeRequestCompletionCallback callback; + private final StandardAsyncClusterResponse clusterResponse; + private final long creationNanos = System.nanoTime(); - private NodeHttpRequest(final NodeIdentifier nodeId, final String method, - final URI uri, final Object entity, final Map headers, final NodeRequestCompletionCallback callback) { + private NodeHttpRequest(final NodeIdentifier nodeId, final String method, final URI uri, final Object entity, final Map headers, + final NodeRequestCompletionCallback callback, final StandardAsyncClusterResponse clusterResponse) { this.nodeId = nodeId; this.method = method; this.uri = uri; this.entity = entity; this.headers.putAll(headers); this.callback = callback; + this.clusterResponse = clusterResponse; } @Override public void run() { + final long waitForScheduleNanos = System.nanoTime() - creationNanos; + clusterResponse.addTiming("Wait for HTTP Request Replication to be triggered", nodeId.toString(), waitForScheduleNanos); + NodeResponse nodeResponse; try { @@ -768,7 +799,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { final String requestId = headers.get("x-nifi-request-id"); logger.debug("Replicating request {} {} to {}", method, uri.getPath(), nodeId); - nodeResponse = replicateRequest(resourceBuilder, nodeId, method, uri, requestId, headers); + nodeResponse = replicateRequest(resourceBuilder, nodeId, method, uri, requestId, headers, clusterResponse); } catch (final Exception e) { nodeResponse = new NodeResponse(nodeId, method, uri, e); logger.warn("Failed to replicate request {} {} to {} due to {}", method, uri.getPath(), nodeId, e); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java index 308652e873..7c911b87f2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java @@ -16,8 +16,9 @@ */ package org.apache.nifi.cluster.manager; -import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.util.List; @@ -34,6 +35,7 @@ import javax.ws.rs.core.StreamingOutput; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.web.api.entity.Entity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,11 +61,12 @@ public class NodeResponse { private final URI requestUri; private final ClientResponse clientResponse; private final NodeIdentifier nodeId; - private final Throwable throwable; + private Throwable throwable; private boolean hasCreatedResponse = false; private final Entity updatedEntity; private final long requestDurationNanos; private final String requestId; + private byte[] bufferedResponse; public NodeResponse(final NodeIdentifier nodeId, final String httpMethod, final URI requestUri, final ClientResponse clientResponse, final long requestDurationNanos, final String requestId) { if (nodeId == null) { @@ -158,6 +161,23 @@ public class NodeResponse { return (500 <= statusCode && statusCode <= 599); } + public synchronized void bufferResponse() { + bufferedResponse = new byte[clientResponse.getLength()]; + try { + StreamUtils.fillBuffer(clientResponse.getEntityInputStream(), bufferedResponse); + } catch (final IOException e) { + this.throwable = e; + } + } + + private synchronized InputStream getInputStream() { + if (bufferedResponse == null) { + return clientResponse.getEntityInputStream(); + } + + return new ByteArrayInputStream(bufferedResponse); + } + public ClientResponse getClientResponse() { return clientResponse; } @@ -229,7 +249,6 @@ public class NodeResponse { for (final String key : clientResponse.getHeaders().keySet()) { final List values = clientResponse.getHeaders().get(key); for (final String value : values) { - if (key.equalsIgnoreCase("transfer-encoding") || key.equalsIgnoreCase("content-length")) { /* * do not copy the transfer-encoding header (i.e., chunked encoding) or @@ -244,25 +263,19 @@ public class NodeResponse { */ continue; } + responseBuilder.header(key, value); } } // head requests must not have a message-body in the response if (!HttpMethod.HEAD.equalsIgnoreCase(httpMethod)) { - // set the entity if (updatedEntity == null) { responseBuilder.entity(new StreamingOutput() { @Override public void write(final OutputStream output) throws IOException, WebApplicationException { - BufferedInputStream bis = null; - try { - bis = new BufferedInputStream(clientResponse.getEntityInputStream()); - IOUtils.copy(bis, output); - } finally { - IOUtils.closeQuietly(bis); - } + IOUtils.copy(getInputStream(), output); } }); } else { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java index 88a883660c..018bf93538 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@ -234,7 +234,7 @@ public class TestThreadPoolRequestReplicator { = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) { @Override protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, - final URI uri, final String requestId, Map givenHeaders) { + final URI uri, final String requestId, Map givenHeaders, final StandardAsyncClusterResponse response) { // the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them. final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata"); final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); @@ -364,7 +364,7 @@ public class TestThreadPoolRequestReplicator { = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) { @Override protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, - final URI uri, final String requestId, Map givenHeaders) { + final URI uri, final String requestId, Map givenHeaders, final StandardAsyncClusterResponse response) { // the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them. final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata"); final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); @@ -574,7 +574,7 @@ public class TestThreadPoolRequestReplicator { final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) { @Override protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, - final URI uri, final String requestId, Map givenHeaders) { + final URI uri, final String requestId, Map givenHeaders, final StandardAsyncClusterResponse response) { if (delayMillis > 0L) { try { Thread.sleep(delayMillis); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java index 7c74680425..20dbfe122f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.bundle.Bundle; import org.apache.nifi.cluster.ReportedEvent; import org.apache.nifi.cluster.coordination.flow.FlowElection; import org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor; @@ -62,6 +63,7 @@ import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.events.EventReporter; import org.apache.nifi.io.socket.ServerSocketConfiguration; import org.apache.nifi.io.socket.SocketConfiguration; +import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.Severity; @@ -118,6 +120,9 @@ public class Node { } }; + final Bundle systemBundle = ExtensionManager.createSystemBundle(properties); + ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet()); + revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/META-INF/services/org.apache.nifi.components.state.StateProvider b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/META-INF/services/org.apache.nifi.components.state.StateProvider new file mode 100644 index 0000000000..49a02cdb64 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/META-INF/services/org.apache.nifi.components.state.StateProvider @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.cluster.integration.NopStateProvider \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java index 088f26dbd1..a1d5173b8e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java @@ -57,7 +57,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { private StandardFlowFileRecord(final Builder builder) { this.id = builder.bId; - this.attributes = builder.bAttributes; + this.attributes = builder.bAttributes == null ? Collections.emptyMap() : builder.bAttributes; this.entryDate = builder.bEntryDate; this.lineageStartDate = builder.bLineageStartDate; this.lineageStartIndex = builder.bLineageStartIndex; @@ -176,11 +176,12 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { private final Set bLineageIdentifiers = new HashSet<>(); private long bPenaltyExpirationMs = -1L; private long bSize = 0L; - private final Map bAttributes = new HashMap<>(); private ContentClaim bClaim = null; private long bClaimOffset = 0L; private long bLastQueueDate = System.currentTimeMillis(); private long bQueueDateIndex = 0L; + private Map bAttributes; + private boolean bAttributesCopied = false; public Builder id(final long id) { bId = id; @@ -210,14 +211,28 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { return this; } + private Map initializeAttributes() { + if (bAttributes == null) { + bAttributes = new HashMap<>(); + bAttributesCopied = true; + } else if (!bAttributesCopied) { + bAttributes = new HashMap<>(bAttributes); + bAttributesCopied = true; + } + + return bAttributes; + } + public Builder addAttribute(final String key, final String value) { if (key != null && value != null) { - bAttributes.put(FlowFile.KeyValidator.validateKey(key), value); + initializeAttributes().put(FlowFile.KeyValidator.validateKey(key), value); } return this; } public Builder addAttributes(final Map attributes) { + final Map initializedAttributes = initializeAttributes(); + if (null != attributes) { for (final String key : attributes.keySet()) { FlowFile.KeyValidator.validateKey(key); @@ -226,7 +241,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { final String key = entry.getKey(); final String value = entry.getValue(); if (key != null && value != null) { - bAttributes.put(key, value); + initializedAttributes.put(key, value); } } } @@ -240,7 +255,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { continue; } - bAttributes.remove(key); + initializeAttributes().remove(key); } } return this; @@ -253,7 +268,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { continue; } - bAttributes.remove(key); + initializeAttributes().remove(key); } } return this; @@ -261,7 +276,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { public Builder removeAttributes(final Pattern keyPattern) { if (keyPattern != null) { - final Iterator iterator = bAttributes.keySet().iterator(); + final Iterator iterator = initializeAttributes().keySet().iterator(); while (iterator.hasNext()) { final String key = iterator.next(); @@ -304,7 +319,8 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { bLineageIdentifiers.clear(); bPenaltyExpirationMs = specFlowFile.getPenaltyExpirationMillis(); bSize = specFlowFile.getSize(); - bAttributes.putAll(specFlowFile.getAttributes()); + bAttributes = specFlowFile.getAttributes(); + bAttributesCopied = false; bClaim = specFlowFile.getContentClaim(); bClaimOffset = specFlowFile.getContentClaimOffset(); bLastQueueDate = specFlowFile.getLastQueueDate(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardFlowFileRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardFlowFileRecord.java new file mode 100644 index 0000000000..55c0e3e3a9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardFlowFileRecord.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.junit.Test; + +public class TestStandardFlowFileRecord { + + @Test + public void testAttributeCopiedOnModification() { + final FlowFileRecord original = new StandardFlowFileRecord.Builder() + .addAttribute("uuid", UUID.randomUUID().toString()) + .addAttribute("abc", "xyz") + .build(); + + final FlowFileRecord addAttribute = new StandardFlowFileRecord.Builder() + .fromFlowFile(original) + .addAttribute("hello", "good-bye") + .build(); + + final Map addAttributeMapCopy = new HashMap<>(addAttribute.getAttributes()); + + assertEquals("good-bye", addAttribute.getAttributes().get("hello")); + assertEquals("xyz", addAttribute.getAttributes().get("abc")); + + assertEquals("xyz", original.getAttributes().get("abc")); + assertFalse(original.getAttributes().containsKey("hello")); + + final FlowFileRecord removeAttribute = new StandardFlowFileRecord.Builder() + .fromFlowFile(addAttribute) + .removeAttributes("hello") + .build(); + + assertEquals(original.getAttributes(), removeAttribute.getAttributes()); + assertEquals(addAttributeMapCopy, addAttribute.getAttributes()); + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java index 7d548c9d1d..11573a54f7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java @@ -16,17 +16,20 @@ */ package org.apache.nifi.web; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; - -import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Aspect to limit access into the core. */ @Aspect public class NiFiServiceFacadeLock { + private static final Logger logger = LoggerFactory.getLogger(NiFiServiceFacadeLock.class); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); @@ -36,165 +39,128 @@ public class NiFiServiceFacadeLock { @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " + "execution(* create*(..))") public Object createLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { - writeLock.lock(); - try { - return proceedingJoinPoint.proceed(); - } finally { - writeLock.unlock(); - } + return proceedWithWriteLock(proceedingJoinPoint); } @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " + "execution(* clear*(..))") public Object clearLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { - writeLock.lock(); - try { - return proceedingJoinPoint.proceed(); - } finally { - writeLock.unlock(); - } + return proceedWithWriteLock(proceedingJoinPoint); } @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " + "execution(* delete*(..))") public Object deleteLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { - writeLock.lock(); - try { - return proceedingJoinPoint.proceed(); - } finally { - writeLock.unlock(); - } + return proceedWithWriteLock(proceedingJoinPoint); } @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " + "execution(* remove*(..))") public Object removeLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { - writeLock.lock(); - try { - return proceedingJoinPoint.proceed(); - } finally { - writeLock.unlock(); - } + return proceedWithWriteLock(proceedingJoinPoint); } @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " + "execution(* update*(..))") public Object updateLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { - writeLock.lock(); - try { - return proceedingJoinPoint.proceed(); - } finally { - writeLock.unlock(); - } + return proceedWithWriteLock(proceedingJoinPoint); } @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " + "execution(* set*(..))") public Object setLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { - writeLock.lock(); - try { - return proceedingJoinPoint.proceed(); - } finally { - writeLock.unlock(); - } + return proceedWithWriteLock(proceedingJoinPoint); } @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " + "execution(* copy*(..))") public Object copyLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { - writeLock.lock(); - try { - return proceedingJoinPoint.proceed(); - } finally { - writeLock.unlock(); - } + return proceedWithWriteLock(proceedingJoinPoint); } @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " + "execution(* import*(..))") public Object importLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { - writeLock.lock(); - try { - return proceedingJoinPoint.proceed(); - } finally { - writeLock.unlock(); - } + return proceedWithWriteLock(proceedingJoinPoint); } @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " + "execution(* export*(..))") public Object exportLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { - writeLock.lock(); - try { - return proceedingJoinPoint.proceed(); - } finally { - writeLock.unlock(); - } + return proceedWithWriteLock(proceedingJoinPoint); } @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " + "execution(* submit*(..))") public Object submitLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { - writeLock.lock(); - try { - return proceedingJoinPoint.proceed(); - } finally { - writeLock.unlock(); - } + return proceedWithWriteLock(proceedingJoinPoint); } @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " + "execution(* schedule*(..))") public Object scheduleLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { - writeLock.lock(); - try { - return proceedingJoinPoint.proceed(); - } finally { - writeLock.unlock(); - } + return proceedWithWriteLock(proceedingJoinPoint); } @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " + "execution(* get*(..))") public Object getLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { - readLock.lock(); - try { - return proceedingJoinPoint.proceed(); - } finally { - readLock.unlock(); - } + return proceedWithReadLock(proceedingJoinPoint); } @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " + "execution(* is*(..))") public Object isLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { - readLock.lock(); - try { - return proceedingJoinPoint.proceed(); - } finally { - readLock.unlock(); - } + return proceedWithReadLock(proceedingJoinPoint); } @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " + "execution(* search*(..))") public Object searchLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { - readLock.lock(); - try { - return proceedingJoinPoint.proceed(); - } finally { - readLock.unlock(); - } + return proceedWithReadLock(proceedingJoinPoint); } @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " - + "execution(* verify*(..))") + + "execution(* verify*(..))") public Object verifyLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { + return proceedWithReadLock(proceedingJoinPoint); + } + + + private Object proceedWithReadLock(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable { + final long beforeLock = System.nanoTime(); + long afterLock = 0L; + readLock.lock(); try { + afterLock = System.nanoTime(); return proceedingJoinPoint.proceed(); } finally { readLock.unlock(); + + final long afterProcedure = System.nanoTime(); + final String procedure = proceedingJoinPoint.getSignature().toLongString(); + logger.debug("In order to perform procedure {}, it took {} nanos to obtain the Read Lock {} and {} nanos to invoke the method", + procedure, afterLock - beforeLock, readLock, afterProcedure - afterLock); } } + + private Object proceedWithWriteLock(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable { + final long beforeLock = System.nanoTime(); + long afterLock = 0L; + + writeLock.lock(); + try { + afterLock = System.nanoTime(); + return proceedingJoinPoint.proceed(); + } finally { + writeLock.unlock(); + + final long afterProcedure = System.nanoTime(); + final String procedure = proceedingJoinPoint.getSignature().toLongString(); + logger.debug("In order to perform procedure {}, it took {} nanos to obtain the Write Lock {} and {} nanos to invoke the method", + procedure, afterLock - beforeLock, writeLock, afterProcedure - afterLock); + } + } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index c159273bf1..455380f2a7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -978,10 +978,21 @@ public abstract class ApplicationResource { // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly // to the cluster nodes themselves. - if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { - return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse(); - } else { - return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, path, entity, headers).awaitMergedResponse(); + final long replicateStart = System.nanoTime(); + String action = null; + try { + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + action = "Replicate Request " + method + " " + path; + return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse(); + } else { + action = "Forward Request " + method + " " + path + " to Coordinator"; + return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, path, entity, headers).awaitMergedResponse(); + } + } finally { + final long replicateNanos = System.nanoTime() - replicateStart; + final String transactionId = headers.get(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER); + final String requestId = transactionId == null ? "Request with no ID" : transactionId; + logger.debug("Took a total of {} millis to {} for {}", TimeUnit.NANOSECONDS.toMillis(replicateNanos), action, requestId); } }