diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..7a4a3ea242 --- /dev/null +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..8f8b7eac83 --- /dev/null +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,37 @@ +nifi-airtable-nar +Copyright 2014-2022 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +************************** +Apache Software License v2 +************************** + + (ASLv2) Apache Commons IO + The following NOTICE information applies: + Apache Commons IO + Copyright 2002-2016 The Apache Software Foundation + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java index 80c1b4c96f..58c0d02c58 100644 --- a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java @@ -22,13 +22,14 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.TriggerSerially; -import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.configuration.DefaultSettings; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -47,31 +48,36 @@ import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.web.client.api.HttpResponseEntity; import org.apache.nifi.web.client.api.HttpResponseStatus; -import org.apache.nifi.web.client.api.HttpUriBuilder; import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; import java.io.IOException; +import java.io.InputStream; import java.io.UncheckedIOException; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; @PrimaryNodeOnly @TriggerSerially -@TriggerWhenEmpty @InputRequirement(Requirement.INPUT_FORBIDDEN) @Tags({"hubspot"}) @CapabilityDescription("Retrieves JSON data from a private HubSpot application." - + " Configuring the Result Limit property enables incremental retrieval of results. When this property is set the processor will" - + " retrieve new records. This processor is intended to be run on the Primary Node only.") -@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is set, the paging cursor is saved after executing a request." - + " Only the objects after the paging cursor will be retrieved. The maximum number of retrieved objects is the 'Limit' attribute.") + + " This processor is intended to be run on the Primary Node only.") +@Stateful(scopes = Scope.CLUSTER, description = "In case of incremental loading, the start and end timestamps of the last" + + " query time window are stored in the state. When the 'Result Limit' property is set, the paging cursor is saved after" + + " executing a request. Only the objects after the paging cursor will be retrieved. The maximum number of retrieved" + + " objects can be set in the 'Result Limit' property.") @DefaultSettings(yieldDuration = "10 sec") public class GetHubSpot extends AbstractProcessor { @@ -97,9 +103,44 @@ public class GetHubSpot extends AbstractProcessor { .name("result-limit") .displayName("Result Limit") .description("The maximum number of results to request for each invocation of the Processor") - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .required(false) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .addValidator(StandardValidators.createLongValidator(1, 100, true)) + .build(); + + static final PropertyDescriptor IS_INCREMENTAL = new PropertyDescriptor.Builder() + .name("is-incremental") + .displayName("Incremental Loading") + .description("The processor can incrementally load the queried objects so that each object is queried exactly once." + + " For each query, the processor queries objects within a time window where the objects were modified between" + + " the previous run time and the current time (optionally adjusted by the Incremental Delay property).") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + static final PropertyDescriptor INCREMENTAL_DELAY = new PropertyDescriptor.Builder() + .name("incremental-delay") + .displayName("Incremental Delay") + .description(("The ending timestamp of the time window will be adjusted earlier by the amount configured in this property." + + " For example, with a property value of 10 seconds, an ending timestamp of 12:30:45 would be changed to 12:30:35." + + " Set this property to avoid missing objects when the clock of your local machines and HubSpot servers' clock are not in sync.")) + .required(true) + .defaultValue("3 sec") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .dependsOn(IS_INCREMENTAL, "true") + .build(); + + static final PropertyDescriptor INCREMENTAL_INITIAL_START_TIME = new PropertyDescriptor.Builder() + .name("incremental-initial-start-time") + .displayName("Incremental Initial Start Time") + .description("This property specifies the start time that the processor applies when running the first request." + + " The expected format is a UTC date-time such as '2011-12-03T10:15:30Z'") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.ISO8601_INSTANT_VALIDATOR) + .dependsOn(IS_INCREMENTAL, "true") .build(); static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new PropertyDescriptor.Builder() @@ -117,28 +158,36 @@ public class GetHubSpot extends AbstractProcessor { private static final String API_BASE_URI = "api.hubapi.com"; private static final String HTTPS = "https"; - private static final String CURSOR_PARAMETER = "after"; - private static final String LIMIT_PARAMETER = "limit"; private static final int TOO_MANY_REQUESTS = 429; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory(); + private static final Map OBJECT_TYPE_LOOKUP_MAP = createObjectTypeLookupMap(); + private static final String NO_PAGING = "no paging"; + private static final String PAGING_CURSOR = "after"; + static final String CURSOR_KEY = "paging_next"; + static final String START_INCREMENTAL_KEY = "time_window_start"; + static final String END_INCREMENTAL_KEY = "time_window_end"; + + private static Map createObjectTypeLookupMap() { + return Arrays.stream(HubSpotObjectType.values()) + .collect(Collectors.toMap(HubSpotObjectType::getValue, Function.identity())); + } private volatile WebClientServiceProvider webClientServiceProvider; + private volatile boolean isObjectTypeModified; private static final List PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( OBJECT_TYPE, ACCESS_TOKEN, RESULT_LIMIT, + IS_INCREMENTAL, + INCREMENTAL_DELAY, + INCREMENTAL_INITIAL_START_TIME, WEB_CLIENT_SERVICE_PROVIDER )); private static final Set RELATIONSHIPS = Collections.singleton(REL_SUCCESS); - @OnScheduled - public void onScheduled(final ProcessContext context) { - webClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class); - } - @Override protected List getSupportedPropertyDescriptors() { return PROPERTY_DESCRIPTORS; @@ -149,26 +198,45 @@ public class GetHubSpot extends AbstractProcessor { return RELATIONSHIPS; } + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (OBJECT_TYPE.equals(descriptor) || IS_INCREMENTAL.equals(descriptor)) { + isObjectTypeModified = true; + } + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + webClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class); + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + if (isObjectTypeModified) { + clearState(context); + isObjectTypeModified = false; + } final String accessToken = context.getProperty(ACCESS_TOKEN).getValue(); final String endpoint = context.getProperty(OBJECT_TYPE).getValue(); - final StateMap state = getStateMap(context); - final URI uri = createUri(context, state); + final URI uri = getBaseUri(context); - final HttpResponseEntity response = getHttpResponseEntity(accessToken, uri); - final AtomicInteger objectCountHolder = new AtomicInteger(); + final AtomicInteger total = new AtomicInteger(-1); + final Map stateMap = getStateMap(session); + final String filters = createIncrementalFilters(context, stateMap); + final HttpResponseEntity response = getHttpResponseEntity(accessToken, uri, filters); if (response.statusCode() == HttpResponseStatus.OK.getCode()) { FlowFile flowFile = session.create(); - flowFile = session.write(flowFile, parseHttpResponse(context, endpoint, state, response, objectCountHolder)); - if (objectCountHolder.get() > 0) { + flowFile = session.write(flowFile, parseHttpResponse(response, total, stateMap)); + if (total.get() > 0) { session.transfer(flowFile, REL_SUCCESS); } else { getLogger().debug("Empty response when requested HubSpot endpoint: [{}]", endpoint); + context.yield(); session.remove(flowFile); } + updateState(session, stateMap); } else if (response.statusCode() == TOO_MANY_REQUESTS) { context.yield(); throw new ProcessException(String.format("Rate limit exceeded, yielding before retrying request. HTTP %d error for requested URI [%s]", response.statusCode(), uri)); @@ -187,78 +255,154 @@ public class GetHubSpot extends AbstractProcessor { } } - private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) { + private OutputStreamCallback parseHttpResponse(final HttpResponseEntity response, final AtomicInteger total, + final Map stateMap) { return out -> { try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body()); final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) { + boolean isCursorAvailable = false; while (jsonParser.nextToken() != null) { + if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName() + .equals("total")) { + jsonParser.nextToken(); + total.set(jsonParser.getIntValue()); + } if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName() .equals("results")) { jsonParser.nextToken(); jsonGenerator.copyCurrentStructure(jsonParser); - objectCountHolder.incrementAndGet(); } final String fieldName = jsonParser.getCurrentName(); - if (CURSOR_PARAMETER.equals(fieldName)) { + if (PAGING_CURSOR.equals(fieldName)) { + isCursorAvailable = true; jsonParser.nextToken(); - Map newStateMap = new HashMap<>(state.toMap()); - newStateMap.put(endpoint, jsonParser.getText()); - updateState(context, newStateMap); + stateMap.put(CURSOR_KEY, jsonParser.getText()); break; } } + if (!isCursorAvailable) { + stateMap.put(CURSOR_KEY, NO_PAGING); + } } }; } - HttpUriBuilder getBaseUri(final ProcessContext context) { + URI getBaseUri(final ProcessContext context) { final String path = context.getProperty(OBJECT_TYPE).getValue(); return webClientServiceProvider.getHttpUriBuilder() .scheme(HTTPS) .host(API_BASE_URI) - .encodedPath(path); + .encodedPath(path + "/search") + .build(); } - private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) { - return webClientServiceProvider.getWebClientService() - .get() - .uri(uri) - .header("Authorization", "Bearer " + accessToken) - .retrieve(); + private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) { + final InputStream inputStream = IOUtils.toInputStream(filters, StandardCharsets.UTF_8); + try { + return webClientServiceProvider.getWebClientService() + .post() + .uri(uri) + .header("Authorization", "Bearer " + accessToken) + .header("Content-Type", "application/json") + .body(inputStream, OptionalLong.of(inputStream.available())) + .retrieve(); + } catch (IOException e) { + throw new ProcessException("Could not transform incremental filters to input stream", e); + } } - private URI createUri(final ProcessContext context, final StateMap state) { - final String path = context.getProperty(OBJECT_TYPE).getValue(); - final HttpUriBuilder uriBuilder = getBaseUri(context); + private String createIncrementalFilters(final ProcessContext context, final Map stateMap) { + final String limit = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().getValue(); + final String objectType = context.getProperty(OBJECT_TYPE).getValue(); + final HubSpotObjectType hubSpotObjectType = OBJECT_TYPE_LOOKUP_MAP.get(objectType); + final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); - final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet(); - if (isLimitSet) { - final String limit = context.getProperty(RESULT_LIMIT).getValue(); - uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit); + final ObjectNode root = OBJECT_MAPPER.createObjectNode(); + if (limit != null) { + root.put("limit", limit); } - final String cursor = state.get(path); - if (cursor != null) { - uriBuilder.addQueryParameter(CURSOR_PARAMETER, cursor); + final String cursor = stateMap.get(CURSOR_KEY); + if (cursor != null && !NO_PAGING.equals(cursor)) { + root.put(PAGING_CURSOR, cursor); } - return uriBuilder.build(); + final boolean isIncremental = context.getProperty(IS_INCREMENTAL).asBoolean(); + if (isIncremental) { + final String initialStartTimeValue = context.getProperty(INCREMENTAL_INITIAL_START_TIME).evaluateAttributeExpressions().getValue(); + final String hubspotSpecificIncrementalFieldName = hubSpotObjectType.getLastModifiedDateType().getValue(); + final String lastStartTime = stateMap.get(START_INCREMENTAL_KEY); + final String lastEndTime = stateMap.get(END_INCREMENTAL_KEY); + + final String currentStartTime; + final String currentEndTime; + + if (cursor != null && !NO_PAGING.equals(cursor)) { + currentStartTime = lastStartTime; + currentEndTime = lastEndTime; + } else { + currentStartTime = lastEndTime != null ? lastEndTime : getInitialStartTimeEpoch(initialStartTimeValue); + final long delayedCurrentEndTime = incrDelayMs != null ? getCurrentEpochTime() - incrDelayMs : getCurrentEpochTime(); + currentEndTime = String.valueOf(delayedCurrentEndTime); + + stateMap.put(START_INCREMENTAL_KEY, currentStartTime); + stateMap.put(END_INCREMENTAL_KEY, currentEndTime); + } + + final ArrayNode filters = OBJECT_MAPPER.createArrayNode(); + + if (currentStartTime != null) { + final ObjectNode greaterThanFilterNode = OBJECT_MAPPER.createObjectNode(); + greaterThanFilterNode.put("propertyName", hubspotSpecificIncrementalFieldName); + greaterThanFilterNode.put("operator", "GTE"); + greaterThanFilterNode.put("value", currentStartTime); + filters.add(greaterThanFilterNode); + } + + final ObjectNode lessThanFilterNode = OBJECT_MAPPER.createObjectNode(); + lessThanFilterNode.put("propertyName", hubspotSpecificIncrementalFieldName); + lessThanFilterNode.put("operator", "LT"); + lessThanFilterNode.put("value", currentEndTime); + filters.add(lessThanFilterNode); + + root.set("filters", filters); + } + return root.toString(); } - private StateMap getStateMap(final ProcessContext context) { + private String getInitialStartTimeEpoch(String initialStartTimeValue) { + if (initialStartTimeValue != null) { + return String.valueOf(Instant.parse(initialStartTimeValue).toEpochMilli()); + } + return null; + } + + long getCurrentEpochTime() { + return Instant.now().toEpochMilli(); + } + + private Map getStateMap(final ProcessSession session) { final StateMap stateMap; try { - stateMap = context.getStateManager().getState(Scope.CLUSTER); + stateMap = session.getState(Scope.CLUSTER); } catch (IOException e) { throw new ProcessException("State retrieval failed", e); } - return stateMap; + return new HashMap<>(stateMap.toMap()); } - private void updateState(ProcessContext context, Map newState) { + private void updateState(ProcessSession session, Map newState) { try { - context.getStateManager().setState(newState, Scope.CLUSTER); + session.setState(newState, Scope.CLUSTER); } catch (IOException e) { throw new ProcessException("Page cursor update failed", e); } } + + private void clearState(ProcessContext context) { + try { + context.getStateManager().clear(Scope.CLUSTER); + } catch (IOException e) { + throw new ProcessException("Clearing state failed", e); + } + } } diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/HubSpotObjectType.java b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/HubSpotObjectType.java index afcefc81d1..a4150f1908 100644 --- a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/HubSpotObjectType.java +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/HubSpotObjectType.java @@ -18,97 +18,100 @@ package org.apache.nifi.processors.hubspot; import org.apache.nifi.components.DescribedValue; +import static org.apache.nifi.processors.hubspot.IncrementalFieldType.HS_LAST_MODIFIED_DATE; +import static org.apache.nifi.processors.hubspot.IncrementalFieldType.LAST_MODIFIED_DATE; + public enum HubSpotObjectType implements DescribedValue { COMPANIES( "/crm/v3/objects/companies", "Companies", "In HubSpot, the companies object is a standard CRM object. Individual company records can be used to store information about businesses" + - " and organizations within company properties." + " and organizations within company properties.", + HS_LAST_MODIFIED_DATE ), CONTACTS( "/crm/v3/objects/contacts", "Contacts", "In HubSpot, contacts store information about individuals. From marketing automation to smart content, the lead-specific data found in" + - " contact records helps users leverage much of HubSpot's functionality." + " contact records helps users leverage much of HubSpot's functionality.", + LAST_MODIFIED_DATE ), DEALS( "/crm/v3/objects/deals", "Deals", "In HubSpot, a deal represents an ongoing transaction that a sales team is pursuing with a contact or company. It’s tracked through" + - " pipeline stages until won or lost." - ), - FEEDBACK_SUBMISSIONS( - "/crm/v3/objects/feedback_submissions", - "Feedback Submissions", - "In HubSpot, feedback submissions are an object which stores information submitted to a feedback survey. This includes Net Promoter Score (NPS)," + - " Customer Satisfaction (CSAT), Customer Effort Score (CES) and Custom Surveys." + " pipeline stages until won or lost.", + HS_LAST_MODIFIED_DATE ), LINE_ITEMS( "/crm/v3/objects/line_items", "Line Items", "In HubSpot, line items can be thought of as a subset of products. When a product is attached to a deal, it becomes a line item. Line items can" + - " be created that are unique to an individual quote, but they will not be added to the product library." + " be created that are unique to an individual quote, but they will not be added to the product library.", + HS_LAST_MODIFIED_DATE ), PRODUCTS( "/crm/v3/objects/products", "Products", "In HubSpot, products represent the goods or services to be sold. Building a product library allows the user to quickly add products to deals," + - " generate quotes, and report on product performance." + " generate quotes, and report on product performance.", + HS_LAST_MODIFIED_DATE ), TICKETS( "/crm/v3/objects/tickets", "Tickets", - "In HubSpot, a ticket represents a customer request for help or support." + "In HubSpot, a ticket represents a customer request for help or support.", + HS_LAST_MODIFIED_DATE ), QUOTES( "/crm/v3/objects/quotes", "Quotes", - "In HubSpot, quotes are used to share pricing information with potential buyers." + "In HubSpot, quotes are used to share pricing information with potential buyers.", + HS_LAST_MODIFIED_DATE ), CALLS( "/crm/v3/objects/calls", "Calls", - "Get calls on CRM records and on the calls index page." + "Get calls on CRM records and on the calls index page.", + HS_LAST_MODIFIED_DATE ), EMAILS( "/crm/v3/objects/emails", "Emails", - "Get emails on CRM records." + "Get emails on CRM records.", + HS_LAST_MODIFIED_DATE ), MEETINGS( "/crm/v3/objects/meetings", "Meetings", - "Get meetings on CRM records." + "Get meetings on CRM records.", + HS_LAST_MODIFIED_DATE ), NOTES( "/crm/v3/objects/notes", "Notes", - "Get notes on CRM records." + "Get notes on CRM records.", + HS_LAST_MODIFIED_DATE ), TASKS( "/crm/v3/objects/tasks", "Tasks", - "Get tasks on CRM records." - ), - - OWNERS( - "/crm/v3/owners/", - "Owners", - "HubSpot uses owners to assign specific users to contacts, companies, deals, tickets, or engagements. Any HubSpot user with access to contacts" + - " can be assigned as an owner, and multiple owners can be assigned to an object by creating a custom property for this purpose." + "Get tasks on CRM records.", + HS_LAST_MODIFIED_DATE ); - private final String value; private final String displayName; private final String description; + private final IncrementalFieldType lastModifiedDateType; - HubSpotObjectType(final String value, final String displayName, final String description) { + HubSpotObjectType(String value, String displayName, String description, IncrementalFieldType lastModifiedDateType) { this.value = value; this.displayName = displayName; this.description = description; + this.lastModifiedDateType = lastModifiedDateType; } @Override @@ -125,4 +128,8 @@ public enum HubSpotObjectType implements DescribedValue { public String getDescription() { return description; } + + public IncrementalFieldType getLastModifiedDateType() { + return lastModifiedDateType; + } } diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/IncrementalFieldType.java b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/IncrementalFieldType.java new file mode 100644 index 0000000000..0c7e75e7dd --- /dev/null +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/IncrementalFieldType.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hubspot; + +public enum IncrementalFieldType { + LAST_MODIFIED_DATE("lastmodifieddate"), + HS_LAST_MODIFIED_DATE("hs_lastmodifieddate"); + + private final String value; + + IncrementalFieldType(String value) { + this.value = value; + } + + public String getValue() { + return value; + } +} diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/resources/docs/org.apache.nifi.processors.hubspot.GetHubSpot/additionalDetails.html b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/resources/docs/org.apache.nifi.processors.hubspot.GetHubSpot/additionalDetails.html index 17df046700..6f2dd67fa6 100644 --- a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/resources/docs/org.apache.nifi.processors.hubspot.GetHubSpot/additionalDetails.html +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/resources/docs/org.apache.nifi.processors.hubspot.GetHubSpot/additionalDetails.html @@ -28,121 +28,9 @@

