diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index e991bae734..a7a351cbcf 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -856,6 +856,12 @@ language governing permissions and limitations under the License. --> 1.18.0-SNAPSHOT nar + + org.apache.nifi + nifi-hubspot-nar + 1.18.0-SNAPSHOT + nar + diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-nar/pom.xml b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-nar/pom.xml new file mode 100644 index 0000000000..c9258b3d16 --- /dev/null +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-nar/pom.xml @@ -0,0 +1,46 @@ + + + + + nifi-hubspot-bundle + org.apache.nifi + 1.18.0-SNAPSHOT + + 4.0.0 + + nifi-hubspot-nar + + nar + + true + true + + + + org.apache.nifi + nifi-hubspot-processors + 1.18.0-SNAPSHOT + + + org.apache.nifi + nifi-standard-services-api-nar + 1.18.0-SNAPSHOT + nar + + + diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/pom.xml b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/pom.xml new file mode 100644 index 0000000000..06a7213274 --- /dev/null +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/pom.xml @@ -0,0 +1,93 @@ + + + + + nifi-hubspot-bundle + org.apache.nifi + 1.18.0-SNAPSHOT + + 4.0.0 + + nifi-hubspot-processors + + + + commons-io + commons-io + + + org.apache.nifi + nifi-web-client-provider-api + 1.18.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-utils + 1.18.0-SNAPSHOT + compile + + + com.fasterxml.jackson.core + jackson-databind + + + + org.apache.nifi + nifi-mock + 1.18.0-SNAPSHOT + test + + + org.apache.nifi + nifi-web-client-provider-service + 1.18.0-SNAPSHOT + test + + + org.apache.nifi + nifi-ssl-context-service-api + test + + + org.apache.nifi + nifi-proxy-configuration-api + test + + + com.squareup.okhttp3 + mockwebserver + test + + + + + + + org.apache.rat + apache-rat-plugin + + + + src/test/resources/**/* + + + + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java new file mode 100644 index 0000000000..80c1b4c96f --- /dev/null +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hubspot; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.HttpResponseStatus; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +@PrimaryNodeOnly +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"hubspot"}) +@CapabilityDescription("Retrieves JSON data from a private HubSpot application." + + " Configuring the Result Limit property enables incremental retrieval of results. When this property is set the processor will" + + " retrieve new records. This processor is intended to be run on the Primary Node only.") +@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is set, the paging cursor is saved after executing a request." + + " Only the objects after the paging cursor will be retrieved. The maximum number of retrieved objects is the 'Limit' attribute.") +@DefaultSettings(yieldDuration = "10 sec") +public class GetHubSpot extends AbstractProcessor { + + static final PropertyDescriptor OBJECT_TYPE = new PropertyDescriptor.Builder() + .name("object-type") + .displayName("Object Type") + .description("The HubSpot Object Type requested") + .required(true) + .allowableValues(HubSpotObjectType.class) + .build(); + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("access-token") + .displayName("Access Token") + .description("Access Token to authenticate requests") + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + static final PropertyDescriptor RESULT_LIMIT = new PropertyDescriptor.Builder() + .name("result-limit") + .displayName("Result Limit") + .description("The maximum number of results to request for each invocation of the Processor") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new PropertyDescriptor.Builder() + .name("web-client-service-provider") + .displayName("Web Client Service Provider") + .description("Controller service for HTTP client operations") + .identifiesControllerService(WebClientServiceProvider.class) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("For FlowFiles created as a result of a successful HTTP request.") + .build(); + + private static final String API_BASE_URI = "api.hubapi.com"; + private static final String HTTPS = "https"; + private static final String CURSOR_PARAMETER = "after"; + private static final String LIMIT_PARAMETER = "limit"; + private static final int TOO_MANY_REQUESTS = 429; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory(); + + private volatile WebClientServiceProvider webClientServiceProvider; + + private static final List PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( + OBJECT_TYPE, + ACCESS_TOKEN, + RESULT_LIMIT, + WEB_CLIENT_SERVICE_PROVIDER + )); + + private static final Set RELATIONSHIPS = Collections.singleton(REL_SUCCESS); + + @OnScheduled + public void onScheduled(final ProcessContext context) { + webClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final String accessToken = context.getProperty(ACCESS_TOKEN).getValue(); + final String endpoint = context.getProperty(OBJECT_TYPE).getValue(); + + final StateMap state = getStateMap(context); + final URI uri = createUri(context, state); + + final HttpResponseEntity response = getHttpResponseEntity(accessToken, uri); + final AtomicInteger objectCountHolder = new AtomicInteger(); + + if (response.statusCode() == HttpResponseStatus.OK.getCode()) { + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, parseHttpResponse(context, endpoint, state, response, objectCountHolder)); + if (objectCountHolder.get() > 0) { + session.transfer(flowFile, REL_SUCCESS); + } else { + getLogger().debug("Empty response when requested HubSpot endpoint: [{}]", endpoint); + session.remove(flowFile); + } + } else if (response.statusCode() == TOO_MANY_REQUESTS) { + context.yield(); + throw new ProcessException(String.format("Rate limit exceeded, yielding before retrying request. HTTP %d error for requested URI [%s]", response.statusCode(), uri)); + } else { + final String responseBody = getResponseBodyAsString(context, response, uri); + getLogger().warn("HTTP {} error for requested URI [{}] with response [{}]", response.statusCode(), uri, responseBody); + } + } + + private String getResponseBodyAsString(final ProcessContext context, final HttpResponseEntity response, final URI uri) { + try { + return IOUtils.toString(response.body(), StandardCharsets.UTF_8); + } catch (final IOException e) { + context.yield(); + throw new UncheckedIOException(String.format("Reading HTTP response body for requested URI [%s] failed", uri), e); + } + } + + private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) { + return out -> { + try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body()); + final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) { + while (jsonParser.nextToken() != null) { + if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName() + .equals("results")) { + jsonParser.nextToken(); + jsonGenerator.copyCurrentStructure(jsonParser); + objectCountHolder.incrementAndGet(); + } + final String fieldName = jsonParser.getCurrentName(); + if (CURSOR_PARAMETER.equals(fieldName)) { + jsonParser.nextToken(); + Map newStateMap = new HashMap<>(state.toMap()); + newStateMap.put(endpoint, jsonParser.getText()); + updateState(context, newStateMap); + break; + } + } + } + }; + } + + HttpUriBuilder getBaseUri(final ProcessContext context) { + final String path = context.getProperty(OBJECT_TYPE).getValue(); + return webClientServiceProvider.getHttpUriBuilder() + .scheme(HTTPS) + .host(API_BASE_URI) + .encodedPath(path); + } + + private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) { + return webClientServiceProvider.getWebClientService() + .get() + .uri(uri) + .header("Authorization", "Bearer " + accessToken) + .retrieve(); + } + + private URI createUri(final ProcessContext context, final StateMap state) { + final String path = context.getProperty(OBJECT_TYPE).getValue(); + final HttpUriBuilder uriBuilder = getBaseUri(context); + + final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet(); + if (isLimitSet) { + final String limit = context.getProperty(RESULT_LIMIT).getValue(); + uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit); + } + + final String cursor = state.get(path); + if (cursor != null) { + uriBuilder.addQueryParameter(CURSOR_PARAMETER, cursor); + } + return uriBuilder.build(); + } + + private StateMap getStateMap(final ProcessContext context) { + final StateMap stateMap; + try { + stateMap = context.getStateManager().getState(Scope.CLUSTER); + } catch (IOException e) { + throw new ProcessException("State retrieval failed", e); + } + return stateMap; + } + + private void updateState(ProcessContext context, Map newState) { + try { + context.getStateManager().setState(newState, Scope.CLUSTER); + } catch (IOException e) { + throw new ProcessException("Page cursor update failed", e); + } + } +} diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/HubSpotObjectType.java b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/HubSpotObjectType.java new file mode 100644 index 0000000000..afcefc81d1 --- /dev/null +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/HubSpotObjectType.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hubspot; + +import org.apache.nifi.components.DescribedValue; + +public enum HubSpotObjectType implements DescribedValue { + + COMPANIES( + "/crm/v3/objects/companies", + "Companies", + "In HubSpot, the companies object is a standard CRM object. Individual company records can be used to store information about businesses" + + " and organizations within company properties." + ), + CONTACTS( + "/crm/v3/objects/contacts", + "Contacts", + "In HubSpot, contacts store information about individuals. From marketing automation to smart content, the lead-specific data found in" + + " contact records helps users leverage much of HubSpot's functionality." + ), + DEALS( + "/crm/v3/objects/deals", + "Deals", + "In HubSpot, a deal represents an ongoing transaction that a sales team is pursuing with a contact or company. It’s tracked through" + + " pipeline stages until won or lost." + ), + FEEDBACK_SUBMISSIONS( + "/crm/v3/objects/feedback_submissions", + "Feedback Submissions", + "In HubSpot, feedback submissions are an object which stores information submitted to a feedback survey. This includes Net Promoter Score (NPS)," + + " Customer Satisfaction (CSAT), Customer Effort Score (CES) and Custom Surveys." + ), + LINE_ITEMS( + "/crm/v3/objects/line_items", + "Line Items", + "In HubSpot, line items can be thought of as a subset of products. When a product is attached to a deal, it becomes a line item. Line items can" + + " be created that are unique to an individual quote, but they will not be added to the product library." + ), + PRODUCTS( + "/crm/v3/objects/products", + "Products", + "In HubSpot, products represent the goods or services to be sold. Building a product library allows the user to quickly add products to deals," + + " generate quotes, and report on product performance." + ), + TICKETS( + "/crm/v3/objects/tickets", + "Tickets", + "In HubSpot, a ticket represents a customer request for help or support." + ), + QUOTES( + "/crm/v3/objects/quotes", + "Quotes", + "In HubSpot, quotes are used to share pricing information with potential buyers." + ), + + CALLS( + "/crm/v3/objects/calls", + "Calls", + "Get calls on CRM records and on the calls index page." + ), + EMAILS( + "/crm/v3/objects/emails", + "Emails", + "Get emails on CRM records." + ), + MEETINGS( + "/crm/v3/objects/meetings", + "Meetings", + "Get meetings on CRM records." + ), + NOTES( + "/crm/v3/objects/notes", + "Notes", + "Get notes on CRM records." + ), + TASKS( + "/crm/v3/objects/tasks", + "Tasks", + "Get tasks on CRM records." + ), + + OWNERS( + "/crm/v3/owners/", + "Owners", + "HubSpot uses owners to assign specific users to contacts, companies, deals, tickets, or engagements. Any HubSpot user with access to contacts" + + " can be assigned as an owner, and multiple owners can be assigned to an object by creating a custom property for this purpose." + ); + + + private final String value; + private final String displayName; + private final String description; + + HubSpotObjectType(final String value, final String displayName, final String description) { + this.value = value; + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return value; + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..e31c426d26 --- /dev/null +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.hubspot.GetHubSpot diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/resources/docs/org.apache.nifi.processors.hubspot.GetHubSpot/additionalDetails.html b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/resources/docs/org.apache.nifi.processors.hubspot.GetHubSpot/additionalDetails.html new file mode 100644 index 0000000000..17df046700 --- /dev/null +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/resources/docs/org.apache.nifi.processors.hubspot.GetHubSpot/additionalDetails.html @@ -0,0 +1,148 @@ + + + + + + GetHubSpot + + + + +

