From cc8d10897588dbcbab4e861b0f11ec7cee045442 Mon Sep 17 00:00:00 2001
From: Kalman Jantner
Date: Thu, 27 Oct 2022 16:25:17 +0200
Subject: [PATCH] NIFI-10710 Added Processors for AWS Polly, Textract,
Translate, Transcribe
This closes #6589
Signed-off-by: David Handermann
---
.../nifi-aws-processors/pom.xml | 16 ++
.../aws/ml/AwsMachineLearningJobStarter.java | 172 ++++++++++++++++++
.../AwsMachineLearningJobStatusProcessor.java | 140 ++++++++++++++
.../ml/AwsResponseMetadataDeserializer.java | 36 ++++
.../aws/ml/SdkHttpMetadataDeserializer.java | 36 ++++
.../aws/ml/polly/GetAwsPollyJobStatus.java | 115 ++++++++++++
.../aws/ml/polly/StartAwsPollyJob.java | 59 ++++++
.../ml/textract/GetAwsTextractJobStatus.java | 142 +++++++++++++++
.../aws/ml/textract/StartAwsTextractJob.java | 154 ++++++++++++++++
.../aws/ml/textract/TextractType.java | 51 ++++++
.../transcribe/GetAwsTranscribeJobStatus.java | 91 +++++++++
.../ml/transcribe/StartAwsTranscribeJob.java | 64 +++++++
.../translate/GetAwsTranslateJobStatus.java | 92 ++++++++++
.../ml/translate/StartAwsTranslateJob.java | 57 ++++++
.../org.apache.nifi.processor.Processor | 8 +
.../additionalDetails.html | 39 ++++
.../additionalDetails.html | 68 +++++++
.../additionalDetails.html | 37 ++++
.../additionalDetails.html | 144 +++++++++++++++
.../additionalDetails.html | 42 +++++
.../additionalDetails.html | 113 ++++++++++++
.../additionalDetails.html | 40 ++++
.../additionalDetails.html | 75 ++++++++
.../aws/ml/polly/GetAwsPollyStatusTest.java | 128 +++++++++++++
.../GetAwsTranslateJobStatusTest.java | 117 ++++++++++++
.../GetAwsTranscribeJobStatusTest.java | 131 +++++++++++++
.../GetAwsTranslateJobStatusTest.java | 129 +++++++++++++
27 files changed, 2296 insertions(+)
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStarter.java
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStatusProcessor.java
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsResponseMetadataDeserializer.java
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/SdkHttpMetadataDeserializer.java
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyJobStatus.java
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/StartAwsPollyJob.java
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTextractJobStatus.java
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/StartAwsTextractJob.java
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractType.java
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/GetAwsTranscribeJobStatus.java
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/StartAwsTranscribeJob.java
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/GetAwsTranslateJobStatus.java
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/StartAwsTranslateJob.java
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.GetAwsPollyJobStatus/additionalDetails.html
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.StartAwsPollyJob/additionalDetails.html
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.GetAwsTextractJobStatus/additionalDetails.html
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.StartAwsTextractJob/additionalDetails.html
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.GetAwsTranscribeJobStatus/additionalDetails.html
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.StartAwsTranscribeJob/additionalDetails.html
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.GetAwsTranslateJobStatus/additionalDetails.html
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.StartAwsTranslateJob/additionalDetails.html
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyStatusTest.java
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTranslateJobStatusTest.java
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/transcribe/GetAwsTranscribeJobStatusTest.java
create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/translate/GetAwsTranslateJobStatusTest.java
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
index 49bc3aef24..dddd131bb4 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
@@ -121,6 +121,22 @@
1.20.0-SNAPSHOTprovided
+
+ com.amazonaws
+ aws-java-sdk-translate
+
+
+ com.amazonaws
+ aws-java-sdk-polly
+
+
+ com.amazonaws
+ aws-java-sdk-transcribe
+
+
+ com.amazonaws
+ aws-java-sdk-textract
+
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStarter.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStarter.java
new file mode 100644
index 0000000000..8419bee547
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStarter.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml;
+
+import static org.apache.nifi.flowfile.attributes.CoreAttributes.MIME_TYPE;
+import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.TASK_ID;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.AmazonWebServiceResult;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.regions.Regions;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+public abstract class AwsMachineLearningJobStarter
+ extends AbstractAWSCredentialsProviderProcessor {
+ public static final PropertyDescriptor JSON_PAYLOAD = new PropertyDescriptor.Builder()
+ .name("json-payload")
+ .displayName("JSON Payload")
+ .description("JSON request for AWS Machine Learning services. The Processor will use FlowFile content for the request when this property is not specified.")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+ new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+ .required(true)
+ .build();
+ public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
+ .displayName("Region")
+ .name("aws-region")
+ .required(true)
+ .allowableValues(getAvailableRegions())
+ .defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
+ .build();
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("Upon successful completion, the original FlowFile will be routed to this relationship.")
+ .autoTerminateDefault(true)
+ .build();
+ protected static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+ MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+ REGION,
+ TIMEOUT,
+ JSON_PAYLOAD,
+ SSL_CONTEXT_SERVICE,
+ ENDPOINT_OVERRIDE));
+ private final static ObjectMapper MAPPER = JsonMapper.builder()
+ .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+ .build();
+ private static final Set relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ REL_ORIGINAL,
+ REL_SUCCESS,
+ REL_FAILURE
+ )));
+
+ @Override
+ public Set getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public List getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null && !context.getProperty(JSON_PAYLOAD).isSet()) {
+ return;
+ }
+ final RESPONSE response;
+ FlowFile childFlowFile;
+ try {
+ response = sendRequest(buildRequest(session, context, flowFile), context, flowFile);
+ childFlowFile = writeToFlowFile(session, flowFile, response);
+ postProcessFlowFile(context, session, childFlowFile, response);
+ session.transfer(childFlowFile, REL_SUCCESS);
+ } catch (Exception e) {
+ if (flowFile != null) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ getLogger().error("Sending AWS ML Request failed", e);
+ return;
+ }
+ if (flowFile != null) {
+ session.transfer(flowFile, REL_ORIGINAL);
+ }
+
+ }
+
+ protected void postProcessFlowFile(ProcessContext context, ProcessSession session, FlowFile flowFile, RESPONSE response) {
+ final String awsTaskId = getAwsTaskId(context, response, flowFile);
+ flowFile = session.putAttribute(flowFile, TASK_ID.getName(), awsTaskId);
+ flowFile = session.putAttribute(flowFile, MIME_TYPE.key(), "application/json");
+ getLogger().debug("AWS ML Task [{}] started", awsTaskId);
+ }
+
+ protected REQUEST buildRequest(ProcessSession session, ProcessContext context, FlowFile flowFile) throws JsonProcessingException {
+ return MAPPER.readValue(getPayload(session, context, flowFile), getAwsRequestClass(context, flowFile));
+ }
+
+ @Override
+ protected T createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) {
+ throw new UnsupportedOperationException("createClient(ProcessContext, AWSCredentials, ClientConfiguration) is not supported");
+ }
+
+ protected FlowFile writeToFlowFile(ProcessSession session, FlowFile flowFile, RESPONSE response) {
+ FlowFile childFlowFile = flowFile == null ? session.create() : session.create(flowFile);
+ childFlowFile = session.write(childFlowFile, out -> MAPPER.writeValue(out, response));
+ return childFlowFile;
+ }
+
+ protected String readFlowFile(final ProcessSession session, final FlowFile flowFile) {
+ try (InputStream inputStream = session.read(flowFile)) {
+ return new String(IOUtils.toByteArray(inputStream));
+ } catch (final IOException e) {
+ throw new ProcessException("Read FlowFile Failed", e);
+ }
+ }
+
+ private String getPayload(ProcessSession session, ProcessContext context, FlowFile flowFile) {
+ String payloadPropertyValue = context.getProperty(JSON_PAYLOAD).evaluateAttributeExpressions(flowFile).getValue();
+ if (payloadPropertyValue == null) {
+ payloadPropertyValue = readFlowFile(session, flowFile);
+ }
+ return payloadPropertyValue;
+ }
+
+ abstract protected RESPONSE sendRequest(REQUEST request, ProcessContext context, FlowFile flowFile) throws JsonProcessingException;
+
+ abstract protected Class extends REQUEST> getAwsRequestClass(ProcessContext context, FlowFile flowFile);
+
+ abstract protected String getAwsTaskId(ProcessContext context, RESPONSE response, FlowFile flowFile);
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStatusProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStatusProcessor.java
new file mode 100644
index 0000000000..157314c9cf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStatusProcessor.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ResponseMetadata;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.http.SdkHttpMetadata;
+import com.amazonaws.regions.Regions;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+public abstract class AwsMachineLearningJobStatusProcessor
+ extends AbstractAWSCredentialsProviderProcessor {
+ public static final String AWS_TASK_OUTPUT_LOCATION = "outputLocation";
+ public static final PropertyDescriptor MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+ new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+ .required(true)
+ .build();
+ public static final PropertyDescriptor TASK_ID =
+ new PropertyDescriptor.Builder()
+ .name("awsTaskId")
+ .displayName("AWS Task ID")
+ .defaultValue("${awsTaskId}")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+ .build();
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("Upon successful completion, the original FlowFile will be routed to this relationship.")
+ .autoTerminateDefault(true)
+ .build();
+ public static final Relationship REL_RUNNING = new Relationship.Builder()
+ .name("running")
+ .description("The job is currently still being processed")
+ .build();
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Job successfully finished. FlowFile will be routed to this relation.")
+ .build();
+ public static final Relationship REL_THROTTLED = new Relationship.Builder()
+ .name("throttled")
+ .description("Retrieving results failed for some reason, but the issue is likely to resolve on its own, such as Provisioned Throughput Exceeded or a Throttling failure. " +
+ "It is generally expected to retry this relationship.")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("The job failed, the original FlowFile will be routed to this relationship.")
+ .autoTerminateDefault(true)
+ .build();
+ public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
+ .displayName("Region")
+ .name("aws-region")
+ .required(true)
+ .allowableValues(getAvailableRegions())
+ .defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
+ .build();
+ public static final String FAILURE_REASON_ATTRIBUTE = "failure.reason";
+ protected static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+ TASK_ID,
+ MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+ REGION,
+ TIMEOUT,
+ SSL_CONTEXT_SERVICE,
+ ENDPOINT_OVERRIDE,
+ PROXY_CONFIGURATION_SERVICE));
+ private static final ObjectMapper MAPPER = JsonMapper.builder()
+ .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+ .build();
+
+ static {
+ SimpleModule awsResponseModule = new SimpleModule();
+ awsResponseModule.addDeserializer(ResponseMetadata.class, new AwsResponseMetadataDeserializer());
+ SimpleModule sdkHttpModule = new SimpleModule();
+ awsResponseModule.addDeserializer(SdkHttpMetadata.class, new SdkHttpMetadataDeserializer());
+ MAPPER.registerModule(awsResponseModule);
+ MAPPER.registerModule(sdkHttpModule);
+ }
+
+
+ @Override
+ public Set getRelationships() {
+ return relationships;
+ }
+
+ private static final Set relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ REL_ORIGINAL,
+ REL_SUCCESS,
+ REL_RUNNING,
+ REL_THROTTLED,
+ REL_FAILURE
+ )));
+
+ @Override
+ public List getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ protected T createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) {
+ throw new UnsupportedOperationException("Client creation not supported");
+ }
+
+ protected void writeToFlowFile(ProcessSession session, FlowFile flowFile, Object response) {
+ session.write(flowFile, out -> MAPPER.writeValue(out, response));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsResponseMetadataDeserializer.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsResponseMetadataDeserializer.java
new file mode 100644
index 0000000000..ec3ad96282
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsResponseMetadataDeserializer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.ResponseMetadata;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.std.StdNodeBasedDeserializer;
+import java.io.IOException;
+import java.util.Map;
+
+public class AwsResponseMetadataDeserializer extends StdNodeBasedDeserializer {
+ protected AwsResponseMetadataDeserializer() {
+ super(ResponseMetadata.class);
+ }
+
+ @Override
+ public ResponseMetadata convert(JsonNode root, DeserializationContext ctxt) throws IOException {
+ return new ResponseMetadata((Map) null);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/SdkHttpMetadataDeserializer.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/SdkHttpMetadataDeserializer.java
new file mode 100644
index 0000000000..a8d027d8d3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/SdkHttpMetadataDeserializer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.http.SdkHttpMetadata;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.std.StdNodeBasedDeserializer;
+import java.io.IOException;
+
+public class SdkHttpMetadataDeserializer extends StdNodeBasedDeserializer {
+
+ protected SdkHttpMetadataDeserializer() {
+ super(SdkHttpMetadata.class);
+ }
+
+ @Override
+ public SdkHttpMetadata convert(JsonNode root, DeserializationContext ctxt) throws IOException {
+ return null;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyJobStatus.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyJobStatus.java
new file mode 100644
index 0000000000..0efcd062d4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyJobStatus.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.polly;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.polly.AmazonPollyClient;
+import com.amazonaws.services.polly.AmazonPollyClientBuilder;
+import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskResult;
+import com.amazonaws.services.polly.model.TaskStatus;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"})
+@CapabilityDescription("Retrieves the current status of an AWS Polly job.")
+@SeeAlso({StartAwsPollyJob.class})
+@WritesAttributes({
+ @WritesAttribute(attribute = "PollyS3OutputBucket", description = "The bucket name where polly output will be located."),
+ @WritesAttribute(attribute = "PollyS3OutputKey", description = "Object key of polly output."),
+ @WritesAttribute(attribute = "outputLocation", description = "S3 path-style output location of the result.")
+})
+public class GetAwsPollyJobStatus extends AwsMachineLearningJobStatusProcessor {
+ private static final String BUCKET = "bucket";
+ private static final String KEY = "key";
+ private static final Pattern S3_PATH = Pattern.compile("https://s3.*amazonaws.com/(?<" + BUCKET + ">[^/]+)/(?<" + KEY + ">.*)");
+ private static final String AWS_S3_BUCKET = "PollyS3OutputBucket";
+ private static final String AWS_S3_KEY = "filename";
+
+ @Override
+ protected AmazonPollyClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+ return (AmazonPollyClient) AmazonPollyClientBuilder.standard()
+ .withCredentials(credentialsProvider)
+ .withRegion(context.getProperty(REGION).getValue())
+ .build();
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+ GetSpeechSynthesisTaskResult speechSynthesisTask;
+ try {
+ speechSynthesisTask = getSynthesisTask(context, flowFile);
+ } catch (ThrottlingException e) {
+ getLogger().info("Request Rate Limit exceeded", e);
+ session.transfer(flowFile, REL_THROTTLED);
+ return;
+ } catch (Exception e) {
+ getLogger().warn("Failed to get Polly Job status", e);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ TaskStatus taskStatus = TaskStatus.fromValue(speechSynthesisTask.getSynthesisTask().getTaskStatus());
+
+ if (taskStatus == TaskStatus.InProgress || taskStatus == TaskStatus.Scheduled) {
+ session.penalize(flowFile);
+ session.transfer(flowFile, REL_RUNNING);
+ } else if (taskStatus == TaskStatus.Completed) {
+ String outputUri = speechSynthesisTask.getSynthesisTask().getOutputUri();
+
+ Matcher matcher = S3_PATH.matcher(outputUri);
+ if (matcher.find()) {
+ session.putAttribute(flowFile, AWS_S3_BUCKET, matcher.group(BUCKET));
+ session.putAttribute(flowFile, AWS_S3_KEY, matcher.group(KEY));
+ }
+ FlowFile childFlowFile = session.create(flowFile);
+ writeToFlowFile(session, childFlowFile, speechSynthesisTask);
+ childFlowFile = session.putAttribute(childFlowFile, AWS_TASK_OUTPUT_LOCATION, outputUri);
+ session.transfer(flowFile, REL_ORIGINAL);
+ session.transfer(childFlowFile, REL_SUCCESS);
+ getLogger().info("Amazon Polly Task Completed {}", flowFile);
+ } else if (taskStatus == TaskStatus.Failed) {
+ final String failureReason = speechSynthesisTask.getSynthesisTask().getTaskStatusReason();
+ flowFile = session.putAttribute(flowFile, FAILURE_REASON_ATTRIBUTE, failureReason);
+ session.transfer(flowFile, REL_FAILURE);
+ getLogger().error("Amazon Polly Task Failed {} Reason [{}]", flowFile, failureReason);
+ }
+ }
+
+ private GetSpeechSynthesisTaskResult getSynthesisTask(ProcessContext context, FlowFile flowFile) {
+ String taskId = context.getProperty(TASK_ID).evaluateAttributeExpressions(flowFile).getValue();
+ GetSpeechSynthesisTaskRequest request = new GetSpeechSynthesisTaskRequest().withTaskId(taskId);
+ return getClient().getSpeechSynthesisTask(request);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/StartAwsPollyJob.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/StartAwsPollyJob.java
new file mode 100644
index 0000000000..1a0b00d8ce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/StartAwsPollyJob.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.polly;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.polly.AmazonPollyClient;
+import com.amazonaws.services.polly.AmazonPollyClientBuilder;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskResult;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStarter;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"})
+@CapabilityDescription("Trigger a AWS Polly job. It should be followed by GetAwsPollyJobStatus processor in order to monitor job status.")
+@SeeAlso({GetAwsPollyJobStatus.class})
+public class StartAwsPollyJob extends AwsMachineLearningJobStarter {
+ @Override
+ protected AmazonPollyClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+ return (AmazonPollyClient) AmazonPollyClientBuilder.standard()
+ .withRegion(context.getProperty(REGION).getValue())
+ .withCredentials(credentialsProvider)
+ .build();
+ }
+
+ @Override
+ protected StartSpeechSynthesisTaskResult sendRequest(StartSpeechSynthesisTaskRequest request, ProcessContext context, FlowFile flowFile) {
+ return getClient().startSpeechSynthesisTask(request);
+ }
+
+ @Override
+ protected Class extends StartSpeechSynthesisTaskRequest> getAwsRequestClass(ProcessContext context, FlowFile flowFile) {
+ return StartSpeechSynthesisTaskRequest.class;
+ }
+
+ @Override
+ protected String getAwsTaskId(ProcessContext context, StartSpeechSynthesisTaskResult startSpeechSynthesisTaskResult, FlowFile flowFile) {
+ return startSpeechSynthesisTaskResult.getSynthesisTask().getTaskId();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTextractJobStatus.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTextractJobStatus.java
new file mode 100644
index 0000000000..50a81c10c0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTextractJobStatus.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.textract;
+
+import static org.apache.nifi.processors.aws.ml.textract.TextractType.DOCUMENT_ANALYSIS;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractClient;
+import com.amazonaws.services.textract.model.GetDocumentAnalysisRequest;
+import com.amazonaws.services.textract.model.GetDocumentTextDetectionRequest;
+import com.amazonaws.services.textract.model.GetExpenseAnalysisRequest;
+import com.amazonaws.services.textract.model.JobStatus;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"})
+@CapabilityDescription("Retrieves the current status of an AWS Textract job.")
+@SeeAlso({StartAwsTextractJob.class})
+public class GetAwsTextractJobStatus extends AwsMachineLearningJobStatusProcessor {
+ public static final PropertyDescriptor TEXTRACT_TYPE = new PropertyDescriptor.Builder()
+ .name("textract-type")
+ .displayName("Textract Type")
+ .required(true)
+ .description("Supported values: \"Document Analysis\", \"Document Text Detection\", \"Expense Analysis\"")
+ .allowableValues(TextractType.TEXTRACT_TYPES)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .defaultValue(DOCUMENT_ANALYSIS.getType())
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ private static final List TEXTRACT_PROPERTIES =
+ Collections.unmodifiableList(Stream.concat(PROPERTIES.stream(), Stream.of(TEXTRACT_TYPE)).collect(Collectors.toList()));
+
+ @Override
+ public List getSupportedPropertyDescriptors() {
+ return TEXTRACT_PROPERTIES;
+ }
+
+ @Override
+ protected AmazonTextractClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+ return (AmazonTextractClient) AmazonTextractClient.builder()
+ .withRegion(context.getProperty(REGION).getValue())
+ .withCredentials(credentialsProvider)
+ .build();
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+ String textractType = context.getProperty(TEXTRACT_TYPE).evaluateAttributeExpressions(flowFile).getValue();
+
+ String awsTaskId = context.getProperty(TASK_ID).evaluateAttributeExpressions(flowFile).getValue();
+ try {
+ JobStatus jobStatus = getTaskStatus(TextractType.fromString(textractType), getClient(), awsTaskId);
+ if (JobStatus.SUCCEEDED == jobStatus) {
+ Object task = getTask(TextractType.fromString(textractType), getClient(), awsTaskId);
+ writeToFlowFile(session, flowFile, task);
+ session.transfer(flowFile, REL_SUCCESS);
+ } else if (JobStatus.IN_PROGRESS == jobStatus) {
+ session.transfer(flowFile, REL_RUNNING);
+ } else if (JobStatus.PARTIAL_SUCCESS == jobStatus) {
+ session.transfer(flowFile, REL_THROTTLED);
+ } else if (JobStatus.FAILED == jobStatus) {
+ session.transfer(flowFile, REL_FAILURE);
+ getLogger().error("Amazon Textract Task [{}] Failed", awsTaskId);
+ }
+ } catch (ThrottlingException e) {
+ getLogger().info("Request Rate Limit exceeded", e);
+ session.transfer(flowFile, REL_THROTTLED);
+ return;
+ } catch (Exception e) {
+ getLogger().warn("Failed to get Textract Job status", e);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
+ private Object getTask(TextractType typeOfTextract, AmazonTextractClient client, String awsTaskId) {
+ Object job = null;
+ switch (typeOfTextract) {
+ case DOCUMENT_ANALYSIS:
+ job = client.getDocumentAnalysis(new GetDocumentAnalysisRequest().withJobId(awsTaskId));
+ break;
+ case DOCUMENT_TEXT_DETECTION:
+ job = client.getDocumentTextDetection(new GetDocumentTextDetectionRequest().withJobId(awsTaskId));
+ break;
+ case EXPENSE_ANALYSIS:
+ job = client.getExpenseAnalysis(new GetExpenseAnalysisRequest().withJobId(awsTaskId));
+ break;
+ }
+ return job;
+ }
+
+ private JobStatus getTaskStatus(TextractType typeOfTextract, AmazonTextractClient client, String awsTaskId) {
+ JobStatus jobStatus = JobStatus.IN_PROGRESS;
+ switch (typeOfTextract) {
+ case DOCUMENT_ANALYSIS:
+ jobStatus = JobStatus.fromValue(client.getDocumentAnalysis(new GetDocumentAnalysisRequest().withJobId(awsTaskId)).getJobStatus());
+ break;
+ case DOCUMENT_TEXT_DETECTION:
+ jobStatus = JobStatus.fromValue(client.getDocumentTextDetection(new GetDocumentTextDetectionRequest().withJobId(awsTaskId)).getJobStatus());
+ break;
+ case EXPENSE_ANALYSIS:
+ jobStatus = JobStatus.fromValue(client.getExpenseAnalysis(new GetExpenseAnalysisRequest().withJobId(awsTaskId)).getJobStatus());
+ break;
+
+ }
+ return jobStatus;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/StartAwsTextractJob.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/StartAwsTextractJob.java
new file mode 100644
index 0000000000..2fa4483af0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/StartAwsTextractJob.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.textract;
+
+import static org.apache.nifi.processors.aws.ml.textract.TextractType.DOCUMENT_ANALYSIS;
+
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.AmazonWebServiceResult;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractClient;
+import com.amazonaws.services.textract.model.StartDocumentAnalysisRequest;
+import com.amazonaws.services.textract.model.StartDocumentAnalysisResult;
+import com.amazonaws.services.textract.model.StartDocumentTextDetectionRequest;
+import com.amazonaws.services.textract.model.StartDocumentTextDetectionResult;
+import com.amazonaws.services.textract.model.StartExpenseAnalysisRequest;
+import com.amazonaws.services.textract.model.StartExpenseAnalysisResult;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStarter;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"})
+@CapabilityDescription("Trigger a AWS Textract job. It should be followed by GetAwsTextractJobStatus processor in order to monitor job status.")
+@SeeAlso({GetAwsTextractJobStatus.class})
+public class StartAwsTextractJob extends AwsMachineLearningJobStarter {
+ public static final Validator TEXTRACT_TYPE_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+ return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+ } else if (TextractType.TEXTRACT_TYPES.contains(value)) {
+ return new ValidationResult.Builder().subject(subject).input(value).explanation("Supported Value.").valid(true).build();
+ } else {
+ return new ValidationResult.Builder().subject(subject).input(value).explanation("Not a supported value, flow file attribute or context parameter.").valid(false).build();
+ }
+ }
+ };
+ public static final PropertyDescriptor TEXTRACT_TYPE = new PropertyDescriptor.Builder()
+ .name("textract-type")
+ .displayName("Textract Type")
+ .required(true)
+ .description("Supported values: \"Document Analysis\", \"Document Text Detection\", \"Expense Analysis\"")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .defaultValue(DOCUMENT_ANALYSIS.type)
+ .addValidator(TEXTRACT_TYPE_VALIDATOR)
+ .build();
+ private static final List TEXTRACT_PROPERTIES =
+ Collections.unmodifiableList(Stream.concat(PROPERTIES.stream(), Stream.of(TEXTRACT_TYPE)).collect(Collectors.toList()));
+
+ @Override
+ public List getSupportedPropertyDescriptors() {
+ return TEXTRACT_PROPERTIES;
+ }
+
+ @Override
+ protected void postProcessFlowFile(ProcessContext context, ProcessSession session, FlowFile flowFile, AmazonWebServiceResult response) {
+ super.postProcessFlowFile(context, session, flowFile, response);
+ }
+
+ @Override
+ protected AmazonTextractClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+ return (AmazonTextractClient) AmazonTextractClient.builder()
+ .withRegion(context.getProperty(REGION).getValue())
+ .withCredentials(credentialsProvider)
+ .build();
+ }
+
+ @Override
+ protected AmazonWebServiceResult sendRequest(AmazonWebServiceRequest request, ProcessContext context, FlowFile flowFile) {
+ TextractType textractType =
+ TextractType.fromString(context.getProperty(TEXTRACT_TYPE.getName()).evaluateAttributeExpressions(flowFile).getValue());
+ AmazonWebServiceResult result;
+ switch (textractType) {
+ case DOCUMENT_ANALYSIS :
+ result = getClient().startDocumentAnalysis((StartDocumentAnalysisRequest) request);
+ break;
+ case DOCUMENT_TEXT_DETECTION:
+ result = getClient().startDocumentTextDetection((StartDocumentTextDetectionRequest) request);
+ break;
+ case EXPENSE_ANALYSIS:
+ result = getClient().startExpenseAnalysis((StartExpenseAnalysisRequest) request);
+ break;
+ default: throw new UnsupportedOperationException("Unsupported textract type: " + textractType);
+ }
+ return result;
+ }
+
+ @Override
+ protected Class extends AmazonWebServiceRequest> getAwsRequestClass(ProcessContext context, FlowFile flowFile) {
+ TextractType typeOfTextract =
+ TextractType.fromString(context.getProperty(TEXTRACT_TYPE.getName()).evaluateAttributeExpressions(flowFile).getValue());
+ Class extends AmazonWebServiceRequest> result = null;
+ switch (typeOfTextract) {
+ case DOCUMENT_ANALYSIS:
+ result = StartDocumentAnalysisRequest.class;
+ break;
+ case DOCUMENT_TEXT_DETECTION:
+ result = StartDocumentTextDetectionRequest.class;
+ break;
+ case EXPENSE_ANALYSIS:
+ result = StartExpenseAnalysisRequest.class;
+ break;
+ }
+ return result;
+ }
+
+ @Override
+ protected String getAwsTaskId(ProcessContext context, AmazonWebServiceResult amazonWebServiceResult, FlowFile flowFile) {
+ TextractType textractType =
+ TextractType.fromString(context.getProperty(TEXTRACT_TYPE.getName()).evaluateAttributeExpressions(flowFile).getValue());
+ String result;
+ switch (textractType) {
+ case DOCUMENT_ANALYSIS:
+ result = ((StartDocumentAnalysisResult) amazonWebServiceResult).getJobId();
+ break;
+ case DOCUMENT_TEXT_DETECTION:
+ result = ((StartDocumentTextDetectionResult) amazonWebServiceResult).getJobId();
+ break;
+ case EXPENSE_ANALYSIS:
+ result = ((StartExpenseAnalysisResult) amazonWebServiceResult).getJobId();
+ break;
+ default: throw new UnsupportedOperationException("Unsupported textract type.");
+ }
+ return result;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractType.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractType.java
new file mode 100644
index 0000000000..e0f4462d34
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractType.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.textract;
+
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toSet;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Set;
+
+public enum TextractType {
+ DOCUMENT_ANALYSIS("Document Analysis"),
+ DOCUMENT_TEXT_DETECTION("Document Text Detection"),
+ EXPENSE_ANALYSIS("Expense Analysis");
+
+ public static final Set TEXTRACT_TYPES = Arrays.stream(TextractType.values()).map(TextractType::getType)
+ .collect(collectingAndThen(toSet(), Collections::unmodifiableSet));
+
+ public final String type;
+
+ TextractType(String type) {
+ this.type = type;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public static TextractType fromString(String value) {
+ return Arrays.stream(values())
+ .filter(type -> type.getType().equalsIgnoreCase(value))
+ .findAny()
+ .orElseThrow(() -> new UnsupportedOperationException("Unsupported textract type."));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/GetAwsTranscribeJobStatus.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/GetAwsTranscribeJobStatus.java
new file mode 100644
index 0000000000..8a21a9bb19
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/GetAwsTranscribeJobStatus.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.transcribe;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import com.amazonaws.services.transcribe.AmazonTranscribeClient;
+import com.amazonaws.services.transcribe.model.GetTranscriptionJobRequest;
+import com.amazonaws.services.transcribe.model.GetTranscriptionJobResult;
+import com.amazonaws.services.transcribe.model.TranscriptionJobStatus;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Transcribe"})
+@CapabilityDescription("Retrieves the current status of an AWS Transcribe job.")
+@SeeAlso({StartAwsTranscribeJob.class})
+@WritesAttributes({
+ @WritesAttribute(attribute = "outputLocation", description = "S3 path-style output location of the result.")
+})
+public class GetAwsTranscribeJobStatus extends AwsMachineLearningJobStatusProcessor {
+ @Override
+ protected AmazonTranscribeClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+ return (AmazonTranscribeClient) AmazonTranscribeClient.builder()
+ .withRegion(context.getProperty(REGION).getValue())
+ .withCredentials(credentialsProvider)
+ .build();
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+ try {
+ GetTranscriptionJobResult job = getJob(context, flowFile);
+ TranscriptionJobStatus jobStatus = TranscriptionJobStatus.fromValue(job.getTranscriptionJob().getTranscriptionJobStatus());
+
+ if (TranscriptionJobStatus.COMPLETED == jobStatus) {
+ writeToFlowFile(session, flowFile, job);
+ session.putAttribute(flowFile, AWS_TASK_OUTPUT_LOCATION, job.getTranscriptionJob().getTranscript().getTranscriptFileUri());
+ session.transfer(flowFile, REL_SUCCESS);
+ } else if (TranscriptionJobStatus.IN_PROGRESS == jobStatus) {
+ session.transfer(flowFile, REL_RUNNING);
+ } else if (TranscriptionJobStatus.FAILED == jobStatus) {
+ final String failureReason = job.getTranscriptionJob().getFailureReason();
+ session.putAttribute(flowFile, FAILURE_REASON_ATTRIBUTE, failureReason);
+ session.transfer(flowFile, REL_FAILURE);
+ getLogger().error("Transcribe Task Failed {} Reason [{}]", flowFile, failureReason);
+ }
+ } catch (ThrottlingException e) {
+ getLogger().info("Request Rate Limit exceeded", e);
+ session.transfer(flowFile, REL_THROTTLED);
+ return;
+ } catch (Exception e) {
+ getLogger().warn("Failed to get Transcribe Job status", e);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
+ private GetTranscriptionJobResult getJob(ProcessContext context, FlowFile flowFile) {
+ String taskId = context.getProperty(TASK_ID).evaluateAttributeExpressions(flowFile).getValue();
+ GetTranscriptionJobRequest request = new GetTranscriptionJobRequest().withTranscriptionJobName(taskId);
+ return getClient().getTranscriptionJob(request);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/StartAwsTranscribeJob.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/StartAwsTranscribeJob.java
new file mode 100644
index 0000000000..f34a8b5490
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/StartAwsTranscribeJob.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.transcribe;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.transcribe.AmazonTranscribeClient;
+import com.amazonaws.services.transcribe.model.StartTranscriptionJobRequest;
+import com.amazonaws.services.transcribe.model.StartTranscriptionJobResult;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStarter;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Transcribe"})
+@CapabilityDescription("Trigger a AWS Transcribe job. It should be followed by GetAwsTranscribeStatus processor in order to monitor job status.")
+@SeeAlso({GetAwsTranscribeJobStatus.class})
+public class StartAwsTranscribeJob extends AwsMachineLearningJobStarter {
+ @Override
+ protected AmazonTranscribeClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+ return (AmazonTranscribeClient) AmazonTranscribeClient.builder()
+ .withRegion(context.getProperty(REGION).getValue())
+ .withCredentials(credentialsProvider)
+ .build();
+ }
+
+ @Override
+ protected AmazonTranscribeClient createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) {
+ return (AmazonTranscribeClient) AmazonTranscribeClient.builder().build();
+ }
+
+ @Override
+ protected StartTranscriptionJobResult sendRequest(StartTranscriptionJobRequest request, ProcessContext context, FlowFile flowFile) {
+ return getClient().startTranscriptionJob(request);
+ }
+
+ @Override
+ protected Class extends StartTranscriptionJobRequest> getAwsRequestClass(ProcessContext context, FlowFile flowFile) {
+ return StartTranscriptionJobRequest.class;
+ }
+
+ @Override
+ protected String getAwsTaskId(ProcessContext context, StartTranscriptionJobResult startTranscriptionJobResult, FlowFile flowFile) {
+ return startTranscriptionJobResult.getTranscriptionJob().getTranscriptionJobName();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/GetAwsTranslateJobStatus.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/GetAwsTranslateJobStatus.java
new file mode 100644
index 0000000000..2471b15068
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/GetAwsTranslateJobStatus.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.translate;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import com.amazonaws.services.translate.AmazonTranslateClient;
+import com.amazonaws.services.translate.model.DescribeTextTranslationJobRequest;
+import com.amazonaws.services.translate.model.DescribeTextTranslationJobResult;
+import com.amazonaws.services.translate.model.JobStatus;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Translate"})
+@CapabilityDescription("Retrieves the current status of an AWS Translate job.")
+@SeeAlso({StartAwsTranslateJob.class})
+@WritesAttributes({
+ @WritesAttribute(attribute = "outputLocation", description = "S3 path-style output location of the result.")
+})
+public class GetAwsTranslateJobStatus extends AwsMachineLearningJobStatusProcessor {
+ @Override
+ protected AmazonTranslateClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+ return (AmazonTranslateClient) AmazonTranslateClient.builder()
+ .withRegion(context.getProperty(REGION).getValue())
+ .withCredentials(credentialsProvider)
+ .build();
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+ String awsTaskId = context.getProperty(TASK_ID).evaluateAttributeExpressions(flowFile).getValue();
+ try {
+ DescribeTextTranslationJobResult describeTextTranslationJobResult = getStatusString(awsTaskId);
+ JobStatus status = JobStatus.fromValue(describeTextTranslationJobResult.getTextTranslationJobProperties().getJobStatus());
+
+ if (status == JobStatus.IN_PROGRESS || status == JobStatus.SUBMITTED) {
+ writeToFlowFile(session, flowFile, describeTextTranslationJobResult);
+ session.penalize(flowFile);
+ session.transfer(flowFile, REL_RUNNING);
+ } else if (status == JobStatus.COMPLETED) {
+ session.putAttribute(flowFile, AWS_TASK_OUTPUT_LOCATION, describeTextTranslationJobResult.getTextTranslationJobProperties().getOutputDataConfig().getS3Uri());
+ writeToFlowFile(session, flowFile, describeTextTranslationJobResult);
+ session.transfer(flowFile, REL_SUCCESS);
+ } else if (status == JobStatus.FAILED || status == JobStatus.COMPLETED_WITH_ERROR) {
+ writeToFlowFile(session, flowFile, describeTextTranslationJobResult);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ } catch (ThrottlingException e) {
+ getLogger().info("Request Rate Limit exceeded", e);
+ session.transfer(flowFile, REL_THROTTLED);
+ return;
+ } catch (Exception e) {
+ getLogger().warn("Failed to get Polly Job status", e);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
+ private DescribeTextTranslationJobResult getStatusString(String awsTaskId) {
+ DescribeTextTranslationJobRequest request = new DescribeTextTranslationJobRequest().withJobId(awsTaskId);
+ DescribeTextTranslationJobResult translationJobsResult = getClient().describeTextTranslationJob(request);
+ return translationJobsResult;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/StartAwsTranslateJob.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/StartAwsTranslateJob.java
new file mode 100644
index 0000000000..2ae8480089
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/StartAwsTranslateJob.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.ml.translate;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.translate.AmazonTranslateClient;
+import com.amazonaws.services.translate.model.StartTextTranslationJobRequest;
+import com.amazonaws.services.translate.model.StartTextTranslationJobResult;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStarter;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Translate"})
+@CapabilityDescription("Trigger a AWS Translate job. It should be followed by GetAwsTranslateJobStatus processor in order to monitor job status.")
+@SeeAlso({GetAwsTranslateJobStatus.class})
+public class StartAwsTranslateJob extends AwsMachineLearningJobStarter {
+ @Override
+ protected AmazonTranslateClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+ return (AmazonTranslateClient) AmazonTranslateClient.builder()
+ .withRegion(context.getProperty(REGION).getValue())
+ .withCredentials(credentialsProvider)
+ .build();
+ }
+
+ @Override
+ protected StartTextTranslationJobResult sendRequest(StartTextTranslationJobRequest request, ProcessContext context, FlowFile flowFile) {
+ return getClient().startTextTranslationJob(request);
+ }
+
+ @Override
+ protected Class getAwsRequestClass(ProcessContext context, FlowFile flowFile) {
+ return StartTextTranslationJobRequest.class;
+ }
+
+ protected String getAwsTaskId(ProcessContext context, StartTextTranslationJobResult startTextTranslationJobResult, FlowFile flowFile) {
+ return startTextTranslationJobResult.getJobId();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index a02ced4908..2682d4ea5b 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -31,4 +31,12 @@ org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream
org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream
org.apache.nifi.processors.aws.cloudwatch.PutCloudWatchMetric
org.apache.nifi.processors.aws.wag.InvokeAWSGatewayApi
+org.apache.nifi.processors.aws.ml.translate.StartAwsTranslateJob
+org.apache.nifi.processors.aws.ml.translate.GetAwsTranslateJobStatus
+org.apache.nifi.processors.aws.ml.polly.StartAwsPollyJob
+org.apache.nifi.processors.aws.ml.polly.GetAwsPollyJobStatus
+org.apache.nifi.processors.aws.ml.textract.StartAwsTextractJob
+org.apache.nifi.processors.aws.ml.textract.GetAwsTextractJobStatus
+org.apache.nifi.processors.aws.ml.transcribe.StartAwsTranscribeJob
+org.apache.nifi.processors.aws.ml.transcribe.GetAwsTranscribeJobStatus
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.GetAwsPollyJobStatus/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.GetAwsPollyJobStatus/additionalDetails.html
new file mode 100644
index 0000000000..5e842f0006
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.GetAwsPollyJobStatus/additionalDetails.html
@@ -0,0 +1,39 @@
+
+
+
+
+
+
+ Amazon Polly
+
+
+
+
+
GetAwsPollyJobStatus
+
+ Amazon Polly is a service that turns text into lifelike speech, allowing you to create applications that talk, and build entirely new categories of speech-enabled products.
+ Polly's Text-to-Speech (TTS) service uses advanced deep learning technologies to synthesize natural sounding human speech.
+ With dozens of lifelike voices across a broad set of languages, you can build speech-enabled applications that work in many different countries.
+
+
+
Usage
+
+ GetAwsPollyJobStatus Processor is designed to periodically check polly job status. This processor should be used in pair with StartAwsPollyJob Processor.
+ If the job successfully finished it will populate outputLocation attribute of the flow file where you can find the output of the Polly job.
+ In case of an error failure.reason attribute will be populated with the details.
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.StartAwsPollyJob/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.StartAwsPollyJob/additionalDetails.html
new file mode 100644
index 0000000000..a3ed042035
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.StartAwsPollyJob/additionalDetails.html
@@ -0,0 +1,68 @@
+
+
+
+
+
+
+ Amazon Polly
+
+
+
+
+
StartAwsPollyJob
+
+ Amazon Polly is a service that turns text into lifelike speech, allowing you to create applications that talk, and build entirely new categories of speech-enabled products.
+ Polly's Text-to-Speech (TTS) service uses advanced deep learning technologies to synthesize natural sounding human speech.
+ With dozens of lifelike voices across a broad set of languages, you can build speech-enabled applications that work in many different countries.
+
+
+
Usage
+
+ Amazon ML Processors are implemented to utilize ML services based on the official AWS API Reference.
+ You can find example json payload in the documentation at the Request Syntax sections.
+ For more details please check the official Polly API reference
+ With this processor you will trigger a startSpeechSynthesisTask async call to Polly Service.
+
+ You can define json payload as property or provide as a flow file content. Property has higher precedence.
+
+ After the job is triggered the serialized json response will be written to the output flow file.
+ The awsTaskId attribute will be populated, so it makes it easier to query job status by the corresponding get job status processor.
+
+
+
+ JSON payload template - note that it can be simplified with the optional fields, check AWS documentation for more details - example:
+
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.GetAwsTextractJobStatus/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.GetAwsTextractJobStatus/additionalDetails.html
new file mode 100644
index 0000000000..9ac7d31f09
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.GetAwsTextractJobStatus/additionalDetails.html
@@ -0,0 +1,37 @@
+
+
+
+
+
+
+ Amazon Textract
+
+
+
+
+
GetAwsTextractJobStatus
+
+ Amazon Textract is a machine learning (ML) service that automatically extracts text, handwriting, and data from scanned documents.
+ It goes beyond simple optical character recognition (OCR) to identify, understand, and extract data from forms and tables.
+
+
Usage
+
+ GetAwsTextractJobStatus Processor is designed to periodically check textract job status. This processor should be used in pair with StartAwsTextractJob Processor.
+ FlowFile will contain the serialized Tetract response that contains the result and additional metadata as it is documented in AWS Textract Reference.
+
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.StartAwsTextractJob/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.StartAwsTextractJob/additionalDetails.html
new file mode 100644
index 0000000000..fe334e55fd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.StartAwsTextractJob/additionalDetails.html
@@ -0,0 +1,144 @@
+
+
+
+
+
+
+ Amazon Textract
+
+
+
+
+
StartAwsTextractJob
+
+ Amazon Textract is a machine learning (ML) service that automatically extracts text, handwriting, and data from scanned documents.
+ It goes beyond simple optical character recognition (OCR) to identify, understand, and extract data from forms and tables.
+
+
+
Usage
+
+ Amazon ML Processors are implemented to utilize ML services based on the official AWS API Reference.
+ You can find example json payload in the documentation at the Request Syntax sections.
+ For more details please check the official Textract API reference
+ With this processor you will trigger a startDocumentAnalysis, startDocumentTextDetection or startExpenseAnalysis async call according to your type of textract settings.
+
+ You can define json payload as property or provide as a flow file content. Property has higher precedence.
+
+ After the job is triggered the serialized json response will be written to the output flow file.
+ The awsTaskId attribute will be populated, so it makes it easier to query job status by the corresponding get job status processor.
+
+
+ Three different type of textract task are supported: Documnet Analysis, Text Detection, Expense Analysis.
+
+
DocumentAnalysis
+
Starts the asynchronous analysis of an input document for relationships between detected items such as key-value pairs, tables, and selection elements.
+ API Reference
+
Starts the asynchronous detection of text in a document. Amazon Textract can detect lines of text and the words that make up a line of text.
+ API Reference
+
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.GetAwsTranscribeJobStatus/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.GetAwsTranscribeJobStatus/additionalDetails.html
new file mode 100644
index 0000000000..25e4f3513d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.GetAwsTranscribeJobStatus/additionalDetails.html
@@ -0,0 +1,42 @@
+
+
+
+
+
+
+ Amazon Transcribe
+
+
+
+
+
Amazon Transcribe
+
+ Automatically convert speech to text
+
+
Extract key business insights from customer calls, video files, clinical conversations, and more.
+
Improve business outcomes with state of the art speech recognition models that are fully managed and continuously trained.
+
Ensure customer privacy and safety by masking sensitive information.
+
+
+
+
Usage
+
+ GetAwsTranscribeJobStatus Processor is designed to periodically check Transcribe job status. This processor should be used in pair with Transcribe Processor.
+ FlowFile will contain the serialized Transcribe response and it will populate the path of the output in the outputLocation attribute.
+
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.StartAwsTranscribeJob/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.StartAwsTranscribeJob/additionalDetails.html
new file mode 100644
index 0000000000..8068383329
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.StartAwsTranscribeJob/additionalDetails.html
@@ -0,0 +1,113 @@
+
+
+
+
+
+
+ Amazon Transcribe
+
+
+
+
+
Amazon Transcribe
+
+ Automatically convert speech to text
+
+
Extract key business insights from customer calls, video files, clinical conversations, and more.
+
Improve business outcomes with state of the art speech recognition models that are fully managed and continuously trained.
+
Ensure customer privacy and safety by masking sensitive information.
+
+
+
+
Usage
+
+ Amazon ML Processors are implemented to utilize ML services based on the official AWS API Reference.
+ You can find example json payload in the documentation at the Request Syntax sections.
+ For more details please check the official Transcribe API reference
+ With this processor you will trigger a startTranscriptionJob async call to AWS Transcribe Service.
+
+ You can define json payload as property or provide as a flow file content. Property has higher precedence.
+
+ After the job is triggered the serialized json response will be written to the output flow file.
+ The awsTaskId attribute will be populated, so it makes it easier to query job status by the corresponding get job status processor.
+
+
+ JSON payload template - note that these can be simplified with the optional fields, check AWS documentation for more details - examples:
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.GetAwsTranslateJobStatus/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.GetAwsTranslateJobStatus/additionalDetails.html
new file mode 100644
index 0000000000..d12d376283
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.GetAwsTranslateJobStatus/additionalDetails.html
@@ -0,0 +1,40 @@
+
+
+
+
+
+
+ Amazon Translate
+
+
+
+
+
GetAwsTranslateJobStatus
+
+ Amazon Translate is a neural machine translation service for translating text to and from English across a breadth of supported languages.
+ Powered by deep-learning technologies, Amazon Translate delivers fast, high-quality, and affordable language translation.
+ It provides a managed, continually trained solution so you can easily translate company and user-authored content or build applications that require support across multiple languages.
+ The machine translation engine has been trained on a wide variety of content across different domains to produce quality translations that serve any industry need.
+
+
+
Usage
+
+ GetAwsTranslateJobStatus Processor is designed to periodically check translate job status. This processor should be used in pair with Translate Processor.
+ If the job successfully finished it will populate outputLocation attribute of the flow file where you can find the output of the Translation job.
+
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.StartAwsTranslateJob/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.StartAwsTranslateJob/additionalDetails.html
new file mode 100644
index 0000000000..ae02e9a397
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.StartAwsTranslateJob/additionalDetails.html
@@ -0,0 +1,75 @@
+
+
+
+
+
+
+ Amazon Translate
+
+
+
+
+
StartAwsTranslateJob
+
+ Amazon Translate is a neural machine translation service for translating text to and from English across a breadth of supported languages.
+ Powered by deep-learning technologies, Amazon Translate delivers fast, high-quality, and affordable language translation.
+ It provides a managed, continually trained solution so you can easily translate company and user-authored content or build applications that require support across multiple languages.
+ The machine translation engine has been trained on a wide variety of content across different domains to produce quality translations that serve any industry need.
+
+
+
Usage
+
+ Amazon ML Processors are implemented to utilize ML services based on the official AWS API Reference.
+ You can find example json payload in the documentation at the Request Syntax sections.
+ For more details please check the official Translate API reference
+ With this processor you will trigger a startTextTranslationJob async call to Translate Service
+
+ You can define json payload as property or provide as a flow file content. Property has higher precedence.
+
+
+ JSON payload template - note that it can be simplified with the optional fields, check AWS documentation for more details - example:
+