mirror of https://github.com/apache/nifi.git
NIFI-13718 Switched Request Replication to Web Client Service (#9234)
- Added Request Replication Header enumeration with lowercased header names for HTTP/2
This commit is contained in:
parent
6355812a77
commit
6e5a276cb2
|
@ -38,6 +38,11 @@
|
|||
<artifactId>nifi-framework-components</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-web-client-api</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-web-client</artifactId>
|
||||
|
@ -95,11 +100,6 @@
|
|||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.jetbrains</groupId>
|
||||
<artifactId>annotations</artifactId>
|
||||
<version>24.1.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>c2-protocol-component-api</artifactId>
|
||||
|
@ -207,10 +207,6 @@
|
|||
<groupId>jakarta.xml.bind</groupId>
|
||||
<artifactId>jakarta.xml.bind-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.squareup.okhttp3</groupId>
|
||||
<artifactId>okhttp</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- spring dependencies -->
|
||||
<dependency>
|
||||
|
@ -251,6 +247,18 @@
|
|||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<configuration>
|
||||
<archive>
|
||||
<manifest>
|
||||
<!-- Add Implementation-Version for User-Agent Header in replicated requests -->
|
||||
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
|
||||
</manifest>
|
||||
</archive>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
|
|
@ -95,7 +95,6 @@ import org.apache.nifi.cluster.coordination.http.endpoints.UserGroupEndpointMerg
|
|||
import org.apache.nifi.cluster.coordination.http.endpoints.UserGroupsEndpointMerger;
|
||||
import org.apache.nifi.cluster.coordination.http.endpoints.UsersEndpointMerger;
|
||||
import org.apache.nifi.cluster.coordination.http.endpoints.VerifyConfigEndpointMerger;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
|
||||
import org.apache.nifi.cluster.manager.NodeResponse;
|
||||
import org.apache.nifi.stream.io.NullOutputStream;
|
||||
import org.apache.nifi.util.FormatUtils;
|
||||
|
@ -104,6 +103,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -282,7 +282,7 @@ public class StandardHttpResponseMapper implements HttpResponseMapper {
|
|||
responses.stream()
|
||||
.parallel() // "parallelize" the draining of the responses, since we have multiple streams to consume
|
||||
.filter(response -> response != exclude) // don't include the explicitly excluded node
|
||||
.filter(response -> response.getStatus() != RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any continue responses because they contain no content
|
||||
.filter(response -> response.getStatus() != HttpURLConnection.HTTP_ACCEPTED) // don't include any continue responses because they contain no content
|
||||
.forEach(this::drainResponse); // drain all node responses that didn't get filtered out
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.nifi.cluster.coordination.http.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
|
||||
import jakarta.ws.rs.core.Response;
|
||||
|
@ -26,6 +27,6 @@ public interface HttpReplicationClient {
|
|||
|
||||
PreparedRequest prepareRequest(String method, Map<String, String> headers, Object entity);
|
||||
|
||||
Response replicate(PreparedRequest request, String uri) throws IOException;
|
||||
Response replicate(PreparedRequest request, URI uri) throws IOException;
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.cluster.coordination.http.replication;
|
||||
|
||||
/**
|
||||
* Enumeration of HTTP headers for Request Replication with lowercasing for compatibility with HTTP/2
|
||||
*/
|
||||
public enum RequestReplicationHeader {
|
||||
/**
|
||||
* Indicator to cancel transaction processing
|
||||
*/
|
||||
CANCEL_TRANSACTION("cancel-transaction"),
|
||||
|
||||
/**
|
||||
* Seed for deterministic cluster identifier generation
|
||||
*/
|
||||
CLUSTER_ID_GENERATION_SEED("cluster-id-generation-seed"),
|
||||
|
||||
/**
|
||||
* Indicator to continue transaction processing
|
||||
*/
|
||||
EXECUTION_CONTINUE("execution-continue"),
|
||||
|
||||
/**
|
||||
* When replicating a request to the cluster coordinator, it may be useful to denote that the request should
|
||||
* be replicated only to a single node. This happens, for instance, when retrieving a Provenance Event that
|
||||
* we know lives on a specific node. This request must still be replicated through the cluster coordinator.
|
||||
* This header tells the cluster coordinator the UUID's (comma-separated list, possibly with spaces between)
|
||||
* of the nodes that the request should be replicated to.
|
||||
*/
|
||||
REPLICATION_TARGET_ID("replication-target-id"),
|
||||
|
||||
/**
|
||||
* When we replicate a request across the cluster, we replicate it only from the cluster coordinator.
|
||||
* If the request needs to be replicated by another node, it first replicates the request to the coordinator,
|
||||
* which then replicates the request on the node's behalf. This header name and value are used to denote
|
||||
* that the request has already been to the cluster coordinator, and the cluster coordinator is the one replicating
|
||||
* the request. This allows us to know that the request should be serviced, rather than proxied back to the
|
||||
* cluster coordinator.
|
||||
*/
|
||||
REQUEST_REPLICATED("request-replicated"),
|
||||
|
||||
/**
|
||||
* Transaction Identifier for replicated requests
|
||||
*/
|
||||
REQUEST_TRANSACTION_ID("request-transaction-id"),
|
||||
|
||||
/**
|
||||
* The HTTP header that the requestor specifies to ask a node if they are able to process a given request.
|
||||
* The value is always 202-Accepted. The node will respond with 202 ACCEPTED if it is able to
|
||||
* process the request, 417 EXPECTATION_FAILED otherwise.
|
||||
*/
|
||||
VALIDATION_EXPECTS("validation-expects");
|
||||
|
||||
private final String header;
|
||||
|
||||
RequestReplicationHeader(final String header) {
|
||||
this.header = header;
|
||||
}
|
||||
|
||||
public String getHeader() {
|
||||
return header;
|
||||
}
|
||||
}
|
|
@ -26,48 +26,6 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
|
||||
public interface RequestReplicator {
|
||||
|
||||
public static final String REQUEST_TRANSACTION_ID_HEADER = "X-RequestTransactionId";
|
||||
public static final String CLUSTER_ID_GENERATION_SEED_HEADER = "X-Cluster-Id-Generation-Seed";
|
||||
|
||||
/**
|
||||
* The HTTP header that the requestor specifies to ask a node if they are able to process a given request.
|
||||
* The value is always 202-Accepted. The node will respond with 202 ACCEPTED if it is able to
|
||||
* process the request, 417 EXPECTATION_FAILED otherwise.
|
||||
*/
|
||||
public static final String REQUEST_VALIDATION_HTTP_HEADER = "X-Validation-Expects";
|
||||
public static final String NODE_CONTINUE = "202-Accepted";
|
||||
public static final int NODE_CONTINUE_STATUS_CODE = 202;
|
||||
|
||||
/**
|
||||
* Indicates that the request is intended to cancel a transaction that was previously created without performing the action
|
||||
*/
|
||||
public static final String REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER = "X-Cancel-Transaction";
|
||||
|
||||
/**
|
||||
* Indicates that this is the second phase of the two phase commit and the execution of the action should proceed.
|
||||
*/
|
||||
public static final String REQUEST_EXECUTION_HTTP_HEADER = "X-Execution-Continue";
|
||||
|
||||
/**
|
||||
* When we replicate a request across the cluster, we replicate it only from the cluster coordinator.
|
||||
* If the request needs to be replicated by another node, it first replicates the request to the coordinator,
|
||||
* which then replicates the request on the node's behalf. This header name and value are used to denote
|
||||
* that the request has already been to the cluster coordinator, and the cluster coordinator is the one replicating
|
||||
* the request. This allows us to know that the request should be serviced, rather than proxied back to the
|
||||
* cluster coordinator.
|
||||
*/
|
||||
public static final String REPLICATION_INDICATOR_HEADER = "X-Request-Replicated";
|
||||
|
||||
/**
|
||||
* When replicating a request to the cluster coordinator, it may be useful to denote that the request should
|
||||
* be replicated only to a single node. This happens, for instance, when retrieving a Provenance Event that
|
||||
* we know lives on a specific node. This request must still be replicated through the cluster coordinator.
|
||||
* This header tells the cluster coordinator the UUID's (comma-separated list, possibly with spaces between)
|
||||
* of the nodes that the request should be replicated to.
|
||||
*/
|
||||
public static final String REPLICATION_TARGET_NODE_UUID_HEADER = "X-Replication-Target-Id";
|
||||
|
||||
/**
|
||||
* Stops the instance from replicating requests. Calling this method on a stopped instance has no effect.
|
||||
*/
|
||||
|
|
|
@ -153,8 +153,8 @@ public class StandardUploadRequestReplicator implements UploadRequestReplicator
|
|||
.uri(requestUri)
|
||||
.body(inputStream, OptionalLong.of(inputStream.available()))
|
||||
// Special NiFi-specific headers to indicate that the request should be performed and not replicated to the nodes
|
||||
.header(RequestReplicator.REQUEST_EXECUTION_HTTP_HEADER, "true")
|
||||
.header(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true")
|
||||
.header(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader(), Boolean.TRUE.toString())
|
||||
.header(RequestReplicationHeader.REQUEST_REPLICATED.getHeader(), Boolean.TRUE.toString())
|
||||
.header(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user))
|
||||
.header(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS, ProxiedEntitiesUtils.buildProxiedEntityGroupsString(user.getIdentityProviderGroups()));
|
||||
|
||||
|
|
|
@ -61,6 +61,7 @@ import jakarta.ws.rs.core.Response.Status;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -98,6 +99,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator, Closeable
|
|||
|
||||
private static final String COOKIE_HEADER = "Cookie";
|
||||
private static final String HOST_HEADER = "Host";
|
||||
private static final String NODE_CONTINUE = "202-Accepted";
|
||||
|
||||
private final int maxConcurrentRequests; // maximum number of concurrent requests
|
||||
private final HttpResponseMapper responseMapper;
|
||||
|
@ -276,9 +278,9 @@ public class ThreadPoolRequestReplicator implements RequestReplicator, Closeable
|
|||
final boolean indicateReplicated, final boolean performVerification) {
|
||||
final Map<String, String> updatedHeaders = new HashMap<>(headers);
|
||||
|
||||
updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, ComponentIdGenerator.generateId().toString());
|
||||
updatedHeaders.put(RequestReplicationHeader.CLUSTER_ID_GENERATION_SEED.getHeader(), ComponentIdGenerator.generateId().toString());
|
||||
if (indicateReplicated) {
|
||||
updatedHeaders.put(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true");
|
||||
updatedHeaders.put(RequestReplicationHeader.REQUEST_REPLICATED.getHeader(), Boolean.TRUE.toString());
|
||||
}
|
||||
|
||||
// include the proxied entities header
|
||||
|
@ -380,7 +382,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator, Closeable
|
|||
// Update headers to indicate the current revision so that we can
|
||||
// prevent multiple users changing the flow at the same time
|
||||
final Map<String, String> updatedHeaders = new HashMap<>(headers);
|
||||
final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString());
|
||||
final String requestId = computeRequestId(updatedHeaders);
|
||||
|
||||
long verifyClusterStateNanos = -1;
|
||||
if (performVerification) {
|
||||
|
@ -458,7 +460,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator, Closeable
|
|||
|
||||
// instruct the node to actually perform the underlying action
|
||||
if (mutableRequest && executionPhase) {
|
||||
updatedHeaders.put(REQUEST_EXECUTION_HTTP_HEADER, "true");
|
||||
updatedHeaders.put(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader(), "true");
|
||||
}
|
||||
|
||||
// replicate the request to all nodes
|
||||
|
@ -486,13 +488,16 @@ public class ThreadPoolRequestReplicator implements RequestReplicator, Closeable
|
|||
}
|
||||
}
|
||||
|
||||
private String computeRequestId(final Map<String, String> headers) {
|
||||
return headers.computeIfAbsent(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader(), header -> UUID.randomUUID().toString());
|
||||
}
|
||||
|
||||
private void performVerification(final Set<NodeIdentifier> nodeIds, final String method, final URI uri, final Object entity, final Map<String, String> headers,
|
||||
final StandardAsyncClusterResponse clusterResponse, final boolean merge, final Object monitor) {
|
||||
logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath());
|
||||
|
||||
final Map<String, String> validationHeaders = new HashMap<>(headers);
|
||||
validationHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE);
|
||||
validationHeaders.put(RequestReplicationHeader.VALIDATION_EXPECTS.getHeader(), NODE_CONTINUE);
|
||||
|
||||
final long startNanos = System.nanoTime();
|
||||
final int numNodes = nodeIds.size();
|
||||
|
@ -523,7 +528,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator, Closeable
|
|||
clusterResponse.addTiming("Verification Completed", "All Nodes", nanos);
|
||||
|
||||
// Check if we have any requests that do not have a 202-Accepted status code.
|
||||
final long dissentingCount = nodeResponses.stream().filter(p -> p.getStatus() != NODE_CONTINUE_STATUS_CODE).count();
|
||||
final long dissentingCount = nodeResponses.stream().filter(p -> p.getStatus() != HttpURLConnection.HTTP_ACCEPTED).count();
|
||||
|
||||
// If all nodes responded with 202-Accepted, then we can replicate the original request
|
||||
// to all nodes and we are finished.
|
||||
|
@ -535,7 +540,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator, Closeable
|
|||
|
||||
try {
|
||||
final Map<String, String> cancelLockHeaders = new HashMap<>(headers);
|
||||
cancelLockHeaders.put(REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER, "true");
|
||||
cancelLockHeaders.put(RequestReplicationHeader.CANCEL_TRANSACTION.getHeader(), "true");
|
||||
final Thread cancelLockThread = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -554,7 +559,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator, Closeable
|
|||
// Add a NodeResponse for each node to the Cluster Response
|
||||
// Check that all nodes responded successfully.
|
||||
for (final NodeResponse response : nodeResponses) {
|
||||
if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) {
|
||||
if (response.getStatus() != HttpURLConnection.HTTP_ACCEPTED) {
|
||||
final Response clientResponse = response.getClientResponse();
|
||||
|
||||
final String message;
|
||||
|
@ -645,7 +650,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator, Closeable
|
|||
logger.debug("Replicating request to {} {}, request ID = {}, headers = {}", request.getMethod(), uri, requestId, request.getHeaders());
|
||||
|
||||
// invoke the request
|
||||
response = httpClient.replicate(request, uri.toString());
|
||||
response = httpClient.replicate(request, uri);
|
||||
|
||||
final long nanos = System.nanoTime() - startNanos;
|
||||
clusterResponse.addTiming("Perform HTTP Request", nodeId.toString(), nanos);
|
||||
|
@ -866,7 +871,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator, Closeable
|
|||
|
||||
try {
|
||||
// create and send the request
|
||||
final String requestId = request.getHeaders().get("x-nifi-request-id");
|
||||
final String requestId = request.getHeaders().get(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
|
||||
logger.debug("Replicating request {} {} to {}", method, uri.getPath(), nodeId);
|
||||
|
||||
nodeResponse = replicateRequest(request, nodeId, uri, requestId, clusterResponse);
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.cluster.coordination.http.replication.client;
|
||||
|
||||
/**
|
||||
* Enumeration of HTTP headers for preparing replicated requests
|
||||
*/
|
||||
enum PreparedRequestHeader {
|
||||
ACCEPT_ENCODING("accept-encoding"),
|
||||
|
||||
CONTENT_ENCODING("content-encoding"),
|
||||
|
||||
CONTENT_LENGTH("content-length"),
|
||||
|
||||
CONTENT_TYPE("content-type"),
|
||||
|
||||
USER_AGENT("user-agent");
|
||||
|
||||
private final String header;
|
||||
|
||||
PreparedRequestHeader(final String header) {
|
||||
this.header = header;
|
||||
}
|
||||
|
||||
String getHeader() {
|
||||
return header;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,376 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.cluster.coordination.http.replication.client;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.module.jakarta.xmlbind.JakartaXmlBindAnnotationIntrospector;
|
||||
import jakarta.ws.rs.core.MultivaluedHashMap;
|
||||
import jakarta.ws.rs.core.MultivaluedMap;
|
||||
import jakarta.ws.rs.core.Response;
|
||||
|
||||
import org.apache.nifi.cluster.coordination.http.replication.HttpReplicationClient;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.PreparedRequest;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.io.EntitySerializer;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.io.JacksonResponse;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.io.JsonEntitySerializer;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.io.XmlEntitySerializer;
|
||||
import org.apache.nifi.web.client.api.HttpEntityHeaders;
|
||||
import org.apache.nifi.web.client.api.HttpRequestBodySpec;
|
||||
import org.apache.nifi.web.client.api.HttpRequestMethod;
|
||||
import org.apache.nifi.web.client.api.HttpResponseEntity;
|
||||
import org.apache.nifi.web.client.api.HttpUriBuilder;
|
||||
import org.apache.nifi.web.client.api.WebClientService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.net.URI;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Set;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
/**
|
||||
* Standard HTTP Replication Client based on Web Client Service
|
||||
*/
|
||||
public class StandardHttpReplicationClient implements HttpReplicationClient {
|
||||
private static final Set<String> REQUEST_BODY_METHODS = Set.of("PATCH", "POST", "PUT");
|
||||
|
||||
private static final Set<String> DISALLOWED_HEADERS = Set.of("connection", "content-length", "expect", "host", "upgrade");
|
||||
|
||||
private static final char PSEUDO_HEADER_PREFIX = ':';
|
||||
|
||||
private static final String GZIP_ENCODING = "gzip";
|
||||
|
||||
private static final String QUERY_SEPARATOR = "&";
|
||||
|
||||
private static final String QUERY_NAME_VALUE_SEPARATOR = "=";
|
||||
|
||||
private static final String APPLICATION_JSON_CONTENT_TYPE = "application/json";
|
||||
|
||||
private static final String APPLICATION_XML_CONTENT_TYPE = "application/xml";
|
||||
|
||||
private static final String USER_AGENT_PRODUCT = "Apache NiFi";
|
||||
|
||||
private static final String USER_AGENT_FORMAT = "%s/%s";
|
||||
|
||||
private static final String USER_AGENT_VERSION = "SNAPSHOT";
|
||||
|
||||
private static final String USER_AGENT;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(StandardHttpReplicationClient.class);
|
||||
|
||||
static {
|
||||
final Package clientPackage = StandardHttpReplicationClient.class.getPackage();
|
||||
final String userAgentVersion;
|
||||
if (clientPackage == null || clientPackage.getImplementationVersion() == null) {
|
||||
userAgentVersion = USER_AGENT_VERSION;
|
||||
} else {
|
||||
// Set User Agent Version from JAR MANIFEST.MF Version when found
|
||||
userAgentVersion = clientPackage.getImplementationVersion();
|
||||
}
|
||||
USER_AGENT = USER_AGENT_FORMAT.formatted(USER_AGENT_PRODUCT, userAgentVersion);
|
||||
}
|
||||
|
||||
private final WebClientService webClientService;
|
||||
|
||||
private final Supplier<HttpUriBuilder> httpUriBuilderSupplier;
|
||||
|
||||
private final EntitySerializer jsonSerializer;
|
||||
|
||||
private final EntitySerializer xmlSerializer;
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
public StandardHttpReplicationClient(final WebClientService webClientService, final Supplier<HttpUriBuilder> httpUriBuilderSupplier) {
|
||||
this.webClientService = Objects.requireNonNull(webClientService, "Web Client Service required");
|
||||
this.httpUriBuilderSupplier = Objects.requireNonNull(httpUriBuilderSupplier, "HTTP URI Builder supplier required");
|
||||
|
||||
objectMapper.setDefaultPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL, JsonInclude.Include.ALWAYS));
|
||||
objectMapper.setAnnotationIntrospector(new JakartaXmlBindAnnotationIntrospector(objectMapper.getTypeFactory()));
|
||||
|
||||
jsonSerializer = new JsonEntitySerializer(objectMapper);
|
||||
xmlSerializer = new XmlEntitySerializer();
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare Request for Replication with serialized Request Entity
|
||||
*
|
||||
* @param method HTTP Method
|
||||
* @param headers HTTP Request Headers
|
||||
* @param requestEntity Request Entity to be serialized
|
||||
* @return Prepared Request for replication
|
||||
*/
|
||||
@Override
|
||||
public PreparedRequest prepareRequest(final String method, final Map<String, String> headers, final Object requestEntity) {
|
||||
final Map<String, String> preparedHeaders = getPreparedHeaders(headers, method);
|
||||
final byte[] requestBody = getRequestBody(requestEntity, preparedHeaders);
|
||||
return new StandardPreparedRequest(method, preparedHeaders, requestEntity, requestBody);
|
||||
}
|
||||
|
||||
/**
|
||||
* Replicate Prepared HTTP Request to destination URI
|
||||
*
|
||||
* @param request Prepared HTTP Request for replication
|
||||
* @param uri Destination URI for sending the request
|
||||
* @return Jakarta REST Response
|
||||
* @throws IOException Thrown on communication failures sending requests or retrieving responses
|
||||
*/
|
||||
@Override
|
||||
public Response replicate(final PreparedRequest request, final URI uri) throws IOException {
|
||||
if (request instanceof StandardPreparedRequest preparedRequest) {
|
||||
return replicate(preparedRequest, uri);
|
||||
} else {
|
||||
throw new IllegalArgumentException("HTTP Prepared Request not provided");
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, String> getPreparedHeaders(final Map<String, String> headers, final String method) {
|
||||
final Map<String, String> preparedHeaders = new LinkedHashMap<>();
|
||||
|
||||
for (final Map.Entry<String, String> header : headers.entrySet()) {
|
||||
final String headerName = header.getKey().toLowerCase();
|
||||
if (PreparedRequestHeader.ACCEPT_ENCODING.getHeader().equals(headerName)) {
|
||||
// Remove Accept-Encoding from original client request in favor of specific value for replication
|
||||
continue;
|
||||
}
|
||||
|
||||
final String headerValue = header.getValue();
|
||||
preparedHeaders.put(headerName, headerValue);
|
||||
}
|
||||
|
||||
// Set Accept-Encoding to request gzip encoded responses
|
||||
preparedHeaders.put(PreparedRequestHeader.ACCEPT_ENCODING.getHeader(), GZIP_ENCODING);
|
||||
|
||||
processContentType(method, preparedHeaders);
|
||||
processUserAgent(preparedHeaders);
|
||||
return preparedHeaders;
|
||||
}
|
||||
|
||||
private Response replicate(final StandardPreparedRequest preparedRequest, final URI location) throws IOException {
|
||||
final HttpRequestMethod requestMethod = getRequestMethod(preparedRequest);
|
||||
final URI requestUri = getRequestUri(preparedRequest, location);
|
||||
|
||||
final HttpRequestBodySpec httpRequestBodySpec = webClientService.method(requestMethod).uri(requestUri);
|
||||
|
||||
final Map<String, String> requestHeaders = preparedRequest.headers();
|
||||
for (final Map.Entry<String, String> requestHeader : requestHeaders.entrySet()) {
|
||||
final String headerName = requestHeader.getKey();
|
||||
final String headerNameLowerCased = headerName.toLowerCase();
|
||||
if (!DISALLOWED_HEADERS.contains(headerNameLowerCased)) {
|
||||
httpRequestBodySpec.header(headerName, requestHeader.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
if (REQUEST_BODY_METHODS.contains(requestMethod.getMethod())) {
|
||||
final byte[] requestBody = preparedRequest.requestBody();
|
||||
final ByteArrayInputStream body = new ByteArrayInputStream(requestBody);
|
||||
final OptionalLong contentLength = OptionalLong.of(requestBody.length);
|
||||
httpRequestBodySpec.body(body, contentLength);
|
||||
}
|
||||
|
||||
return replicate(httpRequestBodySpec, preparedRequest.method(), location);
|
||||
}
|
||||
|
||||
private Response replicate(final HttpRequestBodySpec httpRequestBodySpec, final String method, final URI location) throws IOException {
|
||||
final long started = System.currentTimeMillis();
|
||||
|
||||
try (HttpResponseEntity responseEntity = httpRequestBodySpec.retrieve()) {
|
||||
final int statusCode = responseEntity.statusCode();
|
||||
final HttpEntityHeaders headers = responseEntity.headers();
|
||||
final MultivaluedMap<String, String> responseHeaders = getResponseHeaders(headers);
|
||||
final byte[] responseBody = getResponseBody(responseEntity.body(), headers);
|
||||
|
||||
final long elapsed = System.currentTimeMillis() - started;
|
||||
logger.debug("Replicated {} {} HTTP {} in {} ms", method, location, statusCode, elapsed);
|
||||
|
||||
return new JacksonResponse(objectMapper, responseBody, responseHeaders, location, statusCode, null);
|
||||
}
|
||||
}
|
||||
|
||||
private URI getRequestUri(final StandardPreparedRequest preparedRequest, final URI location) {
|
||||
final HttpUriBuilder httpUriBuilder = httpUriBuilderSupplier.get();
|
||||
|
||||
httpUriBuilder.scheme(location.getScheme());
|
||||
httpUriBuilder.host(location.getHost());
|
||||
httpUriBuilder.port(location.getPort());
|
||||
httpUriBuilder.encodedPath(location.getPath());
|
||||
|
||||
final String query = location.getQuery();
|
||||
if (query != null) {
|
||||
final String[] parameters = query.split(QUERY_SEPARATOR);
|
||||
for (final String parameter : parameters) {
|
||||
final String[] parameterNameValue = parameter.split(QUERY_NAME_VALUE_SEPARATOR);
|
||||
if (parameterNameValue.length == 1) {
|
||||
final String parameterName = parameterNameValue[0];
|
||||
httpUriBuilder.addQueryParameter(parameterName, null);
|
||||
} else if (parameterNameValue.length == 2) {
|
||||
final String parameterName = parameterNameValue[0];
|
||||
final String parameterValue = parameterNameValue[1];
|
||||
httpUriBuilder.addQueryParameter(parameterName, parameterValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final Object requestEntity = preparedRequest.entity();
|
||||
if (requestEntity instanceof MultivaluedMap<?, ?> parameterEntity) {
|
||||
for (final Object key : parameterEntity.keySet()) {
|
||||
final String parameterName = key.toString();
|
||||
final Object parameterValues = parameterEntity.get(parameterName);
|
||||
if (parameterValues instanceof List<?> values) {
|
||||
for (final Object value : values) {
|
||||
httpUriBuilder.addQueryParameter(parameterName, value.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return httpUriBuilder.build();
|
||||
}
|
||||
|
||||
private HttpRequestMethod getRequestMethod(final PreparedRequest preparedRequest) {
|
||||
final String method = preparedRequest.getMethod();
|
||||
return new HttpRequestMethod() {
|
||||
@Override
|
||||
public String getMethod() {
|
||||
return method;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return method;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private MultivaluedMap<String, String> getResponseHeaders(final HttpEntityHeaders responseHeaders) {
|
||||
final MultivaluedMap<String, String> headers = new MultivaluedHashMap<>();
|
||||
for (final String name : responseHeaders.getHeaderNames()) {
|
||||
// Remove pseudo-headers returned from HTTP/2 responses
|
||||
if (name.charAt(0) == PSEUDO_HEADER_PREFIX) {
|
||||
continue;
|
||||
}
|
||||
// Remove Content-Encoding Response Header to align with gzip decoding of Response Body
|
||||
if (PreparedRequestHeader.CONTENT_ENCODING.getHeader().equalsIgnoreCase(name)) {
|
||||
continue;
|
||||
}
|
||||
// Remove Content-Length Response Header to align with gzip decoding of Response Body
|
||||
if (PreparedRequestHeader.CONTENT_LENGTH.getHeader().equalsIgnoreCase(name)) {
|
||||
continue;
|
||||
}
|
||||
final List<String> values = responseHeaders.getHeader(name);
|
||||
headers.addAll(name, values);
|
||||
}
|
||||
return headers;
|
||||
}
|
||||
|
||||
private byte[] getResponseBody(final InputStream inputStream, final HttpEntityHeaders responseHeaders) throws IOException {
|
||||
final boolean gzipEncoded = isGzipEncoded(responseHeaders);
|
||||
|
||||
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
try (InputStream responseBodyStream = gzipEncoded ? new GZIPInputStream(inputStream) : inputStream) {
|
||||
responseBodyStream.transferTo(outputStream);
|
||||
}
|
||||
return outputStream.toByteArray();
|
||||
}
|
||||
|
||||
private byte[] getRequestBody(final Object requestEntity, final Map<String, String> headers) {
|
||||
final Optional<String> contentTypeFound = getContentType(headers);
|
||||
final String contentType = contentTypeFound.orElse(APPLICATION_JSON_CONTENT_TYPE);
|
||||
final EntitySerializer serializer = getSerializer(contentType);
|
||||
|
||||
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
try {
|
||||
serializer.serialize(requestEntity, outputStream);
|
||||
} catch (final IOException e) {
|
||||
throw new UncheckedIOException("Request Entity serialization failed", e);
|
||||
}
|
||||
|
||||
return outputStream.toByteArray();
|
||||
}
|
||||
|
||||
private void processContentType(final String method, final Map<String, String> headers) {
|
||||
if (REQUEST_BODY_METHODS.contains(method)) {
|
||||
final Optional<String> contentTypeHeaderFound = getHeaderName(headers, PreparedRequestHeader.CONTENT_TYPE);
|
||||
if (contentTypeHeaderFound.isEmpty()) {
|
||||
// Set default Content-Type to JSON
|
||||
headers.put(PreparedRequestHeader.CONTENT_TYPE.getHeader(), APPLICATION_JSON_CONTENT_TYPE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void processUserAgent(final Map<String, String> headers) {
|
||||
final Optional<String> userAgentHeaderFound = getHeaderName(headers, PreparedRequestHeader.USER_AGENT);
|
||||
final String userAgentHeader = userAgentHeaderFound.orElseGet(PreparedRequestHeader.USER_AGENT::getHeader);
|
||||
headers.put(userAgentHeader, USER_AGENT);
|
||||
}
|
||||
|
||||
private EntitySerializer getSerializer(final String contentType) {
|
||||
final EntitySerializer serializer;
|
||||
|
||||
if (APPLICATION_XML_CONTENT_TYPE.equalsIgnoreCase(contentType)) {
|
||||
serializer = xmlSerializer;
|
||||
} else {
|
||||
serializer = jsonSerializer;
|
||||
}
|
||||
|
||||
return serializer;
|
||||
}
|
||||
|
||||
private boolean isGzipEncoded(final HttpEntityHeaders headers) {
|
||||
final Optional<String> contentEncodingFound = headers.getHeaderNames()
|
||||
.stream()
|
||||
.filter(PreparedRequestHeader.CONTENT_ENCODING.getHeader()::equalsIgnoreCase)
|
||||
.map(headers::getFirstHeader)
|
||||
.filter(Optional::isPresent)
|
||||
.map(Optional::get)
|
||||
.findFirst();
|
||||
|
||||
return contentEncodingFound.map(GZIP_ENCODING::equalsIgnoreCase).orElse(false);
|
||||
}
|
||||
|
||||
private Optional<String> getContentType(final Map<String, String> headers) {
|
||||
final Optional<String> headerNameFound = getHeaderName(headers, PreparedRequestHeader.CONTENT_TYPE);
|
||||
|
||||
final String header;
|
||||
if (headerNameFound.isPresent()) {
|
||||
final String name = headerNameFound.get();
|
||||
header = headers.get(name);
|
||||
} else {
|
||||
header = null;
|
||||
}
|
||||
|
||||
return Optional.ofNullable(header);
|
||||
}
|
||||
|
||||
private Optional<String> getHeaderName(final Map<String, String> headers, final PreparedRequestHeader httpHeader) {
|
||||
return headers.keySet()
|
||||
.stream()
|
||||
.filter(httpHeader.getHeader()::equalsIgnoreCase)
|
||||
.findFirst();
|
||||
}
|
||||
}
|
|
@ -15,26 +15,21 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.cluster.coordination.http.replication.okhttp;
|
||||
|
||||
import java.util.Map;
|
||||
package org.apache.nifi.cluster.coordination.http.replication.client;
|
||||
|
||||
import org.apache.nifi.cluster.coordination.http.replication.PreparedRequest;
|
||||
|
||||
import okhttp3.RequestBody;
|
||||
import java.util.Map;
|
||||
|
||||
public class OkHttpPreparedRequest implements PreparedRequest {
|
||||
private final String method;
|
||||
private final Map<String, String> headers;
|
||||
private final Object entity;
|
||||
private final RequestBody requestBody;
|
||||
|
||||
public OkHttpPreparedRequest(final String method, final Map<String, String> headers, final Object entity, final RequestBody requestBody) {
|
||||
this.method = method;
|
||||
this.headers = headers;
|
||||
this.entity = entity;
|
||||
this.requestBody = requestBody;
|
||||
}
|
||||
/**
|
||||
* Standard record implementation of Request prepared for Replication
|
||||
*
|
||||
* @param method HTTP Method
|
||||
* @param headers Map of HTTP Request Headers
|
||||
* @param entity HTTP Request Entity
|
||||
* @param requestBody Serialized Request Body
|
||||
*/
|
||||
record StandardPreparedRequest(String method, Map<String, String> headers, Object entity, byte[] requestBody) implements PreparedRequest {
|
||||
|
||||
@Override
|
||||
public String getMethod() {
|
||||
|
@ -50,13 +45,4 @@ public class OkHttpPreparedRequest implements PreparedRequest {
|
|||
public Object getEntity() {
|
||||
return entity;
|
||||
}
|
||||
|
||||
public RequestBody getRequestBody() {
|
||||
return requestBody;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "OkHttpPreparedRequest[method=" + method + ", headers=" + headers + "]";
|
||||
}
|
||||
}
|
|
@ -15,7 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.cluster.coordination.http.replication.okhttp;
|
||||
package org.apache.nifi.cluster.coordination.http.replication.io;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
|
@ -15,7 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.cluster.coordination.http.replication.okhttp;
|
||||
package org.apache.nifi.cluster.coordination.http.replication.io;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.InputStream;
|
|
@ -15,7 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.cluster.coordination.http.replication.okhttp;
|
||||
package org.apache.nifi.cluster.coordination.http.replication.io;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
|
@ -15,7 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.cluster.coordination.http.replication.okhttp;
|
||||
package org.apache.nifi.cluster.coordination.http.replication.io;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
|
@ -1,161 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.cluster.coordination.http.replication.okhttp;
|
||||
|
||||
import okhttp3.Call;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
import java.text.NumberFormat;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class CallEventListener {
|
||||
private final Call call;
|
||||
private final Map<String, Timing> dnsTimings = new HashMap<>();
|
||||
private final Map<String, Timing> establishConnectionTiming = new HashMap<>();
|
||||
private long callStart;
|
||||
private long callEnd;
|
||||
private long responseBodyStart;
|
||||
private long responseBodyEnd;
|
||||
private long responseHeaderStart;
|
||||
private long responseHeaderEnd;
|
||||
private long requestHeaderStart;
|
||||
private long requestHeaderEnd;
|
||||
private long requestBodyStart;
|
||||
private long requestBodyEnd;
|
||||
private long secureConnectStart;
|
||||
private long secureConnectEnd;
|
||||
|
||||
|
||||
public CallEventListener(final Call call) {
|
||||
this.call = call;
|
||||
}
|
||||
|
||||
public void callStart() {
|
||||
callStart = System.nanoTime();
|
||||
}
|
||||
|
||||
public void callEnd() {
|
||||
callEnd = System.nanoTime();
|
||||
}
|
||||
|
||||
public void dnsStart(final String domainName) {
|
||||
dnsTimings.computeIfAbsent(domainName, k -> new Timing(domainName)).start();
|
||||
}
|
||||
|
||||
public void dnsEnd(final String domainName) {
|
||||
dnsTimings.computeIfAbsent(domainName, k -> new Timing(domainName)).end();
|
||||
}
|
||||
|
||||
public void responseBodyStart() {
|
||||
responseBodyStart = System.nanoTime();
|
||||
}
|
||||
|
||||
public void responseBodyEnd() {
|
||||
responseBodyEnd = System.nanoTime();
|
||||
}
|
||||
|
||||
public void responseHeaderStart() {
|
||||
responseHeaderStart = System.nanoTime();
|
||||
}
|
||||
|
||||
public void responseHeaderEnd() {
|
||||
responseHeaderEnd = System.nanoTime();
|
||||
}
|
||||
|
||||
public void requestHeaderStart() {
|
||||
requestHeaderStart = System.nanoTime();
|
||||
}
|
||||
|
||||
public void requestHeaderEnd() {
|
||||
requestHeaderEnd = System.nanoTime();
|
||||
}
|
||||
|
||||
public void requestBodyStart() {
|
||||
requestBodyStart = System.nanoTime();
|
||||
}
|
||||
|
||||
public void requestBodyEnd() {
|
||||
requestBodyEnd = System.nanoTime();
|
||||
}
|
||||
|
||||
public void connectStart(final SocketAddress address) {
|
||||
establishConnectionTiming.computeIfAbsent(address.toString(), Timing::new).start();
|
||||
}
|
||||
|
||||
public void connectionAcquired(final SocketAddress address) {
|
||||
establishConnectionTiming.computeIfAbsent(address.toString(), Timing::new).end();
|
||||
}
|
||||
|
||||
public void secureConnectStart() {
|
||||
secureConnectStart = System.nanoTime();
|
||||
}
|
||||
|
||||
public void secureConnectEnd() {
|
||||
secureConnectEnd = System.nanoTime();
|
||||
}
|
||||
|
||||
public Call getCall() {
|
||||
return call;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final NumberFormat numberFormat = NumberFormat.getInstance();
|
||||
|
||||
return "CallEventListener{" +
|
||||
"url=" + call.request().url() +
|
||||
", dnsTimings=" + dnsTimings.values() +
|
||||
", establishConnectionTiming=" + establishConnectionTiming.values() +
|
||||
", tlsInitialization=" + numberFormat.format(secureConnectEnd - secureConnectStart) + " nanos" +
|
||||
", writeRequestHeaders=" + numberFormat.format(requestHeaderEnd - requestHeaderStart) + " nanos" +
|
||||
", writeRequestBody=" + numberFormat.format(requestBodyEnd - requestBodyStart) + " nanos" +
|
||||
", readResponseHeaders=" + numberFormat.format(responseHeaderEnd - responseHeaderStart) + " nanos" +
|
||||
", readResponseBody=" + numberFormat.format(responseBodyEnd - responseBodyStart) + " nanos" +
|
||||
", callTime=" + numberFormat.format(callEnd - callStart) + " nanos" +
|
||||
'}';
|
||||
}
|
||||
|
||||
private static class Timing {
|
||||
private final String address;
|
||||
private long start;
|
||||
private long nanos;
|
||||
|
||||
public Timing(final String address) {
|
||||
this.address = address;
|
||||
}
|
||||
|
||||
public String getAddress() {
|
||||
return address;
|
||||
}
|
||||
|
||||
public void start() {
|
||||
start = System.nanoTime();
|
||||
}
|
||||
|
||||
public void end() {
|
||||
if (start > 0) {
|
||||
nanos += (System.nanoTime() - start);
|
||||
}
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "{address=" + address + ", nanos=" + NumberFormat.getInstance().format(nanos) + "}";
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,327 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.cluster.coordination.http.replication.okhttp;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude.Value;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.module.jakarta.xmlbind.JakartaXmlBindAnnotationIntrospector;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLSocketFactory;
|
||||
import javax.net.ssl.X509TrustManager;
|
||||
import jakarta.ws.rs.HttpMethod;
|
||||
import jakarta.ws.rs.core.MultivaluedHashMap;
|
||||
import jakarta.ws.rs.core.MultivaluedMap;
|
||||
import jakarta.ws.rs.core.Response;
|
||||
import okhttp3.Call;
|
||||
import okhttp3.ConnectionPool;
|
||||
import okhttp3.Headers;
|
||||
import okhttp3.HttpUrl;
|
||||
import okhttp3.MediaType;
|
||||
import okhttp3.OkHttpClient;
|
||||
import okhttp3.Request;
|
||||
import okhttp3.RequestBody;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.HttpReplicationClient;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.PreparedRequest;
|
||||
import org.apache.nifi.remote.protocol.http.HttpHeaders;
|
||||
import org.apache.nifi.stream.io.GZIPOutputStream;
|
||||
import org.apache.nifi.util.FormatUtils;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.util.StreamUtils;
|
||||
|
||||
public class OkHttpReplicationClient implements HttpReplicationClient {
|
||||
private static final Logger logger = LoggerFactory.getLogger(OkHttpReplicationClient.class);
|
||||
private static final Set<String> gzipEncodings = Stream.of("gzip", "x-gzip").collect(Collectors.toSet());
|
||||
|
||||
private final EntitySerializer jsonSerializer;
|
||||
private final EntitySerializer xmlSerializer;
|
||||
|
||||
private final ObjectMapper jsonCodec = new ObjectMapper();
|
||||
private final OkHttpClient okHttpClient;
|
||||
private final SSLContext sslContext;
|
||||
private final X509TrustManager trustManager;
|
||||
|
||||
public OkHttpReplicationClient(
|
||||
final NiFiProperties properties,
|
||||
final SSLContext sslContext,
|
||||
final X509TrustManager trustManager
|
||||
) {
|
||||
jsonCodec.setDefaultPropertyInclusion(Value.construct(Include.NON_NULL, Include.ALWAYS));
|
||||
jsonCodec.setAnnotationIntrospector(new JakartaXmlBindAnnotationIntrospector(jsonCodec.getTypeFactory()));
|
||||
|
||||
jsonSerializer = new JsonEntitySerializer(jsonCodec);
|
||||
xmlSerializer = new XmlEntitySerializer();
|
||||
|
||||
this.sslContext = sslContext;
|
||||
this.trustManager = trustManager;
|
||||
okHttpClient = createOkHttpClient(properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PreparedRequest prepareRequest(final String method, final Map<String, String> headers, final Object entity) {
|
||||
final boolean gzip = isUseGzip(headers);
|
||||
checkContentLengthHeader(method, headers);
|
||||
final RequestBody requestBody = createRequestBody(headers, entity, gzip);
|
||||
|
||||
final Map<String, String> updatedHeaders = gzip ? updateHeadersForGzip(headers) : headers;
|
||||
return new OkHttpPreparedRequest(method, updatedHeaders, entity, requestBody);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks the content length header on DELETE requests to ensure it is set to '0', avoiding request timeouts on replicated requests.
|
||||
*
|
||||
* @param method the HTTP method of the request
|
||||
* @param headers the header keys and values
|
||||
*/
|
||||
private void checkContentLengthHeader(String method, Map<String, String> headers) {
|
||||
// Only applies to DELETE requests
|
||||
if (HttpMethod.DELETE.equalsIgnoreCase(method)) {
|
||||
// Find the Content-Length header if present
|
||||
final String CONTENT_LENGTH_HEADER_KEY = "Content-Length";
|
||||
Map.Entry<String, String> contentLengthEntry = headers.entrySet().stream().filter(entry -> entry.getKey().equalsIgnoreCase(CONTENT_LENGTH_HEADER_KEY)).findFirst().orElse(null);
|
||||
// If no CL header, do nothing
|
||||
if (contentLengthEntry != null) {
|
||||
// If the provided CL value is non-zero, override it
|
||||
if (contentLengthEntry.getValue() != null && !contentLengthEntry.getValue().equalsIgnoreCase("0")) {
|
||||
logger.warn("This is a DELETE request; the provided Content-Length was {}; setting Content-Length to 0", contentLengthEntry.getValue());
|
||||
headers.put(CONTENT_LENGTH_HEADER_KEY, "0");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response replicate(final PreparedRequest request, final String uri) throws IOException {
|
||||
if (!(Objects.requireNonNull(request) instanceof OkHttpPreparedRequest)) {
|
||||
throw new IllegalArgumentException("Replication Client is only able to replicate requests that the client itself has prepared");
|
||||
}
|
||||
|
||||
return replicate((OkHttpPreparedRequest) request, uri);
|
||||
}
|
||||
|
||||
private Response replicate(final OkHttpPreparedRequest request, final String uri) throws IOException {
|
||||
logger.debug("Replicating request {} to {}", request, uri);
|
||||
final Call call = createCall(request, uri);
|
||||
final okhttp3.Response callResponse = call.execute();
|
||||
|
||||
final byte[] responseBytes = getResponseBytes(callResponse);
|
||||
final MultivaluedMap<String, String> responseHeaders = getHeaders(callResponse);
|
||||
logger.debug("Received response code {} with headers {} for request {} to {}", callResponse.code(), responseHeaders, request, uri);
|
||||
|
||||
final Response response = new JacksonResponse(jsonCodec, responseBytes, responseHeaders, URI.create(uri), callResponse.code(), callResponse::close);
|
||||
return response;
|
||||
}
|
||||
|
||||
private MultivaluedMap<String, String> getHeaders(final okhttp3.Response callResponse) {
|
||||
final Headers headers = callResponse.headers();
|
||||
final MultivaluedMap<String, String> headerMap = new MultivaluedHashMap<>();
|
||||
for (final String name : headers.names()) {
|
||||
final List<String> values = headers.values(name);
|
||||
headerMap.addAll(name, values);
|
||||
}
|
||||
|
||||
return headerMap;
|
||||
}
|
||||
|
||||
private byte[] getResponseBytes(final okhttp3.Response callResponse) throws IOException {
|
||||
final byte[] rawBytes = callResponse.body().bytes();
|
||||
|
||||
final String contentEncoding = callResponse.header("Content-Encoding");
|
||||
if (gzipEncodings.contains(contentEncoding)) {
|
||||
try (final InputStream gzipIn = new GZIPInputStream(new ByteArrayInputStream(rawBytes));
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
|
||||
|
||||
StreamUtils.copy(gzipIn, baos);
|
||||
return baos.toByteArray();
|
||||
}
|
||||
} else {
|
||||
return rawBytes;
|
||||
}
|
||||
}
|
||||
|
||||
private Call createCall(final OkHttpPreparedRequest request, final String uri) {
|
||||
Request.Builder requestBuilder = new Request.Builder();
|
||||
|
||||
final HttpUrl url = buildUrl(request, uri);
|
||||
requestBuilder = requestBuilder.url(url);
|
||||
|
||||
// build the request body
|
||||
final String method = request.getMethod().toUpperCase();
|
||||
switch (method) {
|
||||
case "POST":
|
||||
case "PUT":
|
||||
case "PATCH":
|
||||
requestBuilder = requestBuilder.method(method, request.getRequestBody());
|
||||
break;
|
||||
default:
|
||||
requestBuilder = requestBuilder.method(method, null);
|
||||
break;
|
||||
}
|
||||
|
||||
// Add appropriate headers
|
||||
for (final Map.Entry<String, String> header : request.getHeaders().entrySet()) {
|
||||
requestBuilder = requestBuilder.addHeader(header.getKey(), header.getValue());
|
||||
}
|
||||
|
||||
// Build the request
|
||||
final Request okHttpRequest = requestBuilder.build();
|
||||
final Call call = okHttpClient.newCall(okHttpRequest);
|
||||
return call;
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private HttpUrl buildUrl(final OkHttpPreparedRequest request, final String uri) {
|
||||
HttpUrl.Builder urlBuilder = HttpUrl.parse(uri).newBuilder();
|
||||
switch (request.getMethod().toUpperCase()) {
|
||||
case HttpMethod.DELETE:
|
||||
case HttpMethod.HEAD:
|
||||
case HttpMethod.GET:
|
||||
case HttpMethod.OPTIONS:
|
||||
if (request.getEntity() instanceof MultivaluedMap) {
|
||||
final MultivaluedMap<String, String> entityMap = (MultivaluedMap<String, String>) request.getEntity();
|
||||
|
||||
for (final Entry<String, List<String>> queryEntry : entityMap.entrySet()) {
|
||||
final String queryName = queryEntry.getKey();
|
||||
for (final String queryValue : queryEntry.getValue()) {
|
||||
urlBuilder = urlBuilder.addQueryParameter(queryName, queryValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
return urlBuilder.build();
|
||||
}
|
||||
|
||||
private RequestBody createRequestBody(final Map<String, String> headers, final Object entity, final boolean gzip) {
|
||||
final String contentType = getContentType(headers, "application/json");
|
||||
final byte[] serialized = serializeEntity(entity, contentType, gzip);
|
||||
|
||||
final MediaType mediaType = MediaType.parse(contentType);
|
||||
return RequestBody.create(serialized, mediaType);
|
||||
}
|
||||
|
||||
private String getContentType(final Map<String, String> headers, final String defaultValue) {
|
||||
for (final Map.Entry<String, String> entry : headers.entrySet()) {
|
||||
if (entry.getKey().equalsIgnoreCase("content-type")) {
|
||||
return entry.getValue();
|
||||
}
|
||||
}
|
||||
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
private byte[] serializeEntity(final Object entity, final String contentType, final boolean gzip) {
|
||||
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
final OutputStream out = gzip ? new GZIPOutputStream(baos, 1) : baos) {
|
||||
|
||||
getSerializer(contentType).serialize(entity, out);
|
||||
out.close();
|
||||
|
||||
return baos.toByteArray();
|
||||
} catch (final IOException e) {
|
||||
// This should never happen with a ByteArrayOutputStream
|
||||
throw new RuntimeException("Failed to serialize entity for cluster replication", e);
|
||||
}
|
||||
}
|
||||
|
||||
private EntitySerializer getSerializer(final String contentType) {
|
||||
switch (contentType.toLowerCase()) {
|
||||
case "application/xml":
|
||||
return xmlSerializer;
|
||||
case "application/json":
|
||||
default:
|
||||
return jsonSerializer;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private Map<String, String> updateHeadersForGzip(final Map<String, String> headers) {
|
||||
final String encodingHeader = headers.get("Content-Encoding");
|
||||
if (gzipEncodings.contains(encodingHeader)) {
|
||||
return headers;
|
||||
}
|
||||
|
||||
final Map<String, String> updatedHeaders = new HashMap<>(headers);
|
||||
updatedHeaders.put("Content-Encoding", "gzip");
|
||||
return updatedHeaders;
|
||||
}
|
||||
|
||||
|
||||
private boolean isUseGzip(final Map<String, String> headers) {
|
||||
String rawAcceptEncoding = headers.get(HttpHeaders.ACCEPT_ENCODING);
|
||||
|
||||
if (rawAcceptEncoding == null) {
|
||||
rawAcceptEncoding = headers.get(HttpHeaders.ACCEPT_ENCODING.toLowerCase());
|
||||
}
|
||||
|
||||
if (rawAcceptEncoding == null) {
|
||||
return false;
|
||||
} else {
|
||||
final String[] acceptEncodingTokens = rawAcceptEncoding.split(",");
|
||||
return Stream.of(acceptEncodingTokens)
|
||||
.map(String::trim)
|
||||
.filter(StringUtils::isNotEmpty)
|
||||
.map(String::toLowerCase)
|
||||
.anyMatch(gzipEncodings::contains);
|
||||
}
|
||||
}
|
||||
|
||||
private OkHttpClient createOkHttpClient(final NiFiProperties properties) {
|
||||
final String connectionTimeout = properties.getClusterNodeConnectionTimeout();
|
||||
final long connectionTimeoutMs = (long) FormatUtils.getPreciseTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS);
|
||||
final String readTimeout = properties.getClusterNodeReadTimeout();
|
||||
final long readTimeoutMs = (long) FormatUtils.getPreciseTimeDuration(readTimeout, TimeUnit.MILLISECONDS);
|
||||
|
||||
OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder();
|
||||
okHttpClientBuilder.connectTimeout(connectionTimeoutMs, TimeUnit.MILLISECONDS);
|
||||
okHttpClientBuilder.readTimeout(readTimeoutMs, TimeUnit.MILLISECONDS);
|
||||
okHttpClientBuilder.followRedirects(true);
|
||||
final int connectionPoolSize = properties.getClusterNodeMaxConcurrentRequests();
|
||||
okHttpClientBuilder.connectionPool(new ConnectionPool(connectionPoolSize, 5, TimeUnit.MINUTES));
|
||||
okHttpClientBuilder.eventListener(new RequestReplicationEventListener());
|
||||
|
||||
if (sslContext != null) {
|
||||
final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
|
||||
okHttpClientBuilder.sslSocketFactory(sslSocketFactory, trustManager);
|
||||
}
|
||||
|
||||
return okHttpClientBuilder.build();
|
||||
}
|
||||
}
|
|
@ -1,174 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.cluster.coordination.http.replication.okhttp;
|
||||
|
||||
import okhttp3.Call;
|
||||
import okhttp3.Connection;
|
||||
import okhttp3.EventListener;
|
||||
import okhttp3.Handshake;
|
||||
import okhttp3.Request;
|
||||
import okhttp3.Response;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Proxy;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
public class RequestReplicationEventListener extends EventListener {
|
||||
private static final Logger logger = LoggerFactory.getLogger(RequestReplicationEventListener.class);
|
||||
|
||||
private final ConcurrentMap<Call, CallEventListener> eventListeners = new ConcurrentHashMap<>();
|
||||
|
||||
private CallEventListener getListener(final Call call) {
|
||||
return eventListeners.computeIfAbsent(call, CallEventListener::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dnsStart(@NotNull final Call call, @NotNull final String domainName) {
|
||||
super.dnsStart(call, domainName);
|
||||
getListener(call).dnsStart(domainName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dnsEnd(@NotNull final Call call, @NotNull final String domainName, @NotNull final List<InetAddress> inetAddressList) {
|
||||
super.dnsEnd(call, domainName, inetAddressList);
|
||||
getListener(call).dnsEnd(domainName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void callStart(@NotNull final Call call) {
|
||||
super.callStart(call);
|
||||
getListener(call).callStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void callEnd(@NotNull final Call call) {
|
||||
super.callEnd(call);
|
||||
final CallEventListener callListener = getListener(call);
|
||||
callListener.callEnd();
|
||||
|
||||
logTimingInfo(callListener);
|
||||
eventListeners.remove(call);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void callFailed(@NotNull final Call call, @NotNull final IOException ioe) {
|
||||
super.callFailed(call, ioe);
|
||||
|
||||
final CallEventListener callListener = getListener(call);
|
||||
callListener.callEnd();
|
||||
|
||||
logTimingInfo(callListener);
|
||||
eventListeners.remove(call);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void responseBodyStart(@NotNull final Call call) {
|
||||
super.responseBodyStart(call);
|
||||
getListener(call).responseBodyStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void responseBodyEnd(@NotNull final Call call, final long byteCount) {
|
||||
super.responseBodyEnd(call, byteCount);
|
||||
getListener(call).responseBodyEnd();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void responseFailed(@NotNull final Call call, @NotNull final IOException ioe) {
|
||||
super.responseFailed(call, ioe);
|
||||
getListener(call).responseBodyEnd();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void responseHeadersStart(@NotNull final Call call) {
|
||||
super.responseHeadersStart(call);
|
||||
getListener(call).responseHeaderStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void responseHeadersEnd(@NotNull final Call call, @NotNull final Response response) {
|
||||
super.responseHeadersEnd(call, response);
|
||||
getListener(call).responseHeaderEnd();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void requestHeadersStart(@NotNull final Call call) {
|
||||
super.requestHeadersStart(call);
|
||||
getListener(call).requestHeaderStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void requestHeadersEnd(@NotNull final Call call, @NotNull final Request request) {
|
||||
super.requestHeadersEnd(call, request);
|
||||
getListener(call).requestHeaderEnd();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void requestBodyStart(@NotNull final Call call) {
|
||||
super.requestBodyStart(call);
|
||||
getListener(call).requestBodyStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void requestBodyEnd(@NotNull final Call call, final long byteCount) {
|
||||
super.requestBodyEnd(call, byteCount);
|
||||
getListener(call).requestBodyEnd();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void requestFailed(@NotNull final Call call, @NotNull final IOException ioe) {
|
||||
super.requestFailed(call, ioe);
|
||||
getListener(call).requestBodyEnd();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectStart(@NotNull final Call call, @NotNull final InetSocketAddress inetSocketAddress, @NotNull final Proxy proxy) {
|
||||
super.connectStart(call, inetSocketAddress, proxy);
|
||||
getListener(call).connectStart(inetSocketAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectionAcquired(@NotNull final Call call, @NotNull final Connection connection) {
|
||||
super.connectionAcquired(call, connection);
|
||||
getListener(call).connectionAcquired(connection.socket().getRemoteSocketAddress());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void secureConnectStart(@NotNull final Call call) {
|
||||
super.secureConnectStart(call);
|
||||
getListener(call).secureConnectStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void secureConnectEnd(@NotNull final Call call, @Nullable final Handshake handshake) {
|
||||
super.secureConnectEnd(call, handshake);
|
||||
getListener(call).secureConnectEnd();
|
||||
}
|
||||
|
||||
private void logTimingInfo(final CallEventListener eventListener) {
|
||||
logger.debug("Timing information {}", eventListener);
|
||||
}
|
||||
}
|
|
@ -245,13 +245,6 @@ public class NodeResponse {
|
|||
* the content-length. Let the outgoing response builder determine it.
|
||||
*/
|
||||
continue;
|
||||
} else if (key.equals("X-ClusterContext")) {
|
||||
/*
|
||||
* do not copy the cluster context to the response because
|
||||
* this information is private and should not be sent to
|
||||
* the client
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
|
||||
responseBuilder.header(key, value);
|
||||
|
|
|
@ -19,15 +19,17 @@ package org.apache.nifi.framework.configuration;
|
|||
import org.apache.nifi.cluster.ClusterDetailsFactory;
|
||||
import org.apache.nifi.cluster.StandardClusterDetailsFactory;
|
||||
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.HttpReplicationClient;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.StandardUploadRequestReplicator;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.ThreadPoolRequestReplicator;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.UploadRequestReplicator;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.okhttp.OkHttpReplicationClient;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.client.StandardHttpReplicationClient;
|
||||
import org.apache.nifi.cluster.lifecycle.ClusterDecommissionTask;
|
||||
import org.apache.nifi.controller.FlowController;
|
||||
import org.apache.nifi.events.EventReporter;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.apache.nifi.web.client.StandardHttpUriBuilder;
|
||||
import org.apache.nifi.web.client.api.WebClientService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
|
@ -87,7 +89,7 @@ public class FrameworkClusterConfiguration {
|
|||
if (clusterCoordinator == null) {
|
||||
replicator = null;
|
||||
} else {
|
||||
final OkHttpReplicationClient replicationClient = new OkHttpReplicationClient(properties, sslContext, trustManager);
|
||||
final HttpReplicationClient replicationClient = new StandardHttpReplicationClient(webClientService, StandardHttpUriBuilder::new);
|
||||
|
||||
replicator = new ThreadPoolRequestReplicator(
|
||||
properties.getClusterNodeProtocolMaxPoolSize(),
|
||||
|
|
|
@ -72,6 +72,8 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
public class TestThreadPoolRequestReplicator {
|
||||
|
||||
private static final String NODE_CONTINUE = "202-Accepted";
|
||||
|
||||
@BeforeAll
|
||||
public static void setupClass() {
|
||||
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties");
|
||||
|
@ -278,11 +280,11 @@ public class TestThreadPoolRequestReplicator {
|
|||
protected NodeResponse replicateRequest(final PreparedRequest request, final NodeIdentifier nodeId,
|
||||
final URI uri, final String requestId, final StandardAsyncClusterResponse response) {
|
||||
// the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them.
|
||||
final Object expectsHeader = request.getHeaders().get(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
|
||||
final Object expectsHeader = request.getHeaders().get(RequestReplicationHeader.VALIDATION_EXPECTS.getHeader());
|
||||
|
||||
final int statusCode;
|
||||
if (requestCount.incrementAndGet() == 1) {
|
||||
assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE, expectsHeader);
|
||||
assertEquals(NODE_CONTINUE, expectsHeader);
|
||||
statusCode = Status.ACCEPTED.getStatusCode();
|
||||
} else {
|
||||
assertNull(expectsHeader);
|
||||
|
@ -343,10 +345,10 @@ public class TestThreadPoolRequestReplicator {
|
|||
protected NodeResponse replicateRequest(final PreparedRequest request, final NodeIdentifier nodeId,
|
||||
final URI uri, final String requestId, final StandardAsyncClusterResponse response) {
|
||||
// the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them.
|
||||
final Object expectsHeader = request.getHeaders().get(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
|
||||
final Object expectsHeader = request.getHeaders().get(RequestReplicationHeader.VALIDATION_EXPECTS.getHeader());
|
||||
|
||||
final int requestIndex = requestCount.incrementAndGet();
|
||||
assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE, expectsHeader);
|
||||
assertEquals(NODE_CONTINUE, expectsHeader);
|
||||
|
||||
if (requestIndex == 1) {
|
||||
final Response clientResponse = mock(Response.class);
|
||||
|
|
|
@ -0,0 +1,258 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.cluster.coordination.http.replication.client;
|
||||
|
||||
import jakarta.ws.rs.core.MultivaluedHashMap;
|
||||
import jakarta.ws.rs.core.MultivaluedMap;
|
||||
import jakarta.ws.rs.core.Response;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.PreparedRequest;
|
||||
import org.apache.nifi.web.client.StandardHttpUriBuilder;
|
||||
import org.apache.nifi.web.client.api.HttpEntityHeaders;
|
||||
import org.apache.nifi.web.client.api.HttpRequestBodySpec;
|
||||
import org.apache.nifi.web.client.api.HttpRequestUriSpec;
|
||||
import org.apache.nifi.web.client.api.HttpResponseEntity;
|
||||
import org.apache.nifi.web.client.api.WebClientService;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Captor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static java.net.HttpURLConnection.HTTP_OK;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class TestStandardHttpReplicationClient {
|
||||
|
||||
private static final String GET_METHOD = "GET";
|
||||
|
||||
private static final String POST_METHOD = "POST";
|
||||
|
||||
private static final byte[] EMPTY_MAP_SERIALIZED = new byte[]{123, 125};
|
||||
|
||||
private static final String CONTENT_TYPE_LOWERCASED = "content-type";
|
||||
|
||||
private static final String APPLICATION_JSON = "application/json";
|
||||
|
||||
private static final URI REPLICATE_URI = URI.create("http://localhost/nifi-api/flow/current-user");
|
||||
|
||||
private static final String URI_QUERY = "recursive=false";
|
||||
|
||||
private static final URI REPLICATE_URI_QUERY = URI.create("http://localhost/nifi-api/flow/process-groups/root/status?%s".formatted(URI_QUERY));
|
||||
|
||||
private static final String QUERY_PARAMETER_NAME = "revision";
|
||||
|
||||
private static final String QUERY_PARAMETER_VALUE = "1";
|
||||
|
||||
private static final String QUERY_EXPECTED = "%s=%s".formatted(QUERY_PARAMETER_NAME, QUERY_PARAMETER_VALUE);
|
||||
|
||||
private static final String STATUS_PSEUDO_HEADER = ":status";
|
||||
|
||||
@Mock
|
||||
private WebClientService webClientService;
|
||||
|
||||
@Mock
|
||||
private HttpRequestUriSpec httpRequestUriSpec;
|
||||
|
||||
@Mock
|
||||
private HttpRequestBodySpec httpRequestBodySpec;
|
||||
|
||||
@Mock
|
||||
private HttpResponseEntity httpResponseEntity;
|
||||
|
||||
@Mock
|
||||
private HttpEntityHeaders httpResponseHeaders;
|
||||
|
||||
@Captor
|
||||
private ArgumentCaptor<URI> uriCaptor;
|
||||
|
||||
private StandardHttpReplicationClient client;
|
||||
|
||||
@BeforeEach
|
||||
void setClient() {
|
||||
client = new StandardHttpReplicationClient(webClientService, StandardHttpUriBuilder::new);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testPrepareRequest() {
|
||||
final Map<String, String> headers = Collections.emptyMap();
|
||||
final Map<String, String> requestEntity = Collections.emptyMap();
|
||||
final PreparedRequest preparedRequest = client.prepareRequest(GET_METHOD, headers, requestEntity);
|
||||
|
||||
assertNotNull(preparedRequest);
|
||||
assertInstanceOf(StandardPreparedRequest.class, preparedRequest);
|
||||
|
||||
assertEquals(GET_METHOD, preparedRequest.getMethod());
|
||||
assertNotEquals(headers, preparedRequest.getHeaders());
|
||||
assertEquals(requestEntity, preparedRequest.getEntity());
|
||||
|
||||
final StandardPreparedRequest standardPreparedRequest = (StandardPreparedRequest) preparedRequest;
|
||||
assertArrayEquals(EMPTY_MAP_SERIALIZED, standardPreparedRequest.requestBody());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReplicateIllegalArgumentException() {
|
||||
assertThrows(IllegalArgumentException.class, () -> client.replicate(null, REPLICATE_URI));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReplicate() throws IOException {
|
||||
final Map<String, String> headers = Map.of(CONTENT_TYPE_LOWERCASED, APPLICATION_JSON);
|
||||
final Map<String, String> requestEntity = Collections.emptyMap();
|
||||
final PreparedRequest preparedRequest = client.prepareRequest(GET_METHOD, headers, requestEntity);
|
||||
|
||||
when(webClientService.method(any())).thenReturn(httpRequestUriSpec);
|
||||
when(httpRequestUriSpec.uri(any())).thenReturn(httpRequestBodySpec);
|
||||
when(httpRequestBodySpec.header(anyString(), anyString())).thenReturn(httpRequestBodySpec);
|
||||
when(httpRequestBodySpec.retrieve()).thenReturn(httpResponseEntity);
|
||||
|
||||
when(httpResponseEntity.statusCode()).thenReturn(HTTP_OK);
|
||||
when(httpResponseEntity.headers()).thenReturn(httpResponseHeaders);
|
||||
|
||||
final Set<String> responseHeaderNames = Set.of(
|
||||
PreparedRequestHeader.CONTENT_TYPE.getHeader(),
|
||||
PreparedRequestHeader.CONTENT_ENCODING.getHeader(),
|
||||
PreparedRequestHeader.CONTENT_LENGTH.getHeader(),
|
||||
STATUS_PSEUDO_HEADER
|
||||
);
|
||||
when(httpResponseHeaders.getHeaderNames()).thenReturn(responseHeaderNames);
|
||||
when(httpResponseHeaders.getHeader(eq(PreparedRequestHeader.CONTENT_TYPE.getHeader()))).thenReturn(List.of(APPLICATION_JSON));
|
||||
|
||||
final ByteArrayInputStream responseBody = new ByteArrayInputStream(EMPTY_MAP_SERIALIZED);
|
||||
when(httpResponseEntity.body()).thenReturn(responseBody);
|
||||
|
||||
final Response response = client.replicate(preparedRequest, REPLICATE_URI);
|
||||
|
||||
assertResponseFound(response);
|
||||
|
||||
final String responseContentType = response.getHeaderString(PreparedRequestHeader.CONTENT_TYPE.getHeader());
|
||||
assertEquals(APPLICATION_JSON, responseContentType);
|
||||
|
||||
final String responseStatusHeader = response.getHeaderString(STATUS_PSEUDO_HEADER);
|
||||
assertNull(responseStatusHeader);
|
||||
|
||||
final String contentEncodingHeader = response.getHeaderString(PreparedRequestHeader.CONTENT_ENCODING.getHeader());
|
||||
assertNull(contentEncodingHeader);
|
||||
|
||||
final String contentLengthHeader = response.getHeaderString(PreparedRequestHeader.CONTENT_LENGTH.getHeader());
|
||||
assertNull(contentLengthHeader);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReplicatePostBody() throws IOException {
|
||||
final Map<String, String> headers = Map.of(CONTENT_TYPE_LOWERCASED, APPLICATION_JSON);
|
||||
final Map<String, String> requestEntity = Collections.emptyMap();
|
||||
final PreparedRequest preparedRequest = client.prepareRequest(POST_METHOD, headers, requestEntity);
|
||||
|
||||
when(webClientService.method(any())).thenReturn(httpRequestUriSpec);
|
||||
when(httpRequestUriSpec.uri(any())).thenReturn(httpRequestBodySpec);
|
||||
when(httpRequestBodySpec.header(anyString(), anyString())).thenReturn(httpRequestBodySpec);
|
||||
when(httpRequestBodySpec.retrieve()).thenReturn(httpResponseEntity);
|
||||
|
||||
when(httpResponseEntity.statusCode()).thenReturn(HTTP_OK);
|
||||
when(httpResponseEntity.headers()).thenReturn(httpResponseHeaders);
|
||||
|
||||
final ByteArrayInputStream responseBody = new ByteArrayInputStream(EMPTY_MAP_SERIALIZED);
|
||||
when(httpResponseEntity.body()).thenReturn(responseBody);
|
||||
|
||||
final Response response = client.replicate(preparedRequest, REPLICATE_URI);
|
||||
|
||||
assertResponseFound(response);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReplicateGetMultivaluedMap() throws IOException {
|
||||
final Map<String, String> headers = Map.of(PreparedRequestHeader.CONTENT_TYPE.getHeader(), APPLICATION_JSON);
|
||||
|
||||
final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap<>();
|
||||
requestEntity.add(QUERY_PARAMETER_NAME, QUERY_PARAMETER_VALUE);
|
||||
final PreparedRequest preparedRequest = client.prepareRequest(GET_METHOD, headers, requestEntity);
|
||||
|
||||
when(webClientService.method(any())).thenReturn(httpRequestUriSpec);
|
||||
when(httpRequestUriSpec.uri(any())).thenReturn(httpRequestBodySpec);
|
||||
when(httpRequestBodySpec.header(anyString(), anyString())).thenReturn(httpRequestBodySpec);
|
||||
when(httpRequestBodySpec.retrieve()).thenReturn(httpResponseEntity);
|
||||
|
||||
when(httpResponseEntity.statusCode()).thenReturn(HTTP_OK);
|
||||
when(httpResponseEntity.headers()).thenReturn(httpResponseHeaders);
|
||||
|
||||
final ByteArrayInputStream responseBody = new ByteArrayInputStream(EMPTY_MAP_SERIALIZED);
|
||||
when(httpResponseEntity.body()).thenReturn(responseBody);
|
||||
|
||||
final Response response = client.replicate(preparedRequest, REPLICATE_URI);
|
||||
|
||||
assertResponseFound(response);
|
||||
|
||||
verify(httpRequestUriSpec).uri(uriCaptor.capture());
|
||||
|
||||
final URI requestUri = uriCaptor.getValue();
|
||||
assertEquals(QUERY_EXPECTED, requestUri.getQuery());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReplicateGetUriQuery() throws IOException {
|
||||
final Map<String, String> headers = Map.of(PreparedRequestHeader.CONTENT_TYPE.getHeader(), APPLICATION_JSON);
|
||||
|
||||
final PreparedRequest preparedRequest = client.prepareRequest(GET_METHOD, headers, Collections.emptyMap());
|
||||
|
||||
when(webClientService.method(any())).thenReturn(httpRequestUriSpec);
|
||||
when(httpRequestUriSpec.uri(any())).thenReturn(httpRequestBodySpec);
|
||||
when(httpRequestBodySpec.header(anyString(), anyString())).thenReturn(httpRequestBodySpec);
|
||||
when(httpRequestBodySpec.retrieve()).thenReturn(httpResponseEntity);
|
||||
|
||||
when(httpResponseEntity.statusCode()).thenReturn(HTTP_OK);
|
||||
when(httpResponseEntity.headers()).thenReturn(httpResponseHeaders);
|
||||
|
||||
final ByteArrayInputStream responseBody = new ByteArrayInputStream(EMPTY_MAP_SERIALIZED);
|
||||
when(httpResponseEntity.body()).thenReturn(responseBody);
|
||||
|
||||
final Response response = client.replicate(preparedRequest, REPLICATE_URI_QUERY);
|
||||
|
||||
assertResponseFound(response);
|
||||
|
||||
verify(httpRequestUriSpec).uri(uriCaptor.capture());
|
||||
|
||||
final URI requestUri = uriCaptor.getValue();
|
||||
assertEquals(URI_QUERY, requestUri.getQuery());
|
||||
}
|
||||
|
||||
private void assertResponseFound(final Response response) {
|
||||
assertNotNull(response);
|
||||
assertEquals(HTTP_OK, response.getStatus());
|
||||
}
|
||||
}
|
|
@ -15,7 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.cluster.coordination.http.replication.okhttp;
|
||||
package org.apache.nifi.cluster.coordination.http.replication.io;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.nifi.cluster.coordination.http.replication.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.annotation.Annotation;
|
||||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
|
@ -71,7 +70,7 @@ public class MockReplicationClient implements HttpReplicationClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Response replicate(PreparedRequest request, String uri) throws IOException {
|
||||
public Response replicate(PreparedRequest request, URI uri) {
|
||||
return new Response() {
|
||||
|
||||
@Override
|
||||
|
@ -173,7 +172,7 @@ public class MockReplicationClient implements HttpReplicationClient {
|
|||
|
||||
@Override
|
||||
public URI getLocation() {
|
||||
return URI.create(uri);
|
||||
return uri;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,7 +22,6 @@ import org.apache.nifi.web.api.dto.FlowAnalysisRuleDTO;
|
|||
import org.apache.nifi.web.api.dto.FlowAnalysisRuleViolationDTO;
|
||||
import org.apache.nifi.web.api.dto.PermissionsDTO;
|
||||
import org.apache.nifi.web.api.entity.FlowAnalysisResultEntity;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
|
@ -187,13 +186,11 @@ public class FlowAnalysisResultEntityMergerTest {
|
|||
));
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private static NodeIdentifier nodeIdOf(String nodeId) {
|
||||
NodeIdentifier nodeIdentifier = new NodeIdentifier(nodeId, "unimportant", 1, "unimportant", 1, "unimportant", 1, 1, false);
|
||||
return nodeIdentifier;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private static FlowAnalysisRuleDTO ruleOf(String ruleId) {
|
||||
FlowAnalysisRuleDTO rule = new FlowAnalysisRuleDTO();
|
||||
|
||||
|
@ -202,7 +199,6 @@ public class FlowAnalysisResultEntityMergerTest {
|
|||
return rule;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private static FlowAnalysisRuleViolationDTO ruleViolationOf(
|
||||
String ruleId,
|
||||
boolean canRead,
|
||||
|
@ -216,7 +212,6 @@ public class FlowAnalysisResultEntityMergerTest {
|
|||
return ruleViolation;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private static PermissionsDTO permissionOf(boolean canRead, boolean canWrite) {
|
||||
PermissionsDTO subjectPermissionDto = new PermissionsDTO();
|
||||
|
||||
|
@ -226,7 +221,6 @@ public class FlowAnalysisResultEntityMergerTest {
|
|||
return subjectPermissionDto;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private static FlowAnalysisResultEntity resultEntityOf(List<FlowAnalysisRuleDTO> rules, List<FlowAnalysisRuleViolationDTO> ruleViolations) {
|
||||
FlowAnalysisResultEntity clientEntity = new FlowAnalysisResultEntity();
|
||||
|
||||
|
@ -236,7 +230,6 @@ public class FlowAnalysisResultEntityMergerTest {
|
|||
return clientEntity;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private static Map<NodeIdentifier, FlowAnalysisResultEntity> resultEntityMapOf(FlowAnalysisResultEntity clientEntity1, FlowAnalysisResultEntity clientEntity2) {
|
||||
Map<NodeIdentifier, FlowAnalysisResultEntity> entityMap = new HashMap<>();
|
||||
|
||||
|
@ -246,7 +239,6 @@ public class FlowAnalysisResultEntityMergerTest {
|
|||
return entityMap;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private static <T> List<T> listOf(T... items) {
|
||||
List<T> itemSet = new ArrayList<>();
|
||||
for (T item : items) {
|
||||
|
|
|
@ -63,7 +63,7 @@ class AssetsRestApiClient extends NiFiRestApiClient {
|
|||
final HttpRequestBodySpec requestBodySpec = webClientService.get()
|
||||
.uri(requestUri)
|
||||
.header(ACCEPT_HEADER, APPLICATION_JSON)
|
||||
.header(X_REQUEST_REPLICATED_HEADER, "true");
|
||||
.header(REQUEST_REPLICATED_HEADER, Boolean.TRUE.toString());
|
||||
|
||||
return executeEntityRequest(requestUri, requestBodySpec, AssetsEntity.class);
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ public abstract class NiFiRestApiClient {
|
|||
private static final String HTTPS_SCHEME = "https";
|
||||
|
||||
protected static final String ACCEPT_HEADER = "Accept";
|
||||
protected static final String X_REQUEST_REPLICATED_HEADER = "X-Request-Replicated";
|
||||
protected static final String REQUEST_REPLICATED_HEADER = "request-replicated";
|
||||
protected static final String APPLICATION_JSON = "application/json";
|
||||
protected static final String APPLICATION_OCTET_STREAM = "application/octet-stream";
|
||||
|
||||
|
|
|
@ -65,7 +65,7 @@ public class NarRestApiClient extends NiFiRestApiClient {
|
|||
final HttpRequestBodySpec requestBodySpec = webClientService.get()
|
||||
.uri(requestUri)
|
||||
.header(ACCEPT_HEADER, APPLICATION_JSON)
|
||||
.header(X_REQUEST_REPLICATED_HEADER, "true");
|
||||
.header(REQUEST_REPLICATED_HEADER, Boolean.TRUE.toString());
|
||||
|
||||
return executeEntityRequest(requestUri, requestBodySpec, NarSummariesEntity.class);
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.nifi.web;
|
|||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.authorization.AccessDeniedException;
|
||||
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicationHeader;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
|
||||
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
|
||||
import org.apache.nifi.cluster.manager.NodeResponse;
|
||||
|
@ -91,7 +92,7 @@ public class StandardNiFiContentAccess implements ContentAccess {
|
|||
// replicate the request to the cluster coordinator, indicating the target node
|
||||
NodeResponse nodeResponse;
|
||||
try {
|
||||
headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, nodeId.getId());
|
||||
headers.put(RequestReplicationHeader.REPLICATION_TARGET_ID.getHeader(), nodeId.getId());
|
||||
final NodeIdentifier coordinatorNode = clusterCoordinator.getElectedActiveCoordinatorNode();
|
||||
if (coordinatorNode == null) {
|
||||
throw new NoClusterCoordinatorException();
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.nifi.authorization.resource.Authorizable;
|
|||
import org.apache.nifi.authorization.user.NiFiUser;
|
||||
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
||||
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicationHeader;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
|
||||
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
|
||||
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
|
||||
|
@ -87,6 +88,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||
import org.springframework.security.core.context.SecurityContextHolder;
|
||||
|
||||
import javax.net.ssl.SSLPeerUnverifiedException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.cert.X509Certificate;
|
||||
|
@ -222,7 +224,7 @@ public abstract class ApplicationResource {
|
|||
}
|
||||
|
||||
protected Optional<String> getIdGenerationSeed() {
|
||||
final String idGenerationSeed = httpServletRequest.getHeader(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER);
|
||||
final String idGenerationSeed = httpServletRequest.getHeader(RequestReplicationHeader.CLUSTER_ID_GENERATION_SEED.getHeader());
|
||||
if (StringUtils.isBlank(idGenerationSeed)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
@ -268,7 +270,7 @@ public abstract class ApplicationResource {
|
|||
* @return a 202 Accepted (Node Continue) response to be used within the cluster request handshake
|
||||
*/
|
||||
protected ResponseBuilder generateContinueResponse() {
|
||||
return Response.status(RequestReplicator.NODE_CONTINUE_STATUS_CODE);
|
||||
return Response.status(HttpURLConnection.HTTP_ACCEPTED);
|
||||
}
|
||||
|
||||
protected URI getAbsolutePath() {
|
||||
|
@ -317,50 +319,15 @@ public abstract class ApplicationResource {
|
|||
}
|
||||
}
|
||||
|
||||
// if the scheme is not set by the client, include the details from this request but don't override
|
||||
final String proxyScheme = getFirstHeaderValue(ProxyHeader.PROXY_SCHEME.getHeader(), ProxyHeader.FORWARDED_PROTO.getHeader());
|
||||
if (proxyScheme == null) {
|
||||
result.put(ProxyHeader.PROXY_SCHEME.getHeader(), httpServletRequest.getScheme());
|
||||
}
|
||||
|
||||
// if the host is not set by the client, include the details from this request but don't override
|
||||
final String proxyHost = getFirstHeaderValue(ProxyHeader.PROXY_HOST.getHeader(), ProxyHeader.FORWARDED_HOST.getHeader());
|
||||
if (proxyHost == null) {
|
||||
result.put(ProxyHeader.PROXY_HOST.getHeader(), httpServletRequest.getServerName());
|
||||
}
|
||||
|
||||
// if the port is not set by the client, include the details from this request but don't override
|
||||
final String proxyPort = getFirstHeaderValue(ProxyHeader.PROXY_PORT.getHeader(), ProxyHeader.FORWARDED_PORT.getHeader());
|
||||
if (proxyPort == null) {
|
||||
result.put(ProxyHeader.PROXY_PORT.getHeader(), String.valueOf(httpServletRequest.getServerPort()));
|
||||
}
|
||||
final URI requestUri = RequestUriBuilder.fromHttpServletRequest(httpServletRequest).build();
|
||||
// Set Proxy Headers based on resolved URI from supported values
|
||||
result.put(ProxyHeader.PROXY_SCHEME.getHeader(), requestUri.getScheme());
|
||||
result.put(ProxyHeader.PROXY_HOST.getHeader(), requestUri.getHost());
|
||||
result.put(ProxyHeader.PROXY_PORT.getHeader(), Integer.toString(requestUri.getPort()));
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the value for the first key discovered when inspecting the current request. Will
|
||||
* return null if there are no keys specified or if none of the specified keys are found.
|
||||
*
|
||||
* @param keys http header keys
|
||||
* @return the value for the first key found
|
||||
*/
|
||||
private String getFirstHeaderValue(final String... keys) {
|
||||
if (keys == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
for (final String key : keys) {
|
||||
final String value = httpServletRequest.getHeader(key);
|
||||
|
||||
if (value != null) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether the request is part of a two-phase commit style request (either phase 1 or phase 2)
|
||||
*
|
||||
|
@ -368,7 +335,7 @@ public abstract class ApplicationResource {
|
|||
* @return <code>true</code> if the request represents a two-phase commit style request
|
||||
*/
|
||||
protected boolean isTwoPhaseRequest(final HttpServletRequest httpServletRequest) {
|
||||
final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
|
||||
final String transactionId = httpServletRequest.getHeader(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
|
||||
return transactionId != null && isClustered();
|
||||
}
|
||||
|
||||
|
@ -383,15 +350,15 @@ public abstract class ApplicationResource {
|
|||
* first of the two phases.
|
||||
*/
|
||||
protected boolean isValidationPhase(final HttpServletRequest httpServletRequest) {
|
||||
return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER) != null;
|
||||
return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicationHeader.VALIDATION_EXPECTS.getHeader()) != null;
|
||||
}
|
||||
|
||||
protected boolean isExecutionPhase(final HttpServletRequest httpServletRequest) {
|
||||
return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_EXECUTION_HTTP_HEADER) != null;
|
||||
return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader()) != null;
|
||||
}
|
||||
|
||||
protected boolean isCancellationPhase(final HttpServletRequest httpServletRequest) {
|
||||
return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER) != null;
|
||||
return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicationHeader.CANCEL_TRANSACTION.getHeader()) != null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -412,9 +379,9 @@ public abstract class ApplicationResource {
|
|||
return false;
|
||||
}
|
||||
|
||||
// Check if the X-Request-Replicated header is set. If so, the request has already been replicated,
|
||||
// Check if the replicated header is set. If so, the request has already been replicated,
|
||||
// so we need to service the request locally. If not, then replicate the request to the entire cluster.
|
||||
final String header = httpServletRequest.getHeader(RequestReplicator.REPLICATION_INDICATOR_HEADER);
|
||||
final String header = httpServletRequest.getHeader(RequestReplicationHeader.REQUEST_REPLICATED.getHeader());
|
||||
return header == null;
|
||||
}
|
||||
|
||||
|
@ -719,7 +686,7 @@ public abstract class ApplicationResource {
|
|||
}
|
||||
|
||||
// get the transaction id
|
||||
final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
|
||||
final String transactionId = httpServletRequest.getHeader(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
|
||||
if (StringUtils.isBlank(transactionId)) {
|
||||
throw new IllegalArgumentException("Two phase commit Transaction Id missing.");
|
||||
}
|
||||
|
@ -739,7 +706,7 @@ public abstract class ApplicationResource {
|
|||
|
||||
private <T extends Entity> Request<T> phaseTwoVerifyTransaction() {
|
||||
// get the transaction id
|
||||
final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
|
||||
final String transactionId = httpServletRequest.getHeader(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
|
||||
if (StringUtils.isBlank(transactionId)) {
|
||||
throw new IllegalArgumentException("Two phase commit Transaction Id missing.");
|
||||
}
|
||||
|
@ -775,7 +742,7 @@ public abstract class ApplicationResource {
|
|||
|
||||
private void cancelTransaction() {
|
||||
// get the transaction id
|
||||
final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
|
||||
final String transactionId = httpServletRequest.getHeader(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
|
||||
if (StringUtils.isBlank(transactionId)) {
|
||||
throw new IllegalArgumentException("Two phase commit Transaction Id missing.");
|
||||
}
|
||||
|
@ -891,7 +858,7 @@ public abstract class ApplicationResource {
|
|||
final Set<NodeIdentifier> targetNodes = Collections.singleton(nodeId);
|
||||
return requestReplicator.replicate(targetNodes, method, path, entity, headers, true, true).awaitMergedResponse().getResponse();
|
||||
} else {
|
||||
headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, nodeId.getId());
|
||||
headers.put(RequestReplicationHeader.REPLICATION_TARGET_ID.getHeader(), nodeId.getId());
|
||||
return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, path, entity, headers).awaitMergedResponse().getResponse();
|
||||
}
|
||||
} catch (final InterruptedException ie) {
|
||||
|
@ -936,7 +903,7 @@ public abstract class ApplicationResource {
|
|||
final Set<NodeIdentifier> nodeIds = Collections.singleton(targetNode);
|
||||
return getRequestReplicator().replicate(nodeIds, method, getAbsolutePath(), entity, getHeaders(), true, true).awaitMergedResponse().getResponse();
|
||||
} else {
|
||||
final Map<String, String> headers = getHeaders(Collections.singletonMap(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, targetNode.getId()));
|
||||
final Map<String, String> headers = getHeaders(Collections.singletonMap(RequestReplicationHeader.REPLICATION_TARGET_ID.getHeader(), targetNode.getId()));
|
||||
return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, getAbsolutePath(), entity, headers).awaitMergedResponse().getResponse();
|
||||
}
|
||||
} catch (final InterruptedException ie) {
|
||||
|
@ -1049,7 +1016,7 @@ public abstract class ApplicationResource {
|
|||
}
|
||||
} finally {
|
||||
final long replicateNanos = System.nanoTime() - replicateStart;
|
||||
final String transactionId = headers.get(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
|
||||
final String transactionId = headers.get(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
|
||||
final String requestId = transactionId == null ? "Request with no ID" : transactionId;
|
||||
logger.debug("Took a total of {} millis to {} for {}", TimeUnit.NANOSECONDS.toMillis(replicateNanos), action, requestId);
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ import org.apache.nifi.authorization.RequestAction;
|
|||
import org.apache.nifi.authorization.resource.Authorizable;
|
||||
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
||||
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
|
||||
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicationHeader;
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
import org.apache.nifi.controller.repository.claim.ContentDirection;
|
||||
import org.apache.nifi.web.DownloadableContent;
|
||||
|
@ -451,7 +451,7 @@ public class ProvenanceEventResource extends ApplicationResource {
|
|||
}
|
||||
|
||||
// handle expects request (usually from the cluster manager)
|
||||
final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
|
||||
final String expects = httpServletRequest.getHeader(RequestReplicationHeader.VALIDATION_EXPECTS.getHeader());
|
||||
if (expects != null) {
|
||||
return generateContinueResponse().build();
|
||||
}
|
||||
|
|
|
@ -33,8 +33,8 @@ import java.io.IOException;
|
|||
* Skip Replicated Cross-Site Request Forgery Filter disables subsequent filtering for matched requests
|
||||
*/
|
||||
public class SkipReplicatedCsrfFilter extends OncePerRequestFilter {
|
||||
/** RequestReplicator.REQUEST_TRANSACTION_ID_HEADER applied to replicated cluster requests */
|
||||
protected static final String REPLICATED_REQUEST_HEADER = "X-RequestTransactionId";
|
||||
/** Replication HTTP Header applied to replicated cluster requests */
|
||||
protected static final String REPLICATED_REQUEST_HEADER = "request-transaction-id";
|
||||
|
||||
/** Requests containing replicated header and not containing authorization cookies will be skipped */
|
||||
private static final RequestMatcher REQUEST_MATCHER = new AndRequestMatcher(
|
||||
|
|
|
@ -77,7 +77,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
|
|||
// Group ID | Source Name | Dest Name | Conn Name | Queue Size |
|
||||
private static final String QUEUE_SIZES_FORMAT = "| %1$-36.36s | %2$-30.30s | %3$-30.30s | %4$-30.30s | %5$-30.30s |";
|
||||
|
||||
public static final RequestConfig DO_NOT_REPLICATE = () -> Collections.singletonMap("X-Request-Replicated", "value");
|
||||
public static final RequestConfig DO_NOT_REPLICATE = () -> Collections.singletonMap("request-replicated", Boolean.TRUE.toString());
|
||||
|
||||
public static final int CLUSTERED_CLIENT_API_BASE_PORT = 5671;
|
||||
public static final int STANDALONE_CLIENT_API_BASE_PORT = 5670;
|
||||
|
|
Loading…
Reference in New Issue