Incremental Loading

- Some resources can be processed incrementally by NiFi. This means that only resources created or modified after the - last run - time of the processor are displayed. The processor state can be reset in the context menu. The following list shows - which - date-time fields are incremented for which resources. -

    -
  • Access -
      -
    • Access Scope: none
    • -
    • StoreFront Access Token: none
    • -
    -
  • -
  • Analytics -
      -
    • Reports: updated_at_min
    • -
    -
  • -
  • Billing -
      -
    • Application Charge: none
    • -
    • Application Credit: none
    • -
    • Recurring Application Charge: none
    • -
    -
  • -
  • Customers -
      -
    • Customers: updated_at_min
    • -
    • Customer Saved Searches: none
    • -
    -
  • -
  • Discounts -
      -
    • Price Rules: updated_at_min
    • -
    -
  • -
  • Events -
      -
    • Events: created_at_min
    • -
    -
  • -
  • Inventory -
      -
    • Inventory Levels: updated_at_min
    • -
    • Locations: none
    • -
    -
  • -
  • Marketing Event -
      -
    • Marketing Events: none
    • -
    -
  • -
  • Metafields -
      -
    • Metafields: updated_at_min
    • -
    -
  • -
  • Online Store -
      -
    • Blogs: none
    • -
    • Comment: none
    • -
    • Pages: none
    • -
    • Redirects: none
    • -
    • Script Tags: updated_at_min
    • -
    • Themes: none
    • -
    -
  • -
  • Orders -
      -
    • Abandoned Checkouts: updated_at_min
    • -
    • Draft Orders: updated_at_min
    • -
    • Orders: updated_at_min
    • -
    -
  • -
  • Plus -
      -
    • Gift Cards: none
    • -
    • Users: none
    • -
    -
  • -
  • Product -
      -
    • Collects: none
    • -
    • Custom Collections: updated_at_min
    • -
    • Products: updated_at_min
    • -
    • Smart Collections: updated_at_min
    • -
    -
  • -
  • Sales Channels -
      -
    • Collection Listings: none
    • -
    • Mobile Platform Applications: none
    • -
    • Product Listings: updated_at_min
    • -
    • Resource Feedbacks: none
    • -
    -
  • -
  • Shipping and Fulfillments -
      -
    • Carrier Services: none
    • -
    -
  • -
  • Store Properties -
      -
    • Countries: none
    • -
    • Currencies: none
    • -
    • Policies: none
    • -
    • Shipping Zones: updated_at_min
    • -
    • Shop: none
    • -
    -
  • -
  • Tender Transactions -
      -
    • Tender Transactions: processed_at_min
    • -
    -
  • -
