NIFI-10953: Implement GCP Vision AI processors

This closes #6762.

Signed-off-by: Tamas Palfy <tpalfy@apache.org>
This commit is contained in:
Kalman Jantner 2022-12-06 16:56:29 +01:00 committed by Tamas Palfy
parent 464e0a96ee
commit 67925b171a
21 changed files with 1341 additions and 1 deletions

View File

@ -187,6 +187,28 @@
<artifactId>jackson-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-vision</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
</exclusion>
<exclusion>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.vision;
import static org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.vision.v1.ImageAnnotatorClient;
import com.google.cloud.vision.v1.ImageAnnotatorSettings;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
public abstract class AbstractGcpVisionProcessor extends AbstractProcessor {
public static final String GCP_OPERATION_KEY = "operationKey";
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("FlowFiles are routed to success relationship").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("FlowFiles are routed to failure relationship").build();
protected static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS,
REL_FAILURE
)));
protected static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
GCP_CREDENTIALS_PROVIDER_SERVICE)
);
private ImageAnnotatorClient vision;
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@OnScheduled
public void onScheduled(ProcessContext context) {
final GCPCredentialsService gcpCredentialsService =
context.getProperty(GCP_CREDENTIALS_PROVIDER_SERVICE).asControllerService(GCPCredentialsService.class);
try {
GoogleCredentials credentials = gcpCredentialsService.getGoogleCredentials();
FixedCredentialsProvider credentialsProvider = FixedCredentialsProvider.create(credentials);
ImageAnnotatorSettings.Builder builder = ImageAnnotatorSettings.newBuilder().setCredentialsProvider(credentialsProvider);
vision = ImageAnnotatorClient.create(builder.build());
} catch (Exception e) {
getLogger().error("Failed to create vision client.", e);
throw new ProcessException("Failed to create vision client.", e);
}
}
protected ImageAnnotatorClient getVisionClient() {
return this.vision;
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.vision;
import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
import com.google.longrunning.Operation;
import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessageV3;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.rpc.Status;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
abstract public class AbstractGetGcpVisionAnnotateOperationStatus extends AbstractGcpVisionProcessor {
public static final PropertyDescriptor OPERATION_KEY = new PropertyDescriptor.Builder()
.name("operationKey")
.displayName("GCP Operation Key")
.description("The unique identifier of the Vision operation.")
.defaultValue("${operationKey}")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.build();
public static final Relationship REL_RUNNING = new Relationship.Builder()
.name("running")
.description("The job is currently still being processed")
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("Upon successful completion, the original FlowFile will be routed to this relationship.")
.autoTerminateDefault(true)
.build();
private static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Stream.concat(properties.stream(), Stream.of(OPERATION_KEY)).collect(Collectors.toList()));
private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_ORIGINAL,
REL_SUCCESS,
REL_FAILURE,
REL_RUNNING
)));
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
try {
String operationKey = context.getProperty(OPERATION_KEY).evaluateAttributeExpressions(flowFile).getValue();;
Operation operation = getVisionClient().getOperationsClient().getOperation(operationKey);
getLogger().info(operation.toString());
if (operation.getDone() && !operation.hasError()) {
GeneratedMessageV3 response = deserializeResponse(operation.getResponse().getValue());
FlowFile childFlowFile = session.create(flowFile);
session.write(childFlowFile, out -> out.write(JsonFormat.printer().print(response).getBytes(StandardCharsets.UTF_8)));
session.putAttribute(childFlowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
session.transfer(flowFile, REL_ORIGINAL);
session.transfer(childFlowFile, REL_SUCCESS);
} else if (!operation.getDone()) {
session.transfer(flowFile, REL_RUNNING);
} else {
Status error = operation.getError();
getLogger().error("Failed to execute vision operation. Error code: {}, Error message: {}", error.getCode(), error.getMessage());
session.transfer(flowFile, REL_FAILURE);
}
} catch (Exception e) {
getLogger().error("Fail to get GCP Vision operation's status", e);
session.transfer(flowFile, REL_FAILURE);
}
}
abstract protected GeneratedMessageV3 deserializeResponse(ByteString responseValue) throws InvalidProtocolBufferException;
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.vision;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.protobuf.util.JsonFormat;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
public abstract class AbstractStartGcpVisionOperation<B extends com.google.protobuf.GeneratedMessageV3.Builder<B>> extends AbstractGcpVisionProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null && !context.getProperty(getJsonPayloadPropertyDescriptor()).isSet()) {
return;
} else if (flowFile == null) {
flowFile = session.create();
}
try {
OperationFuture<?, ?> asyncResponse = startOperation(session, context, flowFile);
String operationName = asyncResponse.getName();
session.putAttribute(flowFile, GCP_OPERATION_KEY, operationName);
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception e) {
getLogger().error("Fail to start GCP Vision operation", e);
session.transfer(flowFile, REL_FAILURE);
}
}
@OnStopped
public void onStopped() throws IOException {
getVisionClient().close();
}
protected OperationFuture<?, ?> startOperation(ProcessSession session, ProcessContext context, FlowFile flowFile) {
B builder = newBuilder();
InputStream inStream = context.getProperty(getJsonPayloadPropertyDescriptor()).isSet()
? getInputStreamFromProperty(context, flowFile) : session.read(flowFile);
try (InputStream inputStream = inStream) {
JsonFormat.parser().ignoringUnknownFields().merge(new InputStreamReader(inputStream), builder);
} catch (final IOException e) {
throw new ProcessException("Read FlowFile Failed", e);
}
return startOperation(builder);
}
private InputStream getInputStreamFromProperty(ProcessContext context, FlowFile flowFile) {
return new ByteArrayInputStream(context.getProperty(getJsonPayloadPropertyDescriptor()).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8));
}
abstract B newBuilder();
abstract OperationFuture<?, ?> startOperation(B builder);
abstract PropertyDescriptor getJsonPayloadPropertyDescriptor();
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.vision;
import com.google.cloud.vision.v1p2beta1.AsyncBatchAnnotateFilesResponse;
import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessageV3;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@Tags({"Google", "Cloud", "Vision", "Machine Learning"})
@CapabilityDescription("Retrieves the current status of an Google Vision operation.")
@SeeAlso({StartGcpVisionAnnotateFilesOperation.class})
@ReadsAttributes({
@ReadsAttribute(attribute = "operationKey", description = "A unique identifier of the operation designated by the Vision server.")
})
public class GetGcpVisionAnnotateFilesOperationStatus extends AbstractGetGcpVisionAnnotateOperationStatus {
@Override
protected GeneratedMessageV3 deserializeResponse(ByteString responseValue) throws InvalidProtocolBufferException {
return AsyncBatchAnnotateFilesResponse.parseFrom(responseValue);
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.vision;
import com.google.cloud.vision.v1.AsyncBatchAnnotateImagesResponse;
import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessageV3;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@Tags({"Google", "Cloud", "Vision", "Machine Learning"})
@CapabilityDescription("Retrieves the current status of an Google Vision operation.")
@SeeAlso({StartGcpVisionAnnotateImagesOperation.class})
@ReadsAttributes({
@ReadsAttribute(attribute = "operationKey", description = "A unique identifier of the operation designated by the Vision server.")
})
public class GetGcpVisionAnnotateImagesOperationStatus extends AbstractGetGcpVisionAnnotateOperationStatus {
@Override
protected GeneratedMessageV3 deserializeResponse(ByteString responseValue) throws InvalidProtocolBufferException {
return AsyncBatchAnnotateImagesResponse.parseFrom(responseValue);
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.vision;
import static org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.vision.v1.AsyncBatchAnnotateFilesRequest;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
@Tags({"Google", "Cloud", "Machine Learning", "Vision"})
@CapabilityDescription("Trigger a Vision operation on file input. It should be followed by GetGcpVisionAnnotateFilesOperationStatus processor in order to monitor operation status.")
@SeeAlso({GetGcpVisionAnnotateFilesOperationStatus.class})
@WritesAttributes({
@WritesAttribute(attribute = "operationKey", description = "A unique identifier of the operation returned by the Vision server.")
})
public class StartGcpVisionAnnotateFilesOperation extends AbstractStartGcpVisionOperation<AsyncBatchAnnotateFilesRequest.Builder> {
public static final PropertyDescriptor JSON_PAYLOAD = new PropertyDescriptor.Builder()
.name("json-payload")
.displayName("JSON Payload")
.description("JSON request for AWS Machine Learning services. The Processor will use FlowFile content for the request when this property is not specified.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.defaultValue("{\n" +
" \"requests\": [\n" +
" {\n" +
" \"inputConfig\": {\n" +
" \"gcsSource\": {\n" +
" \"uri\": \"gs://${gcs.bucket}/${filename}\"\n" +
" },\n" +
" \"mimeType\": \"application/pdf\"\n" +
" },\n" +
" \"features\": [{\n" +
" \"type\": \"DOCUMENT_TEXT_DETECTION\",\n" +
" \"maxResults\": 4\n" +
" }],\n" +
" \"outputConfig\": {\n" +
" \"gcsDestination\": {\n" +
" \"uri\": \"gs://${gcs.bucket}/${filename}/\"\n" +
" },\n" +
" \"batchSize\": 2\n" +
" }\n" +
" }]\n" +
"}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
JSON_PAYLOAD, GCP_CREDENTIALS_PROVIDER_SERVICE));
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
AsyncBatchAnnotateFilesRequest.Builder newBuilder() {
return AsyncBatchAnnotateFilesRequest.newBuilder();
}
@Override
OperationFuture<?, ?> startOperation(AsyncBatchAnnotateFilesRequest.Builder builder) {
return getVisionClient().asyncBatchAnnotateFilesAsync(builder.build());
}
@Override
PropertyDescriptor getJsonPayloadPropertyDescriptor() {
return JSON_PAYLOAD;
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.vision;
import static org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.vision.v1.AsyncBatchAnnotateImagesRequest;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
@Tags({"Google", "Cloud", "Machine Learning", "Vision"})
@CapabilityDescription("Trigger a Vision operation on image input. It should be followed by GetGcpVisionAnnotateImagesOperationStatus processor in order to monitor operation status.")
@SeeAlso({GetGcpVisionAnnotateImagesOperationStatus.class})
@WritesAttributes({
@WritesAttribute(attribute = "operationKey", description = "A unique identifier of the operation returned by the Vision server.")
})
public class StartGcpVisionAnnotateImagesOperation extends AbstractStartGcpVisionOperation<AsyncBatchAnnotateImagesRequest.Builder> {
static final PropertyDescriptor JSON_PAYLOAD = new PropertyDescriptor.Builder()
.name("json-payload")
.displayName("JSON Payload")
.description("JSON request for AWS Machine Learning services. The Processor will use FlowFile content for the request when this property is not specified.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("{\n" +
" \"requests\": [{\n" +
" \"image\": {\n" +
" \"source\": {\n" +
" \"imageUri\": \"gs://${gcs.bucket}/${filename}\"\n" +
" }\n" +
" },\n" +
" \"features\": [{\n" +
" \"type\": \"FACE_DETECTION\",\n" +
" \"maxResults\": 4\n" +
" }]\n" +
" }],\n" +
" \"outputConfig\": {\n" +
" \"gcsDestination\": {\n" +
" \"uri\": \"gs://${gcs.bucket}/${filename}/\"\n" +
" },\n" +
" \"batchSize\": 2\n" +
" }\n" +
"}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
JSON_PAYLOAD, GCP_CREDENTIALS_PROVIDER_SERVICE));
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
AsyncBatchAnnotateImagesRequest.Builder newBuilder() {
return AsyncBatchAnnotateImagesRequest.newBuilder();
}
@Override
OperationFuture<?, ?> startOperation(AsyncBatchAnnotateImagesRequest.Builder builder) {
return getVisionClient().asyncBatchAnnotateImagesAsync(builder.build());
}
@Override
PropertyDescriptor getJsonPayloadPropertyDescriptor() {
return JSON_PAYLOAD;
}
}

View File

@ -24,4 +24,8 @@ org.apache.nifi.processors.gcp.bigquery.PutBigQuery
org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming
org.apache.nifi.processors.gcp.drive.ListGoogleDrive
org.apache.nifi.processors.gcp.drive.FetchGoogleDrive
org.apache.nifi.processors.gcp.drive.FetchGoogleDrive
org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation
org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateFilesOperation
org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateImagesOperationStatus
org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateFilesOperationStatus

View File

@ -0,0 +1,34 @@
<!DOCTYPE html>
<html lang="en" xmlns="http://www.w3.org/1999/html">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>Google Vision</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<h1>Google Cloud Vision - Get Annotate Files Status</h1>
<h3>Usage</h3>
<p>
GetGcpVisionAnnotateFilesOperationStatus is designed to periodically check the statuses of file annotation operations. This processor should be used in pair with StartGcpVisionAnnotateFilesOperation Processor.
An outgoing FlowFile contains the raw response returned by the Vision server. This response is in JSON json format and contains a google storage reference where the result is located, as well as additional metadata, as written in the <a href="https://cloud.google.com/vision/docs/reference/rest/v1/locations.operations#Operation" target="_blank">Google Vision API Reference document</a>.
</p>
</body>
</html>

View File

@ -0,0 +1,34 @@
<!DOCTYPE html>
<html lang="en" xmlns="http://www.w3.org/1999/html">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>Google Vision</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<h1>Google Cloud Vision - Get Annotate Images Status</h1>
<h3>Usage</h3>
<p>
GetGcpVisionAnnotateImagesOperationStatus is designed to periodically check the statuses of image annotation operations. This processor should be used in pair with StartGcpVisionAnnotateImagesOperation Processor.
An outgoing FlowFile contains the raw response returned by the Vision server. This response is in JSON json format and contains a google storage reference where the result is located, as well as additional metadata, as written in the <a href="https://cloud.google.com/vision/docs/reference/rest/v1/locations.operations#Operation" target="_blank">Google Vision API Reference document</a>.
</p>
</body>
</html>

View File

@ -0,0 +1,104 @@
<!DOCTYPE html>
<html lang="en" xmlns="http://www.w3.org/1999/html">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>Google Vision</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<h1>Google Cloud Vision - Start Annotate Files Operation</h1>
<p>
Prerequisites
<ul>
<li>Make sure Vision API is enabled and the account you are using has the right to use it</li>
<li>Make sure the input file(s) are available in a GCS bucket</li>
</ul>
</p>
<h3>Usage</h3>
<p>
StartGcpVisionAnnotateFilesOperation is designed to trigger file annotation operations. This processor should be used in pair with the GetGcpVisionAnnotateFilesOperationStatus Processor.
Outgoing FlowFiles contain the raw response to the request returned by the Vision server. The response is in JSON format and contains the result and additional metadata as written in the Google Vision API Reference documents.
</p>
<h3>Payload</h3>
<p>
The JSON Payload is a request in JSON format as documented in the <a href="https://cloud.google.com/vision/docs/reference/rest/v1/files/asyncBatchAnnotate" target="_blank">Google Vision REST API reference document</a>.
Payload can be fed to the processor via the <code>JSON Payload</code> property or as a FlowFile content. The property has higher precedence over FlowFile content.
Please make sure to delete the default value of the property if you want to use FlowFile content payload.
A JSON payload template example:
</p>
<code>
<pre>
{
"requests": [
{
"inputConfig": {
"gcsSource": {
"uri": "gs://${gcs.bucket}/${filename}"
},
"mimeType": "application/pdf"
},
"features": [{
"type": "DOCUMENT_TEXT_DETECTION",
"maxResults": 4
}],
"outputConfig": {
"gcsDestination": {
"uri": "gs://${gcs.bucket}/${filename}/"
},
"batchSize": 2
}
}]
}
</pre>
</code>
<h3>Features types</h3>
<ul>
<li>TEXT_DETECTION: Optical character recognition (OCR) for an image; text recognition and conversion to machine-coded text. Identifies and extracts UTF-8 text in an image.</li>
<li>DOCUMENT_TEXT_DETECTION: Optical character recognition (OCR) for a file (PDF/TIFF) or dense text image; dense text recognition and conversion to machine-coded text.</li>
</ul>
You can find more details at <a href="https://cloud.google.com/vision/docs/features-list" target="_blank">Google Vision Feature List</a>
<h3>Example: How to setup a simple Annotate Image Flow</h3>
<p>
Prerequisites
</p>
<p>
<ul>
<li>Input files should be available in a GCS bucket</li>
<li>This bucket must not contain anything else but the input files</li>
</ul>
</p>
<p>Create the following flow</p>
<img src="vision-annotate-files.png" style="height: 50%; width: 50%"/>
<p>
Keep the default value of JSON PAYLOAD property in StartGcpVisionAnnotateImagesOperation
</p>
<p>
Execution steps:
<ul>
<li>ListGCSBucket processor will return a list of files in the bucket at the first run.</li>
<li>ListGCSBucket will return only new items at subsequent runs.</li>
<li>StartGcpVisionAnnotateFilesOperation processor will trigger GCP Vision file annotation jobs based on the JSON payload.</li>
<li>StartGcpVisionAnnotateFilesOperation processor will populate the <code>operationKey</code> flow file attribute.</li>
<li>GetGcpVisionAnnotateFilesOperationStatus processor will periodically query status of the job.</li>
</ul>
</p>
</body>
</html>

View File

@ -0,0 +1,105 @@
<!DOCTYPE html>
<html lang="en" xmlns="http://www.w3.org/1999/html">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>Google Vision</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<h1>Google Cloud Vision - Start Annotate Images Operation</h1>
<p>
Prerequisites
<ul>
<li>Make sure Vision API is enabled and the account you are using has the right to use it</li>
<li>Make sure thne input image(s) are available in a GCS bucket</li>
</ul>
</p>
<h3>Usage</h3>
<p>
StartGcpVisionAnnotateImagesOperation is designed to trigger image annotation operations. This processor should be used in pair with the GetGcpVisionAnnotateImagesOperationStatus Processor.
Outgoing FlowFiles contain the raw response to the request returned by the Vision server. The response is in JSON format and contains the result and additional metadata as written in the Google Vision API Reference documents.
</p>
<h3>Payload</h3>
<p>
The JSON Payload is a request in JSON format as documented in the <a href="https://cloud.google.com/vision/docs/reference/rest/v1/images/asyncBatchAnnotate" target="_blank">Google Vision REST API reference document</a>.
Payload can be fed to the processor via the <code>JSON Payload</code> property or as a FlowFile content. The property has higher precedence over FlowFile content.
Please make sure to delete the default value of the property if you want to use FlowFile content payload.
A JSON payload template example:
</p>
<code>
<pre>
{
"requests": [{
"image": {
"source": {
"imageUri": "gs://${gcs.bucket}/${filename}"
}
},
"features": [{
"type": "DOCUMENT_TEXT_DETECTION",
"maxResults": 4
}]
}],
"outputConfig": {
"gcsDestination": {
"uri": "gs://${gcs.bucket}/${filename}/"
},
"batchSize": 2
}
}
</pre>
</code>
<h3>Features types</h3>
<ul>
<li>TEXT_DETECTION: Optical character recognition (OCR) for an image; text recognition and conversion to machine-coded text. Identifies and extracts UTF-8 text in an image.</li>
<li>DOCUMENT_TEXT_DETECTION: Optical character recognition (OCR) for a file (PDF/TIFF) or dense text image; dense text recognition and conversion to machine-coded text.</li>
<li>LANDMARK_DETECTION: Provides the name of the landmark, a confidence score and a bounding box in the image for the landmark.</li>
<li>LOGO_DETECTION: Provides a textual description of the entity identified, a confidence score, and a bounding polygon for the logo in the file.</li>
<li>LABEL_DETECTION: Provides generalized labels for an image.</li>
<li>etc.</li>
</ul>
You can find more details at <a href="https://cloud.google.com/vision/docs/features-list" target="_blank">Google Vision Feature List</a>
<h3>Example: How to setup a simple Annotate Image Flow</h3>
<p>
Prerequisites
</p>
<p>
<ul>
<li>Input image files should be available in a GCS bucket</li>
<li>This bucket must not contain anything else but the input image files</li>
</ul>
</p>
<p>Create the following flow</p>
<img src="vision-annotate-images.png" style="height: 50%; width: 50%"/>
<p>
Keep the default value of JSON PAYLOAD property in StartGcpVisionAnnotateImagesOperation
</p>
<p>
Execution steps:
<ul>
<li>ListGCSBucket processor will return a list of files in the bucket at the first run.</li>
<li>ListGCSBucket will return only new items at subsequent runs.</li>
<li>StartGcpVisionAnnotateImagesOperation processor will trigger GCP Vision image annotation jobs based on the JSON payload.</li>
<li>StartGcpVisionAnnotateImagesOperation processor will populate the <code>operationKey</code> flow file attribute.</li>
<li>GetGcpVisionAnnotateImagesOperationStatus processor will periodically query status of the job.</li>
</ul>
</p>
</body>
</html>

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.vision;
import static org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
import static org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.GCP_OPERATION_KEY;
import static org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_FAILURE;
import static org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_SUCCESS;
import static org.apache.nifi.processors.gcp.vision.AbstractGetGcpVisionAnnotateOperationStatus.REL_ORIGINAL;
import static org.mockito.Mockito.when;
import com.google.cloud.vision.v1.AsyncBatchAnnotateFilesResponse;
import com.google.cloud.vision.v1.ImageAnnotatorClient;
import com.google.longrunning.Operation;
import com.google.longrunning.OperationsClient;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessageV3;
import com.google.rpc.Status;
import java.util.Collections;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class GetGcpVisionAnnotateFilesOperationStatusTest {
private static final String PLACEHOLDER_CONTENT = "content";
private static final String OPERATION_KEY = "operationKey";
private TestRunner runner = null;
private GetGcpVisionAnnotateFilesOperationStatus processor;
@Mock
private ImageAnnotatorClient mockVisionClient;
private GCPCredentialsService gcpCredentialsService;
@Mock
private OperationsClient operationClient;
@Mock
private Operation operation;
@BeforeEach
public void setUp() throws InitializationException {
gcpCredentialsService = new GCPCredentialsControllerService();
processor = new GetGcpVisionAnnotateFilesOperationStatus() {
@Override
protected ImageAnnotatorClient getVisionClient() {
return mockVisionClient;
}
@Override
protected GeneratedMessageV3 deserializeResponse(ByteString responseValue) {
return AsyncBatchAnnotateFilesResponse.newBuilder().build();
}
};
runner = TestRunners.newTestRunner(processor);
runner.addControllerService("gcp-credentials-provider-service-id", gcpCredentialsService);
runner.enableControllerService(gcpCredentialsService);
runner.setProperty(GCP_CREDENTIALS_PROVIDER_SERVICE, "gcp-credentials-provider-service-id");
runner.assertValid(gcpCredentialsService);
}
@Test
public void testGetAnnotateFilesJobStatusSuccess() {
when(mockVisionClient.getOperationsClient()).thenReturn(operationClient);
when(operationClient.getOperation(OPERATION_KEY)).thenReturn(operation);
when(operation.getResponse()).thenReturn(Any.newBuilder().build());
when(operation.getDone()).thenReturn(true);
when(operation.hasError()).thenReturn(false);
runner.enqueue(PLACEHOLDER_CONTENT, Collections.singletonMap(GCP_OPERATION_KEY, OPERATION_KEY));
runner.run();
runner.assertTransferCount(REL_SUCCESS, 1);
runner.assertTransferCount(REL_ORIGINAL, 1);
}
@Test
public void testGetAnnotateFilesJobStatusInProgress() {
when(mockVisionClient.getOperationsClient()).thenReturn(operationClient);
when(operationClient.getOperation(OPERATION_KEY)).thenReturn(operation);
when(operation.getDone()).thenReturn(true);
when(operation.hasError()).thenReturn(true);
runner.enqueue(PLACEHOLDER_CONTENT, Collections.singletonMap(GCP_OPERATION_KEY, OPERATION_KEY));
runner.run();
runner.assertAllFlowFilesTransferred(REL_FAILURE, 1);
}
@Test
public void testGetAnnotateImagesJobStatusFailed() {
when(mockVisionClient.getOperationsClient()).thenReturn(operationClient);
when(operationClient.getOperation(OPERATION_KEY)).thenReturn(operation);
when(operation.getDone()).thenReturn(true);
when(operation.hasError()).thenReturn(true);
when(operation.getError()).thenReturn(Status.newBuilder().build());
runner.enqueue(PLACEHOLDER_CONTENT, Collections.singletonMap(GCP_OPERATION_KEY, OPERATION_KEY));
runner.run();
runner.assertAllFlowFilesTransferred(REL_FAILURE, 1);
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.vision;
import static org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
import static org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.GCP_OPERATION_KEY;
import static org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_FAILURE;
import static org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_SUCCESS;
import static org.apache.nifi.processors.gcp.vision.AbstractGetGcpVisionAnnotateOperationStatus.REL_ORIGINAL;
import static org.mockito.Mockito.when;
import com.google.cloud.vision.v1.AsyncBatchAnnotateImagesResponse;
import com.google.cloud.vision.v1.ImageAnnotatorClient;
import com.google.longrunning.Operation;
import com.google.longrunning.OperationsClient;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessageV3;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.Status;
import java.util.Collections;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class GetGcpVisionAnnotateImagesOperationStatusTest {
private static final String PLACEHOLDER_CONTENT = "content";
private static final String OPERATION_KEY = "operationKey";
private TestRunner runner = null;
private GetGcpVisionAnnotateImagesOperationStatus processor;
@Mock
private ImageAnnotatorClient mockVisionClient;
private GCPCredentialsService gcpCredentialsService;
@Mock
private OperationsClient operationClient;
@Mock
private Operation operation;
@BeforeEach
public void setUp() throws InitializationException {
gcpCredentialsService = new GCPCredentialsControllerService();
processor = new GetGcpVisionAnnotateImagesOperationStatus() {
@Override
protected ImageAnnotatorClient getVisionClient() {
return mockVisionClient;
}
@Override
protected GeneratedMessageV3 deserializeResponse(ByteString responseValue) throws InvalidProtocolBufferException {
return AsyncBatchAnnotateImagesResponse.newBuilder().build();
}
};
runner = TestRunners.newTestRunner(processor);
runner.addControllerService("gcp-credentials-provider-service-id", gcpCredentialsService);
runner.enableControllerService(gcpCredentialsService);
runner.setProperty(GCP_CREDENTIALS_PROVIDER_SERVICE, "gcp-credentials-provider-service-id");
runner.assertValid(gcpCredentialsService);
}
@Test
public void testGetAnnotateImagesJobStatusSuccess() {
when(mockVisionClient.getOperationsClient()).thenReturn(operationClient);
when(operationClient.getOperation(OPERATION_KEY)).thenReturn(operation);
when(operation.getResponse()).thenReturn(Any.newBuilder().build());
when(operation.getDone()).thenReturn(true);
when(operation.hasError()).thenReturn(false);
runner.enqueue(PLACEHOLDER_CONTENT, Collections.singletonMap(GCP_OPERATION_KEY, OPERATION_KEY));
runner.run();
runner.assertTransferCount(REL_SUCCESS, 1);
runner.assertTransferCount(REL_ORIGINAL, 1);
}
@Test
public void testGetAnnotateImagesJobStatusInProgress() {
when(mockVisionClient.getOperationsClient()).thenReturn(operationClient);
when(operationClient.getOperation(OPERATION_KEY)).thenReturn(operation);
when(operation.getDone()).thenReturn(true);
when(operation.hasError()).thenReturn(true);
runner.enqueue(PLACEHOLDER_CONTENT, Collections.singletonMap(GCP_OPERATION_KEY, OPERATION_KEY));
runner.run();
runner.assertAllFlowFilesTransferred(REL_FAILURE, 1);
}
@Test
public void testGetAnnotateImagesJobStatusFailed() {
when(mockVisionClient.getOperationsClient()).thenReturn(operationClient);
when(operationClient.getOperation(OPERATION_KEY)).thenReturn(operation);
when(operation.getDone()).thenReturn(true);
when(operation.hasError()).thenReturn(true);
when(operation.getError()).thenReturn(Status.newBuilder().build());
runner.enqueue(PLACEHOLDER_CONTENT, Collections.singletonMap(GCP_OPERATION_KEY, OPERATION_KEY));
runner.run();
runner.assertAllFlowFilesTransferred(REL_FAILURE, 1);
}
}

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.vision;
import static org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
import static org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.GCP_OPERATION_KEY;
import static org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_FAILURE;
import static org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_SUCCESS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import com.google.api.core.ApiFuture;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.cloud.vision.v1.AsyncBatchAnnotateFilesRequest;
import com.google.cloud.vision.v1.AsyncBatchAnnotateFilesResponse;
import com.google.cloud.vision.v1.ImageAnnotatorClient;
import com.google.cloud.vision.v1.OperationMetadata;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class StartGcpVisionAnnotateFilesOperationTest {
private TestRunner runner = null;
private StartGcpVisionAnnotateFilesOperation processor;
private static final Path FlowFileContent = Paths.get("src/test/resources/vision/annotate-image.json");
@Mock
private ImageAnnotatorClient vision;
@Captor
private ArgumentCaptor<AsyncBatchAnnotateFilesRequest> requestCaptor;
private String operationName = "operationName";
@Mock
private OperationFuture<AsyncBatchAnnotateFilesResponse, OperationMetadata> operationFuture;
@Mock
private ApiFuture<OperationSnapshot> apiFuture;
@Mock
private ImageAnnotatorClient mockVisionClient;
private GCPCredentialsService gcpCredentialsService;
@Mock
private OperationSnapshot operationSnapshot;
@BeforeEach
public void setUp() throws InitializationException {
gcpCredentialsService = new GCPCredentialsControllerService();
processor = new StartGcpVisionAnnotateFilesOperation() {
@Override
protected ImageAnnotatorClient getVisionClient() {
return mockVisionClient;
}
};
runner = TestRunners.newTestRunner(processor);
runner.addControllerService("gcp-credentials-provider-service-id", gcpCredentialsService);
runner.enableControllerService(gcpCredentialsService);
runner.setProperty(GCP_CREDENTIALS_PROVIDER_SERVICE, "gcp-credentials-provider-service-id");
runner.assertValid(gcpCredentialsService);
}
@Test
public void testAnnotateFilesJob() throws ExecutionException, InterruptedException, IOException {
when(mockVisionClient.asyncBatchAnnotateFilesAsync((AsyncBatchAnnotateFilesRequest) any())).thenReturn(operationFuture);
when(operationFuture.getName()).thenReturn(operationName);
runner.enqueue(FlowFileContent, Collections.emptyMap());
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS);
runner.assertAllFlowFilesContainAttribute(REL_SUCCESS, GCP_OPERATION_KEY);
}
@Test
public void testAnnotateFilesJobFail() throws IOException {
when(mockVisionClient.asyncBatchAnnotateFilesAsync((AsyncBatchAnnotateFilesRequest)any())).thenThrow(new RuntimeException("ServiceError"));
runner.enqueue(FlowFileContent, Collections.emptyMap());
runner.run();
runner.assertAllFlowFilesTransferred(REL_FAILURE);
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.vision;
import static org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
import static org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.GCP_OPERATION_KEY;
import static org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_SUCCESS;
import static org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation.JSON_PAYLOAD;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import com.google.api.core.ApiFuture;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.cloud.vision.v1.ImageAnnotatorClient;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class StartGcpVisionAnnotateImagesOperationTest {
private TestRunner runner = null;
private StartGcpVisionAnnotateImagesOperation processor;
private String operationName = "operationName";
@Mock
private OperationFuture operationFuture;
@Mock
private ApiFuture<OperationSnapshot> apiFuture;
@Mock
private ImageAnnotatorClient mockVisionClient;
private GCPCredentialsService gcpCredentialsService;
@Mock
private OperationSnapshot operationSnapshot;
private String jsonPayloadValue;
@BeforeEach
public void setUp() throws InitializationException, IOException {
jsonPayloadValue = FileUtils.readFileToString(new File("src/test/resources/vision/annotate-image.json"), "UTF-8");
gcpCredentialsService = new GCPCredentialsControllerService();
processor = new StartGcpVisionAnnotateImagesOperation() {
@Override
protected ImageAnnotatorClient getVisionClient() {
return mockVisionClient;
}
};
runner = TestRunners.newTestRunner(processor);
runner.addControllerService("gcp-credentials-provider-service-id", gcpCredentialsService);
runner.enableControllerService(gcpCredentialsService);
runner.setProperty(GCP_CREDENTIALS_PROVIDER_SERVICE, "gcp-credentials-provider-service-id");
runner.assertValid(gcpCredentialsService);
runner.setProperty(JSON_PAYLOAD, jsonPayloadValue);
}
@Test
public void testAnnotateImageJob() throws ExecutionException, InterruptedException, IOException {
when(mockVisionClient.asyncBatchAnnotateImagesAsync(any())).thenReturn(operationFuture);
when(operationFuture.getName()).thenReturn(operationName);
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS);
runner.assertAllFlowFilesContainAttribute(REL_SUCCESS, GCP_OPERATION_KEY);
}
@Test
public void testAnnotateFilesJob() throws ExecutionException, InterruptedException, IOException {
when(mockVisionClient.asyncBatchAnnotateImagesAsync(any())).thenReturn(operationFuture);
when(operationFuture.getName()).thenReturn(operationName);
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS);
runner.assertAllFlowFilesContainAttribute(REL_SUCCESS, GCP_OPERATION_KEY);
}
}

View File

@ -0,0 +1,21 @@
{
"requests": [
{
"inputConfig": {
"gcsSource": {
"uri": "gs://qe-dim-external/kjantner-vision-test/TestDoc.pdf"
},
"mimeType": "application/pdf"
},
"features": [{
"type": "DOCUMENT_TEXT_DETECTION",
"maxResults": 4
}],
"outputConfig": {
"gcsDestination": {
"uri": "gs://qe-dim-external/kjantner-vision-test/results-files/"
},
"batchSize": 2
}
}]
}

View File

@ -0,0 +1,19 @@
{
"requests": [{
"image": {
"source": {
"imageUri": "gs://qe-dim-external/kjantner-vision-test/vision-test.png"
}
},
"features": [{
"type": "FACE_DETECTION",
"maxResults": 4
}]
}],
"outputConfig": {
"gcsDestination": {
"uri": "gs://qe-dim-external/kjantner-vision-test/results/"
},
"batchSize": 2
}
}