diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java index dc78e02c4b..53567ff212 100644 --- a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java @@ -30,6 +30,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.annotation.configuration.DefaultSettings; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -40,6 +41,7 @@ import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -82,6 +84,7 @@ import java.util.stream.Collectors; " objects can be set in the 'Result Limit' property.") @DefaultSettings(yieldDuration = "10 sec") @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") +@WritesAttribute(attribute = "mime.type", description = "Sets the MIME type to application/json") public class GetHubSpot extends AbstractProcessor { static final PropertyDescriptor OBJECT_TYPE = new PropertyDescriptor.Builder() @@ -234,6 +237,7 @@ public class GetHubSpot extends AbstractProcessor { FlowFile flowFile = session.create(); flowFile = session.write(flowFile, parseHttpResponse(response, total, stateMap)); if (total.get() > 0) { + flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); session.transfer(flowFile, REL_SUCCESS); } else { getLogger().debug("Empty response when requested HubSpot endpoint: [{}]", endpoint); diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/java/org/apache/nifi/processors/hubspot/GetHubSpotTest.java b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/java/org/apache/nifi/processors/hubspot/GetHubSpotTest.java index d18b49d316..fd8ed18fed 100644 --- a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/java/org/apache/nifi/processors/hubspot/GetHubSpotTest.java +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/java/org/apache/nifi/processors/hubspot/GetHubSpotTest.java @@ -16,16 +16,34 @@ */ package org.apache.nifi.processors.hubspot; +import static org.apache.nifi.processors.hubspot.GetHubSpot.CURSOR_KEY; +import static org.apache.nifi.processors.hubspot.GetHubSpot.END_INCREMENTAL_KEY; +import static org.apache.nifi.processors.hubspot.GetHubSpot.START_INCREMENTAL_KEY; +import static org.apache.nifi.processors.hubspot.HubSpotObjectType.COMPANIES; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.IOException; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; import okhttp3.HttpUrl; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import org.apache.commons.io.IOUtils; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; @@ -37,24 +55,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.io.IOException; -import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -import static org.apache.nifi.processors.hubspot.GetHubSpot.CURSOR_KEY; -import static org.apache.nifi.processors.hubspot.GetHubSpot.END_INCREMENTAL_KEY; -import static org.apache.nifi.processors.hubspot.GetHubSpot.START_INCREMENTAL_KEY; -import static org.apache.nifi.processors.hubspot.HubSpotObjectType.COMPANIES; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - class GetHubSpotTest { private static final long TEST_EPOCH_TIME = 1662665787; @@ -70,7 +70,8 @@ class GetHubSpotTest { server.start(); baseUrl = server.url(BASE_URL); - final StandardWebClientServiceProvider standardWebClientServiceProvider = new StandardWebClientServiceProvider(); + final StandardWebClientServiceProvider standardWebClientServiceProvider = + new StandardWebClientServiceProvider(); final MockGetHubSpot mockGetHubSpot = new MockGetHubSpot(TEST_EPOCH_TIME); runner = TestRunners.newTestRunner(mockGetHubSpot); @@ -99,11 +100,14 @@ class GetHubSpotTest { runner.run(1); final List flowFiles = runner.getFlowFilesForRelationship(GetHubSpot.REL_SUCCESS); + final MockFlowFile flowFile = flowFiles.get(0); + final String expectedFlowFileContent = getResourceAsString("expected_flowfile_content.json"); final JsonNode expectedJsonNode = OBJECT_MAPPER.readTree(expectedFlowFileContent); - final JsonNode actualJsonNode = OBJECT_MAPPER.readTree(flowFiles.get(0).getContent()); + final JsonNode actualJsonNode = OBJECT_MAPPER.readTree(flowFile.getContent()); + flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); assertEquals(expectedJsonNode, actualJsonNode); } @@ -242,13 +246,15 @@ class GetHubSpotTest { } @Override - public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, + final String newValue) { } } private String getResourceAsString(final String resourceName) throws IOException { return IOUtils.toString( - Objects.requireNonNull(this.getClass().getClassLoader().getResourceAsStream(resourceName), resourceName), + Objects.requireNonNull(this.getClass().getClassLoader().getResourceAsStream(resourceName), + resourceName), StandardCharsets.UTF_8 ); } diff --git a/nifi-nar-bundles/nifi-shopify-bundle/nifi-shopify-processors/src/main/java/org/apache/nifi/processors/shopify/GetShopify.java b/nifi-nar-bundles/nifi-shopify-bundle/nifi-shopify-processors/src/main/java/org/apache/nifi/processors/shopify/GetShopify.java index 2c425ef3f3..9994ab999a 100644 --- a/nifi-nar-bundles/nifi-shopify-bundle/nifi-shopify-processors/src/main/java/org/apache/nifi/processors/shopify/GetShopify.java +++ b/nifi-nar-bundles/nifi-shopify-bundle/nifi-shopify-processors/src/main/java/org/apache/nifi/processors/shopify/GetShopify.java @@ -27,6 +27,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -36,6 +37,7 @@ import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -78,6 +80,7 @@ import java.util.stream.Collectors; " The list of the resources with the supported parameters can be found in the additional details.") @CapabilityDescription("Retrieves objects from a custom Shopify store. The processor yield time must be set to the account's rate limit accordingly.") @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") +@WritesAttribute(attribute = "mime.type", description = "Sets the MIME type to application/json") public class GetShopify extends AbstractProcessor { static final PropertyDescriptor STORE_DOMAIN = new PropertyDescriptor.Builder() @@ -322,6 +325,7 @@ public class GetShopify extends AbstractProcessor { updateState(session, updatedStateMap); } if (objectCountHolder.get() > 0) { + flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); session.transfer(flowFile, REL_SUCCESS); } else { getLogger().debug("Empty response when requested Shopify resource: [{}]", resourceName); diff --git a/nifi-nar-bundles/nifi-shopify-bundle/nifi-shopify-processors/src/test/java/org/apache/nifi/processors/shopify/GetShopifyIT.java b/nifi-nar-bundles/nifi-shopify-bundle/nifi-shopify-processors/src/test/java/org/apache/nifi/processors/shopify/GetShopifyIT.java index a964c58580..73ed2a916b 100644 --- a/nifi-nar-bundles/nifi-shopify-bundle/nifi-shopify-processors/src/test/java/org/apache/nifi/processors/shopify/GetShopifyIT.java +++ b/nifi-nar-bundles/nifi-shopify-bundle/nifi-shopify-processors/src/test/java/org/apache/nifi/processors/shopify/GetShopifyIT.java @@ -20,6 +20,7 @@ import okhttp3.HttpUrl; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import org.apache.commons.io.IOUtils; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processors.shopify.model.IncrementalLoadingParameter; import org.apache.nifi.processors.shopify.model.ResourceType; import org.apache.nifi.processors.shopify.rest.ShopifyRestService; @@ -71,7 +72,6 @@ class GetShopifyIT { @Test void testStateIsUpdatedIfIncrementalAndNotPaging() throws InitializationException, IOException { - final String lastExecutionTimeKey = "last_execution_time"; final MockResponse mockResponse = new MockResponse() .setResponseCode(200) @@ -97,6 +97,10 @@ class GetShopifyIT { runner.run(1); verify(customGetShopify).updateState(any(), any()); + + List flowFiles = runner.getFlowFilesForRelationship(GetShopify.REL_SUCCESS); + + flowFiles.get(0).assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); } @Test