NIFI-3636: Lazily copy FlowFile Attributes Hash Map instead of doing so eagerly.

Signed-off-by: Matt Burgess <mattyb149@apache.org>

NIFI-3257: Added additional logging regarding timing information when replicating requests across cluster in order to glean insight as to what is taking so long when replicating some requests

Signed-off-by: Matt Burgess <mattyb149@apache.org>

NIFI-3649: Buffer node responses when replicating HTTP Requests up to a maximum buffer size

Signed-off-by: Matt Burgess <mattyb149@apache.org>

NIFI-3636: Added unit test to ensure that flowfile attribute maps are copied when appropriate

Signed-off-by: Matt Burgess <mattyb149@apache.org>

NIFI-3636: Removed patch file that should not have been in commit

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #1612
This commit is contained in:
Mark Payne 2017-03-22 17:21:51 -04:00 committed by Matt Burgess
parent a7bf683a0d
commit 3aa1db6ee5
10 changed files with 364 additions and 165 deletions

View File

@ -17,24 +17,30 @@
package org.apache.nifi.cluster.coordination.http.replication;
import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StandardAsyncClusterResponse implements AsyncClusterResponse {
private static final Logger logger = LoggerFactory.getLogger(StandardAsyncClusterResponse.class);
private static final int DEFAULT_RESPONSE_BUFFER_SIZE = 1024 * 1024;
public static final String VERIFICATION_PHASE = "Verification Phase";
public static final String COMMIT_PHASE = "Execution Phase";
public static final String ONLY_PHASE = "Only Phase";
private final String id;
private final Set<NodeIdentifier> nodeIds;
@ -45,21 +51,38 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
private final Runnable completedResultFetchedCallback;
private final long creationTimeNanos;
private final boolean merge;
private final AtomicInteger responseBufferLeft;
private final Map<NodeIdentifier, ResponseHolder> responseMap = new HashMap<>();
private final AtomicInteger requestsCompleted = new AtomicInteger(0);
private NodeResponse mergedResponse; // guarded by synchronizing on this
private RuntimeException failure; // guarded by synchronizing on this
private volatile String phase;
private volatile long phaseStartTime = System.nanoTime();
private final long creationTime = System.nanoTime();
public StandardAsyncClusterResponse(final String id, final URI uri, final String method, final Set<NodeIdentifier> nodeIds,
final HttpResponseMapper responseMapper, final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback, final boolean merge) {
private final Map<String, Long> timingInfo = new LinkedHashMap<>();
public StandardAsyncClusterResponse(final String id, final URI uri, final String method, final Set<NodeIdentifier> nodeIds, final HttpResponseMapper responseMapper,
final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback, final boolean merge) {
this(id, uri, method, nodeIds, responseMapper, completionCallback, completedResultFetchedCallback, merge, DEFAULT_RESPONSE_BUFFER_SIZE);
}
public StandardAsyncClusterResponse(final String id, final URI uri, final String method, final Set<NodeIdentifier> nodeIds, final HttpResponseMapper responseMapper,
final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback, final boolean merge, final int responseBufferSize) {
this.id = id;
this.nodeIds = Collections.unmodifiableSet(new HashSet<>(nodeIds));
this.uri = uri;
this.method = method;
this.merge = merge;
if ("POST".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) {
phase = VERIFICATION_PHASE;
} else {
phase = ONLY_PHASE;
}
creationTimeNanos = System.nanoTime();
for (final NodeIdentifier nodeId : nodeIds) {
responseMap.put(nodeId, new ResponseHolder(creationTimeNanos));
@ -68,8 +91,51 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
this.responseMapper = responseMapper;
this.completionCallback = completionCallback;
this.completedResultFetchedCallback = completedResultFetchedCallback;
this.responseBufferLeft = new AtomicInteger(responseBufferSize);
}
public boolean requestBuffer(final int size) {
boolean updated = false;
while (!updated) {
final int bytesLeft = responseBufferLeft.get();
if (bytesLeft < size) {
return false;
}
updated = responseBufferLeft.compareAndSet(bytesLeft, bytesLeft - size);
}
return true;
}
public void setPhase(final String phase) {
this.phase = phase;
phaseStartTime = System.nanoTime();
}
public synchronized void addTiming(final String description, final String node, final long nanos) {
final StringBuilder sb = new StringBuilder(description);
if (phase != ONLY_PHASE) {
sb.append(" (").append(phase).append(")");
}
sb.append(" for ").append(node);
timingInfo.put(sb.toString(), nanos);
}
private synchronized void logTimingInfo() {
if (!logger.isDebugEnabled()) {
return;
}
final StringBuilder sb = new StringBuilder();
sb.append(String.format("For %s %s Timing Info is as follows:\n", method, uri));
for (final Map.Entry<String, Long> entry : timingInfo.entrySet()) {
sb.append(entry.getKey()).append(" took ").append(TimeUnit.NANOSECONDS.toMillis(entry.getValue())).append(" millis\n");
}
logger.debug(sb.toString());
}
@Override
public String getRequestIdentifier() {
return id;
@ -148,7 +214,11 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
.map(p -> p.getResponse())
.filter(response -> response != null)
.collect(Collectors.toSet());
final long start = System.nanoTime();
mergedResponse = responseMapper.mapResponses(uri, method, nodeResponses, merge);
final long nanos = System.nanoTime() - start;
addTiming("Map/Merge Responses", "All Nodes", nanos);
logger.debug("Notifying all that merged response is complete for {}", id);
this.notifyAll();
@ -169,6 +239,8 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
}
}
logTimingInfo();
return getMergedResponse(true);
}
@ -195,6 +267,8 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
}
}
logTimingInfo();
return getMergedResponse(true);
}
@ -217,10 +291,18 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
if (completedCount == responseMap.size()) {
logger.debug("Notifying all that merged response is ready for {}", id);
addTiming("Phase Completed", "All Nodes", System.nanoTime() - phaseStartTime);
final long start = System.nanoTime();
synchronized (this) {
this.notifyAll();
}
final long nanos = System.nanoTime() - start;
timingInfo.put("Notifying All Threads that Request is Complete", nanos);
timingInfo.put("Total Time for All Nodes", System.nanoTime() - creationTime);
if (completionCallback != null) {
completionCallback.onCompletion(this);
}

View File

@ -250,10 +250,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
// we need to ensure that we use proper locking. We don't want two requests modifying the flow at the same
// time, so we use a write lock if the request is mutable and a read lock otherwise.
final Lock lock = isMutableRequest(method, uri.getPath()) ? writeLock : readLock;
logger.debug("Obtaining lock {} in order to replicate request {} {}", method, uri);
logger.debug("Obtaining lock {} in order to replicate request {} {}", lock, method, uri);
lock.lock();
try {
logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri);
logger.debug("Lock {} obtained in order to replicate request {} {}", lock, method, uri);
// Unlocking of the lock is performed within the replicate method, as we need to ensure that it is unlocked only after
// the entire request has completed.
@ -316,44 +316,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
throw new IllegalArgumentException("Cannot replicate request to 0 nodes");
}
// verify all of the nodes exist and are in the proper state
for (final NodeIdentifier nodeId : nodeIds) {
final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
if (status == null) {
throw new UnknownNodeException("Node " + nodeId + " does not exist in this cluster");
}
if (status.getState() != NodeConnectionState.CONNECTED) {
throw new IllegalClusterStateException("Cannot replicate request to Node " + nodeId + " because the node is not connected");
}
}
logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response);
// Update headers to indicate the current revision so that we can
// prevent multiple users changing the flow at the same time
final Map<String, String> updatedHeaders = new HashMap<>(headers);
final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString());
if (performVerification) {
verifyClusterState(method, uri.getPath());
}
int numRequests = responseMap.size();
if (numRequests >= MAX_CONCURRENT_REQUESTS) {
numRequests = purgeExpiredRequests();
}
if (numRequests >= MAX_CONCURRENT_REQUESTS) {
final Map<String, Long> countsByUri = responseMap.values().stream().collect(
Collectors.groupingBy(
StandardAsyncClusterResponse::getURIPath,
Collectors.counting()));
logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", method, uri.getPath(), numRequests, countsByUri);
throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests");
}
// create a response object if one was not already passed to us
if (response == null) {
// create the request objects and replicate to all nodes.
// When the request has completed, we need to ensure that we notify the monitor, if there is one.
final CompletionCallback completionCallback = clusterResponse -> {
@ -372,13 +341,47 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId);
// create a response object if one was not already passed to us
if (response == null) {
response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds,
responseMapper, completionCallback, responseConsumedCallback, merge);
responseMap.put(requestId, response);
}
// verify all of the nodes exist and are in the proper state
for (final NodeIdentifier nodeId : nodeIds) {
final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
if (status == null) {
throw new UnknownNodeException("Node " + nodeId + " does not exist in this cluster");
}
if (status.getState() != NodeConnectionState.CONNECTED) {
throw new IllegalClusterStateException("Cannot replicate request to Node " + nodeId + " because the node is not connected");
}
}
logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response);
if (performVerification) {
final long start = System.nanoTime();
verifyClusterState(method, uri.getPath());
final long nanos = System.nanoTime() - start;
response.addTiming("Verify Cluster State", "All Nodes", nanos);
}
int numRequests = responseMap.size();
if (numRequests >= MAX_CONCURRENT_REQUESTS) {
numRequests = purgeExpiredRequests();
}
if (numRequests >= MAX_CONCURRENT_REQUESTS) {
final Map<String, Long> countsByUri = responseMap.values().stream().collect(
Collectors.groupingBy(
StandardAsyncClusterResponse::getURIPath,
Collectors.counting()));
logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", method, uri.getPath(), numRequests, countsByUri);
throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests");
}
logger.debug("For Request ID {}, response object is {}", requestId, response);
// if mutable request, we have to do a two-phase commit where we ask each node to verify
@ -391,6 +394,8 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId);
performVerification(nodeIds, method, uri, entity, updatedHeaders, response, merge, monitor);
return response;
} else if (mutableRequest) {
response.setPhase(StandardAsyncClusterResponse.COMMIT_PHASE);
}
// Callback function for generating a NodeHttpRequestCallable that can be used to perform the work
@ -407,8 +412,8 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
// replicate the request to all nodes
final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback);
replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders);
nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback, finalResponse);
submitAsyncRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders);
return response;
} catch (final Throwable t) {
@ -431,6 +436,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
final Map<String, String> validationHeaders = new HashMap<>(headers);
validationHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE);
final long startNanos = System.nanoTime();
final int numNodes = nodeIds.size();
final NodeRequestCompletionCallback completionCallback = new NodeRequestCompletionCallback() {
final Set<NodeResponse> nodeResponses = Collections.synchronizedSet(new HashSet<>());
@ -450,9 +456,14 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
}
try {
final long nanos = System.nanoTime() - startNanos;
clusterResponse.addTiming("Completed Verification", nodeResponse.getNodeId().toString(), nanos);
// If we have all of the node responses, then we can verify the responses
// and if good replicate the original request to all of the nodes.
if (allNodesResponded) {
clusterResponse.addTiming("Verification Completed", "All Nodes", nanos);
// Check if we have any requests that do not have a 150-Continue status code.
final long dissentingCount = nodeResponses.stream().filter(p -> p.getStatus() != NODE_CONTINUE_STATUS_CODE).count();
@ -473,9 +484,9 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", dissentingCount, method, uri.getPath());
final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null);
nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null, clusterResponse);
replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders);
submitAsyncRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders);
}
});
cancelLockThread.setName("Cancel Flow Locks");
@ -547,10 +558,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
};
// Callback function for generating a NodeHttpRequestCallable that can be used to perform the work
final Function<NodeIdentifier, NodeHttpRequest> requestFactory = nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, validationHeaders, completionCallback);
final Function<NodeIdentifier, NodeHttpRequest> requestFactory = nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, validationHeaders, completionCallback,
clusterResponse);
// replicate the 'verification request' to all nodes
replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, validationHeaders);
submitAsyncRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, validationHeaders);
}
@ -566,7 +578,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
// Visible for testing - overriding this method makes it easy to verify behavior without actually making any web requests
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId,
final Map<String, String> headers) {
final Map<String, String> headers, final StandardAsyncClusterResponse clusterResponse) {
final ClientResponse clientResponse;
final long startNanos = System.nanoTime();
logger.debug("Replicating request to {} {}, request ID = {}, headers = {}", method, uri, requestId, headers);
@ -594,7 +606,20 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
throw new IllegalArgumentException("HTTP Method '" + method + "' not supported for request replication.");
}
return new NodeResponse(nodeId, method, uri, clientResponse, System.nanoTime() - startNanos, requestId);
final long nanos = System.nanoTime() - startNanos;
clusterResponse.addTiming("Perform HTTP Request", nodeId.toString(), nanos);
final NodeResponse nodeResponse = new NodeResponse(nodeId, method, uri, clientResponse, System.nanoTime() - startNanos, requestId);
if (nodeResponse.is2xx()) {
final int length = nodeResponse.getClientResponse().getLength();
if (length > 0) {
final boolean canBufferResponse = clusterResponse.requestBuffer(length);
if (canBufferResponse) {
nodeResponse.bufferResponse();
}
}
}
return nodeResponse;
}
private boolean isMutableRequest(final String method, final String uriPath) {
@ -708,7 +733,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
}
private void replicateRequest(final Set<NodeIdentifier> nodeIds, final String scheme, final String path,
private void submitAsyncRequest(final Set<NodeIdentifier> nodeIds, final String scheme, final String path,
final Function<NodeIdentifier, NodeHttpRequest> callableFactory, final Map<String, String> headers) {
if (nodeIds.isEmpty()) {
@ -746,20 +771,26 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
private final Object entity;
private final Map<String, String> headers = new HashMap<>();
private final NodeRequestCompletionCallback callback;
private final StandardAsyncClusterResponse clusterResponse;
private final long creationNanos = System.nanoTime();
private NodeHttpRequest(final NodeIdentifier nodeId, final String method,
final URI uri, final Object entity, final Map<String, String> headers, final NodeRequestCompletionCallback callback) {
private NodeHttpRequest(final NodeIdentifier nodeId, final String method, final URI uri, final Object entity, final Map<String, String> headers,
final NodeRequestCompletionCallback callback, final StandardAsyncClusterResponse clusterResponse) {
this.nodeId = nodeId;
this.method = method;
this.uri = uri;
this.entity = entity;
this.headers.putAll(headers);
this.callback = callback;
this.clusterResponse = clusterResponse;
}
@Override
public void run() {
final long waitForScheduleNanos = System.nanoTime() - creationNanos;
clusterResponse.addTiming("Wait for HTTP Request Replication to be triggered", nodeId.toString(), waitForScheduleNanos);
NodeResponse nodeResponse;
try {
@ -768,7 +799,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
final String requestId = headers.get("x-nifi-request-id");
logger.debug("Replicating request {} {} to {}", method, uri.getPath(), nodeId);
nodeResponse = replicateRequest(resourceBuilder, nodeId, method, uri, requestId, headers);
nodeResponse = replicateRequest(resourceBuilder, nodeId, method, uri, requestId, headers, clusterResponse);
} catch (final Exception e) {
nodeResponse = new NodeResponse(nodeId, method, uri, e);
logger.warn("Failed to replicate request {} {} to {} due to {}", method, uri.getPath(), nodeId, e);

View File

@ -16,8 +16,9 @@
*/
package org.apache.nifi.cluster.manager;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.List;
@ -34,6 +35,7 @@ import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.web.api.entity.Entity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -59,11 +61,12 @@ public class NodeResponse {
private final URI requestUri;
private final ClientResponse clientResponse;
private final NodeIdentifier nodeId;
private final Throwable throwable;
private Throwable throwable;
private boolean hasCreatedResponse = false;
private final Entity updatedEntity;
private final long requestDurationNanos;
private final String requestId;
private byte[] bufferedResponse;
public NodeResponse(final NodeIdentifier nodeId, final String httpMethod, final URI requestUri, final ClientResponse clientResponse, final long requestDurationNanos, final String requestId) {
if (nodeId == null) {
@ -158,6 +161,23 @@ public class NodeResponse {
return (500 <= statusCode && statusCode <= 599);
}
public synchronized void bufferResponse() {
bufferedResponse = new byte[clientResponse.getLength()];
try {
StreamUtils.fillBuffer(clientResponse.getEntityInputStream(), bufferedResponse);
} catch (final IOException e) {
this.throwable = e;
}
}
private synchronized InputStream getInputStream() {
if (bufferedResponse == null) {
return clientResponse.getEntityInputStream();
}
return new ByteArrayInputStream(bufferedResponse);
}
public ClientResponse getClientResponse() {
return clientResponse;
}
@ -229,7 +249,6 @@ public class NodeResponse {
for (final String key : clientResponse.getHeaders().keySet()) {
final List<String> values = clientResponse.getHeaders().get(key);
for (final String value : values) {
if (key.equalsIgnoreCase("transfer-encoding") || key.equalsIgnoreCase("content-length")) {
/*
* do not copy the transfer-encoding header (i.e., chunked encoding) or
@ -244,25 +263,19 @@ public class NodeResponse {
*/
continue;
}
responseBuilder.header(key, value);
}
}
// head requests must not have a message-body in the response
if (!HttpMethod.HEAD.equalsIgnoreCase(httpMethod)) {
// set the entity
if (updatedEntity == null) {
responseBuilder.entity(new StreamingOutput() {
@Override
public void write(final OutputStream output) throws IOException, WebApplicationException {
BufferedInputStream bis = null;
try {
bis = new BufferedInputStream(clientResponse.getEntityInputStream());
IOUtils.copy(bis, output);
} finally {
IOUtils.closeQuietly(bis);
}
IOUtils.copy(getInputStream(), output);
}
});
} else {

View File

@ -234,7 +234,7 @@ public class TestThreadPoolRequestReplicator {
= new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
@Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
final URI uri, final String requestId, Map<String, String> givenHeaders) {
final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) {
// the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them.
final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata");
final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
@ -364,7 +364,7 @@ public class TestThreadPoolRequestReplicator {
= new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
@Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
final URI uri, final String requestId, Map<String, String> givenHeaders) {
final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) {
// the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them.
final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata");
final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
@ -574,7 +574,7 @@ public class TestThreadPoolRequestReplicator {
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) {
@Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
final URI uri, final String requestId, Map<String, String> givenHeaders) {
final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) {
if (delayMillis > 0L) {
try {
Thread.sleep(delayMillis);

View File

@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.cluster.ReportedEvent;
import org.apache.nifi.cluster.coordination.flow.FlowElection;
import org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor;
@ -62,6 +63,7 @@ import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.io.socket.ServerSocketConfiguration;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
@ -118,6 +120,9 @@ public class Node {
}
};
final Bundle systemBundle = ExtensionManager.createSystemBundle(properties);
ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet());
revisionManager = Mockito.mock(RevisionManager.class);
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.cluster.integration.NopStateProvider

View File

@ -57,7 +57,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
private StandardFlowFileRecord(final Builder builder) {
this.id = builder.bId;
this.attributes = builder.bAttributes;
this.attributes = builder.bAttributes == null ? Collections.emptyMap() : builder.bAttributes;
this.entryDate = builder.bEntryDate;
this.lineageStartDate = builder.bLineageStartDate;
this.lineageStartIndex = builder.bLineageStartIndex;
@ -176,11 +176,12 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
private final Set<String> bLineageIdentifiers = new HashSet<>();
private long bPenaltyExpirationMs = -1L;
private long bSize = 0L;
private final Map<String, String> bAttributes = new HashMap<>();
private ContentClaim bClaim = null;
private long bClaimOffset = 0L;
private long bLastQueueDate = System.currentTimeMillis();
private long bQueueDateIndex = 0L;
private Map<String, String> bAttributes;
private boolean bAttributesCopied = false;
public Builder id(final long id) {
bId = id;
@ -210,14 +211,28 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
return this;
}
private Map<String, String> initializeAttributes() {
if (bAttributes == null) {
bAttributes = new HashMap<>();
bAttributesCopied = true;
} else if (!bAttributesCopied) {
bAttributes = new HashMap<>(bAttributes);
bAttributesCopied = true;
}
return bAttributes;
}
public Builder addAttribute(final String key, final String value) {
if (key != null && value != null) {
bAttributes.put(FlowFile.KeyValidator.validateKey(key), value);
initializeAttributes().put(FlowFile.KeyValidator.validateKey(key), value);
}
return this;
}
public Builder addAttributes(final Map<String, String> attributes) {
final Map<String, String> initializedAttributes = initializeAttributes();
if (null != attributes) {
for (final String key : attributes.keySet()) {
FlowFile.KeyValidator.validateKey(key);
@ -226,7 +241,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
final String key = entry.getKey();
final String value = entry.getValue();
if (key != null && value != null) {
bAttributes.put(key, value);
initializedAttributes.put(key, value);
}
}
}
@ -240,7 +255,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
continue;
}
bAttributes.remove(key);
initializeAttributes().remove(key);
}
}
return this;
@ -253,7 +268,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
continue;
}
bAttributes.remove(key);
initializeAttributes().remove(key);
}
}
return this;
@ -261,7 +276,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
public Builder removeAttributes(final Pattern keyPattern) {
if (keyPattern != null) {
final Iterator<String> iterator = bAttributes.keySet().iterator();
final Iterator<String> iterator = initializeAttributes().keySet().iterator();
while (iterator.hasNext()) {
final String key = iterator.next();
@ -304,7 +319,8 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
bLineageIdentifiers.clear();
bPenaltyExpirationMs = specFlowFile.getPenaltyExpirationMillis();
bSize = specFlowFile.getSize();
bAttributes.putAll(specFlowFile.getAttributes());
bAttributes = specFlowFile.getAttributes();
bAttributesCopied = false;
bClaim = specFlowFile.getContentClaim();
bClaimOffset = specFlowFile.getContentClaimOffset();
bLastQueueDate = specFlowFile.getLastQueueDate();

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.junit.Test;
public class TestStandardFlowFileRecord {
@Test
public void testAttributeCopiedOnModification() {
final FlowFileRecord original = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", UUID.randomUUID().toString())
.addAttribute("abc", "xyz")
.build();
final FlowFileRecord addAttribute = new StandardFlowFileRecord.Builder()
.fromFlowFile(original)
.addAttribute("hello", "good-bye")
.build();
final Map<String, String> addAttributeMapCopy = new HashMap<>(addAttribute.getAttributes());
assertEquals("good-bye", addAttribute.getAttributes().get("hello"));
assertEquals("xyz", addAttribute.getAttributes().get("abc"));
assertEquals("xyz", original.getAttributes().get("abc"));
assertFalse(original.getAttributes().containsKey("hello"));
final FlowFileRecord removeAttribute = new StandardFlowFileRecord.Builder()
.fromFlowFile(addAttribute)
.removeAttributes("hello")
.build();
assertEquals(original.getAttributes(), removeAttribute.getAttributes());
assertEquals(addAttributeMapCopy, addAttribute.getAttributes());
}
}

View File

@ -16,17 +16,20 @@
*/
package org.apache.nifi.web;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Aspect to limit access into the core.
*/
@Aspect
public class NiFiServiceFacadeLock {
private static final Logger logger = LoggerFactory.getLogger(NiFiServiceFacadeLock.class);
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
@ -36,165 +39,128 @@ public class NiFiServiceFacadeLock {
@Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+ "execution(* create*(..))")
public Object createLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
writeLock.lock();
try {
return proceedingJoinPoint.proceed();
} finally {
writeLock.unlock();
}
return proceedWithWriteLock(proceedingJoinPoint);
}
@Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+ "execution(* clear*(..))")
public Object clearLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
writeLock.lock();
try {
return proceedingJoinPoint.proceed();
} finally {
writeLock.unlock();
}
return proceedWithWriteLock(proceedingJoinPoint);
}
@Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+ "execution(* delete*(..))")
public Object deleteLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
writeLock.lock();
try {
return proceedingJoinPoint.proceed();
} finally {
writeLock.unlock();
}
return proceedWithWriteLock(proceedingJoinPoint);
}
@Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+ "execution(* remove*(..))")
public Object removeLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
writeLock.lock();
try {
return proceedingJoinPoint.proceed();
} finally {
writeLock.unlock();
}
return proceedWithWriteLock(proceedingJoinPoint);
}
@Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+ "execution(* update*(..))")
public Object updateLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
writeLock.lock();
try {
return proceedingJoinPoint.proceed();
} finally {
writeLock.unlock();
}
return proceedWithWriteLock(proceedingJoinPoint);
}
@Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+ "execution(* set*(..))")
public Object setLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
writeLock.lock();
try {
return proceedingJoinPoint.proceed();
} finally {
writeLock.unlock();
}
return proceedWithWriteLock(proceedingJoinPoint);
}
@Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+ "execution(* copy*(..))")
public Object copyLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
writeLock.lock();
try {
return proceedingJoinPoint.proceed();
} finally {
writeLock.unlock();
}
return proceedWithWriteLock(proceedingJoinPoint);
}
@Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+ "execution(* import*(..))")
public Object importLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
writeLock.lock();
try {
return proceedingJoinPoint.proceed();
} finally {
writeLock.unlock();
}
return proceedWithWriteLock(proceedingJoinPoint);
}
@Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+ "execution(* export*(..))")
public Object exportLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
writeLock.lock();
try {
return proceedingJoinPoint.proceed();
} finally {
writeLock.unlock();
}
return proceedWithWriteLock(proceedingJoinPoint);
}
@Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+ "execution(* submit*(..))")
public Object submitLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
writeLock.lock();
try {
return proceedingJoinPoint.proceed();
} finally {
writeLock.unlock();
}
return proceedWithWriteLock(proceedingJoinPoint);
}
@Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+ "execution(* schedule*(..))")
public Object scheduleLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
writeLock.lock();
try {
return proceedingJoinPoint.proceed();
} finally {
writeLock.unlock();
}
return proceedWithWriteLock(proceedingJoinPoint);
}
@Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+ "execution(* get*(..))")
public Object getLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
readLock.lock();
try {
return proceedingJoinPoint.proceed();
} finally {
readLock.unlock();
}
return proceedWithReadLock(proceedingJoinPoint);
}
@Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+ "execution(* is*(..))")
public Object isLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
readLock.lock();
try {
return proceedingJoinPoint.proceed();
} finally {
readLock.unlock();
}
return proceedWithReadLock(proceedingJoinPoint);
}
@Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+ "execution(* search*(..))")
public Object searchLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
readLock.lock();
try {
return proceedingJoinPoint.proceed();
} finally {
readLock.unlock();
}
return proceedWithReadLock(proceedingJoinPoint);
}
@Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+ "execution(* verify*(..))")
public Object verifyLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
return proceedWithReadLock(proceedingJoinPoint);
}
private Object proceedWithReadLock(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
final long beforeLock = System.nanoTime();
long afterLock = 0L;
readLock.lock();
try {
afterLock = System.nanoTime();
return proceedingJoinPoint.proceed();
} finally {
readLock.unlock();
final long afterProcedure = System.nanoTime();
final String procedure = proceedingJoinPoint.getSignature().toLongString();
logger.debug("In order to perform procedure {}, it took {} nanos to obtain the Read Lock {} and {} nanos to invoke the method",
procedure, afterLock - beforeLock, readLock, afterProcedure - afterLock);
}
}
private Object proceedWithWriteLock(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
final long beforeLock = System.nanoTime();
long afterLock = 0L;
writeLock.lock();
try {
afterLock = System.nanoTime();
return proceedingJoinPoint.proceed();
} finally {
writeLock.unlock();
final long afterProcedure = System.nanoTime();
final String procedure = proceedingJoinPoint.getSignature().toLongString();
logger.debug("In order to perform procedure {}, it took {} nanos to obtain the Write Lock {} and {} nanos to invoke the method",
procedure, afterLock - beforeLock, writeLock, afterProcedure - afterLock);
}
}
}

View File

@ -978,11 +978,22 @@ public abstract class ApplicationResource {
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
// to the cluster nodes themselves.
final long replicateStart = System.nanoTime();
String action = null;
try {
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
action = "Replicate Request " + method + " " + path;
return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse();
} else {
action = "Forward Request " + method + " " + path + " to Coordinator";
return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, path, entity, headers).awaitMergedResponse();
}
} finally {
final long replicateNanos = System.nanoTime() - replicateStart;
final String transactionId = headers.get(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
final String requestId = transactionId == null ? "Request with no ID" : transactionId;
logger.debug("Took a total of {} millis to {} for {}", TimeUnit.NANOSECONDS.toMillis(replicateNanos), action, requestId);
}
}
/**