Authentication Methods

+

+ The processor is working with HubSpot private applications. A HubSpot private app must be created (see HubSpot Private App Creation) + in order to connect to HubSpot and make requests. Private App Access Tokens are the only authentication method that is currently supported. +

+

Incremental Loading

+

+ Some resources can be processed incrementally by NiFi. This means that only resources created or modified after the + last run + time of the processor are displayed. The processor state can be reset in the context menu. The following list shows + which + date-time fields are incremented for which resources. +

    +
  • Access +
      +
    • Access Scope: none
    • +
    • StoreFront Access Token: none
    • +
    +
  • +
  • Analytics +
      +
    • Reports: updated_at_min
    • +
    +
  • +
  • Billing +
      +
    • Application Charge: none
    • +
    • Application Credit: none
    • +
    • Recurring Application Charge: none
    • +
    +
  • +
  • Customers +
      +
    • Customers: updated_at_min
    • +
    • Customer Saved Searches: none
    • +
    +
  • +
  • Discounts +
      +
    • Price Rules: updated_at_min
    • +
    +
  • +
  • Events +
      +
    • Events: created_at_min
    • +
    +
  • +
  • Inventory +
      +
    • Inventory Levels: updated_at_min
    • +
    • Locations: none
    • +
    +
  • +
  • Marketing Event +
      +
    • Marketing Events: none
    • +
    +
  • +
  • Metafields +
      +
    • Metafields: updated_at_min
    • +
    +
  • +
  • Online Store +
      +
    • Blogs: none
    • +
    • Comment: none
    • +
    • Pages: none
    • +
    • Redirects: none
    • +
    • Script Tags: updated_at_min
    • +
    • Themes: none
    • +
    +
  • +
  • Orders +
      +
    • Abandoned Checkouts: updated_at_min
    • +
    • Draft Orders: updated_at_min
    • +
    • Orders: updated_at_min
    • +
    +
  • +
  • Plus +
      +
    • Gift Cards: none
    • +
    • Users: none
    • +
    +
  • +
  • Product +
      +
    • Collects: none
    • +
    • Custom Collections: updated_at_min
    • +
    • Products: updated_at_min
    • +
    • Smart Collections: updated_at_min
    • +
    +
  • +
  • Sales Channels +
      +
    • Collection Listings: none
    • +
    • Mobile Platform Applications: none
    • +
    • Product Listings: updated_at_min
    • +
    • Resource Feedbacks: none
    • +
    +
  • +
  • Shipping and Fulfillments +
      +
    • Carrier Services: none
    • +
    +
  • +
  • Store Properties +
      +
    • Countries: none
    • +
    • Currencies: none
    • +
    • Policies: none
    • +
    • Shipping Zones: updated_at_min
    • +
    • Shop: none
    • +
    +
  • +
  • Tender Transactions +
      +
    • Tender Transactions: processed_at_min
    • +
    +
  • +