+ HubSpot objects can be processed incrementally by NiFi. This means that only objects created or modified after the + last run time of the processor are processed. The processor state can be reset in the context menu. The incremental loading + is based on the objects last modified time.

\ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/java/org/apache/nifi/processors/hubspot/GetHubSpotTest.java b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/java/org/apache/nifi/processors/hubspot/GetHubSpotTest.java index c5c0fa7820..410a279ce7 100644 --- a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/java/org/apache/nifi/processors/hubspot/GetHubSpotTest.java +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/java/org/apache/nifi/processors/hubspot/GetHubSpotTest.java @@ -18,42 +18,50 @@ package org.apache.nifi.processors.hubspot; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import okhttp3.HttpUrl; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; -import okhttp3.mockwebserver.RecordedRequest; import org.apache.commons.io.IOUtils; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; -import org.apache.nifi.components.state.StateMap; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.apache.nifi.web.client.StandardHttpUriBuilder; -import org.apache.nifi.web.client.api.HttpUriBuilder; import org.apache.nifi.web.client.provider.service.StandardWebClientServiceProvider; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.net.URI; import java.nio.charset.StandardCharsets; -import java.util.Collections; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; +import static org.apache.nifi.processors.hubspot.GetHubSpot.CURSOR_KEY; +import static org.apache.nifi.processors.hubspot.GetHubSpot.END_INCREMENTAL_KEY; +import static org.apache.nifi.processors.hubspot.GetHubSpot.START_INCREMENTAL_KEY; +import static org.apache.nifi.processors.hubspot.HubSpotObjectType.COMPANIES; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; class GetHubSpotTest { - public static final String BASE_URL = "/test/hubspot"; - public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static final String RESPONSE_WITHOUT_PAGING_CURSOR_JSON = "response-without-paging-cursor.json"; - public static final String RESPONSE_WITH_PAGING_CURSOR_JSON = "response-with-paging-cursor.json"; + private static final long TEST_EPOCH_TIME = 1662665787; + private static final String BASE_URL = "/test/hubspot"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static MockWebServer server; private static HttpUrl baseUrl; - private TestRunner runner; @BeforeEach @@ -63,7 +71,7 @@ class GetHubSpotTest { baseUrl = server.url(BASE_URL); final StandardWebClientServiceProvider standardWebClientServiceProvider = new StandardWebClientServiceProvider(); - final MockGetHubSpot mockGetHubSpot = new MockGetHubSpot(); + final MockGetHubSpot mockGetHubSpot = new MockGetHubSpot(TEST_EPOCH_TIME); runner = TestRunners.newTestRunner(mockGetHubSpot); runner.addControllerService("standardWebClientServiceProvider", standardWebClientServiceProvider); @@ -71,8 +79,7 @@ class GetHubSpotTest { runner.setProperty(GetHubSpot.WEB_CLIENT_SERVICE_PROVIDER, standardWebClientServiceProvider.getIdentifier()); runner.setProperty(GetHubSpot.ACCESS_TOKEN, "testToken"); - runner.setProperty(GetHubSpot.OBJECT_TYPE, HubSpotObjectType.COMPANIES.getValue()); - runner.setProperty(GetHubSpot.RESULT_LIMIT, "1"); + runner.setProperty(GetHubSpot.OBJECT_TYPE, COMPANIES.getValue()); } @AfterEach @@ -83,78 +90,161 @@ class GetHubSpotTest { } } - @Test - void testLimitIsAddedToUrl() throws InterruptedException, IOException { - - final String response = getResourceAsString(RESPONSE_WITHOUT_PAGING_CURSOR_JSON); - server.enqueue(new MockResponse().setResponseCode(200).setBody(response)); - - runner.run(1); - - RecordedRequest request = server.takeRequest(); - assertEquals(BASE_URL + "?limit=1", request.getPath()); - } - - @Test - void testPageCursorIsAddedToUrlFromState() throws InterruptedException, IOException { - - final String response = getResourceAsString(RESPONSE_WITHOUT_PAGING_CURSOR_JSON); - server.enqueue(new MockResponse().setBody(response)); - - runner.getStateManager().setState(Collections.singletonMap(HubSpotObjectType.COMPANIES.getValue(), "12345"), Scope.CLUSTER); - - runner.run(1); - - RecordedRequest request = server.takeRequest(); - assertEquals(BASE_URL + "?limit=1&after=12345", request.getPath()); - } - @Test void testFlowFileContainsResultsArray() throws IOException { - final String response = getResourceAsString(RESPONSE_WITH_PAGING_CURSOR_JSON); + final String response = getResourceAsString("simple_response.json"); server.enqueue(new MockResponse().setBody(response)); runner.run(1); - final List flowFile = runner.getFlowFilesForRelationship(GetHubSpot.REL_SUCCESS); + final List flowFiles = runner.getFlowFilesForRelationship(GetHubSpot.REL_SUCCESS); final String expectedFlowFileContent = getResourceAsString("expected_flowfile_content.json"); final JsonNode expectedJsonNode = OBJECT_MAPPER.readTree(expectedFlowFileContent); - final JsonNode actualJsonNode = OBJECT_MAPPER.readTree(flowFile.get(0).getContent()); + final JsonNode actualJsonNode = OBJECT_MAPPER.readTree(flowFiles.get(0).getContent()); assertEquals(expectedJsonNode, actualJsonNode); } @Test - void testStateIsStoredWhenPagingCursorFound() throws IOException { - - final String response = getResourceAsString(RESPONSE_WITH_PAGING_CURSOR_JSON); - final String expectedPagingCursor = OBJECT_MAPPER.readTree(response) - .path("paging") - .path("next") - .path("after") - .asText(); + void testFlowFileNotCreatedWhenZeroResult() throws IOException { + final String response = getResourceAsString("zero_result.json"); server.enqueue(new MockResponse().setBody(response)); runner.run(1); - final StateMap state = runner.getStateManager().getState(Scope.CLUSTER); - final String actualPagingCursor = state.get(HubSpotObjectType.COMPANIES.getValue()); + final List flowFiles = runner.getFlowFilesForRelationship(GetHubSpot.REL_SUCCESS); - assertEquals(expectedPagingCursor, actualPagingCursor); + assertTrue(flowFiles.isEmpty()); } + @Test + void testExceptionIsThrownWhenTooManyRequest() throws IOException { + + final String response = getResourceAsString("zero_result.json"); + server.enqueue(new MockResponse().setBody(response).setResponseCode(429)); + + assertThrows(AssertionError.class, () -> runner.run(1)); + } + + @Test + void testSimpleIncrementalLoadingFilter() throws IOException, InterruptedException { + final String response = getResourceAsString("simple_response.json"); + server.enqueue(new MockResponse().setBody(response)); + + final String limit = "2"; + final int defaultDelay = 3000; + final String endTime = String.valueOf(Instant.now().toEpochMilli()); + final Map stateMap = new HashMap<>(); + stateMap.put(END_INCREMENTAL_KEY, endTime); + + runner.getStateManager().setState(stateMap, Scope.CLUSTER); + runner.setProperty(GetHubSpot.IS_INCREMENTAL, "true"); + runner.setProperty(GetHubSpot.RESULT_LIMIT, limit); + + runner.run(1); + + final String requestBodyString = new String(server.takeRequest().getBody().readByteArray()); + + final ObjectNode startTimeNode = OBJECT_MAPPER.createObjectNode(); + startTimeNode.put("propertyName", "hs_lastmodifieddate"); + startTimeNode.put("operator", "GTE"); + startTimeNode.put("value", endTime); + + final ObjectNode endTimeNode = OBJECT_MAPPER.createObjectNode(); + endTimeNode.put("propertyName", "hs_lastmodifieddate"); + endTimeNode.put("operator", "LT"); + endTimeNode.put("value", String.valueOf(TEST_EPOCH_TIME - defaultDelay)); + + final ArrayNode filtersNode = OBJECT_MAPPER.createArrayNode(); + filtersNode.add(startTimeNode); + filtersNode.add(endTimeNode); + + final ObjectNode root = OBJECT_MAPPER.createObjectNode(); + root.put("limit", limit); + root.set("filters", filtersNode); + + final String expectedJsonString = root.toString(); + + assertEquals(OBJECT_MAPPER.readTree(expectedJsonString), OBJECT_MAPPER.readTree(requestBodyString)); + } + + @Test + void testIncrementalLoadingFilterWithPagingCursor() throws IOException, InterruptedException { + final String response = getResourceAsString("simple_response.json"); + server.enqueue(new MockResponse().setBody(response)); + + final String limit = "2"; + final String after = "nextPage"; + final String objectType = COMPANIES.getValue(); + final String cursorKey = String.format(CURSOR_KEY, objectType); + final Instant now = Instant.now(); + final String startTime = String.valueOf(now.toEpochMilli()); + final String endTime = String.valueOf(now.plus(2, ChronoUnit.MINUTES).toEpochMilli()); + final Map stateMap = new HashMap<>(); + stateMap.put(cursorKey, after); + stateMap.put(START_INCREMENTAL_KEY, startTime); + stateMap.put(END_INCREMENTAL_KEY, endTime); + + runner.getStateManager().setState(stateMap, Scope.CLUSTER); + runner.setProperty(GetHubSpot.IS_INCREMENTAL, "true"); + runner.setProperty(GetHubSpot.RESULT_LIMIT, limit); + + runner.run(1); + + final String requestBodyString = new String(server.takeRequest().getBody().readByteArray()); + + final ObjectNode startTimeNode = OBJECT_MAPPER.createObjectNode(); + startTimeNode.put("propertyName", "hs_lastmodifieddate"); + startTimeNode.put("operator", "GTE"); + startTimeNode.put("value", startTime); + + final ObjectNode endTimeNode = OBJECT_MAPPER.createObjectNode(); + endTimeNode.put("propertyName", "hs_lastmodifieddate"); + endTimeNode.put("operator", "LT"); + endTimeNode.put("value", endTime); + + final ArrayNode filtersNode = OBJECT_MAPPER.createArrayNode(); + filtersNode.add(startTimeNode); + filtersNode.add(endTimeNode); + + final ObjectNode root = OBJECT_MAPPER.createObjectNode(); + root.put("limit", limit); + root.put("after", after); + root.set("filters", filtersNode); + + final String expectedJsonString = root.toString(); + + assertEquals(OBJECT_MAPPER.readTree(expectedJsonString), OBJECT_MAPPER.readTree(requestBodyString)); + } static class MockGetHubSpot extends GetHubSpot { + + private final long currentEpochTime; + + public MockGetHubSpot(long currentEpochTime) { + this.currentEpochTime = currentEpochTime; + } + @Override - HttpUriBuilder getBaseUri(ProcessContext context) { + URI getBaseUri(ProcessContext context) { return new StandardHttpUriBuilder() .scheme(baseUrl.scheme()) .host(baseUrl.host()) .port(baseUrl.port()) - .encodedPath(baseUrl.encodedPath()); + .encodedPath(baseUrl.encodedPath()) + .build(); + } + + @Override + long getCurrentEpochTime() { + return currentEpochTime; + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { } } diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/expected_flowfile_content.json b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/expected_flowfile_content.json index edd489bce0..bdc053f519 100644 --- a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/expected_flowfile_content.json +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/expected_flowfile_content.json @@ -11,5 +11,18 @@ "createdAt": "2022-07-19T13:18:22.646Z", "updatedAt": "2022-07-19T13:18:33.337Z", "archived": false + }, + { + "id": "5935019230", + "properties": { + "createdate": "2022-07-24T19:35:59.337Z", + "domain": "algida.com", + "hs_lastmodifieddate": "2022-08-31T20:50:04.438Z", + "hs_object_id": "5935019230", + "name": "Algida" + }, + "createdAt": "2022-07-24T19:35:59.337Z", + "updatedAt": "2022-08-31T20:50:04.438Z", + "archived": false } ] diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/expected_request_body.json b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/expected_request_body.json new file mode 100644 index 0000000000..02fb4e7002 --- /dev/null +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/expected_request_body.json @@ -0,0 +1,14 @@ +{ + "filters": [ + { + "propertyName": "hs_lastmodifieddate", + "operator": "LT", + "value": "1662665562616" + }, + { + "propertyName": "hs_lastmodifieddate", + "operator": "GT", + "value": "1662665562605" + } + ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/response-without-paging-cursor.json b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/response-without-paging-cursor.json deleted file mode 100644 index fefdf05f77..0000000000 --- a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/response-without-paging-cursor.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "results": [ - { - "id": "5918809044", - "properties": { - "createdate": "2022-07-19T13:18:22.646Z", - "domain": "hubspot.com", - "hs_lastmodifieddate": "2022-07-19T13:18:33.337Z", - "hs_object_id": "5918809044", - "name": "Hubspot, Inc." - }, - "createdAt": "2022-07-19T13:18:22.646Z", - "updatedAt": "2022-07-19T13:18:33.337Z", - "archived": false - } - ], - "paging": { - "next": { - "after": "5918809045", - "link": "https://api.hubapi.com/crm/v3/objects/companies?limit=1&after=5918809045" - } - } -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/response-with-paging-cursor.json b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/simple_response.json similarity index 50% rename from nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/response-with-paging-cursor.json rename to nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/simple_response.json index fefdf05f77..be1ef763d3 100644 --- a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/response-with-paging-cursor.json +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/simple_response.json @@ -1,4 +1,5 @@ { + "total": 8, "results": [ { "id": "5918809044", @@ -12,12 +13,24 @@ "createdAt": "2022-07-19T13:18:22.646Z", "updatedAt": "2022-07-19T13:18:33.337Z", "archived": false + }, + { + "id": "5935019230", + "properties": { + "createdate": "2022-07-24T19:35:59.337Z", + "domain": "algida.com", + "hs_lastmodifieddate": "2022-08-31T20:50:04.438Z", + "hs_object_id": "5935019230", + "name": "Algida" + }, + "createdAt": "2022-07-24T19:35:59.337Z", + "updatedAt": "2022-08-31T20:50:04.438Z", + "archived": false } ], "paging": { "next": { - "after": "5918809045", - "link": "https://api.hubapi.com/crm/v3/objects/companies?limit=1&after=5918809045" + "after": "2" } } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/zero_result.json b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/zero_result.json new file mode 100644 index 0000000000..3fe9070b4e --- /dev/null +++ b/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/test/resources/zero_result.json @@ -0,0 +1,4 @@ +{ + "total": 0, + "results": [] +} \ No newline at end of file