From 4bec56ed81c5ff760be07d609ecab7eba41f00c4 Mon Sep 17 00:00:00 2001 From: Peter Turcsanyi Date: Thu, 16 Mar 2023 17:41:01 +0100 Subject: [PATCH] NIFI-11296 Improved code reusability of GRPC modules This closes #7053 Signed-off-by: David Handermann --- .../nifi-grpc-bundle/nifi-grpc-common/pom.xml | 141 ++++++++++++++++++ .../grpc/FlowFileIngestService.java | 54 +++---- .../FlowFileIngestServiceInterceptor.java | 40 ++--- .../processors/grpc/GRPCAttributeNames.java | 23 +++ .../grpc/ssl/SslContextProvider.java | 0 .../grpc/util/BackpressureChecker.java | 47 ++++++ .../resources/proto/flowfile_service.proto | 0 .../grpc/ssl/SslContextProviderTest.java | 0 .../nifi-grpc-processors/pom.xml | 103 +------------ .../nifi/processors/grpc/ListenGRPC.java | 11 +- .../nifi/processors/grpc/ITListenGRPC.java | 8 +- nifi-nar-bundles/nifi-grpc-bundle/pom.xml | 1 + 12 files changed, 266 insertions(+), 162 deletions(-) create mode 100644 nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/pom.xml rename nifi-nar-bundles/nifi-grpc-bundle/{nifi-grpc-processors => nifi-grpc-common}/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestService.java (78%) rename nifi-nar-bundles/nifi-grpc-bundle/{nifi-grpc-processors => nifi-grpc-common}/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestServiceInterceptor.java (89%) create mode 100644 nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/GRPCAttributeNames.java rename nifi-nar-bundles/nifi-grpc-bundle/{nifi-grpc-processors => nifi-grpc-common}/src/main/java/org/apache/nifi/processors/grpc/ssl/SslContextProvider.java (100%) create mode 100644 nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/util/BackpressureChecker.java rename nifi-nar-bundles/nifi-grpc-bundle/{nifi-grpc-processors => nifi-grpc-common}/src/main/resources/proto/flowfile_service.proto (100%) rename nifi-nar-bundles/nifi-grpc-bundle/{nifi-grpc-processors => nifi-grpc-common}/src/test/java/org/apache/nifi/processors/grpc/ssl/SslContextProviderTest.java (100%) diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/pom.xml b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/pom.xml new file mode 100644 index 0000000000..dfdbda021f --- /dev/null +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/pom.xml @@ -0,0 +1,141 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-grpc-bundle + 2.0.0-SNAPSHOT + + + nifi-grpc-common + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-utils + 2.0.0-SNAPSHOT + + + org.apache.nifi + nifi-ssl-context-service-api + + + org.apache.nifi + nifi-security-utils-api + 2.0.0-SNAPSHOT + + + org.apache.commons + commons-lang3 + + + commons-io + commons-io + + + io.grpc + grpc-protobuf + ${grpc.version} + + + io.grpc + grpc-stub + ${grpc.version} + + + io.grpc + grpc-netty + ${grpc.version} + + + javax.annotation + javax.annotation-api + + + + org.apache.nifi + nifi-security-utils + 2.0.0-SNAPSHOT + test + + + + + + + kr.motd.maven + os-maven-plugin + 1.6.2 + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + **/grpc/FlowFileRequest.java,**/grpc/FlowFileReply.java,**/grpc/FFSProto.java,**/grpc/FlowFileServiceGrpc.java + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + ${basedir}/src/main/resources/proto + + + + + compile + compile-custom + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.0.0 + + + test + generate-sources + + add-source + + + + ${basedir}/target/generated-sources + + + + + + + + diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestService.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestService.java similarity index 78% rename from nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestService.java rename to nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestService.java index 2089981f80..d14cf0c34d 100644 --- a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestService.java +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestService.java @@ -18,26 +18,23 @@ package org.apache.nifi.processors.grpc; import com.google.common.collect.Maps; import com.google.protobuf.ByteString; - +import io.grpc.stub.StreamObserver; import org.apache.commons.io.IOUtils; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processors.grpc.util.BackpressureChecker; import java.io.BufferedOutputStream; import java.io.InputStream; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import io.grpc.stub.StreamObserver; - import static java.util.Objects.requireNonNull; /** @@ -54,11 +51,10 @@ public class FlowFileIngestService extends FlowFileServiceGrpc.FlowFileServiceIm public static final String SERVICE_NAME = "grpc://FlowFileIngestService"; public static final int FILES_BEFORE_CHECKING_DESTINATION_SPACE = 5; - private final AtomicLong filesReceived = new AtomicLong(0L); - private final AtomicBoolean spaceAvailable = new AtomicBoolean(true); private final AtomicReference sessionFactoryReference; - private final ProcessContext context; private final ComponentLog logger; + private final Relationship relSuccess; + private final BackpressureChecker backpressureChecker; /** * Create a FlowFileIngestService @@ -68,10 +64,12 @@ public class FlowFileIngestService extends FlowFileServiceGrpc.FlowFileServiceIm */ public FlowFileIngestService(final ComponentLog logger, final AtomicReference sessionFactoryReference, - final ProcessContext context) { - this.context = requireNonNull(context); + final Relationship relSuccess, + final BackpressureChecker backpressureChecker) { this.sessionFactoryReference = requireNonNull(sessionFactoryReference); this.logger = requireNonNull(logger); + this.relSuccess = requireNonNull(relSuccess); + this.backpressureChecker = requireNonNull(backpressureChecker); } /** @@ -81,7 +79,7 @@ public class FlowFileIngestService extends FlowFileServiceGrpc.FlowFileServiceIm * @param responseObserver the mechanism by which to reply to the client */ @Override - public void send(final org.apache.nifi.processors.grpc.FlowFileRequest request, final StreamObserver responseObserver) { + public void send(final FlowFileRequest request, final StreamObserver responseObserver) { final FlowFileReply.Builder replyBuilder = FlowFileReply.newBuilder(); final String remoteHost = FlowFileIngestServiceInterceptor.REMOTE_HOST_KEY.get(); @@ -102,23 +100,15 @@ public class FlowFileIngestService extends FlowFileServiceGrpc.FlowFileServiceIm final ProcessSession session = sessionFactory.createSession(); // if there's no space available, reject the request. - final long n = filesReceived.getAndIncrement() % FILES_BEFORE_CHECKING_DESTINATION_SPACE; - if (n == 0 || !spaceAvailable.get()) { - if (context.getAvailableRelationships().isEmpty()) { - spaceAvailable.set(false); - final String message = "Received request from " + remoteHost + " but no space available; Indicating Service Unavailable"; - if (logger.isDebugEnabled()) { - logger.debug(message); - } - final FlowFileReply reply = replyBuilder.setResponseCode(FlowFileReply.ResponseCode.ERROR) - .setBody(message) - .build(); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - return; - } else { - spaceAvailable.set(true); - } + if (backpressureChecker.isBackpressured()) { + final String message = "Received request from " + remoteHost + " but no space available; Indicating Service Unavailable"; + logger.debug(message); + final FlowFileReply reply = replyBuilder.setResponseCode(FlowFileReply.ResponseCode.ERROR) + .setBody(message) + .build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + return; } if (logger.isDebugEnabled()) { @@ -156,11 +146,11 @@ public class FlowFileIngestService extends FlowFileServiceGrpc.FlowFileServiceIm sourceSystemFlowFileIdentifier, "Remote DN=" + remoteDN, transferMillis); - flowFile = session.putAttribute(flowFile, ListenGRPC.REMOTE_HOST, remoteHost); - flowFile = session.putAttribute(flowFile, ListenGRPC.REMOTE_USER_DN, remoteDN); + flowFile = session.putAttribute(flowFile, GRPCAttributeNames.REMOTE_HOST, remoteHost); + flowFile = session.putAttribute(flowFile, GRPCAttributeNames.REMOTE_USER_DN, remoteDN); // register success - session.transfer(flowFile, ListenGRPC.REL_SUCCESS); + session.transfer(flowFile, relSuccess); session.commit(); // reply to client diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestServiceInterceptor.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestServiceInterceptor.java similarity index 89% rename from nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestServiceInterceptor.java rename to nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestServiceInterceptor.java index f274f744e0..b1044f668d 100644 --- a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestServiceInterceptor.java +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestServiceInterceptor.java @@ -16,16 +16,6 @@ */ package org.apache.nifi.processors.grpc; -import org.apache.nifi.logging.ComponentLog; - -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.regex.Pattern; - -import javax.net.ssl.SSLPeerUnverifiedException; -import javax.net.ssl.SSLSession; -import javax.security.cert.X509Certificate; - import io.grpc.Attributes; import io.grpc.Context; import io.grpc.Contexts; @@ -35,23 +25,33 @@ import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.Status; +import org.apache.nifi.logging.ComponentLog; + +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; +import javax.security.cert.X509Certificate; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.regex.Pattern; import static java.util.Objects.requireNonNull; /** - * Simple gRPC service call interceptor that enforces various controls + * Simple gRPC service call interceptor that enforces various controls. + * Despite its name, it does not contain any FlowFileIngestService specific logic. */ public class FlowFileIngestServiceInterceptor implements ServerInterceptor { + public static final String DEFAULT_FOUND_SUBJECT = "none"; private static final String UNKNOWN_IP = "unknown-ip"; - private static final String DN_UNAUTHORIZED = "The client DN does not have permission to post FlowFiles to this NiFi. "; + private static final String DN_UNAUTHORIZED = "The client DN does not have permission to send gRPC requests to this NiFi. "; private static final ServerCall.Listener IDENTITY_LISTENER = new ServerCall.Listener(){}; - public static final Context.Key REMOTE_HOST_KEY = Context.key(ListenGRPC.REMOTE_HOST); - public static final Context.Key REMOTE_DN_KEY = Context.key(ListenGRPC.REMOTE_USER_DN); + public static final Context.Key REMOTE_HOST_KEY = Context.key(GRPCAttributeNames.REMOTE_HOST); + public static final Context.Key REMOTE_DN_KEY = Context.key(GRPCAttributeNames.REMOTE_USER_DN); private final ComponentLog logger; - private Pattern authorizedDNpattern; + private Pattern authorizedDNPattern; /** * Create an interceptor that applies various controls per request @@ -70,7 +70,7 @@ public class FlowFileIngestServiceInterceptor implements ServerInterceptor { * @return this */ public FlowFileIngestServiceInterceptor enforceDNPattern(final Pattern authorizedDNPattern) { - this.authorizedDNpattern = requireNonNull(authorizedDNPattern); + this.authorizedDNPattern = requireNonNull(authorizedDNPattern); return this; } @@ -98,13 +98,13 @@ public class FlowFileIngestServiceInterceptor implements ServerInterceptor { // enforce that the DN on the client cert matches the configured pattern final SSLSession sslSession = attributes.get(Grpc.TRANSPORT_ATTR_SSL_SESSION); - if(this.authorizedDNpattern != null && sslSession != null) { + if (this.authorizedDNPattern != null && sslSession != null) { try { final X509Certificate[] certs = sslSession.getPeerCertificateChain(); - if(certs != null && certs.length > 0) { + if (certs != null && certs.length > 0) { for (final X509Certificate cert : certs) { foundSubject = cert.getSubjectDN().getName(); - if(authorizedDNpattern.matcher(foundSubject).matches()) { + if (authorizedDNPattern.matcher(foundSubject).matches()) { break; } else { logger.warn("Rejecting transfer attempt from " + foundSubject + " because the DN is not authorized, host=" + clientIp); @@ -114,7 +114,7 @@ public class FlowFileIngestServiceInterceptor implements ServerInterceptor { } } } catch (final SSLPeerUnverifiedException e) { - logger.debug("skipping DN authorization for request from {}.", new Object[] {clientIp}, e); + logger.debug("Skipping DN authorization for request from {}", clientIp, e); } } // contextualize the DN and IP for use in the RPC implementation diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/GRPCAttributeNames.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/GRPCAttributeNames.java new file mode 100644 index 0000000000..a2f25cf55f --- /dev/null +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/GRPCAttributeNames.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.grpc; + +public class GRPCAttributeNames { + + public static final String REMOTE_USER_DN = "listengrpc.remote.user.dn"; + public static final String REMOTE_HOST = "listengrpc.remote.host"; +} diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ssl/SslContextProvider.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/ssl/SslContextProvider.java similarity index 100% rename from nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ssl/SslContextProvider.java rename to nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/ssl/SslContextProvider.java diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/util/BackpressureChecker.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/util/BackpressureChecker.java new file mode 100644 index 0000000000..064b91e588 --- /dev/null +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/util/BackpressureChecker.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.grpc.util; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class BackpressureChecker { + + private static final int RECHECK_THRESHOLD = 5; + + private final ProcessContext processContext; + private final Set relationships; + private final AtomicLong requestCount = new AtomicLong(0L); + private final AtomicBoolean backPressure = new AtomicBoolean(false); + + public BackpressureChecker(ProcessContext processContext, Set relationships) { + this.processContext = processContext; + this.relationships = relationships; + } + + public boolean isBackpressured() { + long n = requestCount.getAndIncrement() % RECHECK_THRESHOLD; + if (n == 0 || backPressure.get()) { + backPressure.set(processContext.getAvailableRelationships().size() != relationships.size()); + } + return backPressure.get(); + } +} diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/proto/flowfile_service.proto b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/resources/proto/flowfile_service.proto similarity index 100% rename from nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/proto/flowfile_service.proto rename to nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/resources/proto/flowfile_service.proto diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/ssl/SslContextProviderTest.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/test/java/org/apache/nifi/processors/grpc/ssl/SslContextProviderTest.java similarity index 100% rename from nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/ssl/SslContextProviderTest.java rename to nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/test/java/org/apache/nifi/processors/grpc/ssl/SslContextProviderTest.java diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/pom.xml b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/pom.xml index 2d428e1035..6584c6bd5d 100644 --- a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/pom.xml +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/pom.xml @@ -24,49 +24,14 @@ language governing permissions and limitations under the License. --> org.apache.nifi - nifi-api - - - org.apache.nifi - nifi-utils + nifi-grpc-common 2.0.0-SNAPSHOT org.apache.nifi nifi-ssl-context-service-api - - org.apache.nifi - nifi-security-utils-api - 2.0.0-SNAPSHOT - - - org.apache.commons - commons-lang3 - - - commons-io - commons-io - - - io.grpc - grpc-netty - ${grpc.version} - - - io.grpc - grpc-protobuf - ${grpc.version} - - - io.grpc - grpc-stub - ${grpc.version} - - - javax.annotation - javax.annotation-api - + org.apache.nifi nifi-mock @@ -78,69 +43,5 @@ language governing permissions and limitations under the License. --> nifi-ssl-context-service test - - org.apache.nifi - nifi-security-utils - 2.0.0-SNAPSHOT - test - - - - - kr.motd.maven - os-maven-plugin - 1.6.2 - - - - - - org.apache.maven.plugins - maven-checkstyle-plugin - - **/grpc/FlowFileRequest.java,**/grpc/FlowFileReply.java,**/grpc/FFSProto.java,**/grpc/FlowFileServiceGrpc.java - - - - org.xolstice.maven.plugins - protobuf-maven-plugin - 0.6.1 - - com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier} - grpc-java - io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} - ${basedir}/src/main/resources/proto - - - - - compile - compile-custom - - - - - - - org.codehaus.mojo - build-helper-maven-plugin - 3.0.0 - - - test - generate-sources - - add-source - - - - ${basedir}/target/generated-sources - - - - - - - diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java index 229fae28fb..b195b11255 100644 --- a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java @@ -42,6 +42,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.grpc.ssl.SslContextProvider; +import org.apache.nifi.processors.grpc.util.BackpressureChecker; import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.SSLContextService; @@ -61,12 +62,10 @@ import java.util.regex.Pattern; " so this processor should be used only when FlowFile sizes are on the order of megabytes. The default maximum message size is 4MB.") @Tags({"ingest", "grpc", "rpc", "listen"}) @WritesAttributes({ - @WritesAttribute(attribute = "listengrpc.remote.user.dn", description = "The DN of the user who sent the FlowFile to this NiFi"), - @WritesAttribute(attribute = "listengrpc.remote.host", description = "The IP of the client who sent the FlowFile to this NiFi") + @WritesAttribute(attribute = GRPCAttributeNames.REMOTE_USER_DN, description = "The DN of the user who sent the FlowFile to this NiFi"), + @WritesAttribute(attribute = GRPCAttributeNames.REMOTE_HOST, description = "The IP of the client who sent the FlowFile to this NiFi") }) public class ListenGRPC extends AbstractSessionFactoryProcessor { - public static final String REMOTE_USER_DN = "listengrpc.remote.user.dn"; - public static final String REMOTE_HOST = "listengrpc.remote.host"; // properties public static final PropertyDescriptor PROP_SERVICE_PORT = new PropertyDescriptor.Builder() @@ -187,9 +186,11 @@ public class ListenGRPC extends AbstractSessionFactoryProcessor { final FlowFileIngestServiceInterceptor callInterceptor = new FlowFileIngestServiceInterceptor(getLogger()); callInterceptor.enforceDNPattern(authorizedDnPattern); + final BackpressureChecker backpressureChecker = new BackpressureChecker(context, getRelationships()); final FlowFileIngestService flowFileIngestService = new FlowFileIngestService(getLogger(), sessionFactoryReference, - context); + REL_SUCCESS, + backpressureChecker); final NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port) .addService(ServerInterceptors.intercept(flowFileIngestService, callInterceptor)) // default (de)compressor registries handle both plaintext and gzip compressed messages diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/ITListenGRPC.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/ITListenGRPC.java index abeacb281f..ac2530de6d 100644 --- a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/ITListenGRPC.java +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/ITListenGRPC.java @@ -73,8 +73,8 @@ public class ITListenGRPC { assertThat(successFiles.size(), equalTo(1)); final MockFlowFile mockFlowFile = successFiles.get(0); assertThat(mockFlowFile.getAttribute("FOO"), equalTo("BAR")); - assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_HOST), equalTo("127.0.0.1")); - assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_USER_DN), equalTo(FlowFileIngestServiceInterceptor.DEFAULT_FOUND_SUBJECT)); + assertThat(mockFlowFile.getAttribute(GRPCAttributeNames.REMOTE_HOST), equalTo("127.0.0.1")); + assertThat(mockFlowFile.getAttribute(GRPCAttributeNames.REMOTE_USER_DN), equalTo(FlowFileIngestServiceInterceptor.DEFAULT_FOUND_SUBJECT)); } finally { // stop the server @@ -157,8 +157,8 @@ public class ITListenGRPC { assertThat(successFiles.size(), equalTo(1)); final MockFlowFile mockFlowFile = successFiles.get(0); assertThat(mockFlowFile.getAttribute("FOO"), equalTo("BAR")); - assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_HOST), equalTo("127.0.0.1")); - assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_USER_DN), equalTo(FlowFileIngestServiceInterceptor.DEFAULT_FOUND_SUBJECT)); + assertThat(mockFlowFile.getAttribute(GRPCAttributeNames.REMOTE_HOST), equalTo("127.0.0.1")); + assertThat(mockFlowFile.getAttribute(GRPCAttributeNames.REMOTE_USER_DN), equalTo(FlowFileIngestServiceInterceptor.DEFAULT_FOUND_SUBJECT)); } finally { // stop the server diff --git a/nifi-nar-bundles/nifi-grpc-bundle/pom.xml b/nifi-nar-bundles/nifi-grpc-bundle/pom.xml index 18ed89f7c2..7de79ce1ed 100644 --- a/nifi-nar-bundles/nifi-grpc-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-grpc-bundle/pom.xml @@ -27,6 +27,7 @@ A bundle of processors that speak the gRPC protocol + nifi-grpc-common nifi-grpc-processors nifi-grpc-nar