NIFI-10389 Added mime.type to GetShopify and GetHubSpot

This closes #6682

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Lehel Boér 2022-11-18 15:38:47 +01:00 committed by exceptionfactory
parent c63a668ac5
commit 81b36e69ae
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
4 changed files with 41 additions and 23 deletions

View File

@ -30,6 +30,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.configuration.DefaultSettings; import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -40,6 +41,7 @@ import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
@ -82,6 +84,7 @@ import java.util.stream.Collectors;
" objects can be set in the 'Result Limit' property.") " objects can be set in the 'Result Limit' property.")
@DefaultSettings(yieldDuration = "10 sec") @DefaultSettings(yieldDuration = "10 sec")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
@WritesAttribute(attribute = "mime.type", description = "Sets the MIME type to application/json")
public class GetHubSpot extends AbstractProcessor { public class GetHubSpot extends AbstractProcessor {
static final PropertyDescriptor OBJECT_TYPE = new PropertyDescriptor.Builder() static final PropertyDescriptor OBJECT_TYPE = new PropertyDescriptor.Builder()
@ -234,6 +237,7 @@ public class GetHubSpot extends AbstractProcessor {
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
flowFile = session.write(flowFile, parseHttpResponse(response, total, stateMap)); flowFile = session.write(flowFile, parseHttpResponse(response, total, stateMap));
if (total.get() > 0) { if (total.get() > 0) {
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
} else { } else {
getLogger().debug("Empty response when requested HubSpot endpoint: [{}]", endpoint); getLogger().debug("Empty response when requested HubSpot endpoint: [{}]", endpoint);

View File

@ -16,16 +16,34 @@
*/ */
package org.apache.nifi.processors.hubspot; package org.apache.nifi.processors.hubspot;
import static org.apache.nifi.processors.hubspot.GetHubSpot.CURSOR_KEY;
import static org.apache.nifi.processors.hubspot.GetHubSpot.END_INCREMENTAL_KEY;
import static org.apache.nifi.processors.hubspot.GetHubSpot.START_INCREMENTAL_KEY;
import static org.apache.nifi.processors.hubspot.HubSpotObjectType.COMPANIES;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import okhttp3.HttpUrl; import okhttp3.HttpUrl;
import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.MockWebServer;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
@ -37,24 +55,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.apache.nifi.processors.hubspot.GetHubSpot.CURSOR_KEY;
import static org.apache.nifi.processors.hubspot.GetHubSpot.END_INCREMENTAL_KEY;
import static org.apache.nifi.processors.hubspot.GetHubSpot.START_INCREMENTAL_KEY;
import static org.apache.nifi.processors.hubspot.HubSpotObjectType.COMPANIES;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
class GetHubSpotTest { class GetHubSpotTest {
private static final long TEST_EPOCH_TIME = 1662665787; private static final long TEST_EPOCH_TIME = 1662665787;
@ -70,7 +70,8 @@ class GetHubSpotTest {
server.start(); server.start();
baseUrl = server.url(BASE_URL); baseUrl = server.url(BASE_URL);
final StandardWebClientServiceProvider standardWebClientServiceProvider = new StandardWebClientServiceProvider(); final StandardWebClientServiceProvider standardWebClientServiceProvider =
new StandardWebClientServiceProvider();
final MockGetHubSpot mockGetHubSpot = new MockGetHubSpot(TEST_EPOCH_TIME); final MockGetHubSpot mockGetHubSpot = new MockGetHubSpot(TEST_EPOCH_TIME);
runner = TestRunners.newTestRunner(mockGetHubSpot); runner = TestRunners.newTestRunner(mockGetHubSpot);
@ -99,11 +100,14 @@ class GetHubSpotTest {
runner.run(1); runner.run(1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHubSpot.REL_SUCCESS); final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHubSpot.REL_SUCCESS);
final MockFlowFile flowFile = flowFiles.get(0);
final String expectedFlowFileContent = getResourceAsString("expected_flowfile_content.json"); final String expectedFlowFileContent = getResourceAsString("expected_flowfile_content.json");
final JsonNode expectedJsonNode = OBJECT_MAPPER.readTree(expectedFlowFileContent); final JsonNode expectedJsonNode = OBJECT_MAPPER.readTree(expectedFlowFileContent);
final JsonNode actualJsonNode = OBJECT_MAPPER.readTree(flowFiles.get(0).getContent()); final JsonNode actualJsonNode = OBJECT_MAPPER.readTree(flowFile.getContent());
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
assertEquals(expectedJsonNode, actualJsonNode); assertEquals(expectedJsonNode, actualJsonNode);
} }
@ -242,13 +246,15 @@ class GetHubSpotTest {
} }
@Override @Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue,
final String newValue) {
} }
} }
private String getResourceAsString(final String resourceName) throws IOException { private String getResourceAsString(final String resourceName) throws IOException {
return IOUtils.toString( return IOUtils.toString(
Objects.requireNonNull(this.getClass().getClassLoader().getResourceAsStream(resourceName), resourceName), Objects.requireNonNull(this.getClass().getClassLoader().getResourceAsStream(resourceName),
resourceName),
StandardCharsets.UTF_8 StandardCharsets.UTF_8
); );
} }

View File

@ -27,6 +27,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
@ -36,6 +37,7 @@ import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
@ -78,6 +80,7 @@ import java.util.stream.Collectors;
" The list of the resources with the supported parameters can be found in the additional details.") " The list of the resources with the supported parameters can be found in the additional details.")
@CapabilityDescription("Retrieves objects from a custom Shopify store. The processor yield time must be set to the account's rate limit accordingly.") @CapabilityDescription("Retrieves objects from a custom Shopify store. The processor yield time must be set to the account's rate limit accordingly.")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
@WritesAttribute(attribute = "mime.type", description = "Sets the MIME type to application/json")
public class GetShopify extends AbstractProcessor { public class GetShopify extends AbstractProcessor {
static final PropertyDescriptor STORE_DOMAIN = new PropertyDescriptor.Builder() static final PropertyDescriptor STORE_DOMAIN = new PropertyDescriptor.Builder()
@ -322,6 +325,7 @@ public class GetShopify extends AbstractProcessor {
updateState(session, updatedStateMap); updateState(session, updatedStateMap);
} }
if (objectCountHolder.get() > 0) { if (objectCountHolder.get() > 0) {
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
} else { } else {
getLogger().debug("Empty response when requested Shopify resource: [{}]", resourceName); getLogger().debug("Empty response when requested Shopify resource: [{}]", resourceName);

View File

@ -20,6 +20,7 @@ import okhttp3.HttpUrl;
import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.MockWebServer;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processors.shopify.model.IncrementalLoadingParameter; import org.apache.nifi.processors.shopify.model.IncrementalLoadingParameter;
import org.apache.nifi.processors.shopify.model.ResourceType; import org.apache.nifi.processors.shopify.model.ResourceType;
import org.apache.nifi.processors.shopify.rest.ShopifyRestService; import org.apache.nifi.processors.shopify.rest.ShopifyRestService;
@ -71,7 +72,6 @@ class GetShopifyIT {
@Test @Test
void testStateIsUpdatedIfIncrementalAndNotPaging() throws InitializationException, IOException { void testStateIsUpdatedIfIncrementalAndNotPaging() throws InitializationException, IOException {
final String lastExecutionTimeKey = "last_execution_time";
final MockResponse mockResponse = new MockResponse() final MockResponse mockResponse = new MockResponse()
.setResponseCode(200) .setResponseCode(200)
@ -97,6 +97,10 @@ class GetShopifyIT {
runner.run(1); runner.run(1);
verify(customGetShopify).updateState(any(), any()); verify(customGetShopify).updateState(any(), any());
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetShopify.REL_SUCCESS);
flowFiles.get(0).assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
} }
@Test @Test