NIFI-11296 Improved code reusability of GRPC modules

This closes #7053

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Peter Turcsanyi 2023-03-16 17:41:01 +01:00 committed by exceptionfactory
parent 2873575fce
commit 4bec56ed81
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
12 changed files with 266 additions and 162 deletions

View File

@ -0,0 +1,141 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-grpc-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-grpc-common</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
<excludes>**/grpc/FlowFileRequest.java,**/grpc/FlowFileReply.java,**/grpc/FFSProto.java,**/grpc/FlowFileServiceGrpc.java</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<protoSourceRoot>${basedir}/src/main/resources/proto</protoSourceRoot>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>test</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${basedir}/target/generated-sources</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -18,26 +18,23 @@ package org.apache.nifi.processors.grpc;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.grpc.util.BackpressureChecker;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.InputStream; import java.io.InputStream;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import io.grpc.stub.StreamObserver;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
/** /**
@ -54,11 +51,10 @@ public class FlowFileIngestService extends FlowFileServiceGrpc.FlowFileServiceIm
public static final String SERVICE_NAME = "grpc://FlowFileIngestService"; public static final String SERVICE_NAME = "grpc://FlowFileIngestService";
public static final int FILES_BEFORE_CHECKING_DESTINATION_SPACE = 5; public static final int FILES_BEFORE_CHECKING_DESTINATION_SPACE = 5;
private final AtomicLong filesReceived = new AtomicLong(0L);
private final AtomicBoolean spaceAvailable = new AtomicBoolean(true);
private final AtomicReference<ProcessSessionFactory> sessionFactoryReference; private final AtomicReference<ProcessSessionFactory> sessionFactoryReference;
private final ProcessContext context;
private final ComponentLog logger; private final ComponentLog logger;
private final Relationship relSuccess;
private final BackpressureChecker backpressureChecker;
/** /**
* Create a FlowFileIngestService * Create a FlowFileIngestService
@ -68,10 +64,12 @@ public class FlowFileIngestService extends FlowFileServiceGrpc.FlowFileServiceIm
*/ */
public FlowFileIngestService(final ComponentLog logger, public FlowFileIngestService(final ComponentLog logger,
final AtomicReference<ProcessSessionFactory> sessionFactoryReference, final AtomicReference<ProcessSessionFactory> sessionFactoryReference,
final ProcessContext context) { final Relationship relSuccess,
this.context = requireNonNull(context); final BackpressureChecker backpressureChecker) {
this.sessionFactoryReference = requireNonNull(sessionFactoryReference); this.sessionFactoryReference = requireNonNull(sessionFactoryReference);
this.logger = requireNonNull(logger); this.logger = requireNonNull(logger);
this.relSuccess = requireNonNull(relSuccess);
this.backpressureChecker = requireNonNull(backpressureChecker);
} }
/** /**
@ -81,7 +79,7 @@ public class FlowFileIngestService extends FlowFileServiceGrpc.FlowFileServiceIm
* @param responseObserver the mechanism by which to reply to the client * @param responseObserver the mechanism by which to reply to the client
*/ */
@Override @Override
public void send(final org.apache.nifi.processors.grpc.FlowFileRequest request, final StreamObserver<FlowFileReply> responseObserver) { public void send(final FlowFileRequest request, final StreamObserver<FlowFileReply> responseObserver) {
final FlowFileReply.Builder replyBuilder = FlowFileReply.newBuilder(); final FlowFileReply.Builder replyBuilder = FlowFileReply.newBuilder();
final String remoteHost = FlowFileIngestServiceInterceptor.REMOTE_HOST_KEY.get(); final String remoteHost = FlowFileIngestServiceInterceptor.REMOTE_HOST_KEY.get();
@ -102,23 +100,15 @@ public class FlowFileIngestService extends FlowFileServiceGrpc.FlowFileServiceIm
final ProcessSession session = sessionFactory.createSession(); final ProcessSession session = sessionFactory.createSession();
// if there's no space available, reject the request. // if there's no space available, reject the request.
final long n = filesReceived.getAndIncrement() % FILES_BEFORE_CHECKING_DESTINATION_SPACE; if (backpressureChecker.isBackpressured()) {
if (n == 0 || !spaceAvailable.get()) {
if (context.getAvailableRelationships().isEmpty()) {
spaceAvailable.set(false);
final String message = "Received request from " + remoteHost + " but no space available; Indicating Service Unavailable"; final String message = "Received request from " + remoteHost + " but no space available; Indicating Service Unavailable";
if (logger.isDebugEnabled()) {
logger.debug(message); logger.debug(message);
}
final FlowFileReply reply = replyBuilder.setResponseCode(FlowFileReply.ResponseCode.ERROR) final FlowFileReply reply = replyBuilder.setResponseCode(FlowFileReply.ResponseCode.ERROR)
.setBody(message) .setBody(message)
.build(); .build();
responseObserver.onNext(reply); responseObserver.onNext(reply);
responseObserver.onCompleted(); responseObserver.onCompleted();
return; return;
} else {
spaceAvailable.set(true);
}
} }
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
@ -156,11 +146,11 @@ public class FlowFileIngestService extends FlowFileServiceGrpc.FlowFileServiceIm
sourceSystemFlowFileIdentifier, sourceSystemFlowFileIdentifier,
"Remote DN=" + remoteDN, "Remote DN=" + remoteDN,
transferMillis); transferMillis);
flowFile = session.putAttribute(flowFile, ListenGRPC.REMOTE_HOST, remoteHost); flowFile = session.putAttribute(flowFile, GRPCAttributeNames.REMOTE_HOST, remoteHost);
flowFile = session.putAttribute(flowFile, ListenGRPC.REMOTE_USER_DN, remoteDN); flowFile = session.putAttribute(flowFile, GRPCAttributeNames.REMOTE_USER_DN, remoteDN);
// register success // register success
session.transfer(flowFile, ListenGRPC.REL_SUCCESS); session.transfer(flowFile, relSuccess);
session.commit(); session.commit();
// reply to client // reply to client

