diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index cc2b76a2cc..850c89f069 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -1013,23 +1013,6 @@ language governing permissions and limitations under the License. -->
-
- include-solr
-
- false
-
- allProfiles
-
-
-
-
- org.apache.nifi
- nifi-solr-nar
- 2.0.0-SNAPSHOT
- nar
-
-
- include-iotdb
diff --git a/nifi-code-coverage/pom.xml b/nifi-code-coverage/pom.xml
index 4d54c0fd59..a23b1b330f 100644
--- a/nifi-code-coverage/pom.xml
+++ b/nifi-code-coverage/pom.xml
@@ -1425,11 +1425,6 @@
nifi-twitter-processors2.0.0-SNAPSHOT
-
- org.apache.nifi
- nifi-solr-processors
- 2.0.0-SNAPSHOT
- org.apache.nifinifi-splunk-processors
diff --git a/nifi-docs/src/main/asciidoc/developer-guide.adoc b/nifi-docs/src/main/asciidoc/developer-guide.adoc
index cbf9b30a76..5f6cc76a27 100644
--- a/nifi-docs/src/main/asciidoc/developer-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/developer-guide.adoc
@@ -2703,7 +2703,6 @@ deprecationLogger.warn(
| Apache HBase Bundle | include-hbase | Adds support for Apache HBase
| Apache IoTDB Bundle | include-iotdb | Adds support for Apache IoTDB
| Apache Kudu Bundle | include-kudu | Adds support for Apache Kudu
-| Apache Solr Bundle | include-solr | Adds support for Apache Solr
| ASN.1 Support | include-asn1 | Adds support for ASN.1
| Contribution Check | contrib-check | Runs various quality checks that are required to be accepted before a contribution can be accepted into the core NiFi code base.
| Graph Database Bundle | include-graph | Adds support for various common graph database scenarios. Support is currently for https://neo4j.com/developer/cypher[Cypher] and https://tinkerpop.apache.org/gremlin.html[Gremlin]-compatible databases such as Neo4J and JanusGraph. Includes controller services that provide driver functionality and a suite of processors for ingestion and querying.
diff --git a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-nar/pom.xml b/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-nar/pom.xml
deleted file mode 100644
index e2fd39eb9f..0000000000
--- a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-nar/pom.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-
-
-
- 4.0.0
-
-
- org.apache.nifi
- nifi-solr-bundle
- 2.0.0-SNAPSHOT
-
-
- nifi-solr-nar
- nar
-
-
-
- org.apache.nifi
- nifi-solr-processors
- 2.0.0-SNAPSHOT
-
-
- org.apache.nifi
- nifi-standard-shared-nar
- 2.0.0-SNAPSHOT
- nar
-
-
-
diff --git a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-nar/src/main/resources/META-INF/LICENSE b/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-nar/src/main/resources/META-INF/LICENSE
deleted file mode 100644
index 09608142a2..0000000000
--- a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-nar/src/main/resources/META-INF/LICENSE
+++ /dev/null
@@ -1,236 +0,0 @@
-
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
- APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "[]"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
- Copyright [yyyy] [name of copyright owner]
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
-APACHE NIFI SUBCOMPONENTS:
-
-The Apache NiFi project contains subcomponents with separate copyright
-notices and license terms. Your use of the source code for the these
-subcomponents is subject to the terms and conditions of the following
-licenses.
-
-The binary distribution of this product bundles 'Woodstox StAX 2 API' which is
- "licensed under standard BSD license"
-
- The binary distribution of this product bundles 'Bouncy Castle JDK 1.5'
- under an MIT style license.
-
- Copyright (c) 2000 - 2015 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org)
-
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"), to deal
- in the Software without restriction, including without limitation the rights
- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- copies of the Software, and to permit persons to whom the Software is
- furnished to do so, subject to the following conditions:
-
- The above copyright notice and this permission notice shall be included in
- all copies or substantial portions of the Software.
-
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- THE SOFTWARE.
-
diff --git a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-nar/src/main/resources/META-INF/NOTICE b/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-nar/src/main/resources/META-INF/NOTICE
deleted file mode 100644
index e0e26ffe61..0000000000
--- a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-nar/src/main/resources/META-INF/NOTICE
+++ /dev/null
@@ -1,91 +0,0 @@
-nifi-solr-nar
-Copyright 2014-2024 The Apache Software Foundation
-
-This product includes software developed at
-The Apache Software Foundation (http://www.apache.org/).
-
-******************
-Apache Software License v2
-******************
-
-The following binary components are provided under the Apache Software License v2
-
- (ASLv2) Apache Commons IO
- The following NOTICE information applies:
- Apache Commons IO
- Copyright 2002-2016 The Apache Software Foundation
-
- (ASLv2) Apache HttpComponents
- The following NOTICE information applies:
- Apache HttpClient
- Copyright 1999-2015 The Apache Software Foundation
-
- Apache HttpCore
- Copyright 2005-2014 The Apache Software Foundation
-
- Apache HttpMime
- Copyright 1999-2013 The Apache Software Foundation
-
- This project contains annotations derived from JCIP-ANNOTATIONS
- Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net
-
- (ASLv2) Apache Commons Lang
- The following NOTICE information applies:
- Apache Commons Lang
- Copyright 2001-2015 The Apache Software Foundation
-
- This product includes software from the Spring Framework,
- under the Apache License 2.0 (see: StringUtils.containsWhitespace())
-
- (ASLv2) Apache Commons Codec
- The following NOTICE information applies:
- Apache Commons Codec
- Copyright 2002-2014 The Apache Software Foundation
-
- src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
- contains test data from http://aspell.net/test/orig/batch0.tab.
- Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
-
- ===============================================================================
-
- The content of package org.apache.commons.codec.language.bm has been translated
- from the original php source code available at http://stevemorse.org/phoneticinfo.htm
- with permission from the original authors.
- Original source copyright:
- Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
-
- (ASLv2) Apache ZooKeeper
- The following NOTICE information applies:
- Apache ZooKeeper
- Copyright 2009-2012 The Apache Software Foundation
-
- (ASLv2) Woodstox Core ASL
- The following NOTICE information applies:
- This product currently only contains code developed by authors
- of specific components, as identified by the source code files.
-
- Since product implements StAX API, it has dependencies to StAX API
- classes.
-
- (ASLv2) Jackson JSON processor
- The following NOTICE information applies:
- # Jackson JSON processor
-
- Jackson is a high-performance, Free/Open Source JSON processing library.
- It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
- been in development since 2007.
- It is currently developed by a community of developers, as well as supported
- commercially by FasterXML.com.
-
- ## Licensing
-
- Jackson core and extension components may licensed under different licenses.
- To find the details that apply to this artifact see the accompanying LICENSE file.
- For more information, including possible other licensing options, contact
- FasterXML.com (http://fasterxml.com).
-
- ## Credits
-
- A list of contributors may be found from CREDITS file, which is included
- in some artifacts (usually source distributions); but is always available
- from the source code management (SCM) system project uses.
\ No newline at end of file
diff --git a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml b/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
deleted file mode 100755
index ca37fe4275..0000000000
--- a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml
+++ /dev/null
@@ -1,106 +0,0 @@
-
-
-
- 4.0.0
-
- org.apache.nifi
- nifi-solr-bundle
- 2.0.0-SNAPSHOT
-
- nifi-solr-processors
- jar
-
-
- org.apache.nifi
- nifi-record-serialization-service-api
-
-
- org.apache.solr
- solr-solrj
- ${solr.version}
-
-
- commons-io
- commons-io
-
-
-
- org.apache.nifi
- nifi-record
-
-
- org.apache.nifi
- nifi-utils
-
-
- org.apache.commons
- commons-lang3
-
-
-
- org.apache.nifi
- nifi-security-kerberos
- 2.0.0-SNAPSHOT
-
-
- org.apache.nifi
- nifi-security-kerberos-api
-
-
- org.apache.nifi
- nifi-ssl-context-service-api
-
-
- org.apache.nifi
- nifi-kerberos-credentials-service-api
-
-
- org.apache.nifi
- nifi-kerberos-user-service-api
-
-
- com.google.code.gson
- gson
-
-
-
-
-
- org.apache.rat
- apache-rat-plugin
-
-
- src/test/resources/solr/solr.xml
- src/test/resources/solr/testCollection/core.properties
- src/test/resources/solr/testCollection/conf/_rest_managed.json
- src/test/resources/solr/testCollection/conf/protowords.txt
- src/test/resources/solr/testCollection/conf/schema.xml
- src/test/resources/solr/testCollection/conf/solrconfig.xml
- src/test/resources/solr/testCollection/conf/synonyms.txt
- src/test/resources/solr/testCollection/conf/lang/stopwords_en.txt
- src/test/resources/testdata/test-csv-multiple-docs.csv
- src/test/resources/testdata/test-custom-json-single-doc.json
- src/test/resources/testdata/test-solr-json-multiple-docs.json
- src/test/resources/testdata/test-xml-multiple-docs.xml
- src/test/resources/log4j.properties
- src/test/resources/jaas-client.conf
- src/test/resources/test-schema.avsc
-
-
-
-
-
-
diff --git a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java b/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
deleted file mode 100644
index 22252ad380..0000000000
--- a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.solr;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.Stateful;
-import org.apache.nifi.annotation.configuration.DefaultSchedule;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.RecordSet;
-import org.apache.nifi.util.StopWatch;
-import org.apache.nifi.util.StringUtils;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.common.SolrDocument;
-import org.apache.solr.common.SolrDocumentList;
-import org.apache.solr.common.params.CursorMarkParams;
-
-import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
-import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
-import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
-import static org.apache.nifi.processors.solr.SolrUtils.KERBEROS_USER_SERVICE;
-import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
-import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
-
-@Tags({"Apache", "Solr", "Get", "Pull", "Records"})
-@InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer")
-@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.")
-@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
-public class GetSolr extends SolrProcessor {
-
- public static final String STATE_MANAGER_FILTER = "stateManager_filter";
- public static final String STATE_MANAGER_CURSOR_MARK = "stateManager_cursorMark";
- public static final AllowableValue MODE_XML = new AllowableValue("XML");
- public static final AllowableValue MODE_REC = new AllowableValue("Records");
-
- public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor
- .Builder().name("Return Type")
- .displayName("Return Type")
- .description("Write Solr documents to FlowFiles as XML or using a Record Writer")
- .required(true)
- .allowableValues(MODE_XML, MODE_REC)
- .defaultValue(MODE_XML.getValue())
- .build();
-
- public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor
- .Builder().name("Solr Query")
- .displayName("Solr Query")
- .description("A query to execute against Solr")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor DATE_FIELD = new PropertyDescriptor
- .Builder().name("Date Field")
- .displayName("Date Field")
- .description("The name of a date field in Solr used to filter results")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor DATE_FILTER = new PropertyDescriptor
- .Builder().name("Initial Date Filter")
- .displayName("Initial Date Filter")
- .description("Date value to filter results. Documents with an earlier date will not be fetched. The format has to correspond to the date pattern of Solr 'YYYY-MM-DDThh:mm:ssZ'")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor RETURN_FIELDS = new PropertyDescriptor
- .Builder().name("Return Fields")
- .displayName("Return Fields")
- .description("Comma-separated list of field names to return")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor
- .Builder().name("Batch Size")
- .displayName("Batch Size")
- .description("Number of rows per Solr query")
- .required(true)
- .addValidator(StandardValidators.INTEGER_VALIDATOR)
- .defaultValue("100")
- .build();
-
- public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("The results of querying Solr")
- .build();
-
- private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US);
-
- private final AtomicBoolean configurationRestored = new AtomicBoolean(false);
- private final AtomicBoolean clearState = new AtomicBoolean(false);
- private final AtomicBoolean dateFieldNotInSpecifiedFieldsList = new AtomicBoolean(false);
- private volatile String id_field = null;
-
- private Set relationships;
- private List descriptors;
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- super.init(context);
-
- this.descriptors = List.of(
- SOLR_TYPE,
- SOLR_LOCATION,
- COLLECTION,
- RETURN_TYPE,
- RECORD_WRITER,
- SOLR_QUERY,
- DATE_FIELD,
- DATE_FILTER,
- RETURN_FIELDS,
- BATCH_SIZE,
- KERBEROS_USER_SERVICE,
- BASIC_USERNAME,
- BASIC_PASSWORD,
- SSL_CONTEXT_SERVICE,
- SOLR_SOCKET_TIMEOUT,
- SOLR_CONNECTION_TIMEOUT,
- SOLR_MAX_CONNECTIONS_PER_HOST
- );
-
- this.relationships = Set.of(REL_SUCCESS);
- }
-
- @Override
- public Set getRelationships() {
- return this.relationships;
- }
-
- @Override
- public List getSupportedPropertyDescriptors() {
- return this.descriptors;
- }
-
- final static Set propertyNamesForActivatingClearState = new HashSet<>();
- static {
- propertyNamesForActivatingClearState.add(SOLR_TYPE.getName());
- propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName());
- propertyNamesForActivatingClearState.add(COLLECTION.getName());
- propertyNamesForActivatingClearState.add(SOLR_QUERY.getName());
- propertyNamesForActivatingClearState.add(DATE_FIELD.getName());
- propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName());
- propertyNamesForActivatingClearState.add(DATE_FILTER.getName());
- }
-
- @Override
- public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
- if (configurationRestored.get() && propertyNamesForActivatingClearState.contains(descriptor.getName()))
- clearState.set(true);
- }
-
- @OnConfigurationRestored
- public void onConfigurationRestored() {
- configurationRestored.set(true);
- }
-
- @OnScheduled
- public void clearState(final ProcessContext context) throws IOException {
- if (clearState.getAndSet(false))
- context.getStateManager().clear(Scope.CLUSTER);
-
- final Map stateMap = new HashMap<>(context.getStateManager().getState(Scope.CLUSTER).toMap());
- final AtomicBoolean stateMapHasChanged = new AtomicBoolean(false);
-
- if (stateMap.get(STATE_MANAGER_CURSOR_MARK) == null) {
- stateMap.put(STATE_MANAGER_CURSOR_MARK, "*");
- stateMapHasChanged.set(true);
- }
-
- if (stateMap.get(STATE_MANAGER_FILTER) == null) {
- final String initialDate = context.getProperty(DATE_FILTER).getValue();
- if (StringUtils.isBlank(initialDate))
- stateMap.put(STATE_MANAGER_FILTER, "*");
- else
- stateMap.put(STATE_MANAGER_FILTER, initialDate);
- stateMapHasChanged.set(true);
- }
-
- if (stateMapHasChanged.get()) {
- context.getStateManager().setState(stateMap, Scope.CLUSTER);
- }
-
- id_field = null;
- }
-
- @Override
- protected final Collection additionalCustomValidation(ValidationContext context) {
- final Collection problems = new ArrayList<>();
-
- if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue())
- && !context.getProperty(RECORD_WRITER).isSet()) {
- problems.add(new ValidationResult.Builder()
- .explanation("for writing records a record writer has to be configured")
- .valid(false)
- .subject("Record writer check")
- .build());
- }
- return problems;
- }
-
- private String getFieldNameOfUniqueKey() {
- final SolrQuery solrQuery = new SolrQuery();
- try {
- solrQuery.setRequestHandler("/schema/uniquekey");
- final QueryRequest req = new QueryRequest(solrQuery);
- if (isBasicAuthEnabled()) {
- req.setBasicAuthCredentials(getUsername(), getPassword());
- }
-
- return(req.process(getSolrClient()).getResponse().get("uniqueKey").toString());
- } catch (SolrServerException | IOException e) {
- getLogger().error("Solr query to retrieve uniqueKey-field failed due to {}", solrQuery.toString(), e, e);
- throw new ProcessException(e);
- }
- }
-
- @Override
- public void doOnTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-
- final ComponentLog logger = getLogger();
- final AtomicBoolean continuePaging = new AtomicBoolean(true);
- final SolrQuery solrQuery = new SolrQuery();
-
- try {
- if (id_field == null) {
- id_field = getFieldNameOfUniqueKey();
- }
-
- final String dateField = context.getProperty(DATE_FIELD).getValue();
-
- final Map stateMap = new HashMap<>(session.getState(Scope.CLUSTER).toMap());
-
- solrQuery.setQuery("*:*");
- final String query = context.getProperty(SOLR_QUERY).getValue();
- if (!StringUtils.isBlank(query) && !query.equals("*:*")) {
- solrQuery.addFilterQuery(query);
- }
- final String automatedFilterQuery = "%s:[%s TO *]".formatted(dateField, stateMap.get(STATE_MANAGER_FILTER));
- solrQuery.addFilterQuery(automatedFilterQuery);
-
- final List fieldList = new ArrayList<>();
- final String returnFields = context.getProperty(RETURN_FIELDS).getValue();
- if (!StringUtils.isBlank(returnFields)) {
- fieldList.addAll(Arrays.asList(returnFields.trim().split(",")));
- if (!fieldList.contains(dateField)) {
- fieldList.add(dateField);
- dateFieldNotInSpecifiedFieldsList.set(true);
- }
- for (String returnField : fieldList) {
- solrQuery.addField(returnField.trim());
- }
- }
-
- solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, stateMap.get(STATE_MANAGER_CURSOR_MARK));
- solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger());
-
- final String sortClause = "%s asc, %s asc".formatted(dateField, id_field);
- solrQuery.setParam("sort", sortClause);
-
- while (continuePaging.get()) {
- StopWatch timer = new StopWatch(true);
-
- final QueryRequest req = new QueryRequest(solrQuery);
- if (isBasicAuthEnabled()) {
- req.setBasicAuthCredentials(getUsername(), getPassword());
- }
-
- logger.debug(solrQuery.toQueryString());
- final QueryResponse response = req.process(getSolrClient());
- final SolrDocumentList documentList = response.getResults();
-
- if (response.getResults().size() > 0) {
- final SolrDocument lastSolrDocument = documentList.get(response.getResults().size()-1);
- final Object dateObject = lastSolrDocument.get(dateField);
- final String latestDateValue = getDateFormatted(dateObject);
- final String newCursorMark = response.getNextCursorMark();
-
- solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, newCursorMark);
- stateMap.put(STATE_MANAGER_CURSOR_MARK, newCursorMark);
- stateMap.put(STATE_MANAGER_FILTER, latestDateValue);
-
- FlowFile flowFile = session.create();
- flowFile = session.putAttribute(flowFile, "solrQuery", solrQuery.toString());
-
- if (context.getProperty(RETURN_TYPE).getValue().equals(MODE_XML.getValue())){
- if (dateFieldNotInSpecifiedFieldsList.get()) {
- for (SolrDocument doc : response.getResults()) {
- doc.removeFields(dateField);
- }
- }
- flowFile = session.write(flowFile, SolrUtils.getOutputStreamCallbackToTransformSolrResponseToXml(response));
- flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/xml");
-
- } else {
- final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).evaluateAttributeExpressions()
- .asControllerService(RecordSetWriterFactory.class);
- final RecordSchema schema = writerFactory.getSchema(null, null);
- final RecordSet recordSet = SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema);
- final StringBuffer mimeType = new StringBuffer();
- final FlowFile flowFileRef = flowFile;
- flowFile = session.write(flowFile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws IOException {
- try {
- final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, flowFileRef);
- writer.write(recordSet);
- writer.flush();
- mimeType.append(writer.getMimeType());
- } catch (SchemaNotFoundException e) {
- throw new ProcessException("Could not parse Solr response", e);
- }
- }
- });
-
- flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), mimeType.toString());
- }
-
- timer.stop();
- StringBuilder transitUri = new StringBuilder("solr://");
- transitUri.append(getSolrLocation());
- if (getSolrLocation().equals(SolrUtils.SOLR_TYPE_CLOUD.getValue())) {
- transitUri.append(":").append(context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue());
- }
- final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
- session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration);
-
- session.transfer(flowFile, REL_SUCCESS);
- }
- continuePaging.set(response.getResults().size() == Integer.parseInt(context.getProperty(BATCH_SIZE).getValue()));
- }
-
- session.setState(stateMap, Scope.CLUSTER);
- } catch (final SolrServerException | SchemaNotFoundException | IOException e) {
- context.yield();
- session.rollback();
- logger.error("Failed to execute query {} due to {}", solrQuery.toString(), e, e);
- throw new ProcessException(e);
- } catch (final Throwable t) {
- context.yield();
- session.rollback();
- logger.error("Failed to execute query {} due to {}", solrQuery.toString(), t, t);
- throw t;
- }
- }
-
- private String getDateFormatted(final Object dateObject) {
- final String formatted;
-
- if (dateObject instanceof Date date) {
- final OffsetDateTime dateTime = date.toInstant().atOffset(ZoneOffset.UTC);
- formatted = DATE_TIME_FORMATTER.format(dateTime);
- } else if (dateObject == null) {
- formatted = null;
- } else {
- throw new IllegalArgumentException("Date Object Type [%s] not supported".formatted(dateObject.getClass()));
- }
-
- return formatted;
- }
-}
diff --git a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java b/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
deleted file mode 100644
index 8bf26afe98..0000000000
--- a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.solr;
-
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.util.StopWatch;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
-import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.MultiMapSolrParams;
-import org.apache.solr.common.util.ContentStreamBase;
-
-import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
-import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
-import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
-import static org.apache.nifi.processors.solr.SolrUtils.KERBEROS_USER_SERVICE;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
-import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
-
-@Tags({"Apache", "Solr", "Put", "Send"})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to Solr")
-@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
- description="These parameters will be passed to Solr on the request")
-public class PutSolrContentStream extends SolrProcessor {
-
- public static final PropertyDescriptor CONTENT_STREAM_PATH = new PropertyDescriptor
- .Builder().name("Content Stream Path")
- .description("The path in Solr to post the ContentStream")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .defaultValue("/update/json/docs")
- .build();
-
- public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor
- .Builder().name("Content-Type")
- .description("Content-Type being sent to Solr")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .defaultValue("application/json")
- .build();
-
- public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor
- .Builder().name("Commit Within")
- .description("The number of milliseconds before the given update is committed")
- .required(false)
- .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .defaultValue("5000")
- .build();
-
- public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("The original FlowFile")
- .build();
-
- public static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("FlowFiles that failed for any reason other than Solr being unreachable")
- .build();
-
- public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
- .name("connection_failure")
- .description("FlowFiles that failed because Solr is unreachable")
- .build();
-
- public static final String COLLECTION_PARAM_NAME = "collection";
- public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
-
- private Set relationships;
- private List descriptors;
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- super.init(context);
-
- final List descriptors = new ArrayList<>();
- descriptors.add(SOLR_TYPE);
- descriptors.add(SOLR_LOCATION);
- descriptors.add(COLLECTION);
- descriptors.add(CONTENT_STREAM_PATH);
- descriptors.add(CONTENT_TYPE);
- descriptors.add(COMMIT_WITHIN);
- descriptors.add(KERBEROS_USER_SERVICE);
- descriptors.add(BASIC_USERNAME);
- descriptors.add(BASIC_PASSWORD);
- descriptors.add(SSL_CONTEXT_SERVICE);
- descriptors.add(SOLR_SOCKET_TIMEOUT);
- descriptors.add(SOLR_CONNECTION_TIMEOUT);
- descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
- this.descriptors = Collections.unmodifiableList(descriptors);
-
- final Set relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- relationships.add(REL_FAILURE);
- relationships.add(REL_CONNECTION_FAILURE);
- this.relationships = Collections.unmodifiableSet(relationships);
- }
-
- @Override
- public Set getRelationships() {
- return this.relationships;
- }
-
- @Override
- public List getSupportedPropertyDescriptors() {
- return this.descriptors;
- }
-
- @Override
- protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
- return new PropertyDescriptor.Builder()
- .description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter")
- .name(propertyDescriptorName)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .dynamic(true)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
- }
-
- @Override
- protected void doOnTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- final FlowFile flowFile = session.get();
- if ( flowFile == null ) {
- return;
- }
-
- final AtomicReference error = new AtomicReference<>(null);
- final AtomicReference connectionError = new AtomicReference<>(null);
-
- final boolean isSolrCloud = SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue());
- final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
- final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
- final String contentStreamPath = context.getProperty(CONTENT_STREAM_PATH).evaluateAttributeExpressions(flowFile).getValue();
- final MultiMapSolrParams requestParams = new MultiMapSolrParams(SolrUtils.getRequestParams(context, flowFile));
-
- StopWatch timer = new StopWatch(true);
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- ContentStreamUpdateRequest request = new ContentStreamUpdateRequest(contentStreamPath);
- request.setParams(new ModifiableSolrParams());
-
- // add the extra params, don't use 'set' in case of repeating params
- Iterator paramNames = requestParams.getParameterNamesIterator();
- while (paramNames.hasNext()) {
- String paramName = paramNames.next();
- for (String paramValue : requestParams.getParams(paramName)) {
- request.getParams().add(paramName, paramValue);
- }
- }
-
- // specify the collection for SolrCloud
- if (isSolrCloud) {
- request.setParam(COLLECTION_PARAM_NAME, collection);
- }
-
- if (commitWithin != null && commitWithin > 0) {
- request.setParam(COMMIT_WITHIN_PARAM_NAME, commitWithin.toString());
- }
-
- // if a username and password were provided then pass them for basic auth
- if (isBasicAuthEnabled()) {
- request.setBasicAuthCredentials(getUsername(), getPassword());
- }
-
- try (final BufferedInputStream bufferedIn = new BufferedInputStream(in)) {
- // add the FlowFile's content on the UpdateRequest
- request.addContentStream(new ContentStreamBase() {
- @Override
- public InputStream getStream() throws IOException {
- return bufferedIn;
- }
-
- @Override
- public String getContentType() {
- return context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(flowFile).getValue();
- }
- });
-
- UpdateResponse response = request.process(getSolrClient());
- getLogger().debug("Got {} response from Solr", new Object[]{response.getStatus()});
- } catch (SolrException e) {
- error.set(e);
- } catch (SolrServerException e) {
- if (causedByIOException(e)) {
- connectionError.set(e);
- } else {
- error.set(e);
- }
- } catch (IOException e) {
- connectionError.set(e);
- }
- }
- });
- timer.stop();
-
- if (error.get() != null) {
- getLogger().error("Failed to send {} to Solr due to {}; routing to failure",
- new Object[]{flowFile, error.get()});
- session.transfer(flowFile, REL_FAILURE);
- } else if (connectionError.get() != null) {
- getLogger().error("Failed to send {} to Solr due to {}; routing to connection_failure",
- new Object[]{flowFile, connectionError.get()});
- session.penalize(flowFile);
- session.transfer(flowFile, REL_CONNECTION_FAILURE);
- } else {
- StringBuilder transitUri = new StringBuilder("solr://");
- transitUri.append(getSolrLocation());
- if (isSolrCloud) {
- transitUri.append(":").append(collection);
- }
-
- final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
- session.getProvenanceReporter().send(flowFile, transitUri.toString(), duration, true);
- getLogger().info("Successfully sent {} to Solr in {} millis", new Object[]{flowFile, duration});
- session.transfer(flowFile, REL_SUCCESS);
- }
- }
-
- private boolean causedByIOException(SolrServerException e) {
- boolean foundIOException = false;
- Throwable cause = e.getCause();
- while (cause != null) {
- if (cause instanceof IOException) {
- foundIOException = true;
- break;
- }
- cause = cause.getCause();
- }
- return foundIOException;
- }
-}
diff --git a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java b/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java
deleted file mode 100644
index 2e6cca0e5a..0000000000
--- a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.solr;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RecordReaderFactory;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.util.StopWatch;
-import org.apache.nifi.util.StringUtils;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.MultiMapSolrParams;
-
-import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
-import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
-import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
-import static org.apache.nifi.processors.solr.SolrUtils.KERBEROS_USER_SERVICE;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
-import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
-import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
-
-
-@Tags({"Apache", "Solr", "Put", "Send","Record"})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
-@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
- description="These parameters will be passed to Solr on the request")
-public class PutSolrRecord extends SolrProcessor {
-
- public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor.Builder()
- .name("Solr Update Path")
- .description("The path in Solr to post the FlowFile Records")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .defaultValue("/update")
- .build();
-
- public static final PropertyDescriptor FIELDS_TO_INDEX = new PropertyDescriptor.Builder()
- .name("Fields To Index")
- .description("Comma-separated list of field names to write")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor.Builder()
- .name("Commit Within")
- .description("The number of milliseconds before the given update is committed")
- .required(false)
- .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .defaultValue("5000")
- .build();
-
- public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
- .name("Batch Size")
- .description("The number of solr documents to index per batch")
- .required(false)
- .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .defaultValue("500")
- .build();
-
- public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("The FlowFile is routed to this relationship when it has been successfully sent to Solr")
- .build();
-
- public static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("FlowFiles that failed for any reason other than Solr being unreachable")
- .build();
-
- public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
- .name("connection_failure")
- .description("FlowFiles that failed because Solr is unreachable")
- .build();
-
- public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
- .name("put-solr-record-record-reader")
- .displayName("Record Reader")
- .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
- .identifiesControllerService(RecordReaderFactory.class)
- .required(true)
- .build();
-
- public static final String COLLECTION_PARAM_NAME = "collection";
- public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
-
- private Set relationships;
- private List descriptors;
- private static final String EMPTY_STRING = "";
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- super.init(context);
-
- final List descriptors = new ArrayList<>();
- descriptors.add(SOLR_TYPE);
- descriptors.add(SOLR_LOCATION);
- descriptors.add(COLLECTION);
- descriptors.add(UPDATE_PATH);
- descriptors.add(RECORD_READER);
- descriptors.add(FIELDS_TO_INDEX);
- descriptors.add(COMMIT_WITHIN);
- descriptors.add(KERBEROS_USER_SERVICE);
- descriptors.add(BASIC_USERNAME);
- descriptors.add(BASIC_PASSWORD);
- descriptors.add(SSL_CONTEXT_SERVICE);
- descriptors.add(SOLR_SOCKET_TIMEOUT);
- descriptors.add(SOLR_CONNECTION_TIMEOUT);
- descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
- descriptors.add(BATCH_SIZE);
- this.descriptors = Collections.unmodifiableList(descriptors);
-
- final Set relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- relationships.add(REL_FAILURE);
- relationships.add(REL_CONNECTION_FAILURE);
- this.relationships = Collections.unmodifiableSet(relationships);
- }
-
- @Override
- public Set getRelationships() {
- return this.relationships;
- }
-
- @Override
- public List getSupportedPropertyDescriptors() {
- return this.descriptors;
- }
-
- @Override
- protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
- return new PropertyDescriptor.Builder()
- .description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter")
- .name(propertyDescriptorName)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .dynamic(true)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
- }
-
- @Override
- public void doOnTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- FlowFile flowFile = session.get();
- if ( flowFile == null ) {
- return;
- }
-
- final AtomicReference error = new AtomicReference<>(null);
- final AtomicReference connectionError = new AtomicReference<>(null);
-
- final boolean isSolrCloud = SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue());
- final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
- final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
- final String contentStreamPath = context.getProperty(UPDATE_PATH).evaluateAttributeExpressions(flowFile).getValue();
- final MultiMapSolrParams requestParams = new MultiMapSolrParams(SolrUtils.getRequestParams(context, flowFile));
- final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
- final String fieldsToIndex = context.getProperty(FIELDS_TO_INDEX).evaluateAttributeExpressions(flowFile).getValue();
- final Long batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(flowFile).asLong();
-
- final List fieldList = new ArrayList<>();
- if (!StringUtils.isBlank(fieldsToIndex)) {
- Arrays.asList(fieldsToIndex.split(",")).forEach(f -> fieldList.add(f.trim()));
- }
- StopWatch timer = new StopWatch(true);
- try (final InputStream in = session.read(flowFile);
- final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
-
- Record record;
- List inputDocumentList = new LinkedList<>();
- try {
- while ((record = reader.nextRecord()) != null) {
- SolrInputDocument inputDoc = new SolrInputDocument();
- writeRecord(record, inputDoc, fieldList, EMPTY_STRING);
- inputDocumentList.add(inputDoc);
- if (inputDocumentList.size() == batchSize) {
- index(isSolrCloud, collection, commitWithin, contentStreamPath, requestParams, inputDocumentList);
- inputDocumentList = new ArrayList<>();
- }
- index(isSolrCloud, collection, commitWithin, contentStreamPath, requestParams, inputDocumentList);
- }
- } catch (SolrException e) {
- error.set(e);
- } catch (SolrServerException e) {
- if (causedByIOException(e)) {
- //Exit in case of a solr connection error
- connectionError.set(e);
- } else {
- error.set(e);
- }
- } catch (IOException e) {
- //Exit in case of a solr connection error
- connectionError.set(e);
- }
- } catch (final IOException | SchemaNotFoundException | MalformedRecordException e) {
- getLogger().error("Could not parse incoming data", e);
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
- timer.stop();
-
- if (error.get() != null) {
- getLogger().error("Failed to send all the records of the {} to Solr due to {}; routing to failure",
- new Object[]{flowFile, error.get()});
- session.transfer(flowFile, REL_FAILURE);
- } else if (connectionError.get() != null) {
- getLogger().error("Failed to send {} to Solr due to {}; routing to connection_failure",
- new Object[]{flowFile, connectionError.get()});
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_CONNECTION_FAILURE);
- } else {
- StringBuilder transitUri = new StringBuilder("solr://");
- transitUri.append(getSolrLocation());
- if (isSolrCloud) {
- transitUri.append(":").append(collection);
- }
-
- final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
- session.getProvenanceReporter().send(flowFile, transitUri.toString(), duration, true);
- getLogger().info("Successfully sent {} to Solr in {} millis", new Object[]{flowFile, duration});
- session.transfer(flowFile, REL_SUCCESS);
- }
- }
-
- private void index(boolean isSolrCloud, String collection, Long commitWithin, String contentStreamPath, MultiMapSolrParams requestParams, List inputDocumentList)
- throws IOException, SolrServerException,SolrException {
- UpdateRequest request = new UpdateRequest(contentStreamPath);
- request.setParams(new ModifiableSolrParams());
-
- // add the extra params, don't use 'set' in case of repeating params
- Iterator paramNames = requestParams.getParameterNamesIterator();
- while (paramNames.hasNext()) {
- String paramName = paramNames.next();
- for (String paramValue : requestParams.getParams(paramName)) {
- request.getParams().add(paramName, paramValue);
- }
- }
-
- // specify the collection for SolrCloud
- if (collection != null) {
- request.setParam(COLLECTION_PARAM_NAME, collection);
- }
-
- if (commitWithin != null && commitWithin > 0) {
- request.setParam(COMMIT_WITHIN_PARAM_NAME, commitWithin.toString());
- }
-
- // if a username and password were provided then pass them for basic auth
- if (isBasicAuthEnabled()) {
- request.setBasicAuthCredentials(getUsername(), getPassword());
- }
- request.add(inputDocumentList);
- UpdateResponse response = request.process(getSolrClient());
- getLogger().debug("Got {} response from Solr", new Object[]{response.getStatus()});
- inputDocumentList.clear();
- }
-
- private boolean causedByIOException(SolrServerException e) {
- boolean foundIOException = false;
- Throwable cause = e.getCause();
- while (cause != null) {
- if (cause instanceof IOException) {
- foundIOException = true;
- break;
- }
- cause = cause.getCause();
- }
- return foundIOException;
- }
-
-}
diff --git a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/QuerySolr.java b/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/QuerySolr.java
deleted file mode 100644
index 36367b33fe..0000000000
--- a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/QuerySolr.java
+++ /dev/null
@@ -1,615 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.nifi.processors.solr;
-
-import com.google.gson.stream.JsonWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.configuration.DefaultSchedule;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.expression.AttributeExpression;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.RecordSet;
-import org.apache.nifi.util.StopWatch;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.client.solrj.response.FacetField;
-import org.apache.solr.client.solrj.response.FieldStatsInfo;
-import org.apache.solr.client.solrj.response.IntervalFacet;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.client.solrj.response.RangeFacet;
-import org.apache.solr.client.solrj.response.RangeFacet.Count;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.FacetParams;
-import org.apache.solr.common.params.MultiMapSolrParams;
-import org.apache.solr.common.params.StatsParams;
-
-import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
-import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
-import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
-import static org.apache.nifi.processors.solr.SolrUtils.KERBEROS_USER_SERVICE;
-import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
-import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
-
-@Tags({"Apache", "Solr", "Get", "Query", "Records"})
-@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
-@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer")
-@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
- description="These parameters will be passed to Solr on the request")
-@WritesAttributes({
- @WritesAttribute(attribute = "solr.connect", description = "Solr connect string"),
- @WritesAttribute(attribute = "solr.collection", description = "Solr collection"),
- @WritesAttribute(attribute = "solr.query", description = "Query string sent to Solr"),
- @WritesAttribute(attribute = "solr.cursor.mark", description = "Cursor mark can be used for scrolling Solr"),
- @WritesAttribute(attribute = "solr.status.code", description = "Status code of Solr request. A status code of 0 indicates that the request was successfully processed"),
- @WritesAttribute(attribute = "solr.query.time", description = "The elapsed time to process the query (in ms)"),
- @WritesAttribute(attribute = "solr.start", description = "Solr start parameter (result offset) for the query"),
- @WritesAttribute(attribute = "solr.rows", description = "Number of Solr documents to be returned for the query"),
- @WritesAttribute(attribute = "solr.number.results", description = "Number of Solr documents that match the query"),
- @WritesAttribute(attribute = "mime.type", description = "The mime type of the data format"),
- @WritesAttribute(attribute = "querysolr.exeption.class", description = "The Java exception class raised when the processor fails"),
- @WritesAttribute(attribute = "querysolr.exeption.message", description = "The Java exception message raised when the processor fails")
-})
-@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
-public class QuerySolr extends SolrProcessor {
-
- public static final AllowableValue MODE_XML = new AllowableValue("XML");
- public static final AllowableValue MODE_REC = new AllowableValue("Records");
-
- public static final AllowableValue RETURN_TOP_RESULTS = new AllowableValue("return_only_top_results", "Only top results");
- public static final AllowableValue RETURN_ALL_RESULTS = new AllowableValue("return_all_results", "Entire results");
-
- public static final String MIME_TYPE_JSON = "application/json";
- public static final String MIME_TYPE_XML = "application/xml";
- public static final String ATTRIBUTE_SOLR_CONNECT = "solr.connect";
- public static final String ATTRIBUTE_SOLR_COLLECTION = "solr.collection";
- public static final String ATTRIBUTE_SOLR_QUERY = "solr.query";
- public static final String ATTRIBUTE_CURSOR_MARK = "solr.cursor.mark";
- public static final String ATTRIBUTE_SOLR_STATUS = "solr.status.code";
- public static final String ATTRIBUTE_SOLR_START = "solr.start";
- public static final String ATTRIBUTE_SOLR_ROWS = "solr.rows";
- public static final String ATTRIBUTE_SOLR_NUMBER_RESULTS = "solr.number.results";
- public static final String ATTRIBUTE_QUERY_TIME = "solr.query.time";
- public static final String EXCEPTION = "querysolr.exeption";
- public static final String EXCEPTION_MESSAGE = "querysolr.exeption.message";
-
- public static final Integer UPPER_LIMIT_START_PARAM = 10000;
-
- public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor
- .Builder().name("return_type")
- .displayName("Return Type")
- .description("Output format of Solr results. Write Solr documents to FlowFiles as XML or using a Record Writer")
- .required(true)
- .allowableValues(MODE_XML, MODE_REC)
- .defaultValue(MODE_XML.getValue())
- .build();
-
- public static final PropertyDescriptor SOLR_PARAM_QUERY = new PropertyDescriptor
- .Builder().name("solr_param_query")
- .displayName("Solr Query")
- .description("Solr Query, e. g. field:value")
- .required(true)
- .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .defaultValue("*:*")
- .build();
-
- public static final PropertyDescriptor SOLR_PARAM_REQUEST_HANDLER = new PropertyDescriptor
- .Builder().name("solr_param_request_handler")
- .displayName("Request Handler")
- .description("Define a request handler here, e. g. /query")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .defaultValue("/select")
- .build();
-
- public static final PropertyDescriptor SOLR_PARAM_FIELD_LIST = new PropertyDescriptor
- .Builder().name("solr_param_field_list")
- .displayName("Field List")
- .description("Comma separated list of fields to be included into results, e. g. field1,field2")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor SOLR_PARAM_SORT = new PropertyDescriptor
- .Builder().name("solr_param_sort")
- .displayName("Sorting of result list")
- .description("Comma separated sort clauses to define the sorting of results, e. g. field1 asc, field2 desc")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor SOLR_PARAM_START = new PropertyDescriptor
- .Builder().name("solr_param_start")
- .displayName("Start of results")
- .description("Offset of result set")
- .required(false)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor SOLR_PARAM_ROWS = new PropertyDescriptor
- .Builder().name("solr_param_rows")
- .displayName("Rows")
- .description("Number of results to be returned for a single request")
- .required(false)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor AMOUNT_DOCUMENTS_TO_RETURN = new PropertyDescriptor
- .Builder().name("amount_documents_to_return")
- .displayName("Total amount of returned results")
- .description("Total amount of Solr documents to be returned. If this property is set to \"Only top results\", " +
- "only single requests will be sent to Solr and the results will be written into single FlowFiles. If it is set to " +
- "\"Entire results\", all results matching to the query are retrieved via multiple Solr requests and " +
- "returned in multiple FlowFiles. For both options, the number of Solr documents to be returned in a FlowFile depends on " +
- "the configuration of the \"Rows\" property")
- .required(true)
- .allowableValues(RETURN_ALL_RESULTS, RETURN_TOP_RESULTS)
- .defaultValue(RETURN_TOP_RESULTS.getValue())
- .build();
-
- @Override
- protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
- return new PropertyDescriptor.Builder()
- .description("Specifies the value to send for the '" + propertyDescriptorName + "' Solr parameter")
- .name(propertyDescriptorName)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .dynamic(true)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
- }
-
- public static final Relationship RESULTS = new Relationship.Builder().name("results")
- .description("Results of Solr queries").build();
- public static final Relationship FACETS = new Relationship.Builder().name("facets")
- .description("Results of faceted search").build();
- public static final Relationship STATS = new Relationship.Builder().name("stats")
- .description("Stats about Solr index").build();
- public static final Relationship ORIGINAL = new Relationship.Builder().name("original")
- .description("Original flowfile").build();
- public static final Relationship FAILURE = new Relationship.Builder().name("failure")
- .description("Failure relationship").build();
-
- private Set relationships;
- private List descriptors;
-
- @Override
- public Set getRelationships() {
- return this.relationships;
- }
-
- @Override
- public List getSupportedPropertyDescriptors() {
- return this.descriptors;
- }
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- super.init(context);
-
- final List descriptors = new ArrayList<>();
- descriptors.add(SOLR_TYPE);
- descriptors.add(SOLR_LOCATION);
- descriptors.add(COLLECTION);
- descriptors.add(RETURN_TYPE);
- descriptors.add(RECORD_WRITER);
- descriptors.add(SOLR_PARAM_QUERY);
- descriptors.add(SOLR_PARAM_REQUEST_HANDLER);
- descriptors.add(SOLR_PARAM_FIELD_LIST);
- descriptors.add(SOLR_PARAM_SORT);
- descriptors.add(SOLR_PARAM_START);
- descriptors.add(SOLR_PARAM_ROWS);
- descriptors.add(AMOUNT_DOCUMENTS_TO_RETURN);
- descriptors.add(KERBEROS_USER_SERVICE);
- descriptors.add(BASIC_USERNAME);
- descriptors.add(BASIC_PASSWORD);
- descriptors.add(SSL_CONTEXT_SERVICE);
- descriptors.add(SOLR_SOCKET_TIMEOUT);
- descriptors.add(SOLR_CONNECTION_TIMEOUT);
- descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
- this.descriptors = Collections.unmodifiableList(descriptors);
-
- final Set relationships = new HashSet<>();
- relationships.add(FAILURE);
- relationships.add(RESULTS);
- relationships.add(FACETS);
- relationships.add(STATS);
- relationships.add(ORIGINAL);
- this.relationships = Collections.unmodifiableSet(relationships);
- }
-
- public static final Set SUPPORTED_SEARCH_COMPONENTS = new HashSet<>();
- static {
- SUPPORTED_SEARCH_COMPONENTS.addAll(Arrays.asList(StatsParams.STATS, FacetParams.FACET));
- }
-
- public static final Set SEARCH_COMPONENTS_ON = new HashSet<>();
- static {
- SEARCH_COMPONENTS_ON.addAll(Arrays.asList("true", "on", "yes"));
- }
-
- @Override
- protected final Collection additionalCustomValidation(ValidationContext context) {
- final Collection problems = new ArrayList<>();
-
- if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue())
- && !context.getProperty(RECORD_WRITER).isSet()) {
- problems.add(new ValidationResult.Builder()
- .explanation("for writing records a record writer has to be configured")
- .valid(false)
- .subject("Record writer check")
- .build());
- }
- return problems;
- }
-
- @Override
- public void doOnTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- final ComponentLog logger = getLogger();
-
- FlowFile flowFileOriginal = session.get();
- FlowFile flowFileResponse;
-
- if (flowFileOriginal == null) {
- if (context.hasNonLoopConnection()) {
- return;
- }
- flowFileResponse = session.create();
- } else {
- flowFileResponse = session.create(flowFileOriginal);
- }
-
- final SolrQuery solrQuery = new SolrQuery();
- final boolean isSolrCloud = SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue());
- final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFileResponse).getValue();
-
- final StringBuilder transitUri = new StringBuilder("solr://");
- transitUri.append(getSolrLocation());
- if (isSolrCloud) {
- transitUri.append(":").append(collection);
- }
- final StopWatch timer = new StopWatch(false);
-
- try {
- solrQuery.setQuery(context.getProperty(SOLR_PARAM_QUERY).evaluateAttributeExpressions(flowFileResponse).getValue());
- solrQuery.setRequestHandler(context.getProperty(SOLR_PARAM_REQUEST_HANDLER).evaluateAttributeExpressions(flowFileResponse).getValue());
-
- if (context.getProperty(SOLR_PARAM_FIELD_LIST).isSet()) {
- for (final String field : context.getProperty(SOLR_PARAM_FIELD_LIST).evaluateAttributeExpressions(flowFileResponse).getValue()
- .split(",")) {
- solrQuery.addField(field.trim());
- }
- }
-
- // Avoid ArrayIndexOutOfBoundsException due to incorrectly configured sorting
- try {
- if (context.getProperty(SOLR_PARAM_SORT).isSet()) {
- final List sortings = new ArrayList<>();
- for (final String sorting : context.getProperty(SOLR_PARAM_SORT).evaluateAttributeExpressions(flowFileResponse).getValue()
- .split(",")) {
- final String[] sortEntry = sorting.trim().split(" ");
- sortings.add(new SolrQuery.SortClause(sortEntry[0], sortEntry[1]));
- }
- solrQuery.setSorts(sortings);
- }
- } catch (Exception e) {
- throw new ProcessException("Error while parsing the sort clauses for the Solr query");
- }
-
- final Integer startParam = context.getProperty(SOLR_PARAM_START).isSet() ? Integer.parseInt(
- context.getProperty(SOLR_PARAM_START).evaluateAttributeExpressions(flowFileResponse).getValue()) : CommonParams.START_DEFAULT;
-
- solrQuery.setStart(startParam);
-
- final Integer rowParam = context.getProperty(SOLR_PARAM_ROWS).isSet() ? Integer.parseInt(
- context.getProperty(SOLR_PARAM_ROWS).evaluateAttributeExpressions(flowFileResponse).getValue()) : CommonParams.ROWS_DEFAULT;
-
- solrQuery.setRows(rowParam);
-
- final Map additionalSolrParams = SolrUtils.getRequestParams(context, flowFileResponse);
-
- final Set searchComponents = extractSearchComponents(additionalSolrParams);
- solrQuery.add(new MultiMapSolrParams(additionalSolrParams));
-
- final Map attributes = new HashMap<>();
- attributes.put(ATTRIBUTE_SOLR_CONNECT, getSolrLocation());
- if (isSolrCloud) {
- attributes.put(ATTRIBUTE_SOLR_COLLECTION, collection);
- }
- attributes.put(ATTRIBUTE_SOLR_QUERY, solrQuery.toString());
- if (flowFileOriginal != null) {
- flowFileOriginal = session.putAllAttributes(flowFileOriginal, attributes);
- }
-
- flowFileResponse = session.putAllAttributes(flowFileResponse, attributes);
-
- final boolean getEntireResults = RETURN_ALL_RESULTS.equals(context.getProperty(AMOUNT_DOCUMENTS_TO_RETURN).getValue());
- boolean processFacetsAndStats = true;
- boolean continuePaging = true;
-
- while (continuePaging){
-
- timer.start();
-
- Map responseAttributes = new HashMap<>();
- responseAttributes.put(ATTRIBUTE_SOLR_START, solrQuery.getStart().toString());
- responseAttributes.put(ATTRIBUTE_SOLR_ROWS, solrQuery.getRows().toString());
-
- if (solrQuery.getStart() > UPPER_LIMIT_START_PARAM) {
- logger.warn("The start parameter of Solr query {} exceeded the upper limit of {}. The query will not be processed " +
- "to avoid performance or memory issues on the part of Solr.", new Object[]{solrQuery.toString(), UPPER_LIMIT_START_PARAM});
- flowFileResponse = session.putAllAttributes(flowFileResponse, responseAttributes);
- timer.stop();
- break;
- }
-
- final QueryRequest req = new QueryRequest(solrQuery);
- if (isBasicAuthEnabled()) {
- req.setBasicAuthCredentials(getUsername(), getPassword());
- }
-
- final QueryResponse response = req.process(getSolrClient());
- timer.stop();
-
- final Long totalNumberOfResults = response.getResults().getNumFound();
-
- responseAttributes.put(ATTRIBUTE_SOLR_NUMBER_RESULTS, totalNumberOfResults.toString());
- responseAttributes.put(ATTRIBUTE_CURSOR_MARK, response.getNextCursorMark());
- responseAttributes.put(ATTRIBUTE_SOLR_STATUS, String.valueOf(response.getStatus()));
- responseAttributes.put(ATTRIBUTE_QUERY_TIME, String.valueOf(response.getQTime()));
- flowFileResponse = session.putAllAttributes(flowFileResponse, responseAttributes);
-
- if (response.getResults().size() > 0) {
-
- if (context.getProperty(RETURN_TYPE).getValue().equals(MODE_XML.getValue())){
- flowFileResponse = session.write(flowFileResponse, SolrUtils.getOutputStreamCallbackToTransformSolrResponseToXml(response));
- flowFileResponse = session.putAttribute(flowFileResponse, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_XML);
- } else {
- final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).evaluateAttributeExpressions(flowFileResponse)
- .asControllerService(RecordSetWriterFactory.class);
- final RecordSchema schema = writerFactory.getSchema(flowFileResponse.getAttributes(), null);
- final RecordSet recordSet = SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema);
- final StringBuffer mimeType = new StringBuffer();
- final FlowFile flowFileResponseRef = flowFileResponse;
- flowFileResponse = session.write(flowFileResponse, out -> {
- try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, flowFileResponseRef)) {
- writer.write(recordSet);
- writer.flush();
- mimeType.append(writer.getMimeType());
- } catch (SchemaNotFoundException e) {
- throw new ProcessException("Could not parse Solr response", e);
- }
- });
- flowFileResponse = session.putAttribute(flowFileResponse, CoreAttributes.MIME_TYPE.key(), mimeType.toString());
- }
-
- if (processFacetsAndStats) {
- if (searchComponents.contains(FacetParams.FACET)) {
- FlowFile flowFileFacets = session.create(flowFileResponse);
- flowFileFacets = session.write(flowFileFacets, out -> {
- try (
- final OutputStreamWriter osw = new OutputStreamWriter(out);
- final JsonWriter writer = new JsonWriter(osw)
- ) {
- addFacetsFromSolrResponseToJsonWriter(response, writer);
- }
- });
- flowFileFacets = session.putAttribute(flowFileFacets, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_JSON);
- session.getProvenanceReporter().receive(flowFileFacets, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS));
- session.transfer(flowFileFacets, FACETS);
- }
-
- if (searchComponents.contains(StatsParams.STATS)) {
- FlowFile flowFileStats = session.create(flowFileResponse);
- flowFileStats = session.write(flowFileStats, out -> {
- try (
- final OutputStreamWriter osw = new OutputStreamWriter(out);
- final JsonWriter writer = new JsonWriter(osw)
- ) {
- addStatsFromSolrResponseToJsonWriter(response, writer);
- }
- });
- flowFileStats = session.putAttribute(flowFileStats, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_JSON);
- session.getProvenanceReporter().receive(flowFileStats, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS));
- session.transfer(flowFileStats, STATS);
- }
- processFacetsAndStats = false;
- }
- }
-
- if (getEntireResults) {
- final Integer totalDocumentsReturned = solrQuery.getStart() + solrQuery.getRows();
- if (totalDocumentsReturned < totalNumberOfResults) {
- solrQuery.setStart(totalDocumentsReturned);
- session.getProvenanceReporter().receive(flowFileResponse, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS));
- session.transfer(flowFileResponse, RESULTS);
- flowFileResponse = session.create(flowFileResponse);
- } else {
- continuePaging = false;
- }
- } else {
- continuePaging = false;
- }
- }
-
- } catch (Exception e) {
- flowFileResponse = session.penalize(flowFileResponse);
- flowFileResponse = session.putAttribute(flowFileResponse, EXCEPTION, e.getClass().getName());
- flowFileResponse = session.putAttribute(flowFileResponse, EXCEPTION_MESSAGE, e.getMessage());
- session.transfer(flowFileResponse, FAILURE);
- logger.error("Failed to execute query {} due to {}. FlowFile will be routed to relationship failure", solrQuery.toString(), e, e);
- if (flowFileOriginal != null) {
- flowFileOriginal = session.penalize(flowFileOriginal);
- }
- }
-
- if (!flowFileResponse.isPenalized()) {
- session.getProvenanceReporter().receive(flowFileResponse, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS));
- session.transfer(flowFileResponse, RESULTS);
- }
-
- if (flowFileOriginal != null) {
- if (!flowFileOriginal.isPenalized()) {
- session.transfer(flowFileOriginal, ORIGINAL);
- } else {
- session.remove(flowFileOriginal);
- }
- }
- }
-
- private Set extractSearchComponents(Map solrParams) {
- final Set searchComponentsTemp = new HashSet<>();
- for (final String searchComponent : SUPPORTED_SEARCH_COMPONENTS)
- if (solrParams.keySet().contains(searchComponent)) {
- if (SEARCH_COMPONENTS_ON.contains(solrParams.get(searchComponent)[0])) {
- searchComponentsTemp.add(searchComponent);
- }
- }
- return Collections.unmodifiableSet(searchComponentsTemp);
- }
-
- private static void addStatsFromSolrResponseToJsonWriter(final QueryResponse response, final JsonWriter writer) throws IOException {
- writer.beginObject();
- writer.name("stats_fields");
- writer.beginObject();
- for (Map.Entry entry: response.getFieldStatsInfo().entrySet()) {
- FieldStatsInfo fsi = entry.getValue();
- writer.name(entry.getKey());
- writer.beginObject();
- writer.name("min").value(fsi.getMin().toString());
- writer.name("max").value(fsi.getMax().toString());
- writer.name("count").value(fsi.getCount());
- writer.name("missing").value(fsi.getMissing());
- writer.name("sum").value(fsi.getSum().toString());
- writer.name("mean").value(fsi.getMean().toString());
- writer.name("sumOfSquares").value(fsi.getSumOfSquares());
- writer.name("stddev").value(fsi.getStddev());
- writer.endObject();
- }
- writer.endObject();
- writer.endObject();
- }
-
- private static void addFacetsFromSolrResponseToJsonWriter(final QueryResponse response, final JsonWriter writer) throws IOException {
- writer.beginObject();
- writer.name("facet_queries");
- writer.beginArray();
- for (final Map.Entry facetQuery : response.getFacetQuery().entrySet()){
- writer.beginObject();
- writer.name("facet").value(facetQuery.getKey());
- writer.name("count").value(facetQuery.getValue());
- writer.endObject();
- }
- writer.endArray();
-
- writer.name("facet_fields");
- writer.beginObject();
- for (final FacetField facetField : response.getFacetFields()){
- writer.name(facetField.getName());
- writer.beginArray();
- for (final FacetField.Count count : facetField.getValues()) {
- writer.beginObject();
- writer.name("facet").value(count.getName());
- writer.name("count").value(count.getCount());
- writer.endObject();
- }
- writer.endArray();
- }
- writer.endObject();
-
- writer.name("facet_ranges");
- writer.beginObject();
- for (final RangeFacet rangeFacet : response.getFacetRanges()) {
- writer.name(rangeFacet.getName());
- writer.beginArray();
- final List list = rangeFacet.getCounts();
- for (final Count count : list) {
- writer.beginObject();
- writer.name("facet").value(count.getValue());
- writer.name("count").value(count.getCount());
- writer.endObject();
- }
- writer.endArray();
- }
- writer.endObject();
-
- writer.name("facet_intervals");
- writer.beginObject();
- for (final IntervalFacet intervalFacet : response.getIntervalFacets()) {
- writer.name(intervalFacet.getField());
- writer.beginArray();
- for (final IntervalFacet.Count count : intervalFacet.getIntervals()) {
- writer.beginObject();
- writer.name("facet").value(count.getKey());
- writer.name("count").value(count.getCount());
- writer.endObject();
- }
- writer.endArray();
- }
- writer.endObject();
- writer.endObject();
- }
-}
-
-
diff --git a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java b/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
deleted file mode 100644
index 09cab9c724..0000000000
--- a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.solr;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
-import org.apache.nifi.kerberos.KerberosUserService;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.security.krb.KerberosAction;
-import org.apache.nifi.security.krb.KerberosKeytabUser;
-import org.apache.nifi.security.krb.KerberosLoginException;
-import org.apache.nifi.security.krb.KerberosPasswordUser;
-import org.apache.nifi.security.krb.KerberosUser;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.solr.client.solrj.SolrClient;
-
-import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
-import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
-import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
-import static org.apache.nifi.processors.solr.SolrUtils.KERBEROS_USER_SERVICE;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
-import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_STANDARD;
-import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
-
-/**
- * A base class for processors that interact with Apache Solr.
- *
- */
-public abstract class SolrProcessor extends AbstractProcessor {
-
- private volatile SolrClient solrClient;
- private volatile String solrLocation;
- private volatile String basicUsername;
- private volatile String basicPassword;
- private volatile boolean basicAuthEnabled = false;
-
- private volatile KerberosUser kerberosUser;
-
- @OnScheduled
- public final void onScheduled(final ProcessContext context) throws IOException {
- this.solrLocation = context.getProperty(SOLR_LOCATION).evaluateAttributeExpressions().getValue();
- this.basicUsername = context.getProperty(BASIC_USERNAME).evaluateAttributeExpressions().getValue();
- this.basicPassword = context.getProperty(BASIC_PASSWORD).evaluateAttributeExpressions().getValue();
- if (!StringUtils.isBlank(basicUsername) && !StringUtils.isBlank(basicPassword)) {
- basicAuthEnabled = true;
- }
-
- this.solrClient = createSolrClient(context, solrLocation);
-
- final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
- if (kerberosUserService != null) {
- this.kerberosUser = kerberosUserService.createKerberosUser();
- }
- }
-
- protected KerberosUser createKerberosKeytabUser(final KerberosCredentialsService kerberosCredentialsService) {
- return new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
- }
-
- protected KerberosUser createKerberosPasswordUser(final String principal, final String password) {
- return new KerberosPasswordUser(principal, password);
- }
-
- @OnStopped
- public final void closeClient() {
- if (solrClient != null) {
- try {
- solrClient.close();
- } catch (IOException e) {
- getLogger().debug("Error closing SolrClient", e);
- }
- }
-
- if (kerberosUser != null) {
- try {
- kerberosUser.logout();
- kerberosUser = null;
- } catch (final KerberosLoginException e) {
- getLogger().debug("Error logging out keytab user", e);
- }
- }
- }
-
- @Override
- public final void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- final KerberosUser kerberosUser = getKerberosUser();
- if (kerberosUser == null) {
- doOnTrigger(context, session);
- } else {
- // wrap doOnTrigger in a privileged action
- final PrivilegedExceptionAction action = () -> {
- doOnTrigger(context, session);
- return null;
- };
-
- // execute the privileged action as the given keytab user
- final KerberosAction kerberosAction = new KerberosAction<>(kerberosUser, action, getLogger());
- try {
- kerberosAction.execute();
- } catch (ProcessException e) {
- context.yield();
- throw e;
- }
- }
- }
-
- /**
- * This should be implemented just like the normal onTrigger method. When a KerberosCredentialsService is configured,
- * this method will be wrapped in a PrivilegedAction and executed with the credentials of the service, otherwise this
- * will be executed like a a normal call to onTrigger.
- */
- protected abstract void doOnTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
-
- /**
- * Create a SolrClient based on the type of Solr specified.
- *
- * @param context
- * The context
- * @return an HttpSolrClient or CloudSolrClient
- */
- protected SolrClient createSolrClient(final ProcessContext context, final String solrLocation) {
- return SolrUtils.createSolrClient(context, solrLocation);
- }
-
- /**
- * Returns the {@link org.apache.solr.client.solrj.SolrClient} that was created by the
- * {@link #createSolrClient(org.apache.nifi.processor.ProcessContext, String)} method
- *
- * @return an HttpSolrClient or CloudSolrClient
- */
- protected final SolrClient getSolrClient() {
- return solrClient;
- }
-
- protected final String getSolrLocation() {
- return solrLocation;
- }
-
- protected final String getUsername() {
- return basicUsername;
- }
-
- protected final String getPassword() {
- return basicPassword;
- }
-
- protected final boolean isBasicAuthEnabled() {
- return basicAuthEnabled;
- }
-
- protected final KerberosUser getKerberosUser() {
- return kerberosUser;
- }
-
- @Override
- final protected Collection customValidate(ValidationContext context) {
- final List problems = new ArrayList<>();
-
- // For solr cloud the location will be the ZooKeeper host:port so we can't validate the SSLContext, but for standard solr
- // we can validate if the url starts with https we need an SSLContextService, if it starts with http we can't have an SSLContextService
- if (SOLR_TYPE_STANDARD.getValue().equals(context.getProperty(SOLR_TYPE).getValue())) {
- final String solrLocation = context.getProperty(SOLR_LOCATION).evaluateAttributeExpressions().getValue();
- if (solrLocation != null) {
- final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- if (solrLocation.startsWith("https:") && sslContextService == null) {
- problems.add(new ValidationResult.Builder()
- .subject(SSL_CONTEXT_SERVICE.getDisplayName())
- .valid(false)
- .explanation("an SSLContextService must be provided when using https")
- .build());
- } else if (solrLocation.startsWith("http:") && sslContextService != null) {
- problems.add(new ValidationResult.Builder()
- .subject(SSL_CONTEXT_SERVICE.getDisplayName())
- .valid(false)
- .explanation("an SSLContextService can not be provided when using http")
- .build());
- }
- }
- }
-
- if (context.getProperty(BASIC_USERNAME).isSet() && context.getProperty(KERBEROS_USER_SERVICE).isSet()) {
- problems.add(new ValidationResult.Builder()
- .subject("Basic Auth and Kerberos")
- .explanation("Cannot set both Basic Auth Username and Kerberos User Service")
- .valid(false)
- .build());
- }
-
- invalidIfEmpty(context, BASIC_USERNAME, problems);
- invalidIfEmpty(context, BASIC_PASSWORD, problems);
-
- final Collection otherProblems = this.additionalCustomValidation(context);
- if (otherProblems != null) {
- problems.addAll(otherProblems);
- }
-
- if (SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue()) && !context.getProperty(COLLECTION).isSet()) {
- problems.add(new ValidationResult.Builder()
- .subject(COLLECTION.getName())
- .input(context.getProperty(COLLECTION).getValue())
- .valid(false)
- .explanation("A collection must specified for Solr Type of Cloud")
- .build());
-
- }
-
- return problems;
- }
-
- private void invalidIfEmpty(final ValidationContext context, final PropertyDescriptor descriptor, final List problems) {
- if (context.getProperty(descriptor).isSet() && context.getProperty(descriptor).evaluateAttributeExpressions().getValue().isBlank()) {
- problems.add(new ValidationResult.Builder()
- .subject(descriptor.getDisplayName())
- .explanation("Property cannot be an empty string and cannot reference environment variables that do not exist")
- .valid(false)
- .build());
- }
- }
-
- /**
- * Allows additional custom validation to be done. This will be called from
- * the parent's customValidation method.
- *
- * @param context
- * The context
- * @return Validation results indicating problems
- */
- protected Collection additionalCustomValidation(ValidationContext context) {
- return new ArrayList<>();
- }
-
-}
diff --git a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java b/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
deleted file mode 100644
index 7249b21743..0000000000
--- a/nifi-extension-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.solr;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.context.PropertyContext;
-import org.apache.nifi.expression.AttributeExpression;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.kerberos.KerberosUserService;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.ListRecordSet;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.RecordSet;
-import org.apache.nifi.serialization.record.field.FieldConverter;
-import org.apache.nifi.serialization.record.field.StandardFieldConverterRegistry;
-import org.apache.nifi.serialization.record.type.ChoiceDataType;
-import org.apache.nifi.serialization.record.util.DataTypeUtils;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.embedded.SSLConfig;
-import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
-import org.apache.solr.client.solrj.impl.Http2SolrClient;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.client.solrj.util.ClientUtils;
-import org.apache.solr.common.SolrDocument;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.params.MultiMapSolrParams;
-
-public class SolrUtils {
-
- public static final AllowableValue SOLR_TYPE_CLOUD = new AllowableValue(
- "Cloud", "Cloud", "A SolrCloud instance.");
-
- public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue(
- "Standard", "Standard", "A stand-alone Solr instance.");
-
- public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
- .name("Record Writer")
- .displayName("Record Writer")
- .description("The Record Writer to use in order to write Solr documents to FlowFiles. Must be set if \"Records\" is used as return type.")
- .identifiesControllerService(RecordSetWriterFactory.class)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .required(false)
- .build();
-
- public static final PropertyDescriptor SOLR_TYPE = new PropertyDescriptor.Builder()
- .name("Solr Type")
- .description("The type of Solr instance, Cloud or Standard.")
- .required(true)
- .allowableValues(SOLR_TYPE_CLOUD, SOLR_TYPE_STANDARD)
- .defaultValue(SOLR_TYPE_STANDARD.getValue())
- .build();
-
- public static final PropertyDescriptor COLLECTION = new PropertyDescriptor.Builder()
- .name("Collection")
- .description("The Solr collection name, only used with a Solr Type of Cloud")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor SOLR_LOCATION = new PropertyDescriptor.Builder()
- .name("Solr Location")
- .description("The Solr url for a Solr Type of Standard (ex: http://localhost:8984/solr/gettingstarted), " +
- "or the ZooKeeper hosts for a Solr Type of Cloud (ex: localhost:9983).")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- public static final PropertyDescriptor BASIC_USERNAME = new PropertyDescriptor.Builder()
- .name("Username")
- .displayName("Basic Auth Username")
- .description("The username to use when Solr is configured with basic authentication.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- public static final PropertyDescriptor BASIC_PASSWORD = new PropertyDescriptor.Builder()
- .name("Password")
- .displayName("Basic Auth Password")
- .description("The password to use when Solr is configured with basic authentication.")
- .required(true)
- .dependsOn(BASIC_USERNAME)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .sensitive(true)
- .build();
-
- static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
- .name("kerberos-user-service")
- .displayName("Kerberos User Service")
- .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
- .identifiesControllerService(KerberosUserService.class)
- .required(false)
- .build();
-
- public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
- .name("SSL Context Service")
- .description("The Controller Service to use in order to obtain an SSL Context. This property must be set when communicating with a Solr over https.")
- .required(false)
- .identifiesControllerService(SSLContextService.class)
- .build();
-
- public static final PropertyDescriptor SOLR_SOCKET_TIMEOUT = new PropertyDescriptor.Builder()
- .name("Solr Socket Timeout")
- .description("The amount of time to wait for data on a socket connection to Solr. A value of 0 indicates an infinite timeout.")
- .required(true)
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .defaultValue("10 seconds")
- .build();
-
- public static final PropertyDescriptor SOLR_CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
- .name("Solr Connection Timeout")
- .description("The amount of time to wait when establishing a connection to Solr. A value of 0 indicates an infinite timeout.")
- .required(true)
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .defaultValue("10 seconds")
- .build();
-
- public static final PropertyDescriptor SOLR_MAX_CONNECTIONS_PER_HOST = new PropertyDescriptor.Builder()
- .name("Solr Maximum Connections Per Host")
- .description("The maximum number of connections allowed from the Solr client to a single Solr host.")
- .required(true)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .defaultValue("5")
- .build();
-
- public static final String REPEATING_PARAM_PATTERN = "[\\w.]+\\.\\d+$";
- private static final String ROOT_PATH = "/";
-
- public static synchronized SolrClient createSolrClient(final PropertyContext context, final String solrLocation) {
- final int socketTimeout = context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
- final int connectionTimeout = context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
- final Integer maxConnectionsPerHost = context.getProperty(SOLR_MAX_CONNECTIONS_PER_HOST).asInteger();
- final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-
- final SSLConfig sslConfig;
- if (sslContextService == null) {
- sslConfig = new SSLConfig(false, false, null, null, null, null);
- } else {
- sslConfig = new SSLConfig(true, true, sslContextService.getKeyStoreFile(), sslContextService.getKeyStorePassword(), sslContextService.getTrustStoreFile(),
- sslContextService.getTrustStorePassword());
- }
-
- final Http2SolrClient httpClient = new Http2SolrClient.Builder(solrLocation)
- .withConnectionTimeout(connectionTimeout, TimeUnit.MILLISECONDS)
- .withRequestTimeout(socketTimeout, TimeUnit.MILLISECONDS)
- .withMaxConnectionsPerHost(maxConnectionsPerHost)
- .withSSLConfig(sslConfig)
- .build();
-
-
- if (SOLR_TYPE_STANDARD.getValue().equals(context.getProperty(SOLR_TYPE).getValue())) {
- return httpClient;
- } else {
- // CloudSolrClient.Builder now requires a List of ZK addresses and znode for solr as separate parameters
- final String[] zk = solrLocation.split(ROOT_PATH);
- final List zkList = Arrays.asList(zk[0].split(","));
- String zkChrootPath = getZooKeeperChrootPathSuffix(solrLocation);
-
- final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue();
-
- final CloudHttp2SolrClient cloudClient = new CloudHttp2SolrClient.Builder(zkList, Optional.of(zkChrootPath))
- .withHttpClient(httpClient)
- .withDefaultCollection(collection)
- .build();
- return cloudClient;
- }
- }
-
- private static String getZooKeeperChrootPathSuffix(final String solrLocation) {
- String[] zkConnectStringAndChrootSuffix = solrLocation.split("(?=/)", 2);
- if (zkConnectStringAndChrootSuffix.length > 1) {
- final String chrootSuffix = zkConnectStringAndChrootSuffix[1];
- return chrootSuffix;
- } else {
- return ROOT_PATH;
- }
- }
-
- /**
- * Writes each SolrDocument to a record.
- */
- public static RecordSet solrDocumentsToRecordSet(final List docs, final RecordSchema schema) {
- final List lr = new ArrayList();
-
- for (SolrDocument doc : docs) {
- final Map recordValues = new LinkedHashMap<>();
- for (RecordField field : schema.getFields()){
- final Object fieldValue = doc.getFieldValue(field.getFieldName());
- if (fieldValue != null) {
- if (field.getDataType().getFieldType().equals(RecordFieldType.ARRAY)){
- recordValues.put(field.getFieldName(), ((List>) fieldValue).toArray());
- } else {
- recordValues.put(field.getFieldName(), fieldValue);
- }
- }
- }
- lr.add(new MapRecord(schema, recordValues));
- }
- return new ListRecordSet(schema, lr);
- }
-
- public static OutputStreamCallback getOutputStreamCallbackToTransformSolrResponseToXml(QueryResponse response) {
- return new QueryResponseOutputStreamCallback(response);
- }
-
- /**
- * Writes each SolrDocument in XML format to the OutputStream.
- */
- private static class QueryResponseOutputStreamCallback implements OutputStreamCallback {
- private final QueryResponse response;
-
- public QueryResponseOutputStreamCallback(QueryResponse response) {
- this.response = response;
- }
-
- @Override
- public void process(OutputStream out) throws IOException {
- IOUtils.write("", out, StandardCharsets.UTF_8);
- for (SolrDocument doc : response.getResults()) {
- final String xml = ClientUtils.toXML(toSolrInputDocument(doc));
- IOUtils.write(xml, out, StandardCharsets.UTF_8);
- }
- IOUtils.write("", out, StandardCharsets.UTF_8);
- }
-
- public SolrInputDocument toSolrInputDocument(SolrDocument d) {
- final SolrInputDocument doc = new SolrInputDocument();
-
- for (String name : d.getFieldNames()) {
- doc.addField(name, d.getFieldValue(name));
- }
-
- return doc;
- }
- }
-
- public static Map getRequestParams(ProcessContext context, FlowFile flowFile) {
- final Map paramsMap = new HashMap<>();
- final SortedMap repeatingParams = new TreeMap<>();
-
- for (final Map.Entry entry : context.getProperties().entrySet()) {
- final PropertyDescriptor descriptor = entry.getKey();
- if (descriptor.isDynamic()) {
- final String paramName = descriptor.getName();
- final String paramValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue();
-
- if (!paramValue.trim().isEmpty()) {
- if (paramName.matches(REPEATING_PARAM_PATTERN)) {
- repeatingParams.put(paramName, paramValue);
- } else {
- MultiMapSolrParams.addParam(paramName, paramValue, paramsMap);
- }
- }
- }
- }
-
- for (final Map.Entry entry : repeatingParams.entrySet()) {
- final String paramName = entry.getKey();
- final String paramValue = entry.getValue();
- final int idx = paramName.lastIndexOf(".");
- MultiMapSolrParams.addParam(paramName.substring(0, idx), paramValue, paramsMap);
- }
-
- return paramsMap;
- }
-
- /**
- * Writes each Record as a SolrInputDocument.
- */
- public static void writeRecord(final Record record, final SolrInputDocument inputDocument,final List fieldsToIndex,String parentFieldName)
- throws IOException {
- RecordSchema schema = record.getSchema();
-
- for (int i = 0; i < schema.getFieldCount(); i++) {
- final RecordField field = schema.getField(i);
-
- String fieldName;
- if (StringUtils.isBlank(parentFieldName)) {
- fieldName = field.getFieldName();
- } else {
- // Prefixing parent field name
- fieldName = parentFieldName + "_" + field.getFieldName();
- }
-
- final Object value = record.getValue(field);
- if (value != null) {
- final DataType dataType = schema.getDataType(field.getFieldName()).orElse(null);
- writeValue(inputDocument, value, fieldName, dataType, fieldsToIndex);
- }
- }
- }
-
- private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List fieldsToIndex) throws IOException {
- final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
- final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName);
- if (coercedValue == null) {
- return;
- }
-
- switch (chosenDataType.getFieldType()) {
- case DATE: {
- final FieldConverter