+

+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/java/org/apache/nifi/processors/hubspot/GetHubSpotTest.java b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/java/org/apache/nifi/processors/hubspot/GetHubSpotTest.java new file mode 100644 index 0000000000..c5c0fa7820 --- /dev/null +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/java/org/apache/nifi/processors/hubspot/GetHubSpotTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hubspot; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import okhttp3.HttpUrl; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.web.client.StandardHttpUriBuilder; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.service.StandardWebClientServiceProvider; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class GetHubSpotTest { + + public static final String BASE_URL = "/test/hubspot"; + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static final String RESPONSE_WITHOUT_PAGING_CURSOR_JSON = "response-without-paging-cursor.json"; + public static final String RESPONSE_WITH_PAGING_CURSOR_JSON = "response-with-paging-cursor.json"; + private static MockWebServer server; + private static HttpUrl baseUrl; + + private TestRunner runner; + + @BeforeEach + void setup() throws IOException, InitializationException { + server = new MockWebServer(); + server.start(); + baseUrl = server.url(BASE_URL); + + final StandardWebClientServiceProvider standardWebClientServiceProvider = new StandardWebClientServiceProvider(); + final MockGetHubSpot mockGetHubSpot = new MockGetHubSpot(); + + runner = TestRunners.newTestRunner(mockGetHubSpot); + runner.addControllerService("standardWebClientServiceProvider", standardWebClientServiceProvider); + runner.enableControllerService(standardWebClientServiceProvider); + + runner.setProperty(GetHubSpot.WEB_CLIENT_SERVICE_PROVIDER, standardWebClientServiceProvider.getIdentifier()); + runner.setProperty(GetHubSpot.ACCESS_TOKEN, "testToken"); + runner.setProperty(GetHubSpot.OBJECT_TYPE, HubSpotObjectType.COMPANIES.getValue()); + runner.setProperty(GetHubSpot.RESULT_LIMIT, "1"); + } + + @AfterEach + void tearDown() throws IOException { + if (server != null) { + server.shutdown(); + server = null; + } + } + + @Test + void testLimitIsAddedToUrl() throws InterruptedException, IOException { + + final String response = getResourceAsString(RESPONSE_WITHOUT_PAGING_CURSOR_JSON); + server.enqueue(new MockResponse().setResponseCode(200).setBody(response)); + + runner.run(1); + + RecordedRequest request = server.takeRequest(); + assertEquals(BASE_URL + "?limit=1", request.getPath()); + } + + @Test + void testPageCursorIsAddedToUrlFromState() throws InterruptedException, IOException { + + final String response = getResourceAsString(RESPONSE_WITHOUT_PAGING_CURSOR_JSON); + server.enqueue(new MockResponse().setBody(response)); + + runner.getStateManager().setState(Collections.singletonMap(HubSpotObjectType.COMPANIES.getValue(), "12345"), Scope.CLUSTER); + + runner.run(1); + + RecordedRequest request = server.takeRequest(); + assertEquals(BASE_URL + "?limit=1&after=12345", request.getPath()); + } + + @Test + void testFlowFileContainsResultsArray() throws IOException { + + final String response = getResourceAsString(RESPONSE_WITH_PAGING_CURSOR_JSON); + server.enqueue(new MockResponse().setBody(response)); + + runner.run(1); + + final List flowFile = runner.getFlowFilesForRelationship(GetHubSpot.REL_SUCCESS); + final String expectedFlowFileContent = getResourceAsString("expected_flowfile_content.json"); + + final JsonNode expectedJsonNode = OBJECT_MAPPER.readTree(expectedFlowFileContent); + final JsonNode actualJsonNode = OBJECT_MAPPER.readTree(flowFile.get(0).getContent()); + + assertEquals(expectedJsonNode, actualJsonNode); + } + + @Test + void testStateIsStoredWhenPagingCursorFound() throws IOException { + + final String response = getResourceAsString(RESPONSE_WITH_PAGING_CURSOR_JSON); + final String expectedPagingCursor = OBJECT_MAPPER.readTree(response) + .path("paging") + .path("next") + .path("after") + .asText(); + + server.enqueue(new MockResponse().setBody(response)); + + runner.run(1); + + final StateMap state = runner.getStateManager().getState(Scope.CLUSTER); + final String actualPagingCursor = state.get(HubSpotObjectType.COMPANIES.getValue()); + + assertEquals(expectedPagingCursor, actualPagingCursor); + } + + + static class MockGetHubSpot extends GetHubSpot { + @Override + HttpUriBuilder getBaseUri(ProcessContext context) { + return new StandardHttpUriBuilder() + .scheme(baseUrl.scheme()) + .host(baseUrl.host()) + .port(baseUrl.port()) + .encodedPath(baseUrl.encodedPath()); + } + } + + private String getResourceAsString(final String resourceName) throws IOException { + return IOUtils.toString( + Objects.requireNonNull(this.getClass().getClassLoader().getResourceAsStream(resourceName), resourceName), + StandardCharsets.UTF_8 + ); + } + +} diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/expected_flowfile_content.json b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/expected_flowfile_content.json new file mode 100644 index 0000000000..edd489bce0 --- /dev/null +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/expected_flowfile_content.json @@ -0,0 +1,15 @@ +[ + { + "id": "5918809044", + "properties": { + "createdate": "2022-07-19T13:18:22.646Z", + "domain": "hubspot.com", + "hs_lastmodifieddate": "2022-07-19T13:18:33.337Z", + "hs_object_id": "5918809044", + "name": "Hubspot, Inc." + }, + "createdAt": "2022-07-19T13:18:22.646Z", + "updatedAt": "2022-07-19T13:18:33.337Z", + "archived": false + } +] diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/response-with-paging-cursor.json b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/response-with-paging-cursor.json new file mode 100644 index 0000000000..fefdf05f77 --- /dev/null +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/response-with-paging-cursor.json @@ -0,0 +1,23 @@ +{ + "results": [ + { + "id": "5918809044", + "properties": { + "createdate": "2022-07-19T13:18:22.646Z", + "domain": "hubspot.com", + "hs_lastmodifieddate": "2022-07-19T13:18:33.337Z", + "hs_object_id": "5918809044", + "name": "Hubspot, Inc." + }, + "createdAt": "2022-07-19T13:18:22.646Z", + "updatedAt": "2022-07-19T13:18:33.337Z", + "archived": false + } + ], + "paging": { + "next": { + "after": "5918809045", + "link": "https://api.hubapi.com/crm/v3/objects/companies?limit=1&after=5918809045" + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/response-without-paging-cursor.json b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/response-without-paging-cursor.json new file mode 100644 index 0000000000..fefdf05f77 --- /dev/null +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/response-without-paging-cursor.json @@ -0,0 +1,23 @@ +{ + "results": [ + { + "id": "5918809044", + "properties": { + "createdate": "2022-07-19T13:18:22.646Z", + "domain": "hubspot.com", + "hs_lastmodifieddate": "2022-07-19T13:18:33.337Z", + "hs_object_id": "5918809044", + "name": "Hubspot, Inc." + }, + "createdAt": "2022-07-19T13:18:22.646Z", + "updatedAt": "2022-07-19T13:18:33.337Z", + "archived": false + } + ], + "paging": { + "next": { + "after": "5918809045", + "link": "https://api.hubapi.com/crm/v3/objects/companies?limit=1&after=5918809045" + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/pom.xml b/nifi-nar-bundles/nifi-hubspot-bundle/pom.xml new file mode 100644 index 0000000000..e80893e16c --- /dev/null +++ b/nifi-nar-bundles/nifi-hubspot-bundle/pom.xml @@ -0,0 +1,32 @@ + + + + + nifi-nar-bundles + org.apache.nifi + 1.18.0-SNAPSHOT + + 4.0.0 + + nifi-hubspot-bundle + pom + + nifi-hubspot-nar + nifi-hubspot-processors + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 3c379468ae..3c6cba86c0 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -110,6 +110,7 @@ nifi-snowflake-bundle nifi-salesforce-bundle nifi-rocksdb-bundle + nifi-hubspot-bundle