diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml index 49bc3aef24..dddd131bb4 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml @@ -121,6 +121,22 @@ 1.20.0-SNAPSHOT provided + + com.amazonaws + aws-java-sdk-translate + + + com.amazonaws + aws-java-sdk-polly + + + com.amazonaws + aws-java-sdk-transcribe + + + com.amazonaws + aws-java-sdk-textract + diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStarter.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStarter.java new file mode 100644 index 0000000000..8419bee547 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStarter.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml; + +import static org.apache.nifi.flowfile.attributes.CoreAttributes.MIME_TYPE; +import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.TASK_ID; + +import com.amazonaws.AmazonWebServiceClient; +import com.amazonaws.AmazonWebServiceRequest; +import com.amazonaws.AmazonWebServiceResult; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.regions.Regions; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +public abstract class AwsMachineLearningJobStarter + extends AbstractAWSCredentialsProviderProcessor { + public static final PropertyDescriptor JSON_PAYLOAD = new PropertyDescriptor.Builder() + .name("json-payload") + .displayName("JSON Payload") + .description("JSON request for AWS Machine Learning services. The Processor will use FlowFile content for the request when this property is not specified.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE = + new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE) + .required(true) + .build(); + public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder() + .displayName("Region") + .name("aws-region") + .required(true) + .allowableValues(getAvailableRegions()) + .defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue()) + .build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("Upon successful completion, the original FlowFile will be routed to this relationship.") + .autoTerminateDefault(true) + .build(); + protected static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE, + REGION, + TIMEOUT, + JSON_PAYLOAD, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE)); + private final static ObjectMapper MAPPER = JsonMapper.builder() + .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true) + .build(); + private static final Set relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_ORIGINAL, + REL_SUCCESS, + REL_FAILURE + ))); + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null && !context.getProperty(JSON_PAYLOAD).isSet()) { + return; + } + final RESPONSE response; + FlowFile childFlowFile; + try { + response = sendRequest(buildRequest(session, context, flowFile), context, flowFile); + childFlowFile = writeToFlowFile(session, flowFile, response); + postProcessFlowFile(context, session, childFlowFile, response); + session.transfer(childFlowFile, REL_SUCCESS); + } catch (Exception e) { + if (flowFile != null) { + session.transfer(flowFile, REL_FAILURE); + } + getLogger().error("Sending AWS ML Request failed", e); + return; + } + if (flowFile != null) { + session.transfer(flowFile, REL_ORIGINAL); + } + + } + + protected void postProcessFlowFile(ProcessContext context, ProcessSession session, FlowFile flowFile, RESPONSE response) { + final String awsTaskId = getAwsTaskId(context, response, flowFile); + flowFile = session.putAttribute(flowFile, TASK_ID.getName(), awsTaskId); + flowFile = session.putAttribute(flowFile, MIME_TYPE.key(), "application/json"); + getLogger().debug("AWS ML Task [{}] started", awsTaskId); + } + + protected REQUEST buildRequest(ProcessSession session, ProcessContext context, FlowFile flowFile) throws JsonProcessingException { + return MAPPER.readValue(getPayload(session, context, flowFile), getAwsRequestClass(context, flowFile)); + } + + @Override + protected T createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) { + throw new UnsupportedOperationException("createClient(ProcessContext, AWSCredentials, ClientConfiguration) is not supported"); + } + + protected FlowFile writeToFlowFile(ProcessSession session, FlowFile flowFile, RESPONSE response) { + FlowFile childFlowFile = flowFile == null ? session.create() : session.create(flowFile); + childFlowFile = session.write(childFlowFile, out -> MAPPER.writeValue(out, response)); + return childFlowFile; + } + + protected String readFlowFile(final ProcessSession session, final FlowFile flowFile) { + try (InputStream inputStream = session.read(flowFile)) { + return new String(IOUtils.toByteArray(inputStream)); + } catch (final IOException e) { + throw new ProcessException("Read FlowFile Failed", e); + } + } + + private String getPayload(ProcessSession session, ProcessContext context, FlowFile flowFile) { + String payloadPropertyValue = context.getProperty(JSON_PAYLOAD).evaluateAttributeExpressions(flowFile).getValue(); + if (payloadPropertyValue == null) { + payloadPropertyValue = readFlowFile(session, flowFile); + } + return payloadPropertyValue; + } + + abstract protected RESPONSE sendRequest(REQUEST request, ProcessContext context, FlowFile flowFile) throws JsonProcessingException; + + abstract protected Class getAwsRequestClass(ProcessContext context, FlowFile flowFile); + + abstract protected String getAwsTaskId(ProcessContext context, RESPONSE response, FlowFile flowFile); +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStatusProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStatusProcessor.java new file mode 100644 index 0000000000..157314c9cf --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStatusProcessor.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml; + +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; + +import com.amazonaws.AmazonWebServiceClient; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.ResponseMetadata; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.http.SdkHttpMetadata; +import com.amazonaws.regions.Regions; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +public abstract class AwsMachineLearningJobStatusProcessor + extends AbstractAWSCredentialsProviderProcessor { + public static final String AWS_TASK_OUTPUT_LOCATION = "outputLocation"; + public static final PropertyDescriptor MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE = + new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE) + .required(true) + .build(); + public static final PropertyDescriptor TASK_ID = + new PropertyDescriptor.Builder() + .name("awsTaskId") + .displayName("AWS Task ID") + .defaultValue("${awsTaskId}") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("Upon successful completion, the original FlowFile will be routed to this relationship.") + .autoTerminateDefault(true) + .build(); + public static final Relationship REL_RUNNING = new Relationship.Builder() + .name("running") + .description("The job is currently still being processed") + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Job successfully finished. FlowFile will be routed to this relation.") + .build(); + public static final Relationship REL_THROTTLED = new Relationship.Builder() + .name("throttled") + .description("Retrieving results failed for some reason, but the issue is likely to resolve on its own, such as Provisioned Throughput Exceeded or a Throttling failure. " + + "It is generally expected to retry this relationship.") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("The job failed, the original FlowFile will be routed to this relationship.") + .autoTerminateDefault(true) + .build(); + public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder() + .displayName("Region") + .name("aws-region") + .required(true) + .allowableValues(getAvailableRegions()) + .defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue()) + .build(); + public static final String FAILURE_REASON_ATTRIBUTE = "failure.reason"; + protected static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + TASK_ID, + MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE, + REGION, + TIMEOUT, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, + PROXY_CONFIGURATION_SERVICE)); + private static final ObjectMapper MAPPER = JsonMapper.builder() + .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true) + .build(); + + static { + SimpleModule awsResponseModule = new SimpleModule(); + awsResponseModule.addDeserializer(ResponseMetadata.class, new AwsResponseMetadataDeserializer()); + SimpleModule sdkHttpModule = new SimpleModule(); + awsResponseModule.addDeserializer(SdkHttpMetadata.class, new SdkHttpMetadataDeserializer()); + MAPPER.registerModule(awsResponseModule); + MAPPER.registerModule(sdkHttpModule); + } + + + @Override + public Set getRelationships() { + return relationships; + } + + private static final Set relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_ORIGINAL, + REL_SUCCESS, + REL_RUNNING, + REL_THROTTLED, + REL_FAILURE + ))); + + @Override + public List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected T createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) { + throw new UnsupportedOperationException("Client creation not supported"); + } + + protected void writeToFlowFile(ProcessSession session, FlowFile flowFile, Object response) { + session.write(flowFile, out -> MAPPER.writeValue(out, response)); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsResponseMetadataDeserializer.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsResponseMetadataDeserializer.java new file mode 100644 index 0000000000..ec3ad96282 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsResponseMetadataDeserializer.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml; + +import com.amazonaws.ResponseMetadata; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.deser.std.StdNodeBasedDeserializer; +import java.io.IOException; +import java.util.Map; + +public class AwsResponseMetadataDeserializer extends StdNodeBasedDeserializer { + protected AwsResponseMetadataDeserializer() { + super(ResponseMetadata.class); + } + + @Override + public ResponseMetadata convert(JsonNode root, DeserializationContext ctxt) throws IOException { + return new ResponseMetadata((Map) null); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/SdkHttpMetadataDeserializer.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/SdkHttpMetadataDeserializer.java new file mode 100644 index 0000000000..a8d027d8d3 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/SdkHttpMetadataDeserializer.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml; + +import com.amazonaws.http.SdkHttpMetadata; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.deser.std.StdNodeBasedDeserializer; +import java.io.IOException; + +public class SdkHttpMetadataDeserializer extends StdNodeBasedDeserializer { + + protected SdkHttpMetadataDeserializer() { + super(SdkHttpMetadata.class); + } + + @Override + public SdkHttpMetadata convert(JsonNode root, DeserializationContext ctxt) throws IOException { + return null; + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyJobStatus.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyJobStatus.java new file mode 100644 index 0000000000..0efcd062d4 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyJobStatus.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml.polly; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.polly.AmazonPollyClient; +import com.amazonaws.services.polly.AmazonPollyClientBuilder; +import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskRequest; +import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskResult; +import com.amazonaws.services.polly.model.TaskStatus; +import com.amazonaws.services.textract.model.ThrottlingException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"}) +@CapabilityDescription("Retrieves the current status of an AWS Polly job.") +@SeeAlso({StartAwsPollyJob.class}) +@WritesAttributes({ + @WritesAttribute(attribute = "PollyS3OutputBucket", description = "The bucket name where polly output will be located."), + @WritesAttribute(attribute = "PollyS3OutputKey", description = "Object key of polly output."), + @WritesAttribute(attribute = "outputLocation", description = "S3 path-style output location of the result.") +}) +public class GetAwsPollyJobStatus extends AwsMachineLearningJobStatusProcessor { + private static final String BUCKET = "bucket"; + private static final String KEY = "key"; + private static final Pattern S3_PATH = Pattern.compile("https://s3.*amazonaws.com/(?<" + BUCKET + ">[^/]+)/(?<" + KEY + ">.*)"); + private static final String AWS_S3_BUCKET = "PollyS3OutputBucket"; + private static final String AWS_S3_KEY = "filename"; + + @Override + protected AmazonPollyClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) { + return (AmazonPollyClient) AmazonPollyClientBuilder.standard() + .withCredentials(credentialsProvider) + .withRegion(context.getProperty(REGION).getValue()) + .build(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + GetSpeechSynthesisTaskResult speechSynthesisTask; + try { + speechSynthesisTask = getSynthesisTask(context, flowFile); + } catch (ThrottlingException e) { + getLogger().info("Request Rate Limit exceeded", e); + session.transfer(flowFile, REL_THROTTLED); + return; + } catch (Exception e) { + getLogger().warn("Failed to get Polly Job status", e); + session.transfer(flowFile, REL_FAILURE); + return; + } + + TaskStatus taskStatus = TaskStatus.fromValue(speechSynthesisTask.getSynthesisTask().getTaskStatus()); + + if (taskStatus == TaskStatus.InProgress || taskStatus == TaskStatus.Scheduled) { + session.penalize(flowFile); + session.transfer(flowFile, REL_RUNNING); + } else if (taskStatus == TaskStatus.Completed) { + String outputUri = speechSynthesisTask.getSynthesisTask().getOutputUri(); + + Matcher matcher = S3_PATH.matcher(outputUri); + if (matcher.find()) { + session.putAttribute(flowFile, AWS_S3_BUCKET, matcher.group(BUCKET)); + session.putAttribute(flowFile, AWS_S3_KEY, matcher.group(KEY)); + } + FlowFile childFlowFile = session.create(flowFile); + writeToFlowFile(session, childFlowFile, speechSynthesisTask); + childFlowFile = session.putAttribute(childFlowFile, AWS_TASK_OUTPUT_LOCATION, outputUri); + session.transfer(flowFile, REL_ORIGINAL); + session.transfer(childFlowFile, REL_SUCCESS); + getLogger().info("Amazon Polly Task Completed {}", flowFile); + } else if (taskStatus == TaskStatus.Failed) { + final String failureReason = speechSynthesisTask.getSynthesisTask().getTaskStatusReason(); + flowFile = session.putAttribute(flowFile, FAILURE_REASON_ATTRIBUTE, failureReason); + session.transfer(flowFile, REL_FAILURE); + getLogger().error("Amazon Polly Task Failed {} Reason [{}]", flowFile, failureReason); + } + } + + private GetSpeechSynthesisTaskResult getSynthesisTask(ProcessContext context, FlowFile flowFile) { + String taskId = context.getProperty(TASK_ID).evaluateAttributeExpressions(flowFile).getValue(); + GetSpeechSynthesisTaskRequest request = new GetSpeechSynthesisTaskRequest().withTaskId(taskId); + return getClient().getSpeechSynthesisTask(request); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/StartAwsPollyJob.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/StartAwsPollyJob.java new file mode 100644 index 0000000000..1a0b00d8ce --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/StartAwsPollyJob.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml.polly; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.polly.AmazonPollyClient; +import com.amazonaws.services.polly.AmazonPollyClientBuilder; +import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskRequest; +import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskResult; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStarter; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"}) +@CapabilityDescription("Trigger a AWS Polly job. It should be followed by GetAwsPollyJobStatus processor in order to monitor job status.") +@SeeAlso({GetAwsPollyJobStatus.class}) +public class StartAwsPollyJob extends AwsMachineLearningJobStarter { + @Override + protected AmazonPollyClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) { + return (AmazonPollyClient) AmazonPollyClientBuilder.standard() + .withRegion(context.getProperty(REGION).getValue()) + .withCredentials(credentialsProvider) + .build(); + } + + @Override + protected StartSpeechSynthesisTaskResult sendRequest(StartSpeechSynthesisTaskRequest request, ProcessContext context, FlowFile flowFile) { + return getClient().startSpeechSynthesisTask(request); + } + + @Override + protected Class getAwsRequestClass(ProcessContext context, FlowFile flowFile) { + return StartSpeechSynthesisTaskRequest.class; + } + + @Override + protected String getAwsTaskId(ProcessContext context, StartSpeechSynthesisTaskResult startSpeechSynthesisTaskResult, FlowFile flowFile) { + return startSpeechSynthesisTaskResult.getSynthesisTask().getTaskId(); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTextractJobStatus.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTextractJobStatus.java new file mode 100644 index 0000000000..50a81c10c0 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTextractJobStatus.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml.textract; + +import static org.apache.nifi.processors.aws.ml.textract.TextractType.DOCUMENT_ANALYSIS; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.textract.AmazonTextractClient; +import com.amazonaws.services.textract.model.GetDocumentAnalysisRequest; +import com.amazonaws.services.textract.model.GetDocumentTextDetectionRequest; +import com.amazonaws.services.textract.model.GetExpenseAnalysisRequest; +import com.amazonaws.services.textract.model.JobStatus; +import com.amazonaws.services.textract.model.ThrottlingException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"}) +@CapabilityDescription("Retrieves the current status of an AWS Textract job.") +@SeeAlso({StartAwsTextractJob.class}) +public class GetAwsTextractJobStatus extends AwsMachineLearningJobStatusProcessor { + public static final PropertyDescriptor TEXTRACT_TYPE = new PropertyDescriptor.Builder() + .name("textract-type") + .displayName("Textract Type") + .required(true) + .description("Supported values: \"Document Analysis\", \"Document Text Detection\", \"Expense Analysis\"") + .allowableValues(TextractType.TEXTRACT_TYPES) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue(DOCUMENT_ANALYSIS.getType()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + private static final List TEXTRACT_PROPERTIES = + Collections.unmodifiableList(Stream.concat(PROPERTIES.stream(), Stream.of(TEXTRACT_TYPE)).collect(Collectors.toList())); + + @Override + public List getSupportedPropertyDescriptors() { + return TEXTRACT_PROPERTIES; + } + + @Override + protected AmazonTextractClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) { + return (AmazonTextractClient) AmazonTextractClient.builder() + .withRegion(context.getProperty(REGION).getValue()) + .withCredentials(credentialsProvider) + .build(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + String textractType = context.getProperty(TEXTRACT_TYPE).evaluateAttributeExpressions(flowFile).getValue(); + + String awsTaskId = context.getProperty(TASK_ID).evaluateAttributeExpressions(flowFile).getValue(); + try { + JobStatus jobStatus = getTaskStatus(TextractType.fromString(textractType), getClient(), awsTaskId); + if (JobStatus.SUCCEEDED == jobStatus) { + Object task = getTask(TextractType.fromString(textractType), getClient(), awsTaskId); + writeToFlowFile(session, flowFile, task); + session.transfer(flowFile, REL_SUCCESS); + } else if (JobStatus.IN_PROGRESS == jobStatus) { + session.transfer(flowFile, REL_RUNNING); + } else if (JobStatus.PARTIAL_SUCCESS == jobStatus) { + session.transfer(flowFile, REL_THROTTLED); + } else if (JobStatus.FAILED == jobStatus) { + session.transfer(flowFile, REL_FAILURE); + getLogger().error("Amazon Textract Task [{}] Failed", awsTaskId); + } + } catch (ThrottlingException e) { + getLogger().info("Request Rate Limit exceeded", e); + session.transfer(flowFile, REL_THROTTLED); + return; + } catch (Exception e) { + getLogger().warn("Failed to get Textract Job status", e); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + + private Object getTask(TextractType typeOfTextract, AmazonTextractClient client, String awsTaskId) { + Object job = null; + switch (typeOfTextract) { + case DOCUMENT_ANALYSIS: + job = client.getDocumentAnalysis(new GetDocumentAnalysisRequest().withJobId(awsTaskId)); + break; + case DOCUMENT_TEXT_DETECTION: + job = client.getDocumentTextDetection(new GetDocumentTextDetectionRequest().withJobId(awsTaskId)); + break; + case EXPENSE_ANALYSIS: + job = client.getExpenseAnalysis(new GetExpenseAnalysisRequest().withJobId(awsTaskId)); + break; + } + return job; + } + + private JobStatus getTaskStatus(TextractType typeOfTextract, AmazonTextractClient client, String awsTaskId) { + JobStatus jobStatus = JobStatus.IN_PROGRESS; + switch (typeOfTextract) { + case DOCUMENT_ANALYSIS: + jobStatus = JobStatus.fromValue(client.getDocumentAnalysis(new GetDocumentAnalysisRequest().withJobId(awsTaskId)).getJobStatus()); + break; + case DOCUMENT_TEXT_DETECTION: + jobStatus = JobStatus.fromValue(client.getDocumentTextDetection(new GetDocumentTextDetectionRequest().withJobId(awsTaskId)).getJobStatus()); + break; + case EXPENSE_ANALYSIS: + jobStatus = JobStatus.fromValue(client.getExpenseAnalysis(new GetExpenseAnalysisRequest().withJobId(awsTaskId)).getJobStatus()); + break; + + } + return jobStatus; + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/StartAwsTextractJob.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/StartAwsTextractJob.java new file mode 100644 index 0000000000..2fa4483af0 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/StartAwsTextractJob.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml.textract; + +import static org.apache.nifi.processors.aws.ml.textract.TextractType.DOCUMENT_ANALYSIS; + +import com.amazonaws.AmazonWebServiceRequest; +import com.amazonaws.AmazonWebServiceResult; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.textract.AmazonTextractClient; +import com.amazonaws.services.textract.model.StartDocumentAnalysisRequest; +import com.amazonaws.services.textract.model.StartDocumentAnalysisResult; +import com.amazonaws.services.textract.model.StartDocumentTextDetectionRequest; +import com.amazonaws.services.textract.model.StartDocumentTextDetectionResult; +import com.amazonaws.services.textract.model.StartExpenseAnalysisRequest; +import com.amazonaws.services.textract.model.StartExpenseAnalysisResult; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStarter; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"}) +@CapabilityDescription("Trigger a AWS Textract job. It should be followed by GetAwsTextractJobStatus processor in order to monitor job status.") +@SeeAlso({GetAwsTextractJobStatus.class}) +public class StartAwsTextractJob extends AwsMachineLearningJobStarter { + public static final Validator TEXTRACT_TYPE_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { + return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build(); + } else if (TextractType.TEXTRACT_TYPES.contains(value)) { + return new ValidationResult.Builder().subject(subject).input(value).explanation("Supported Value.").valid(true).build(); + } else { + return new ValidationResult.Builder().subject(subject).input(value).explanation("Not a supported value, flow file attribute or context parameter.").valid(false).build(); + } + } + }; + public static final PropertyDescriptor TEXTRACT_TYPE = new PropertyDescriptor.Builder() + .name("textract-type") + .displayName("Textract Type") + .required(true) + .description("Supported values: \"Document Analysis\", \"Document Text Detection\", \"Expense Analysis\"") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue(DOCUMENT_ANALYSIS.type) + .addValidator(TEXTRACT_TYPE_VALIDATOR) + .build(); + private static final List TEXTRACT_PROPERTIES = + Collections.unmodifiableList(Stream.concat(PROPERTIES.stream(), Stream.of(TEXTRACT_TYPE)).collect(Collectors.toList())); + + @Override + public List getSupportedPropertyDescriptors() { + return TEXTRACT_PROPERTIES; + } + + @Override + protected void postProcessFlowFile(ProcessContext context, ProcessSession session, FlowFile flowFile, AmazonWebServiceResult response) { + super.postProcessFlowFile(context, session, flowFile, response); + } + + @Override + protected AmazonTextractClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) { + return (AmazonTextractClient) AmazonTextractClient.builder() + .withRegion(context.getProperty(REGION).getValue()) + .withCredentials(credentialsProvider) + .build(); + } + + @Override + protected AmazonWebServiceResult sendRequest(AmazonWebServiceRequest request, ProcessContext context, FlowFile flowFile) { + TextractType textractType = + TextractType.fromString(context.getProperty(TEXTRACT_TYPE.getName()).evaluateAttributeExpressions(flowFile).getValue()); + AmazonWebServiceResult result; + switch (textractType) { + case DOCUMENT_ANALYSIS : + result = getClient().startDocumentAnalysis((StartDocumentAnalysisRequest) request); + break; + case DOCUMENT_TEXT_DETECTION: + result = getClient().startDocumentTextDetection((StartDocumentTextDetectionRequest) request); + break; + case EXPENSE_ANALYSIS: + result = getClient().startExpenseAnalysis((StartExpenseAnalysisRequest) request); + break; + default: throw new UnsupportedOperationException("Unsupported textract type: " + textractType); + } + return result; + } + + @Override + protected Class getAwsRequestClass(ProcessContext context, FlowFile flowFile) { + TextractType typeOfTextract = + TextractType.fromString(context.getProperty(TEXTRACT_TYPE.getName()).evaluateAttributeExpressions(flowFile).getValue()); + Class result = null; + switch (typeOfTextract) { + case DOCUMENT_ANALYSIS: + result = StartDocumentAnalysisRequest.class; + break; + case DOCUMENT_TEXT_DETECTION: + result = StartDocumentTextDetectionRequest.class; + break; + case EXPENSE_ANALYSIS: + result = StartExpenseAnalysisRequest.class; + break; + } + return result; + } + + @Override + protected String getAwsTaskId(ProcessContext context, AmazonWebServiceResult amazonWebServiceResult, FlowFile flowFile) { + TextractType textractType = + TextractType.fromString(context.getProperty(TEXTRACT_TYPE.getName()).evaluateAttributeExpressions(flowFile).getValue()); + String result; + switch (textractType) { + case DOCUMENT_ANALYSIS: + result = ((StartDocumentAnalysisResult) amazonWebServiceResult).getJobId(); + break; + case DOCUMENT_TEXT_DETECTION: + result = ((StartDocumentTextDetectionResult) amazonWebServiceResult).getJobId(); + break; + case EXPENSE_ANALYSIS: + result = ((StartExpenseAnalysisResult) amazonWebServiceResult).getJobId(); + break; + default: throw new UnsupportedOperationException("Unsupported textract type."); + } + return result; + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractType.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractType.java new file mode 100644 index 0000000000..e0f4462d34 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractType.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml.textract; + +import static java.util.stream.Collectors.collectingAndThen; +import static java.util.stream.Collectors.toSet; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Set; + +public enum TextractType { + DOCUMENT_ANALYSIS("Document Analysis"), + DOCUMENT_TEXT_DETECTION("Document Text Detection"), + EXPENSE_ANALYSIS("Expense Analysis"); + + public static final Set TEXTRACT_TYPES = Arrays.stream(TextractType.values()).map(TextractType::getType) + .collect(collectingAndThen(toSet(), Collections::unmodifiableSet)); + + public final String type; + + TextractType(String type) { + this.type = type; + } + + public String getType() { + return type; + } + + public static TextractType fromString(String value) { + return Arrays.stream(values()) + .filter(type -> type.getType().equalsIgnoreCase(value)) + .findAny() + .orElseThrow(() -> new UnsupportedOperationException("Unsupported textract type.")); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/GetAwsTranscribeJobStatus.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/GetAwsTranscribeJobStatus.java new file mode 100644 index 0000000000..8a21a9bb19 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/GetAwsTranscribeJobStatus.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml.transcribe; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.textract.model.ThrottlingException; +import com.amazonaws.services.transcribe.AmazonTranscribeClient; +import com.amazonaws.services.transcribe.model.GetTranscriptionJobRequest; +import com.amazonaws.services.transcribe.model.GetTranscriptionJobResult; +import com.amazonaws.services.transcribe.model.TranscriptionJobStatus; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Transcribe"}) +@CapabilityDescription("Retrieves the current status of an AWS Transcribe job.") +@SeeAlso({StartAwsTranscribeJob.class}) +@WritesAttributes({ + @WritesAttribute(attribute = "outputLocation", description = "S3 path-style output location of the result.") +}) +public class GetAwsTranscribeJobStatus extends AwsMachineLearningJobStatusProcessor { + @Override + protected AmazonTranscribeClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) { + return (AmazonTranscribeClient) AmazonTranscribeClient.builder() + .withRegion(context.getProperty(REGION).getValue()) + .withCredentials(credentialsProvider) + .build(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + try { + GetTranscriptionJobResult job = getJob(context, flowFile); + TranscriptionJobStatus jobStatus = TranscriptionJobStatus.fromValue(job.getTranscriptionJob().getTranscriptionJobStatus()); + + if (TranscriptionJobStatus.COMPLETED == jobStatus) { + writeToFlowFile(session, flowFile, job); + session.putAttribute(flowFile, AWS_TASK_OUTPUT_LOCATION, job.getTranscriptionJob().getTranscript().getTranscriptFileUri()); + session.transfer(flowFile, REL_SUCCESS); + } else if (TranscriptionJobStatus.IN_PROGRESS == jobStatus) { + session.transfer(flowFile, REL_RUNNING); + } else if (TranscriptionJobStatus.FAILED == jobStatus) { + final String failureReason = job.getTranscriptionJob().getFailureReason(); + session.putAttribute(flowFile, FAILURE_REASON_ATTRIBUTE, failureReason); + session.transfer(flowFile, REL_FAILURE); + getLogger().error("Transcribe Task Failed {} Reason [{}]", flowFile, failureReason); + } + } catch (ThrottlingException e) { + getLogger().info("Request Rate Limit exceeded", e); + session.transfer(flowFile, REL_THROTTLED); + return; + } catch (Exception e) { + getLogger().warn("Failed to get Transcribe Job status", e); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + + private GetTranscriptionJobResult getJob(ProcessContext context, FlowFile flowFile) { + String taskId = context.getProperty(TASK_ID).evaluateAttributeExpressions(flowFile).getValue(); + GetTranscriptionJobRequest request = new GetTranscriptionJobRequest().withTranscriptionJobName(taskId); + return getClient().getTranscriptionJob(request); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/StartAwsTranscribeJob.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/StartAwsTranscribeJob.java new file mode 100644 index 0000000000..f34a8b5490 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/StartAwsTranscribeJob.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml.transcribe; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.transcribe.AmazonTranscribeClient; +import com.amazonaws.services.transcribe.model.StartTranscriptionJobRequest; +import com.amazonaws.services.transcribe.model.StartTranscriptionJobResult; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStarter; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Transcribe"}) +@CapabilityDescription("Trigger a AWS Transcribe job. It should be followed by GetAwsTranscribeStatus processor in order to monitor job status.") +@SeeAlso({GetAwsTranscribeJobStatus.class}) +public class StartAwsTranscribeJob extends AwsMachineLearningJobStarter { + @Override + protected AmazonTranscribeClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) { + return (AmazonTranscribeClient) AmazonTranscribeClient.builder() + .withRegion(context.getProperty(REGION).getValue()) + .withCredentials(credentialsProvider) + .build(); + } + + @Override + protected AmazonTranscribeClient createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) { + return (AmazonTranscribeClient) AmazonTranscribeClient.builder().build(); + } + + @Override + protected StartTranscriptionJobResult sendRequest(StartTranscriptionJobRequest request, ProcessContext context, FlowFile flowFile) { + return getClient().startTranscriptionJob(request); + } + + @Override + protected Class getAwsRequestClass(ProcessContext context, FlowFile flowFile) { + return StartTranscriptionJobRequest.class; + } + + @Override + protected String getAwsTaskId(ProcessContext context, StartTranscriptionJobResult startTranscriptionJobResult, FlowFile flowFile) { + return startTranscriptionJobResult.getTranscriptionJob().getTranscriptionJobName(); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/GetAwsTranslateJobStatus.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/GetAwsTranslateJobStatus.java new file mode 100644 index 0000000000..2471b15068 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/GetAwsTranslateJobStatus.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml.translate; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.textract.model.ThrottlingException; +import com.amazonaws.services.translate.AmazonTranslateClient; +import com.amazonaws.services.translate.model.DescribeTextTranslationJobRequest; +import com.amazonaws.services.translate.model.DescribeTextTranslationJobResult; +import com.amazonaws.services.translate.model.JobStatus; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Translate"}) +@CapabilityDescription("Retrieves the current status of an AWS Translate job.") +@SeeAlso({StartAwsTranslateJob.class}) +@WritesAttributes({ + @WritesAttribute(attribute = "outputLocation", description = "S3 path-style output location of the result.") +}) +public class GetAwsTranslateJobStatus extends AwsMachineLearningJobStatusProcessor { + @Override + protected AmazonTranslateClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) { + return (AmazonTranslateClient) AmazonTranslateClient.builder() + .withRegion(context.getProperty(REGION).getValue()) + .withCredentials(credentialsProvider) + .build(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + String awsTaskId = context.getProperty(TASK_ID).evaluateAttributeExpressions(flowFile).getValue(); + try { + DescribeTextTranslationJobResult describeTextTranslationJobResult = getStatusString(awsTaskId); + JobStatus status = JobStatus.fromValue(describeTextTranslationJobResult.getTextTranslationJobProperties().getJobStatus()); + + if (status == JobStatus.IN_PROGRESS || status == JobStatus.SUBMITTED) { + writeToFlowFile(session, flowFile, describeTextTranslationJobResult); + session.penalize(flowFile); + session.transfer(flowFile, REL_RUNNING); + } else if (status == JobStatus.COMPLETED) { + session.putAttribute(flowFile, AWS_TASK_OUTPUT_LOCATION, describeTextTranslationJobResult.getTextTranslationJobProperties().getOutputDataConfig().getS3Uri()); + writeToFlowFile(session, flowFile, describeTextTranslationJobResult); + session.transfer(flowFile, REL_SUCCESS); + } else if (status == JobStatus.FAILED || status == JobStatus.COMPLETED_WITH_ERROR) { + writeToFlowFile(session, flowFile, describeTextTranslationJobResult); + session.transfer(flowFile, REL_FAILURE); + } + } catch (ThrottlingException e) { + getLogger().info("Request Rate Limit exceeded", e); + session.transfer(flowFile, REL_THROTTLED); + return; + } catch (Exception e) { + getLogger().warn("Failed to get Polly Job status", e); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + + private DescribeTextTranslationJobResult getStatusString(String awsTaskId) { + DescribeTextTranslationJobRequest request = new DescribeTextTranslationJobRequest().withJobId(awsTaskId); + DescribeTextTranslationJobResult translationJobsResult = getClient().describeTextTranslationJob(request); + return translationJobsResult; + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/StartAwsTranslateJob.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/StartAwsTranslateJob.java new file mode 100644 index 0000000000..2ae8480089 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/translate/StartAwsTranslateJob.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml.translate; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.translate.AmazonTranslateClient; +import com.amazonaws.services.translate.model.StartTextTranslationJobRequest; +import com.amazonaws.services.translate.model.StartTextTranslationJobResult; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStarter; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Translate"}) +@CapabilityDescription("Trigger a AWS Translate job. It should be followed by GetAwsTranslateJobStatus processor in order to monitor job status.") +@SeeAlso({GetAwsTranslateJobStatus.class}) +public class StartAwsTranslateJob extends AwsMachineLearningJobStarter { + @Override + protected AmazonTranslateClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) { + return (AmazonTranslateClient) AmazonTranslateClient.builder() + .withRegion(context.getProperty(REGION).getValue()) + .withCredentials(credentialsProvider) + .build(); + } + + @Override + protected StartTextTranslationJobResult sendRequest(StartTextTranslationJobRequest request, ProcessContext context, FlowFile flowFile) { + return getClient().startTextTranslationJob(request); + } + + @Override + protected Class getAwsRequestClass(ProcessContext context, FlowFile flowFile) { + return StartTextTranslationJobRequest.class; + } + + protected String getAwsTaskId(ProcessContext context, StartTextTranslationJobResult startTextTranslationJobResult, FlowFile flowFile) { + return startTextTranslationJobResult.getJobId(); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index a02ced4908..2682d4ea5b 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -31,4 +31,12 @@ org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream org.apache.nifi.processors.aws.cloudwatch.PutCloudWatchMetric org.apache.nifi.processors.aws.wag.InvokeAWSGatewayApi +org.apache.nifi.processors.aws.ml.translate.StartAwsTranslateJob +org.apache.nifi.processors.aws.ml.translate.GetAwsTranslateJobStatus +org.apache.nifi.processors.aws.ml.polly.StartAwsPollyJob +org.apache.nifi.processors.aws.ml.polly.GetAwsPollyJobStatus +org.apache.nifi.processors.aws.ml.textract.StartAwsTextractJob +org.apache.nifi.processors.aws.ml.textract.GetAwsTextractJobStatus +org.apache.nifi.processors.aws.ml.transcribe.StartAwsTranscribeJob +org.apache.nifi.processors.aws.ml.transcribe.GetAwsTranscribeJobStatus diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.GetAwsPollyJobStatus/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.GetAwsPollyJobStatus/additionalDetails.html new file mode 100644 index 0000000000..5e842f0006 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.GetAwsPollyJobStatus/additionalDetails.html @@ -0,0 +1,39 @@ + + + + + + + Amazon Polly + + + + +

GetAwsPollyJobStatus

+

+ Amazon Polly is a service that turns text into lifelike speech, allowing you to create applications that talk, and build entirely new categories of speech-enabled products. + Polly's Text-to-Speech (TTS) service uses advanced deep learning technologies to synthesize natural sounding human speech. + With dozens of lifelike voices across a broad set of languages, you can build speech-enabled applications that work in many different countries. +

+ +

Usage

+

+ GetAwsPollyJobStatus Processor is designed to periodically check polly job status. This processor should be used in pair with StartAwsPollyJob Processor. + If the job successfully finished it will populate outputLocation attribute of the flow file where you can find the output of the Polly job. + In case of an error failure.reason attribute will be populated with the details. +

+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.StartAwsPollyJob/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.StartAwsPollyJob/additionalDetails.html new file mode 100644 index 0000000000..a3ed042035 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.polly.StartAwsPollyJob/additionalDetails.html @@ -0,0 +1,68 @@ + + + + + + + Amazon Polly + + + + +

StartAwsPollyJob

+

+ Amazon Polly is a service that turns text into lifelike speech, allowing you to create applications that talk, and build entirely new categories of speech-enabled products. + Polly's Text-to-Speech (TTS) service uses advanced deep learning technologies to synthesize natural sounding human speech. + With dozens of lifelike voices across a broad set of languages, you can build speech-enabled applications that work in many different countries. +

+ +

Usage

+

+ Amazon ML Processors are implemented to utilize ML services based on the official AWS API Reference. + You can find example json payload in the documentation at the Request Syntax sections. + For more details please check the official Polly API reference + With this processor you will trigger a startSpeechSynthesisTask async call to Polly Service. + + You can define json payload as property or provide as a flow file content. Property has higher precedence. + + After the job is triggered the serialized json response will be written to the output flow file. + The awsTaskId attribute will be populated, so it makes it easier to query job status by the corresponding get job status processor. +

+ +

+ JSON payload template - note that it can be simplified with the optional fields, check AWS documentation for more details - example: +

+ + +
+{
+   "Engine": "string",
+   "LanguageCode": "string",
+   "LexiconNames": [ "string" ],
+   "OutputFormat": "string",
+   "OutputS3BucketName": "string",
+   "OutputS3KeyPrefix": "string",
+   "SampleRate": "string",
+   "SnsTopicArn": "string",
+   "SpeechMarkTypes": [ "string" ],
+   "Text": "string",
+   "TextType": "string",
+   "VoiceId": "string"
+}
+    
+
+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.GetAwsTextractJobStatus/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.GetAwsTextractJobStatus/additionalDetails.html new file mode 100644 index 0000000000..9ac7d31f09 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.GetAwsTextractJobStatus/additionalDetails.html @@ -0,0 +1,37 @@ + + + + + + + Amazon Textract + + + + +

GetAwsTextractJobStatus

+

+ Amazon Textract is a machine learning (ML) service that automatically extracts text, handwriting, and data from scanned documents. + It goes beyond simple optical character recognition (OCR) to identify, understand, and extract data from forms and tables.

+ +

Usage

+

+ GetAwsTextractJobStatus Processor is designed to periodically check textract job status. This processor should be used in pair with StartAwsTextractJob Processor. + FlowFile will contain the serialized Tetract response that contains the result and additional metadata as it is documented in AWS Textract Reference. +

+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.StartAwsTextractJob/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.StartAwsTextractJob/additionalDetails.html new file mode 100644 index 0000000000..fe334e55fd --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.textract.StartAwsTextractJob/additionalDetails.html @@ -0,0 +1,144 @@ + + + + + + + Amazon Textract + + + + +

StartAwsTextractJob

+

+ Amazon Textract is a machine learning (ML) service that automatically extracts text, handwriting, and data from scanned documents. + It goes beyond simple optical character recognition (OCR) to identify, understand, and extract data from forms and tables. +

+ +

Usage

+

+ Amazon ML Processors are implemented to utilize ML services based on the official AWS API Reference. + You can find example json payload in the documentation at the Request Syntax sections. + For more details please check the official Textract API reference + With this processor you will trigger a startDocumentAnalysis, startDocumentTextDetection or startExpenseAnalysis async call according to your type of textract settings. + + You can define json payload as property or provide as a flow file content. Property has higher precedence. + + After the job is triggered the serialized json response will be written to the output flow file. + The awsTaskId attribute will be populated, so it makes it easier to query job status by the corresponding get job status processor. +

+

+ Three different type of textract task are supported: Documnet Analysis, Text Detection, Expense Analysis. +

+

DocumentAnalysis

+

Starts the asynchronous analysis of an input document for relationships between detected items such as key-value pairs, tables, and selection elements. + API Reference +

+Example payload: + +
+{
+   "ClientRequestToken": "string",
+   "DocumentLocation": {
+      "S3Object": {
+         "Bucket": "string",
+         "Name": "string",
+         "Version": "string"
+      }
+   },
+   "FeatureTypes": [ "string" ],
+   "JobTag": "string",
+   "KMSKeyId": "string",
+   "NotificationChannel": {
+      "RoleArn": "string",
+      "SNSTopicArn": "string"
+   },
+   "OutputConfig": {
+      "S3Bucket": "string",
+      "S3Prefix": "string"
+   },
+   "QueriesConfig": {
+      "Queries": [
+         {
+            "Alias": "string",
+            "Pages": [ "string" ],
+            "Text": "string"
+         }
+      ]
+   }
+}
+    
+
+

ExpenseAnalysis

+

Starts the asynchronous analysis of invoices or receipts for data like contact information, items purchased, and vendor names. + API Reference +

+Example payload: + +
+{
+   "ClientRequestToken": "string",
+   "DocumentLocation": {
+      "S3Object": {
+         "Bucket": "string",
+         "Name": "string",
+         "Version": "string"
+      }
+   },
+   "JobTag": "string",
+   "KMSKeyId": "string",
+   "NotificationChannel": {
+      "RoleArn": "string",
+      "SNSTopicArn": "string"
+   },
+   "OutputConfig": {
+      "S3Bucket": "string",
+      "S3Prefix": "string"
+   }
+}
+    
+
+

StartDocumentTextDetection

+

Starts the asynchronous detection of text in a document. Amazon Textract can detect lines of text and the words that make up a line of text. + API Reference +

+Example payload: + +
+{
+   "ClientRequestToken": "string",
+   "DocumentLocation": {
+      "S3Object": {
+         "Bucket": "string",
+         "Name": "string",
+         "Version": "string"
+      }
+   },
+   "JobTag": "string",
+   "KMSKeyId": "string",
+   "NotificationChannel": {
+      "RoleArn": "string",
+      "SNSTopicArn": "string"
+   },
+   "OutputConfig": {
+      "S3Bucket": "string",
+      "S3Prefix": "string"
+   }
+}
+    
+
+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.GetAwsTranscribeJobStatus/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.GetAwsTranscribeJobStatus/additionalDetails.html new file mode 100644 index 0000000000..25e4f3513d --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.GetAwsTranscribeJobStatus/additionalDetails.html @@ -0,0 +1,42 @@ + + + + + + + Amazon Transcribe + + + + +

Amazon Transcribe

+

+ Automatically convert speech to text +

    +
  • Extract key business insights from customer calls, video files, clinical conversations, and more.
  • +
  • Improve business outcomes with state of the art speech recognition models that are fully managed and continuously trained.
  • +
  • Ensure customer privacy and safety by masking sensitive information.
  • +
+

+ +

Usage

+

+ GetAwsTranscribeJobStatus Processor is designed to periodically check Transcribe job status. This processor should be used in pair with Transcribe Processor. + FlowFile will contain the serialized Transcribe response and it will populate the path of the output in the outputLocation attribute. +

+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.StartAwsTranscribeJob/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.StartAwsTranscribeJob/additionalDetails.html new file mode 100644 index 0000000000..8068383329 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.transcribe.StartAwsTranscribeJob/additionalDetails.html @@ -0,0 +1,113 @@ + + + + + + + Amazon Transcribe + + + + +

Amazon Transcribe

+

+ Automatically convert speech to text +

    +
  • Extract key business insights from customer calls, video files, clinical conversations, and more.
  • +
  • Improve business outcomes with state of the art speech recognition models that are fully managed and continuously trained.
  • +
  • Ensure customer privacy and safety by masking sensitive information.
  • +
+

+ +

Usage

+

+ Amazon ML Processors are implemented to utilize ML services based on the official AWS API Reference. + You can find example json payload in the documentation at the Request Syntax sections. + For more details please check the official Transcribe API reference + With this processor you will trigger a startTranscriptionJob async call to AWS Transcribe Service. + + You can define json payload as property or provide as a flow file content. Property has higher precedence. + + After the job is triggered the serialized json response will be written to the output flow file. + The awsTaskId attribute will be populated, so it makes it easier to query job status by the corresponding get job status processor. +

+

+ JSON payload template - note that these can be simplified with the optional fields, check AWS documentation for more details - examples: +

+ + +
+{
+   "ContentRedaction": {
+      "PiiEntityTypes": [ "string" ],
+      "RedactionOutput": "string",
+      "RedactionType": "string"
+   },
+   "IdentifyLanguage": boolean,
+   "IdentifyMultipleLanguages": boolean,
+   "JobExecutionSettings": {
+      "AllowDeferredExecution": boolean,
+      "DataAccessRoleArn": "string"
+   },
+   "KMSEncryptionContext": {
+      "string" : "string"
+   },
+   "LanguageCode": "string",
+   "LanguageIdSettings": {
+      "string" : {
+         "LanguageModelName": "string",
+         "VocabularyFilterName": "string",
+         "VocabularyName": "string"
+      }
+   },
+   "LanguageOptions": [ "string" ],
+   "Media": {
+      "MediaFileUri": "string",
+      "RedactedMediaFileUri": "string"
+   },
+   "MediaFormat": "string",
+   "MediaSampleRateHertz": number,
+   "ModelSettings": {
+      "LanguageModelName": "string"
+   },
+   "OutputBucketName": "string",
+   "OutputEncryptionKMSKeyId": "string",
+   "OutputKey": "string",
+   "Settings": {
+      "ChannelIdentification": boolean,
+      "MaxAlternatives": number,
+      "MaxSpeakerLabels": number,
+      "ShowAlternatives": boolean,
+      "ShowSpeakerLabels": boolean,
+      "VocabularyFilterMethod": "string",
+      "VocabularyFilterName": "string",
+      "VocabularyName": "string"
+   },
+   "Subtitles": {
+      "Formats": [ "string" ],
+      "OutputStartIndex": number
+   },
+   "Tags": [
+      {
+         "Key": "string",
+         "Value": "string"
+      }
+   ],
+   "TranscriptionJobName": "string"
+}
+    
+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.GetAwsTranslateJobStatus/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.GetAwsTranslateJobStatus/additionalDetails.html new file mode 100644 index 0000000000..d12d376283 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.GetAwsTranslateJobStatus/additionalDetails.html @@ -0,0 +1,40 @@ + + + + + + + Amazon Translate + + + + +

GetAwsTranslateJobStatus

+

+ Amazon Translate is a neural machine translation service for translating text to and from English across a breadth of supported languages. + Powered by deep-learning technologies, Amazon Translate delivers fast, high-quality, and affordable language translation. + It provides a managed, continually trained solution so you can easily translate company and user-authored content or build applications that require support across multiple languages. + The machine translation engine has been trained on a wide variety of content across different domains to produce quality translations that serve any industry need. +

+ +

Usage

+

+ GetAwsTranslateJobStatus Processor is designed to periodically check translate job status. This processor should be used in pair with Translate Processor. + If the job successfully finished it will populate outputLocation attribute of the flow file where you can find the output of the Translation job. +

+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.StartAwsTranslateJob/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.StartAwsTranslateJob/additionalDetails.html new file mode 100644 index 0000000000..ae02e9a397 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.ml.translate.StartAwsTranslateJob/additionalDetails.html @@ -0,0 +1,75 @@ + + + + + + + Amazon Translate + + + + +

StartAwsTranslateJob

+

+ Amazon Translate is a neural machine translation service for translating text to and from English across a breadth of supported languages. + Powered by deep-learning technologies, Amazon Translate delivers fast, high-quality, and affordable language translation. + It provides a managed, continually trained solution so you can easily translate company and user-authored content or build applications that require support across multiple languages. + The machine translation engine has been trained on a wide variety of content across different domains to produce quality translations that serve any industry need. +

+ +

Usage

+

+ Amazon ML Processors are implemented to utilize ML services based on the official AWS API Reference. + You can find example json payload in the documentation at the Request Syntax sections. + For more details please check the official Translate API reference + With this processor you will trigger a startTextTranslationJob async call to Translate Service + + You can define json payload as property or provide as a flow file content. Property has higher precedence. +

+

+ JSON payload template - note that it can be simplified with the optional fields, check AWS documentation for more details - example: +

+ + +
+{
+   "ClientToken": "string",
+   "DataAccessRoleArn": "string",
+   "InputDataConfig": {
+      "ContentType": "string",
+      "S3Uri": "string"
+   },
+   "JobName": "string",
+   "OutputDataConfig": {
+      "EncryptionKey": {
+         "Id": "string",
+         "Type": "string"
+      },
+      "S3Uri": "string"
+   },
+   "ParallelDataNames": [ "string" ],
+   "Settings": {
+      "Formality": "string",
+      "Profanity": "string"
+   },
+   "SourceLanguageCode": "string",
+   "TargetLanguageCodes": [ "string" ],
+   "TerminologyNames": [ "string" ]
+}
+    
+
+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyStatusTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyStatusTest.java new file mode 100644 index 0000000000..76f3940c0f --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyStatusTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml.polly; + +import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE; +import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.AWS_TASK_OUTPUT_LOCATION; +import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_FAILURE; +import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_RUNNING; +import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_ORIGINAL; +import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_SUCCESS; +import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.TASK_ID; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.services.polly.AmazonPollyClient; +import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskRequest; +import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskResult; +import com.amazonaws.services.polly.model.SynthesisTask; +import com.amazonaws.services.polly.model.TaskStatus; +import java.util.Collections; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class GetAwsPollyStatusTest { + private static final String TEST_TASK_ID = "testTaskId"; + private static final String PLACEHOLDER_CONTENT = "content"; + private TestRunner runner; + @Mock + private AmazonPollyClient mockPollyClient; + @Mock + private AWSCredentialsProviderService mockAwsCredentialsProvider; + @Captor + private ArgumentCaptor requestCaptor; + + @BeforeEach + public void setUp() throws InitializationException { + when(mockAwsCredentialsProvider.getIdentifier()).thenReturn("awsCredetialProvider"); + final GetAwsPollyJobStatus mockGetAwsPollyStatus = new GetAwsPollyJobStatus() { + protected AmazonPollyClient getClient() { + return mockPollyClient; + } + + @Override + protected AmazonPollyClient createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) { + return mockPollyClient; + } + }; + runner = TestRunners.newTestRunner(mockGetAwsPollyStatus); + runner.addControllerService("awsCredetialProvider", mockAwsCredentialsProvider); + runner.enableControllerService(mockAwsCredentialsProvider); + runner.setProperty(AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredetialProvider"); + } + + @Test + public void testPollyTaskInProgress() { + GetSpeechSynthesisTaskResult taskResult = new GetSpeechSynthesisTaskResult(); + SynthesisTask task = new SynthesisTask().withTaskId(TEST_TASK_ID) + .withTaskStatus(TaskStatus.InProgress); + taskResult.setSynthesisTask(task); + when(mockPollyClient.getSpeechSynthesisTask(requestCaptor.capture())).thenReturn(taskResult); + runner.enqueue(PLACEHOLDER_CONTENT, Collections.singletonMap(TASK_ID.getName(), TEST_TASK_ID)); + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_RUNNING); + assertEquals(requestCaptor.getValue().getTaskId(), TEST_TASK_ID); + } + + @Test + public void testPollyTaskCompleted() { + GetSpeechSynthesisTaskResult taskResult = new GetSpeechSynthesisTaskResult(); + SynthesisTask task = new SynthesisTask().withTaskId(TEST_TASK_ID) + .withTaskStatus(TaskStatus.Completed) + .withOutputUri("outputLocationPath"); + taskResult.setSynthesisTask(task); + when(mockPollyClient.getSpeechSynthesisTask(requestCaptor.capture())).thenReturn(taskResult); + runner.enqueue(PLACEHOLDER_CONTENT, Collections.singletonMap(TASK_ID.getName(), TEST_TASK_ID)); + runner.run(); + + runner.assertTransferCount(REL_SUCCESS, 1); + runner.assertTransferCount(REL_ORIGINAL, 1); + runner.assertAllFlowFilesContainAttribute(REL_SUCCESS, AWS_TASK_OUTPUT_LOCATION); + assertEquals(requestCaptor.getValue().getTaskId(), TEST_TASK_ID); + } + + + @Test + public void testPollyTaskFailed() { + GetSpeechSynthesisTaskResult taskResult = new GetSpeechSynthesisTaskResult(); + SynthesisTask task = new SynthesisTask().withTaskId(TEST_TASK_ID) + .withTaskStatus(TaskStatus.Failed) + .withTaskStatusReason("reasonOfFailure"); + taskResult.setSynthesisTask(task); + when(mockPollyClient.getSpeechSynthesisTask(requestCaptor.capture())).thenReturn(taskResult); + runner.enqueue(PLACEHOLDER_CONTENT, Collections.singletonMap(TASK_ID.getName(), TEST_TASK_ID)); + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_FAILURE); + assertEquals(requestCaptor.getValue().getTaskId(), TEST_TASK_ID); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTranslateJobStatusTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTranslateJobStatusTest.java new file mode 100644 index 0000000000..96cf149493 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTranslateJobStatusTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml.textract; + +import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE; +import static org.apache.nifi.processors.aws.AbstractAWSProcessor.REL_FAILURE; +import static org.apache.nifi.processors.aws.AbstractAWSProcessor.REL_SUCCESS; +import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_RUNNING; +import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.TASK_ID; +import static org.apache.nifi.processors.aws.ml.textract.StartAwsTextractJob.TEXTRACT_TYPE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.services.textract.AmazonTextractClient; +import com.amazonaws.services.textract.model.GetDocumentAnalysisRequest; +import com.amazonaws.services.textract.model.GetDocumentAnalysisResult; +import com.amazonaws.services.textract.model.JobStatus; +import com.google.common.collect.ImmutableMap; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class GetAwsTranslateJobStatusTest { + private static final String TEST_TASK_ID = "testTaskId"; + private TestRunner runner; + @Mock + private AmazonTextractClient mockTextractClient; + @Mock + private AWSCredentialsProviderService mockAwsCredentialsProvider; + @Captor + private ArgumentCaptor requestCaptor; + + @BeforeEach + public void setUp() throws InitializationException { + when(mockAwsCredentialsProvider.getIdentifier()).thenReturn("awsCredetialProvider"); + final GetAwsTextractJobStatus awsTextractJobStatusGetter = new GetAwsTextractJobStatus() { + protected AmazonTextractClient getClient() { + return mockTextractClient; + } + + @Override + protected AmazonTextractClient createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) { + return mockTextractClient; + } + }; + runner = TestRunners.newTestRunner(awsTextractJobStatusGetter); + runner.addControllerService("awsCredetialProvider", mockAwsCredentialsProvider); + runner.enableControllerService(mockAwsCredentialsProvider); + runner.setProperty(AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredetialProvider"); + } + + @Test + public void testTextractDocAnalysisTaskInProgress() { + GetDocumentAnalysisResult taskResult = new GetDocumentAnalysisResult() + .withJobStatus(JobStatus.IN_PROGRESS); + when(mockTextractClient.getDocumentAnalysis(requestCaptor.capture())).thenReturn(taskResult); + runner.enqueue("content", ImmutableMap.of(TASK_ID.getName(), TEST_TASK_ID, + TEXTRACT_TYPE.getName(), TextractType.DOCUMENT_ANALYSIS.name())); + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_RUNNING); + assertEquals(TEST_TASK_ID, requestCaptor.getValue().getJobId()); + } + + @Test + public void testTextractDocAnalysisTaskComplete() { + GetDocumentAnalysisResult taskResult = new GetDocumentAnalysisResult() + .withJobStatus(JobStatus.SUCCEEDED); + when(mockTextractClient.getDocumentAnalysis(requestCaptor.capture())).thenReturn(taskResult); + runner.enqueue("content", ImmutableMap.of(TASK_ID.getName(), TEST_TASK_ID, + TEXTRACT_TYPE.getName(), TextractType.DOCUMENT_ANALYSIS.name())); + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_SUCCESS); + assertEquals(requestCaptor.getValue().getJobId(), TEST_TASK_ID); + } + + @Test + public void testTextractDocAnalysisTaskFailed() { + GetDocumentAnalysisResult taskResult = new GetDocumentAnalysisResult() + .withJobStatus(JobStatus.FAILED); + when(mockTextractClient.getDocumentAnalysis(requestCaptor.capture())).thenReturn(taskResult); + runner.enqueue("content", ImmutableMap.of(TASK_ID.getName(), TEST_TASK_ID, + TEXTRACT_TYPE.getName(), TextractType.DOCUMENT_ANALYSIS.type)); + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_FAILURE); + assertEquals(requestCaptor.getValue().getJobId(), TEST_TASK_ID); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/transcribe/GetAwsTranscribeJobStatusTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/transcribe/GetAwsTranscribeJobStatusTest.java new file mode 100644 index 0000000000..a52e4b0951 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/transcribe/GetAwsTranscribeJobStatusTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml.transcribe; + +import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE; +import static org.apache.nifi.processors.aws.AbstractAWSProcessor.REL_FAILURE; +import static org.apache.nifi.processors.aws.AbstractAWSProcessor.REL_SUCCESS; +import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.FAILURE_REASON_ATTRIBUTE; +import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_RUNNING; +import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.TASK_ID; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.services.transcribe.AmazonTranscribeClient; +import com.amazonaws.services.transcribe.model.GetTranscriptionJobRequest; +import com.amazonaws.services.transcribe.model.GetTranscriptionJobResult; +import com.amazonaws.services.transcribe.model.Transcript; +import com.amazonaws.services.transcribe.model.TranscriptionJob; +import com.amazonaws.services.transcribe.model.TranscriptionJobStatus; +import java.util.Collections; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class GetAwsTranscribeJobStatusTest { + private static final String TEST_TASK_ID = "testTaskId"; + private static final String AWS_CREDENTIAL_PROVIDER_NAME = "awsCredetialProvider"; + private static final String OUTPUT_LOCATION_PATH = "outputLocationPath"; + private static final String REASON_OF_FAILURE = "reasonOfFailure"; + private static final String CONTENT_STRING = "content"; + private TestRunner runner; + @Mock + private AmazonTranscribeClient mockTranscribeClient; + @Mock + private AWSCredentialsProviderService mockAwsCredentialsProvider; + @Captor + private ArgumentCaptor requestCaptor; + + @BeforeEach + public void setUp() throws InitializationException { + when(mockAwsCredentialsProvider.getIdentifier()).thenReturn(AWS_CREDENTIAL_PROVIDER_NAME); + final GetAwsTranscribeJobStatus mockPollyFetcher = new GetAwsTranscribeJobStatus() { + protected AmazonTranscribeClient getClient() { + return mockTranscribeClient; + } + + @Override + protected AmazonTranscribeClient createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) { + return mockTranscribeClient; + } + }; + runner = TestRunners.newTestRunner(mockPollyFetcher); + runner.addControllerService(AWS_CREDENTIAL_PROVIDER_NAME, mockAwsCredentialsProvider); + runner.enableControllerService(mockAwsCredentialsProvider); + runner.setProperty(AWS_CREDENTIALS_PROVIDER_SERVICE, AWS_CREDENTIAL_PROVIDER_NAME); + } + + @Test + public void testTranscribeTaskInProgress() { + TranscriptionJob task = new TranscriptionJob() + .withTranscriptionJobName(TEST_TASK_ID) + .withTranscriptionJobStatus(TranscriptionJobStatus.IN_PROGRESS); + GetTranscriptionJobResult taskResult = new GetTranscriptionJobResult().withTranscriptionJob(task); + when(mockTranscribeClient.getTranscriptionJob(requestCaptor.capture())).thenReturn(taskResult); + runner.enqueue(CONTENT_STRING, Collections.singletonMap(TASK_ID.getName(), TEST_TASK_ID)); + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_RUNNING); + assertEquals(requestCaptor.getValue().getTranscriptionJobName(), TEST_TASK_ID); + } + + @Test + public void testTranscribeTaskCompleted() { + TranscriptionJob task = new TranscriptionJob() + .withTranscriptionJobName(TEST_TASK_ID) + .withTranscript(new Transcript().withTranscriptFileUri(OUTPUT_LOCATION_PATH)) + .withTranscriptionJobStatus(TranscriptionJobStatus.COMPLETED); + GetTranscriptionJobResult taskResult = new GetTranscriptionJobResult().withTranscriptionJob(task); + when(mockTranscribeClient.getTranscriptionJob(requestCaptor.capture())).thenReturn(taskResult); + runner.enqueue(CONTENT_STRING, Collections.singletonMap(TASK_ID.getName(), TEST_TASK_ID)); + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_SUCCESS); + assertEquals(requestCaptor.getValue().getTranscriptionJobName(), TEST_TASK_ID); + } + + + @Test + public void testPollyTaskFailed() { + TranscriptionJob task = new TranscriptionJob() + .withTranscriptionJobName(TEST_TASK_ID) + .withFailureReason(REASON_OF_FAILURE) + .withTranscriptionJobStatus(TranscriptionJobStatus.FAILED); + GetTranscriptionJobResult taskResult = new GetTranscriptionJobResult().withTranscriptionJob(task); + when(mockTranscribeClient.getTranscriptionJob(requestCaptor.capture())).thenReturn(taskResult); + runner.enqueue(CONTENT_STRING, Collections.singletonMap(TASK_ID.getName(), TEST_TASK_ID)); + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_FAILURE); + runner.assertAllFlowFilesContainAttribute(FAILURE_REASON_ATTRIBUTE); + assertEquals(requestCaptor.getValue().getTranscriptionJobName(), TEST_TASK_ID); + + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/translate/GetAwsTranslateJobStatusTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/translate/GetAwsTranslateJobStatusTest.java new file mode 100644 index 0000000000..6fafcf4b10 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ml/translate/GetAwsTranslateJobStatusTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml.translate; + +import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE; +import static org.apache.nifi.processors.aws.AbstractAWSProcessor.REL_FAILURE; +import static org.apache.nifi.processors.aws.AbstractAWSProcessor.REL_SUCCESS; +import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.AWS_TASK_OUTPUT_LOCATION; +import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.REL_RUNNING; +import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.TASK_ID; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.services.translate.AmazonTranslateClient; +import com.amazonaws.services.translate.model.DescribeTextTranslationJobRequest; +import com.amazonaws.services.translate.model.DescribeTextTranslationJobResult; +import com.amazonaws.services.translate.model.JobStatus; +import com.amazonaws.services.translate.model.OutputDataConfig; +import com.amazonaws.services.translate.model.TextTranslationJobProperties; +import java.util.Collections; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class GetAwsTranslateJobStatusTest { + private static final String TEST_TASK_ID = "testTaskId"; + private static final String CONTENT_STRING = "content"; + private static final String AWS_CREDENTIALS_PROVIDER_NAME = "awsCredetialProvider"; + private static final String OUTPUT_LOCATION_PATH = "outputLocation"; + private TestRunner runner; + @Mock + private AmazonTranslateClient mockTranslateClient; + @Mock + private AWSCredentialsProviderService mockAwsCredentialsProvider; + @Captor + private ArgumentCaptor requestCaptor; + + + @BeforeEach + public void setUp() throws InitializationException { + when(mockAwsCredentialsProvider.getIdentifier()).thenReturn(AWS_CREDENTIALS_PROVIDER_NAME); + final GetAwsTranslateJobStatus mockPollyFetcher = new GetAwsTranslateJobStatus() { + protected AmazonTranslateClient getClient() { + return mockTranslateClient; + } + + @Override + protected AmazonTranslateClient createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) { + return mockTranslateClient; + } + }; + runner = TestRunners.newTestRunner(mockPollyFetcher); + runner.addControllerService(AWS_CREDENTIALS_PROVIDER_NAME, mockAwsCredentialsProvider); + runner.enableControllerService(mockAwsCredentialsProvider); + runner.setProperty(AWS_CREDENTIALS_PROVIDER_SERVICE, AWS_CREDENTIALS_PROVIDER_NAME); + } + + @Test + public void testTranscribeTaskInProgress() { + TextTranslationJobProperties task = new TextTranslationJobProperties() + .withJobId(TEST_TASK_ID) + .withJobStatus(JobStatus.IN_PROGRESS); + DescribeTextTranslationJobResult taskResult = new DescribeTextTranslationJobResult().withTextTranslationJobProperties(task); + when(mockTranslateClient.describeTextTranslationJob(requestCaptor.capture())).thenReturn(taskResult); + runner.enqueue(CONTENT_STRING, Collections.singletonMap(TASK_ID.getName(), TEST_TASK_ID)); + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_RUNNING); + assertEquals(requestCaptor.getValue().getJobId(), TEST_TASK_ID); + } + + @Test + public void testTranscribeTaskCompleted() { + TextTranslationJobProperties task = new TextTranslationJobProperties() + .withJobId(TEST_TASK_ID) + .withOutputDataConfig(new OutputDataConfig().withS3Uri(OUTPUT_LOCATION_PATH)) + .withJobStatus(JobStatus.COMPLETED); + DescribeTextTranslationJobResult taskResult = new DescribeTextTranslationJobResult().withTextTranslationJobProperties(task); + when(mockTranslateClient.describeTextTranslationJob(requestCaptor.capture())).thenReturn(taskResult); + runner.enqueue(CONTENT_STRING, Collections.singletonMap(TASK_ID.getName(), TEST_TASK_ID)); + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_SUCCESS); + runner.assertAllFlowFilesContainAttribute(AWS_TASK_OUTPUT_LOCATION); + assertEquals(requestCaptor.getValue().getJobId(), TEST_TASK_ID); + } + + @Test + public void testTranscribeTaskFailed() { + TextTranslationJobProperties task = new TextTranslationJobProperties() + .withJobId(TEST_TASK_ID) + .withJobStatus(JobStatus.FAILED); + DescribeTextTranslationJobResult taskResult = new DescribeTextTranslationJobResult().withTextTranslationJobProperties(task); + when(mockTranslateClient.describeTextTranslationJob(requestCaptor.capture())).thenReturn(taskResult); + runner.enqueue(CONTENT_STRING, Collections.singletonMap(TASK_ID.getName(), TEST_TASK_ID)); + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_FAILURE); + assertEquals(requestCaptor.getValue().getJobId(), TEST_TASK_ID); + } + +} \ No newline at end of file