NIFI-10356 Created GetHubSpot processor

This closes #6301

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Lehel 2022-08-15 20:26:02 +02:00 committed by exceptionfactory
parent 3a6d724b44
commit e2d6df5afc
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
13 changed files with 962 additions and 0 deletions

View File

@ -856,6 +856,12 @@ language governing permissions and limitations under the License. -->
<version>1.18.0-SNAPSHOT</version> <version>1.18.0-SNAPSHOT</version>
<type>nar</type> <type>nar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hubspot-nar</artifactId>
<version>1.18.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<!-- dependencies for jaxb/activation/annotation for running NiFi on Java 11 --> <!-- dependencies for jaxb/activation/annotation for running NiFi on Java 11 -->
<!-- TODO: remove these once minimum Java version is 11 --> <!-- TODO: remove these once minimum Java version is 11 -->
<dependency> <dependency>

View File

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-hubspot-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.18.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-hubspot-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hubspot-processors</artifactId>
<version>1.18.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>1.18.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,93 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-hubspot-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.18.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-hubspot-processors</artifactId>
<dependencies>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-client-provider-api</artifactId>
<version>1.18.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.18.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.18.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-client-provider-service</artifactId>
<version>1.18.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<!-- test data -->
<exclude>src/test/resources/**/*</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,264 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hubspot;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.web.client.api.HttpResponseEntity;
import org.apache.nifi.web.client.api.HttpResponseStatus;
import org.apache.nifi.web.client.api.HttpUriBuilder;
import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@PrimaryNodeOnly
@TriggerSerially
@TriggerWhenEmpty
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"hubspot"})
@CapabilityDescription("Retrieves JSON data from a private HubSpot application."
+ " Configuring the Result Limit property enables incremental retrieval of results. When this property is set the processor will"
+ " retrieve new records. This processor is intended to be run on the Primary Node only.")
@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is set, the paging cursor is saved after executing a request."
+ " Only the objects after the paging cursor will be retrieved. The maximum number of retrieved objects is the 'Limit' attribute.")
@DefaultSettings(yieldDuration = "10 sec")
public class GetHubSpot extends AbstractProcessor {
static final PropertyDescriptor OBJECT_TYPE = new PropertyDescriptor.Builder()
.name("object-type")
.displayName("Object Type")
.description("The HubSpot Object Type requested")
.required(true)
.allowableValues(HubSpotObjectType.class)
.build();
static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder()
.name("access-token")
.displayName("Access Token")
.description("Access Token to authenticate requests")
.required(true)
.sensitive(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
static final PropertyDescriptor RESULT_LIMIT = new PropertyDescriptor.Builder()
.name("result-limit")
.displayName("Result Limit")
.description("The maximum number of results to request for each invocation of the Processor")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new PropertyDescriptor.Builder()
.name("web-client-service-provider")
.displayName("Web Client Service Provider")
.description("Controller service for HTTP client operations")
.identifiesControllerService(WebClientServiceProvider.class)
.required(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("For FlowFiles created as a result of a successful HTTP request.")
.build();
private static final String API_BASE_URI = "api.hubapi.com";
private static final String HTTPS = "https";
private static final String CURSOR_PARAMETER = "after";
private static final String LIMIT_PARAMETER = "limit";
private static final int TOO_MANY_REQUESTS = 429;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
private volatile WebClientServiceProvider webClientServiceProvider;
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
OBJECT_TYPE,
ACCESS_TOKEN,
RESULT_LIMIT,
WEB_CLIENT_SERVICE_PROVIDER
));
private static final Set<Relationship> RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
@OnScheduled
public void onScheduled(final ProcessContext context) {
webClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final String accessToken = context.getProperty(ACCESS_TOKEN).getValue();
final String endpoint = context.getProperty(OBJECT_TYPE).getValue();
final StateMap state = getStateMap(context);
final URI uri = createUri(context, state);
final HttpResponseEntity response = getHttpResponseEntity(accessToken, uri);
final AtomicInteger objectCountHolder = new AtomicInteger();
if (response.statusCode() == HttpResponseStatus.OK.getCode()) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, parseHttpResponse(context, endpoint, state, response, objectCountHolder));
if (objectCountHolder.get() > 0) {
session.transfer(flowFile, REL_SUCCESS);
} else {
getLogger().debug("Empty response when requested HubSpot endpoint: [{}]", endpoint);
session.remove(flowFile);
}
} else if (response.statusCode() == TOO_MANY_REQUESTS) {
context.yield();
throw new ProcessException(String.format("Rate limit exceeded, yielding before retrying request. HTTP %d error for requested URI [%s]", response.statusCode(), uri));
} else {
final String responseBody = getResponseBodyAsString(context, response, uri);
getLogger().warn("HTTP {} error for requested URI [{}] with response [{}]", response.statusCode(), uri, responseBody);
}
}
private String getResponseBodyAsString(final ProcessContext context, final HttpResponseEntity response, final URI uri) {
try {
return IOUtils.toString(response.body(), StandardCharsets.UTF_8);
} catch (final IOException e) {
context.yield();
throw new UncheckedIOException(String.format("Reading HTTP response body for requested URI [%s] failed", uri), e);
}
}
private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) {
return out -> {
try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
while (jsonParser.nextToken() != null) {
if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
.equals("results")) {
jsonParser.nextToken();
jsonGenerator.copyCurrentStructure(jsonParser);
objectCountHolder.incrementAndGet();
}
final String fieldName = jsonParser.getCurrentName();
if (CURSOR_PARAMETER.equals(fieldName)) {
jsonParser.nextToken();
Map<String, String> newStateMap = new HashMap<>(state.toMap());
newStateMap.put(endpoint, jsonParser.getText());
updateState(context, newStateMap);
break;
}
}
}
};
}
HttpUriBuilder getBaseUri(final ProcessContext context) {
final String path = context.getProperty(OBJECT_TYPE).getValue();
return webClientServiceProvider.getHttpUriBuilder()
.scheme(HTTPS)
.host(API_BASE_URI)
.encodedPath(path);
}
private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) {
return webClientServiceProvider.getWebClientService()
.get()
.uri(uri)
.header("Authorization", "Bearer " + accessToken)
.retrieve();
}
private URI createUri(final ProcessContext context, final StateMap state) {
final String path = context.getProperty(OBJECT_TYPE).getValue();
final HttpUriBuilder uriBuilder = getBaseUri(context);
final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
if (isLimitSet) {
final String limit = context.getProperty(RESULT_LIMIT).getValue();
uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
}
final String cursor = state.get(path);
if (cursor != null) {
uriBuilder.addQueryParameter(CURSOR_PARAMETER, cursor);
}
return uriBuilder.build();
}
private StateMap getStateMap(final ProcessContext context) {
final StateMap stateMap;
try {
stateMap = context.getStateManager().getState(Scope.CLUSTER);
} catch (IOException e) {
throw new ProcessException("State retrieval failed", e);
}
return stateMap;
}
private void updateState(ProcessContext context, Map<String, String> newState) {
try {
context.getStateManager().setState(newState, Scope.CLUSTER);
} catch (IOException e) {
throw new ProcessException("Page cursor update failed", e);
}
}
}

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hubspot;
import org.apache.nifi.components.DescribedValue;
public enum HubSpotObjectType implements DescribedValue {
COMPANIES(
"/crm/v3/objects/companies",
"Companies",
"In HubSpot, the companies object is a standard CRM object. Individual company records can be used to store information about businesses" +
" and organizations within company properties."
),
CONTACTS(
"/crm/v3/objects/contacts",
"Contacts",
"In HubSpot, contacts store information about individuals. From marketing automation to smart content, the lead-specific data found in" +
" contact records helps users leverage much of HubSpot's functionality."
),
DEALS(
"/crm/v3/objects/deals",
"Deals",
"In HubSpot, a deal represents an ongoing transaction that a sales team is pursuing with a contact or company. Its tracked through" +
" pipeline stages until won or lost."
),
FEEDBACK_SUBMISSIONS(
"/crm/v3/objects/feedback_submissions",
"Feedback Submissions",
"In HubSpot, feedback submissions are an object which stores information submitted to a feedback survey. This includes Net Promoter Score (NPS)," +
" Customer Satisfaction (CSAT), Customer Effort Score (CES) and Custom Surveys."
),
LINE_ITEMS(
"/crm/v3/objects/line_items",
"Line Items",
"In HubSpot, line items can be thought of as a subset of products. When a product is attached to a deal, it becomes a line item. Line items can" +
" be created that are unique to an individual quote, but they will not be added to the product library."
),
PRODUCTS(
"/crm/v3/objects/products",
"Products",
"In HubSpot, products represent the goods or services to be sold. Building a product library allows the user to quickly add products to deals," +
" generate quotes, and report on product performance."
),
TICKETS(
"/crm/v3/objects/tickets",
"Tickets",
"In HubSpot, a ticket represents a customer request for help or support."
),
QUOTES(
"/crm/v3/objects/quotes",
"Quotes",
"In HubSpot, quotes are used to share pricing information with potential buyers."
),
CALLS(
"/crm/v3/objects/calls",
"Calls",
"Get calls on CRM records and on the calls index page."
),
EMAILS(
"/crm/v3/objects/emails",
"Emails",
"Get emails on CRM records."
),
MEETINGS(
"/crm/v3/objects/meetings",
"Meetings",
"Get meetings on CRM records."
),
NOTES(
"/crm/v3/objects/notes",
"Notes",
"Get notes on CRM records."
),
TASKS(
"/crm/v3/objects/tasks",
"Tasks",
"Get tasks on CRM records."
),
OWNERS(
"/crm/v3/owners/",
"Owners",
"HubSpot uses owners to assign specific users to contacts, companies, deals, tickets, or engagements. Any HubSpot user with access to contacts" +
" can be assigned as an owner, and multiple owners can be assigned to an object by creating a custom property for this purpose."
);
private final String value;
private final String displayName;
private final String description;
HubSpotObjectType(final String value, final String displayName, final String description) {
this.value = value;
this.displayName = displayName;
this.description = description;
}
@Override
public String getValue() {
return value;
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.hubspot.GetHubSpot

View File

@ -0,0 +1,148 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>GetHubSpot</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<h2>Authentication Methods</h2>
<p>
The processor is working with HubSpot private applications. A HubSpot private app must be created (see <a href="https://developers.hubspot.com/docs/api/private-apps">HubSpot Private App Creation</a>)
in order to connect to HubSpot and make requests. Private App Access Tokens are the only authentication method that is currently supported.
</p>
<h2>Incremental Loading</h2>
<p>
Some resources can be processed incrementally by NiFi. This means that only resources created or modified after the
last run
time of the processor are displayed. The processor state can be reset in the context menu. The following list shows
which
date-time fields are incremented for which resources.
<ul>
<li>Access
<ul>
<li>Access Scope: none</li>
<li>StoreFront Access Token: none</li>
</ul>
</li>
<li>Analytics
<ul>
<li>Reports: updated_at_min</li>
</ul>
</li>
<li>Billing
<ul>
<li>Application Charge: none</li>
<li>Application Credit: none</li>
<li>Recurring Application Charge: none</li>
</ul>
</li>
<li>Customers
<ul>
<li>Customers: updated_at_min</li>
<li>Customer Saved Searches: none</li>
</ul>
</li>
<li>Discounts
<ul>
<li>Price Rules: updated_at_min</li>
</ul>
</li>
<li>Events
<ul>
<li>Events: created_at_min</li>
</ul>
</li>
<li>Inventory
<ul>
<li>Inventory Levels: updated_at_min</li>
<li>Locations: none</li>
</ul>
</li>
<li>Marketing Event
<ul>
<li>Marketing Events: none</li>
</ul>
</li>
<li>Metafields
<ul>
<li>Metafields: updated_at_min</li>
</ul>
</li>
<li>Online Store
<ul>
<li>Blogs: none</li>
<li>Comment: none</li>
<li>Pages: none</li>
<li>Redirects: none</li>
<li>Script Tags: updated_at_min</li>
<li>Themes: none</li>
</ul>
</li>
<li>Orders
<ul>
<li>Abandoned Checkouts: updated_at_min</li>
<li>Draft Orders: updated_at_min</li>
<li>Orders: updated_at_min</li>
</ul>
</li>
<li>Plus
<ul>
<li>Gift Cards: none</li>
<li>Users: none</li>
</ul>
</li>
<li>Product
<ul>
<li>Collects: none</li>
<li>Custom Collections: updated_at_min</li>
<li>Products: updated_at_min</li>
<li>Smart Collections: updated_at_min</li>
</ul>
</li>
<li>Sales Channels
<ul>
<li>Collection Listings: none</li>
<li>Mobile Platform Applications: none</li>
<li>Product Listings: updated_at_min</li>
<li>Resource Feedbacks: none</li>
</ul>
</li>
<li>Shipping and Fulfillments
<ul>
<li>Carrier Services: none</li>
</ul>
</li>
<li>Store Properties
<ul>
<li>Countries: none</li>
<li>Currencies: none</li>
<li>Policies: none</li>
<li>Shipping Zones: updated_at_min</li>
<li>Shop: none</li>
</ul>
</li>
<li>Tender Transactions
<ul>
<li>Tender Transactions: processed_at_min</li>
</ul>
</li>
</ul>
</p>
</body>
</html>

View File

@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hubspot;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.HttpUrl;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.client.StandardHttpUriBuilder;
import org.apache.nifi.web.client.api.HttpUriBuilder;
import org.apache.nifi.web.client.provider.service.StandardWebClientServiceProvider;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import static org.junit.jupiter.api.Assertions.assertEquals;
class GetHubSpotTest {
public static final String BASE_URL = "/test/hubspot";
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final String RESPONSE_WITHOUT_PAGING_CURSOR_JSON = "response-without-paging-cursor.json";
public static final String RESPONSE_WITH_PAGING_CURSOR_JSON = "response-with-paging-cursor.json";
private static MockWebServer server;
private static HttpUrl baseUrl;
private TestRunner runner;
@BeforeEach
void setup() throws IOException, InitializationException {
server = new MockWebServer();
server.start();
baseUrl = server.url(BASE_URL);
final StandardWebClientServiceProvider standardWebClientServiceProvider = new StandardWebClientServiceProvider();
final MockGetHubSpot mockGetHubSpot = new MockGetHubSpot();
runner = TestRunners.newTestRunner(mockGetHubSpot);
runner.addControllerService("standardWebClientServiceProvider", standardWebClientServiceProvider);
runner.enableControllerService(standardWebClientServiceProvider);
runner.setProperty(GetHubSpot.WEB_CLIENT_SERVICE_PROVIDER, standardWebClientServiceProvider.getIdentifier());
runner.setProperty(GetHubSpot.ACCESS_TOKEN, "testToken");
runner.setProperty(GetHubSpot.OBJECT_TYPE, HubSpotObjectType.COMPANIES.getValue());
runner.setProperty(GetHubSpot.RESULT_LIMIT, "1");
}
@AfterEach
void tearDown() throws IOException {
if (server != null) {
server.shutdown();
server = null;
}
}
@Test
void testLimitIsAddedToUrl() throws InterruptedException, IOException {
final String response = getResourceAsString(RESPONSE_WITHOUT_PAGING_CURSOR_JSON);
server.enqueue(new MockResponse().setResponseCode(200).setBody(response));
runner.run(1);
RecordedRequest request = server.takeRequest();
assertEquals(BASE_URL + "?limit=1", request.getPath());
}
@Test
void testPageCursorIsAddedToUrlFromState() throws InterruptedException, IOException {
final String response = getResourceAsString(RESPONSE_WITHOUT_PAGING_CURSOR_JSON);
server.enqueue(new MockResponse().setBody(response));
runner.getStateManager().setState(Collections.singletonMap(HubSpotObjectType.COMPANIES.getValue(), "12345"), Scope.CLUSTER);
runner.run(1);
RecordedRequest request = server.takeRequest();
assertEquals(BASE_URL + "?limit=1&after=12345", request.getPath());
}
@Test
void testFlowFileContainsResultsArray() throws IOException {
final String response = getResourceAsString(RESPONSE_WITH_PAGING_CURSOR_JSON);
server.enqueue(new MockResponse().setBody(response));
runner.run(1);
final List<MockFlowFile> flowFile = runner.getFlowFilesForRelationship(GetHubSpot.REL_SUCCESS);
final String expectedFlowFileContent = getResourceAsString("expected_flowfile_content.json");
final JsonNode expectedJsonNode = OBJECT_MAPPER.readTree(expectedFlowFileContent);
final JsonNode actualJsonNode = OBJECT_MAPPER.readTree(flowFile.get(0).getContent());
assertEquals(expectedJsonNode, actualJsonNode);
}
@Test
void testStateIsStoredWhenPagingCursorFound() throws IOException {
final String response = getResourceAsString(RESPONSE_WITH_PAGING_CURSOR_JSON);
final String expectedPagingCursor = OBJECT_MAPPER.readTree(response)
.path("paging")
.path("next")
.path("after")
.asText();
server.enqueue(new MockResponse().setBody(response));
runner.run(1);
final StateMap state = runner.getStateManager().getState(Scope.CLUSTER);
final String actualPagingCursor = state.get(HubSpotObjectType.COMPANIES.getValue());
assertEquals(expectedPagingCursor, actualPagingCursor);
}
static class MockGetHubSpot extends GetHubSpot {
@Override
HttpUriBuilder getBaseUri(ProcessContext context) {
return new StandardHttpUriBuilder()
.scheme(baseUrl.scheme())
.host(baseUrl.host())
.port(baseUrl.port())
.encodedPath(baseUrl.encodedPath());
}
}
private String getResourceAsString(final String resourceName) throws IOException {
return IOUtils.toString(
Objects.requireNonNull(this.getClass().getClassLoader().getResourceAsStream(resourceName), resourceName),
StandardCharsets.UTF_8
);
}
}

View File

@ -0,0 +1,15 @@
[
{
"id": "5918809044",
"properties": {
"createdate": "2022-07-19T13:18:22.646Z",
"domain": "hubspot.com",
"hs_lastmodifieddate": "2022-07-19T13:18:33.337Z",
"hs_object_id": "5918809044",
"name": "Hubspot, Inc."
},
"createdAt": "2022-07-19T13:18:22.646Z",
"updatedAt": "2022-07-19T13:18:33.337Z",
"archived": false
}
]

View File

@ -0,0 +1,23 @@
{
"results": [
{
"id": "5918809044",
"properties": {
"createdate": "2022-07-19T13:18:22.646Z",
"domain": "hubspot.com",
"hs_lastmodifieddate": "2022-07-19T13:18:33.337Z",
"hs_object_id": "5918809044",
"name": "Hubspot, Inc."
},
"createdAt": "2022-07-19T13:18:22.646Z",
"updatedAt": "2022-07-19T13:18:33.337Z",
"archived": false
}
],
"paging": {
"next": {
"after": "5918809045",
"link": "https://api.hubapi.com/crm/v3/objects/companies?limit=1&after=5918809045"
}
}
}

View File

@ -0,0 +1,23 @@
{
"results": [
{
"id": "5918809044",
"properties": {
"createdate": "2022-07-19T13:18:22.646Z",
"domain": "hubspot.com",
"hs_lastmodifieddate": "2022-07-19T13:18:33.337Z",
"hs_object_id": "5918809044",
"name": "Hubspot, Inc."
},
"createdAt": "2022-07-19T13:18:22.646Z",
"updatedAt": "2022-07-19T13:18:33.337Z",
"archived": false
}
],
"paging": {
"next": {
"after": "5918809045",
"link": "https://api.hubapi.com/crm/v3/objects/companies?limit=1&after=5918809045"
}
}
}

View File

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-nar-bundles</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.18.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-hubspot-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-hubspot-nar</module>
<module>nifi-hubspot-processors</module>
</modules>
</project>

View File

@ -110,6 +110,7 @@
<module>nifi-snowflake-bundle</module> <module>nifi-snowflake-bundle</module>
<module>nifi-salesforce-bundle</module> <module>nifi-salesforce-bundle</module>
<module>nifi-rocksdb-bundle</module> <module>nifi-rocksdb-bundle</module>
<module>nifi-hubspot-bundle</module>
</modules> </modules>
<build> <build>