View File

@ -16,16 +16,6 @@
*/ */
package org.apache.nifi.processors.grpc; package org.apache.nifi.processors.grpc;
import org.apache.nifi.logging.ComponentLog;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.regex.Pattern;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.security.cert.X509Certificate;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.Context; import io.grpc.Context;
import io.grpc.Contexts; import io.grpc.Contexts;
@ -35,23 +25,33 @@ import io.grpc.ServerCall;
import io.grpc.ServerCallHandler; import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptor;
import io.grpc.Status; import io.grpc.Status;
import org.apache.nifi.logging.ComponentLog;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.security.cert.X509Certificate;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.regex.Pattern;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
/** /**
* Simple gRPC service call interceptor that enforces various controls * Simple gRPC service call interceptor that enforces various controls.
* Despite its name, it does not contain any FlowFileIngestService specific logic.
*/ */
public class FlowFileIngestServiceInterceptor implements ServerInterceptor { public class FlowFileIngestServiceInterceptor implements ServerInterceptor {
public static final String DEFAULT_FOUND_SUBJECT = "none"; public static final String DEFAULT_FOUND_SUBJECT = "none";
private static final String UNKNOWN_IP = "unknown-ip"; private static final String UNKNOWN_IP = "unknown-ip";
private static final String DN_UNAUTHORIZED = "The client DN does not have permission to post FlowFiles to this NiFi. "; private static final String DN_UNAUTHORIZED = "The client DN does not have permission to send gRPC requests to this NiFi. ";
private static final ServerCall.Listener IDENTITY_LISTENER = new ServerCall.Listener(){}; private static final ServerCall.Listener IDENTITY_LISTENER = new ServerCall.Listener(){};
public static final Context.Key<String> REMOTE_HOST_KEY = Context.key(ListenGRPC.REMOTE_HOST); public static final Context.Key<String> REMOTE_HOST_KEY = Context.key(GRPCAttributeNames.REMOTE_HOST);
public static final Context.Key<String> REMOTE_DN_KEY = Context.key(ListenGRPC.REMOTE_USER_DN); public static final Context.Key<String> REMOTE_DN_KEY = Context.key(GRPCAttributeNames.REMOTE_USER_DN);
private final ComponentLog logger; private final ComponentLog logger;
private Pattern authorizedDNpattern; private Pattern authorizedDNPattern;
/** /**
* Create an interceptor that applies various controls per request * Create an interceptor that applies various controls per request
@ -70,7 +70,7 @@ public class FlowFileIngestServiceInterceptor implements ServerInterceptor {
* @return this * @return this
*/ */
public FlowFileIngestServiceInterceptor enforceDNPattern(final Pattern authorizedDNPattern) { public FlowFileIngestServiceInterceptor enforceDNPattern(final Pattern authorizedDNPattern) {
this.authorizedDNpattern = requireNonNull(authorizedDNPattern); this.authorizedDNPattern = requireNonNull(authorizedDNPattern);
return this; return this;
} }
@ -98,13 +98,13 @@ public class FlowFileIngestServiceInterceptor implements ServerInterceptor {
// enforce that the DN on the client cert matches the configured pattern // enforce that the DN on the client cert matches the configured pattern
final SSLSession sslSession = attributes.get(Grpc.TRANSPORT_ATTR_SSL_SESSION); final SSLSession sslSession = attributes.get(Grpc.TRANSPORT_ATTR_SSL_SESSION);
if(this.authorizedDNpattern != null && sslSession != null) { if (this.authorizedDNPattern != null && sslSession != null) {
try { try {
final X509Certificate[] certs = sslSession.getPeerCertificateChain(); final X509Certificate[] certs = sslSession.getPeerCertificateChain();
if(certs != null && certs.length > 0) { if (certs != null && certs.length > 0) {
for (final X509Certificate cert : certs) { for (final X509Certificate cert : certs) {
foundSubject = cert.getSubjectDN().getName(); foundSubject = cert.getSubjectDN().getName();
if(authorizedDNpattern.matcher(foundSubject).matches()) { if (authorizedDNPattern.matcher(foundSubject).matches()) {
break; break;
} else { } else {
logger.warn("Rejecting transfer attempt from " + foundSubject + " because the DN is not authorized, host=" + clientIp); logger.warn("Rejecting transfer attempt from " + foundSubject + " because the DN is not authorized, host=" + clientIp);
@ -114,7 +114,7 @@ public class FlowFileIngestServiceInterceptor implements ServerInterceptor {
} }
} }
} catch (final SSLPeerUnverifiedException e) { } catch (final SSLPeerUnverifiedException e) {
logger.debug("skipping DN authorization for request from {}.", new Object[] {clientIp}, e); logger.debug("Skipping DN authorization for request from {}", clientIp, e);
} }
} }
// contextualize the DN and IP for use in the RPC implementation // contextualize the DN and IP for use in the RPC implementation

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.grpc;
public class GRPCAttributeNames {
public static final String REMOTE_USER_DN = "listengrpc.remote.user.dn";
public static final String REMOTE_HOST = "listengrpc.remote.host";
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.grpc.util;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class BackpressureChecker {
private static final int RECHECK_THRESHOLD = 5;
private final ProcessContext processContext;
private final Set<Relationship> relationships;
private final AtomicLong requestCount = new AtomicLong(0L);
private final AtomicBoolean backPressure = new AtomicBoolean(false);
public BackpressureChecker(ProcessContext processContext, Set<Relationship> relationships) {
this.processContext = processContext;
this.relationships = relationships;
}
public boolean isBackpressured() {
long n = requestCount.getAndIncrement() % RECHECK_THRESHOLD;
if (n == 0 || backPressure.get()) {
backPressure.set(processContext.getAvailableRelationships().size() != relationships.size());
}
return backPressure.get();
}
}

View File

@ -24,49 +24,14 @@ language governing permissions and limitations under the License. -->
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId> <artifactId>nifi-grpc-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>2.0.0-SNAPSHOT</version> <version>2.0.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId> <artifactId>nifi-ssl-context-service-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId> <artifactId>nifi-mock</artifactId>
@ -78,69 +43,5 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-ssl-context-service</artifactId> <artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
<excludes>**/grpc/FlowFileRequest.java,**/grpc/FlowFileReply.java,**/grpc/FFSProto.java,**/grpc/FlowFileServiceGrpc.java</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<protoSourceRoot>${basedir}/src/main/resources/proto</protoSourceRoot>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>test</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${basedir}/target/generated-sources</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -42,6 +42,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.grpc.ssl.SslContextProvider; import org.apache.nifi.processors.grpc.ssl.SslContextProvider;
import org.apache.nifi.processors.grpc.util.BackpressureChecker;
import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
@ -61,12 +62,10 @@ import java.util.regex.Pattern;
" so this processor should be used only when FlowFile sizes are on the order of megabytes. The default maximum message size is 4MB.") " so this processor should be used only when FlowFile sizes are on the order of megabytes. The default maximum message size is 4MB.")
@Tags({"ingest", "grpc", "rpc", "listen"}) @Tags({"ingest", "grpc", "rpc", "listen"})
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "listengrpc.remote.user.dn", description = "The DN of the user who sent the FlowFile to this NiFi"), @WritesAttribute(attribute = GRPCAttributeNames.REMOTE_USER_DN, description = "The DN of the user who sent the FlowFile to this NiFi"),
@WritesAttribute(attribute = "listengrpc.remote.host", description = "The IP of the client who sent the FlowFile to this NiFi") @WritesAttribute(attribute = GRPCAttributeNames.REMOTE_HOST, description = "The IP of the client who sent the FlowFile to this NiFi")
}) })
public class ListenGRPC extends AbstractSessionFactoryProcessor { public class ListenGRPC extends AbstractSessionFactoryProcessor {
public static final String REMOTE_USER_DN = "listengrpc.remote.user.dn";
public static final String REMOTE_HOST = "listengrpc.remote.host";
// properties // properties
public static final PropertyDescriptor PROP_SERVICE_PORT = new PropertyDescriptor.Builder() public static final PropertyDescriptor PROP_SERVICE_PORT = new PropertyDescriptor.Builder()
@ -187,9 +186,11 @@ public class ListenGRPC extends AbstractSessionFactoryProcessor {
final FlowFileIngestServiceInterceptor callInterceptor = new FlowFileIngestServiceInterceptor(getLogger()); final FlowFileIngestServiceInterceptor callInterceptor = new FlowFileIngestServiceInterceptor(getLogger());
callInterceptor.enforceDNPattern(authorizedDnPattern); callInterceptor.enforceDNPattern(authorizedDnPattern);
final BackpressureChecker backpressureChecker = new BackpressureChecker(context, getRelationships());
final FlowFileIngestService flowFileIngestService = new FlowFileIngestService(getLogger(), final FlowFileIngestService flowFileIngestService = new FlowFileIngestService(getLogger(),
sessionFactoryReference, sessionFactoryReference,
context); REL_SUCCESS,
backpressureChecker);
final NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port) final NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port)
.addService(ServerInterceptors.intercept(flowFileIngestService, callInterceptor)) .addService(ServerInterceptors.intercept(flowFileIngestService, callInterceptor))
// default (de)compressor registries handle both plaintext and gzip compressed messages // default (de)compressor registries handle both plaintext and gzip compressed messages

View File

@ -73,8 +73,8 @@ public class ITListenGRPC {
assertThat(successFiles.size(), equalTo(1)); assertThat(successFiles.size(), equalTo(1));
final MockFlowFile mockFlowFile = successFiles.get(0); final MockFlowFile mockFlowFile = successFiles.get(0);
assertThat(mockFlowFile.getAttribute("FOO"), equalTo("BAR")); assertThat(mockFlowFile.getAttribute("FOO"), equalTo("BAR"));
assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_HOST), equalTo("127.0.0.1")); assertThat(mockFlowFile.getAttribute(GRPCAttributeNames.REMOTE_HOST), equalTo("127.0.0.1"));
assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_USER_DN), equalTo(FlowFileIngestServiceInterceptor.DEFAULT_FOUND_SUBJECT)); assertThat(mockFlowFile.getAttribute(GRPCAttributeNames.REMOTE_USER_DN), equalTo(FlowFileIngestServiceInterceptor.DEFAULT_FOUND_SUBJECT));
} finally { } finally {
// stop the server // stop the server
@ -157,8 +157,8 @@ public class ITListenGRPC {
assertThat(successFiles.size(), equalTo(1)); assertThat(successFiles.size(), equalTo(1));
final MockFlowFile mockFlowFile = successFiles.get(0); final MockFlowFile mockFlowFile = successFiles.get(0);
assertThat(mockFlowFile.getAttribute("FOO"), equalTo("BAR")); assertThat(mockFlowFile.getAttribute("FOO"), equalTo("BAR"));
assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_HOST), equalTo("127.0.0.1")); assertThat(mockFlowFile.getAttribute(GRPCAttributeNames.REMOTE_HOST), equalTo("127.0.0.1"));
assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_USER_DN), equalTo(FlowFileIngestServiceInterceptor.DEFAULT_FOUND_SUBJECT)); assertThat(mockFlowFile.getAttribute(GRPCAttributeNames.REMOTE_USER_DN), equalTo(FlowFileIngestServiceInterceptor.DEFAULT_FOUND_SUBJECT));
} finally { } finally {
// stop the server // stop the server

View File

@ -27,6 +27,7 @@
<description>A bundle of processors that speak the gRPC protocol</description> <description>A bundle of processors that speak the gRPC protocol</description>
<modules> <modules>
<module>nifi-grpc-common</module>
<module>nifi-grpc-processors</module> <module>nifi-grpc-processors</module>
<module>nifi-grpc-nar</module> <module>nifi-grpc-nar</module>
</modules> </modules>