diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/pom.xml b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/pom.xml
new file mode 100644
index 0000000000..dfdbda021f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/pom.xml
@@ -0,0 +1,141 @@
+
+
+
+ 4.0.0
+
+ org.apache.nifi
+ nifi-grpc-bundle
+ 2.0.0-SNAPSHOT
+
+
+ nifi-grpc-common
+
+
+
+ org.apache.nifi
+ nifi-api
+
+
+ org.apache.nifi
+ nifi-utils
+ 2.0.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-ssl-context-service-api
+
+
+ org.apache.nifi
+ nifi-security-utils-api
+ 2.0.0-SNAPSHOT
+
+
+ org.apache.commons
+ commons-lang3
+
+
+ commons-io
+ commons-io
+
+
+ io.grpc
+ grpc-protobuf
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-stub
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-netty
+ ${grpc.version}
+
+
+ javax.annotation
+ javax.annotation-api
+
+
+
+ org.apache.nifi
+ nifi-security-utils
+ 2.0.0-SNAPSHOT
+ test
+
+
+
+
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.6.2
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+ **/grpc/FlowFileRequest.java,**/grpc/FlowFileReply.java,**/grpc/FFSProto.java,**/grpc/FlowFileServiceGrpc.java
+
+
+
+ org.xolstice.maven.plugins
+ protobuf-maven-plugin
+ 0.6.1
+
+ com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
+ grpc-java
+ io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ ${basedir}/src/main/resources/proto
+
+
+
+
+ compile
+ compile-custom
+
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ 3.0.0
+
+
+ test
+ generate-sources
+
+ add-source
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestService.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestService.java
similarity index 78%
rename from nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestService.java
rename to nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestService.java
index 2089981f80..d14cf0c34d 100644
--- a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestService.java
+++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestService.java
@@ -18,26 +18,23 @@ package org.apache.nifi.processors.grpc;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
-
+import io.grpc.stub.StreamObserver;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.grpc.util.BackpressureChecker;
import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import io.grpc.stub.StreamObserver;
-
import static java.util.Objects.requireNonNull;
/**
@@ -54,11 +51,10 @@ public class FlowFileIngestService extends FlowFileServiceGrpc.FlowFileServiceIm
public static final String SERVICE_NAME = "grpc://FlowFileIngestService";
public static final int FILES_BEFORE_CHECKING_DESTINATION_SPACE = 5;
- private final AtomicLong filesReceived = new AtomicLong(0L);
- private final AtomicBoolean spaceAvailable = new AtomicBoolean(true);
private final AtomicReference sessionFactoryReference;
- private final ProcessContext context;
private final ComponentLog logger;
+ private final Relationship relSuccess;
+ private final BackpressureChecker backpressureChecker;
/**
* Create a FlowFileIngestService
@@ -68,10 +64,12 @@ public class FlowFileIngestService extends FlowFileServiceGrpc.FlowFileServiceIm
*/
public FlowFileIngestService(final ComponentLog logger,
final AtomicReference sessionFactoryReference,
- final ProcessContext context) {
- this.context = requireNonNull(context);
+ final Relationship relSuccess,
+ final BackpressureChecker backpressureChecker) {
this.sessionFactoryReference = requireNonNull(sessionFactoryReference);
this.logger = requireNonNull(logger);
+ this.relSuccess = requireNonNull(relSuccess);
+ this.backpressureChecker = requireNonNull(backpressureChecker);
}
/**
@@ -81,7 +79,7 @@ public class FlowFileIngestService extends FlowFileServiceGrpc.FlowFileServiceIm
* @param responseObserver the mechanism by which to reply to the client
*/
@Override
- public void send(final org.apache.nifi.processors.grpc.FlowFileRequest request, final StreamObserver responseObserver) {
+ public void send(final FlowFileRequest request, final StreamObserver responseObserver) {
final FlowFileReply.Builder replyBuilder = FlowFileReply.newBuilder();
final String remoteHost = FlowFileIngestServiceInterceptor.REMOTE_HOST_KEY.get();
@@ -102,23 +100,15 @@ public class FlowFileIngestService extends FlowFileServiceGrpc.FlowFileServiceIm
final ProcessSession session = sessionFactory.createSession();
// if there's no space available, reject the request.
- final long n = filesReceived.getAndIncrement() % FILES_BEFORE_CHECKING_DESTINATION_SPACE;
- if (n == 0 || !spaceAvailable.get()) {
- if (context.getAvailableRelationships().isEmpty()) {
- spaceAvailable.set(false);
- final String message = "Received request from " + remoteHost + " but no space available; Indicating Service Unavailable";
- if (logger.isDebugEnabled()) {
- logger.debug(message);
- }
- final FlowFileReply reply = replyBuilder.setResponseCode(FlowFileReply.ResponseCode.ERROR)
- .setBody(message)
- .build();
- responseObserver.onNext(reply);
- responseObserver.onCompleted();
- return;
- } else {
- spaceAvailable.set(true);
- }
+ if (backpressureChecker.isBackpressured()) {
+ final String message = "Received request from " + remoteHost + " but no space available; Indicating Service Unavailable";
+ logger.debug(message);
+ final FlowFileReply reply = replyBuilder.setResponseCode(FlowFileReply.ResponseCode.ERROR)
+ .setBody(message)
+ .build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ return;
}
if (logger.isDebugEnabled()) {
@@ -156,11 +146,11 @@ public class FlowFileIngestService extends FlowFileServiceGrpc.FlowFileServiceIm
sourceSystemFlowFileIdentifier,
"Remote DN=" + remoteDN,
transferMillis);
- flowFile = session.putAttribute(flowFile, ListenGRPC.REMOTE_HOST, remoteHost);
- flowFile = session.putAttribute(flowFile, ListenGRPC.REMOTE_USER_DN, remoteDN);
+ flowFile = session.putAttribute(flowFile, GRPCAttributeNames.REMOTE_HOST, remoteHost);
+ flowFile = session.putAttribute(flowFile, GRPCAttributeNames.REMOTE_USER_DN, remoteDN);
// register success
- session.transfer(flowFile, ListenGRPC.REL_SUCCESS);
+ session.transfer(flowFile, relSuccess);
session.commit();
// reply to client
diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestServiceInterceptor.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestServiceInterceptor.java
similarity index 89%
rename from nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestServiceInterceptor.java
rename to nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestServiceInterceptor.java
index f274f744e0..b1044f668d 100644
--- a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestServiceInterceptor.java
+++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/FlowFileIngestServiceInterceptor.java
@@ -16,16 +16,6 @@
*/
package org.apache.nifi.processors.grpc;
-import org.apache.nifi.logging.ComponentLog;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.regex.Pattern;
-
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLSession;
-import javax.security.cert.X509Certificate;
-
import io.grpc.Attributes;
import io.grpc.Context;
import io.grpc.Contexts;
@@ -35,23 +25,33 @@ import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
+import org.apache.nifi.logging.ComponentLog;
+
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.security.cert.X509Certificate;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.regex.Pattern;
import static java.util.Objects.requireNonNull;
/**
- * Simple gRPC service call interceptor that enforces various controls
+ * Simple gRPC service call interceptor that enforces various controls.
+ * Despite its name, it does not contain any FlowFileIngestService specific logic.
*/
public class FlowFileIngestServiceInterceptor implements ServerInterceptor {
+
public static final String DEFAULT_FOUND_SUBJECT = "none";
private static final String UNKNOWN_IP = "unknown-ip";
- private static final String DN_UNAUTHORIZED = "The client DN does not have permission to post FlowFiles to this NiFi. ";
+ private static final String DN_UNAUTHORIZED = "The client DN does not have permission to send gRPC requests to this NiFi. ";
private static final ServerCall.Listener IDENTITY_LISTENER = new ServerCall.Listener(){};
- public static final Context.Key REMOTE_HOST_KEY = Context.key(ListenGRPC.REMOTE_HOST);
- public static final Context.Key REMOTE_DN_KEY = Context.key(ListenGRPC.REMOTE_USER_DN);
+ public static final Context.Key REMOTE_HOST_KEY = Context.key(GRPCAttributeNames.REMOTE_HOST);
+ public static final Context.Key REMOTE_DN_KEY = Context.key(GRPCAttributeNames.REMOTE_USER_DN);
private final ComponentLog logger;
- private Pattern authorizedDNpattern;
+ private Pattern authorizedDNPattern;
/**
* Create an interceptor that applies various controls per request
@@ -70,7 +70,7 @@ public class FlowFileIngestServiceInterceptor implements ServerInterceptor {
* @return this
*/
public FlowFileIngestServiceInterceptor enforceDNPattern(final Pattern authorizedDNPattern) {
- this.authorizedDNpattern = requireNonNull(authorizedDNPattern);
+ this.authorizedDNPattern = requireNonNull(authorizedDNPattern);
return this;
}
@@ -98,13 +98,13 @@ public class FlowFileIngestServiceInterceptor implements ServerInterceptor {
// enforce that the DN on the client cert matches the configured pattern
final SSLSession sslSession = attributes.get(Grpc.TRANSPORT_ATTR_SSL_SESSION);
- if(this.authorizedDNpattern != null && sslSession != null) {
+ if (this.authorizedDNPattern != null && sslSession != null) {
try {
final X509Certificate[] certs = sslSession.getPeerCertificateChain();
- if(certs != null && certs.length > 0) {
+ if (certs != null && certs.length > 0) {
for (final X509Certificate cert : certs) {
foundSubject = cert.getSubjectDN().getName();
- if(authorizedDNpattern.matcher(foundSubject).matches()) {
+ if (authorizedDNPattern.matcher(foundSubject).matches()) {
break;
} else {
logger.warn("Rejecting transfer attempt from " + foundSubject + " because the DN is not authorized, host=" + clientIp);
@@ -114,7 +114,7 @@ public class FlowFileIngestServiceInterceptor implements ServerInterceptor {
}
}
} catch (final SSLPeerUnverifiedException e) {
- logger.debug("skipping DN authorization for request from {}.", new Object[] {clientIp}, e);
+ logger.debug("Skipping DN authorization for request from {}", clientIp, e);
}
}
// contextualize the DN and IP for use in the RPC implementation
diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/GRPCAttributeNames.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/GRPCAttributeNames.java
new file mode 100644
index 0000000000..a2f25cf55f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/GRPCAttributeNames.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.grpc;
+
+public class GRPCAttributeNames {
+
+ public static final String REMOTE_USER_DN = "listengrpc.remote.user.dn";
+ public static final String REMOTE_HOST = "listengrpc.remote.host";
+}
diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ssl/SslContextProvider.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/ssl/SslContextProvider.java
similarity index 100%
rename from nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ssl/SslContextProvider.java
rename to nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/ssl/SslContextProvider.java
diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/util/BackpressureChecker.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/util/BackpressureChecker.java
new file mode 100644
index 0000000000..064b91e588
--- /dev/null
+++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/java/org/apache/nifi/processors/grpc/util/BackpressureChecker.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.grpc.util;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class BackpressureChecker {
+
+ private static final int RECHECK_THRESHOLD = 5;
+
+ private final ProcessContext processContext;
+ private final Set relationships;
+ private final AtomicLong requestCount = new AtomicLong(0L);
+ private final AtomicBoolean backPressure = new AtomicBoolean(false);
+
+ public BackpressureChecker(ProcessContext processContext, Set relationships) {
+ this.processContext = processContext;
+ this.relationships = relationships;
+ }
+
+ public boolean isBackpressured() {
+ long n = requestCount.getAndIncrement() % RECHECK_THRESHOLD;
+ if (n == 0 || backPressure.get()) {
+ backPressure.set(processContext.getAvailableRelationships().size() != relationships.size());
+ }
+ return backPressure.get();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/proto/flowfile_service.proto b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/resources/proto/flowfile_service.proto
similarity index 100%
rename from nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/proto/flowfile_service.proto
rename to nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/main/resources/proto/flowfile_service.proto
diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/ssl/SslContextProviderTest.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/test/java/org/apache/nifi/processors/grpc/ssl/SslContextProviderTest.java
similarity index 100%
rename from nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/ssl/SslContextProviderTest.java
rename to nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-common/src/test/java/org/apache/nifi/processors/grpc/ssl/SslContextProviderTest.java
diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/pom.xml b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/pom.xml
index 2d428e1035..6584c6bd5d 100644
--- a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/pom.xml
@@ -24,49 +24,14 @@ language governing permissions and limitations under the License. -->
org.apache.nifi
- nifi-api
-
-
- org.apache.nifi
- nifi-utils
+ nifi-grpc-common2.0.0-SNAPSHOTorg.apache.nifinifi-ssl-context-service-api
-
- org.apache.nifi
- nifi-security-utils-api
- 2.0.0-SNAPSHOT
-
-
- org.apache.commons
- commons-lang3
-
-
- commons-io
- commons-io
-
-
- io.grpc
- grpc-netty
- ${grpc.version}
-
-
- io.grpc
- grpc-protobuf
- ${grpc.version}
-
-
- io.grpc
- grpc-stub
- ${grpc.version}
-
-
- javax.annotation
- javax.annotation-api
-
+
org.apache.nifinifi-mock
@@ -78,69 +43,5 @@ language governing permissions and limitations under the License. -->
nifi-ssl-context-servicetest
-
- org.apache.nifi
- nifi-security-utils
- 2.0.0-SNAPSHOT
- test
-
-
-
-
- kr.motd.maven
- os-maven-plugin
- 1.6.2
-
-
-
-
-
- org.apache.maven.plugins
- maven-checkstyle-plugin
-
- **/grpc/FlowFileRequest.java,**/grpc/FlowFileReply.java,**/grpc/FFSProto.java,**/grpc/FlowFileServiceGrpc.java
-
-
-
- org.xolstice.maven.plugins
- protobuf-maven-plugin
- 0.6.1
-
- com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
- grpc-java
- io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
- ${basedir}/src/main/resources/proto
-
-
-
-
- compile
- compile-custom
-
-
-
-
-
-
- org.codehaus.mojo
- build-helper-maven-plugin
- 3.0.0
-
-
- test
- generate-sources
-
- add-source
-
-
-
-
-
-
-
-
-
-
-
diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java
index 229fae28fb..b195b11255 100644
--- a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java
+++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java
@@ -42,6 +42,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.grpc.ssl.SslContextProvider;
+import org.apache.nifi.processors.grpc.util.BackpressureChecker;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
@@ -61,12 +62,10 @@ import java.util.regex.Pattern;
" so this processor should be used only when FlowFile sizes are on the order of megabytes. The default maximum message size is 4MB.")
@Tags({"ingest", "grpc", "rpc", "listen"})
@WritesAttributes({
- @WritesAttribute(attribute = "listengrpc.remote.user.dn", description = "The DN of the user who sent the FlowFile to this NiFi"),
- @WritesAttribute(attribute = "listengrpc.remote.host", description = "The IP of the client who sent the FlowFile to this NiFi")
+ @WritesAttribute(attribute = GRPCAttributeNames.REMOTE_USER_DN, description = "The DN of the user who sent the FlowFile to this NiFi"),
+ @WritesAttribute(attribute = GRPCAttributeNames.REMOTE_HOST, description = "The IP of the client who sent the FlowFile to this NiFi")
})
public class ListenGRPC extends AbstractSessionFactoryProcessor {
- public static final String REMOTE_USER_DN = "listengrpc.remote.user.dn";
- public static final String REMOTE_HOST = "listengrpc.remote.host";
// properties
public static final PropertyDescriptor PROP_SERVICE_PORT = new PropertyDescriptor.Builder()
@@ -187,9 +186,11 @@ public class ListenGRPC extends AbstractSessionFactoryProcessor {
final FlowFileIngestServiceInterceptor callInterceptor = new FlowFileIngestServiceInterceptor(getLogger());
callInterceptor.enforceDNPattern(authorizedDnPattern);
+ final BackpressureChecker backpressureChecker = new BackpressureChecker(context, getRelationships());
final FlowFileIngestService flowFileIngestService = new FlowFileIngestService(getLogger(),
sessionFactoryReference,
- context);
+ REL_SUCCESS,
+ backpressureChecker);
final NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port)
.addService(ServerInterceptors.intercept(flowFileIngestService, callInterceptor))
// default (de)compressor registries handle both plaintext and gzip compressed messages
diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/ITListenGRPC.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/ITListenGRPC.java
index abeacb281f..ac2530de6d 100644
--- a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/ITListenGRPC.java
+++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/ITListenGRPC.java
@@ -73,8 +73,8 @@ public class ITListenGRPC {
assertThat(successFiles.size(), equalTo(1));
final MockFlowFile mockFlowFile = successFiles.get(0);
assertThat(mockFlowFile.getAttribute("FOO"), equalTo("BAR"));
- assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_HOST), equalTo("127.0.0.1"));
- assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_USER_DN), equalTo(FlowFileIngestServiceInterceptor.DEFAULT_FOUND_SUBJECT));
+ assertThat(mockFlowFile.getAttribute(GRPCAttributeNames.REMOTE_HOST), equalTo("127.0.0.1"));
+ assertThat(mockFlowFile.getAttribute(GRPCAttributeNames.REMOTE_USER_DN), equalTo(FlowFileIngestServiceInterceptor.DEFAULT_FOUND_SUBJECT));
} finally {
// stop the server
@@ -157,8 +157,8 @@ public class ITListenGRPC {
assertThat(successFiles.size(), equalTo(1));
final MockFlowFile mockFlowFile = successFiles.get(0);
assertThat(mockFlowFile.getAttribute("FOO"), equalTo("BAR"));
- assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_HOST), equalTo("127.0.0.1"));
- assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_USER_DN), equalTo(FlowFileIngestServiceInterceptor.DEFAULT_FOUND_SUBJECT));
+ assertThat(mockFlowFile.getAttribute(GRPCAttributeNames.REMOTE_HOST), equalTo("127.0.0.1"));
+ assertThat(mockFlowFile.getAttribute(GRPCAttributeNames.REMOTE_USER_DN), equalTo(FlowFileIngestServiceInterceptor.DEFAULT_FOUND_SUBJECT));
} finally {
// stop the server
diff --git a/nifi-nar-bundles/nifi-grpc-bundle/pom.xml b/nifi-nar-bundles/nifi-grpc-bundle/pom.xml
index 18ed89f7c2..7de79ce1ed 100644
--- a/nifi-nar-bundles/nifi-grpc-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-grpc-bundle/pom.xml
@@ -27,6 +27,7 @@
A bundle of processors that speak the gRPC protocol
+ nifi-grpc-commonnifi-grpc-processorsnifi-grpc-nar