NIFI-13007 Removed nifi-solr-bundle

This closes #8705

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Joseph Witt 2024-03-13 10:55:54 -05:00 committed by exceptionfactory
parent a1610cf129
commit 19aab9bbba
No known key found for this signature in database
20 changed files with 0 additions and 3292 deletions

View File

@ -1013,23 +1013,6 @@ language governing permissions and limitations under the License. -->
</dependency>
</dependencies>
</profile>
<profile>
<id>include-solr</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>allProfiles</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-nar</artifactId>
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies>
</profile>
<profile>
<id>include-iotdb</id>
<activation>

View File

@ -1425,11 +1425,6 @@
<artifactId>nifi-twitter-processors</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-processors</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-splunk-processors</artifactId>

View File

@ -2703,7 +2703,6 @@ deprecationLogger.warn(
| Apache HBase Bundle | include-hbase | Adds support for Apache HBase
| Apache IoTDB Bundle | include-iotdb | Adds support for Apache IoTDB
| Apache Kudu Bundle | include-kudu | Adds support for Apache Kudu
| Apache Solr Bundle | include-solr | Adds support for Apache Solr
| ASN.1 Support | include-asn1 | Adds support for ASN.1
| Contribution Check | contrib-check | Runs various quality checks that are required to be accepted before a contribution can be accepted into the core NiFi code base.
| Graph Database Bundle | include-graph | Adds support for various common graph database scenarios. Support is currently for https://neo4j.com/developer/cypher[Cypher] and https://tinkerpop.apache.org/gremlin.html[Gremlin]-compatible databases such as Neo4J and JanusGraph. Includes controller services that provide driver functionality and a suite of processors for ingestion and querying.

View File

@ -1,41 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-solr-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-processors</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-shared-nar</artifactId>
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -1,236 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.
The binary distribution of this product bundles 'Woodstox StAX 2 API' which is
"licensed under standard BSD license"
The binary distribution of this product bundles 'Bouncy Castle JDK 1.5'
under an MIT style license.
Copyright (c) 2000 - 2015 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -1,91 +0,0 @@
nifi-solr-nar
Copyright 2014-2024 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons IO
The following NOTICE information applies:
Apache Commons IO
Copyright 2002-2016 The Apache Software Foundation
(ASLv2) Apache HttpComponents
The following NOTICE information applies:
Apache HttpClient
Copyright 1999-2015 The Apache Software Foundation
Apache HttpCore
Copyright 2005-2014 The Apache Software Foundation
Apache HttpMime
Copyright 1999-2013 The Apache Software Foundation
This project contains annotations derived from JCIP-ANNOTATIONS
Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2015 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
(ASLv2) Apache Commons Codec
The following NOTICE information applies:
Apache Commons Codec
Copyright 2002-2014 The Apache Software Foundation
src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
contains test data from http://aspell.net/test/orig/batch0.tab.
Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
===============================================================================
The content of package org.apache.commons.codec.language.bm has been translated
from the original php source code available at http://stevemorse.org/phoneticinfo.htm
with permission from the original authors.
Original source copyright:
Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
(ASLv2) Apache ZooKeeper
The following NOTICE information applies:
Apache ZooKeeper
Copyright 2009-2012 The Apache Software Foundation
(ASLv2) Woodstox Core ASL
The following NOTICE information applies:
This product currently only contains code developed by authors
of specific components, as identified by the source code files.
Since product implements StAX API, it has dependencies to StAX API
classes.
(ASLv2) Jackson JSON processor
The following NOTICE information applies:
# Jackson JSON processor
Jackson is a high-performance, Free/Open Source JSON processing library.
It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
been in development since 2007.
It is currently developed by a community of developers, as well as supported
commercially by FasterXML.com.
## Licensing
Jackson core and extension components may licensed under different licenses.
To find the details that apply to this artifact see the accompanying LICENSE file.
For more information, including possible other licensing options, contact
FasterXML.com (http://fasterxml.com).
## Credits
A list of contributors may be found from CREDITS file, which is included
in some artifacts (usually source distributions); but is always available
from the source code management (SCM) system project uses.

View File

@ -1,106 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-solr-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
<version>${solr.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-kerberos</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-kerberos-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-user-service-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>src/test/resources/solr/solr.xml</exclude>
<exclude>src/test/resources/solr/testCollection/core.properties</exclude>
<exclude>src/test/resources/solr/testCollection/conf/_rest_managed.json</exclude>
<exclude>src/test/resources/solr/testCollection/conf/protowords.txt</exclude>
<exclude>src/test/resources/solr/testCollection/conf/schema.xml</exclude>
<exclude>src/test/resources/solr/testCollection/conf/solrconfig.xml</exclude>
<exclude>src/test/resources/solr/testCollection/conf/synonyms.txt</exclude>
<exclude>src/test/resources/solr/testCollection/conf/lang/stopwords_en.txt</exclude>
<exclude>src/test/resources/testdata/test-csv-multiple-docs.csv</exclude>
<exclude>src/test/resources/testdata/test-custom-json-single-doc.json</exclude>
<exclude>src/test/resources/testdata/test-solr-json-multiple-docs.json</exclude>
<exclude>src/test/resources/testdata/test-xml-multiple-docs.xml</exclude>
<exclude>src/test/resources/log4j.properties</exclude>
<exclude>src/test/resources/jaas-client.conf</exclude>
<exclude>src/test/resources/test-schema.avsc</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -1,428 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import java.io.IOException;
import java.io.OutputStream;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.params.CursorMarkParams;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
import static org.apache.nifi.processors.solr.SolrUtils.KERBEROS_USER_SERVICE;
import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
@Tags({"Apache", "Solr", "Get", "Pull", "Records"})
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer")
@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class GetSolr extends SolrProcessor {
public static final String STATE_MANAGER_FILTER = "stateManager_filter";
public static final String STATE_MANAGER_CURSOR_MARK = "stateManager_cursorMark";
public static final AllowableValue MODE_XML = new AllowableValue("XML");
public static final AllowableValue MODE_REC = new AllowableValue("Records");
public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor
.Builder().name("Return Type")
.displayName("Return Type")
.description("Write Solr documents to FlowFiles as XML or using a Record Writer")
.required(true)
.allowableValues(MODE_XML, MODE_REC)
.defaultValue(MODE_XML.getValue())
.build();
public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor
.Builder().name("Solr Query")
.displayName("Solr Query")
.description("A query to execute against Solr")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DATE_FIELD = new PropertyDescriptor
.Builder().name("Date Field")
.displayName("Date Field")
.description("The name of a date field in Solr used to filter results")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DATE_FILTER = new PropertyDescriptor
.Builder().name("Initial Date Filter")
.displayName("Initial Date Filter")
.description("Date value to filter results. Documents with an earlier date will not be fetched. The format has to correspond to the date pattern of Solr 'YYYY-MM-DDThh:mm:ssZ'")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor RETURN_FIELDS = new PropertyDescriptor
.Builder().name("Return Fields")
.displayName("Return Fields")
.description("Comma-separated list of field names to return")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor
.Builder().name("Batch Size")
.displayName("Batch Size")
.description("Number of rows per Solr query")
.required(true)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.defaultValue("100")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("The results of querying Solr")
.build();
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US);
private final AtomicBoolean configurationRestored = new AtomicBoolean(false);
private final AtomicBoolean clearState = new AtomicBoolean(false);
private final AtomicBoolean dateFieldNotInSpecifiedFieldsList = new AtomicBoolean(false);
private volatile String id_field = null;
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
@Override
protected void init(final ProcessorInitializationContext context) {
super.init(context);
this.descriptors = List.of(
SOLR_TYPE,
SOLR_LOCATION,
COLLECTION,
RETURN_TYPE,
RECORD_WRITER,
SOLR_QUERY,
DATE_FIELD,
DATE_FILTER,
RETURN_FIELDS,
BATCH_SIZE,
KERBEROS_USER_SERVICE,
BASIC_USERNAME,
BASIC_PASSWORD,
SSL_CONTEXT_SERVICE,
SOLR_SOCKET_TIMEOUT,
SOLR_CONNECTION_TIMEOUT,
SOLR_MAX_CONNECTIONS_PER_HOST
);
this.relationships = Set.of(REL_SUCCESS);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return this.descriptors;
}
final static Set<String> propertyNamesForActivatingClearState = new HashSet<>();
static {
propertyNamesForActivatingClearState.add(SOLR_TYPE.getName());
propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName());
propertyNamesForActivatingClearState.add(COLLECTION.getName());
propertyNamesForActivatingClearState.add(SOLR_QUERY.getName());
propertyNamesForActivatingClearState.add(DATE_FIELD.getName());
propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName());
propertyNamesForActivatingClearState.add(DATE_FILTER.getName());
}
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
if (configurationRestored.get() && propertyNamesForActivatingClearState.contains(descriptor.getName()))
clearState.set(true);
}
@OnConfigurationRestored
public void onConfigurationRestored() {
configurationRestored.set(true);
}
@OnScheduled
public void clearState(final ProcessContext context) throws IOException {
if (clearState.getAndSet(false))
context.getStateManager().clear(Scope.CLUSTER);
final Map<String, String> stateMap = new HashMap<>(context.getStateManager().getState(Scope.CLUSTER).toMap());
final AtomicBoolean stateMapHasChanged = new AtomicBoolean(false);
if (stateMap.get(STATE_MANAGER_CURSOR_MARK) == null) {
stateMap.put(STATE_MANAGER_CURSOR_MARK, "*");
stateMapHasChanged.set(true);
}
if (stateMap.get(STATE_MANAGER_FILTER) == null) {
final String initialDate = context.getProperty(DATE_FILTER).getValue();
if (StringUtils.isBlank(initialDate))
stateMap.put(STATE_MANAGER_FILTER, "*");
else
stateMap.put(STATE_MANAGER_FILTER, initialDate);
stateMapHasChanged.set(true);
}
if (stateMapHasChanged.get()) {
context.getStateManager().setState(stateMap, Scope.CLUSTER);
}
id_field = null;
}
@Override
protected final Collection<ValidationResult> additionalCustomValidation(ValidationContext context) {
final Collection<ValidationResult> problems = new ArrayList<>();
if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue())
&& !context.getProperty(RECORD_WRITER).isSet()) {
problems.add(new ValidationResult.Builder()
.explanation("for writing records a record writer has to be configured")
.valid(false)
.subject("Record writer check")
.build());
}
return problems;
}
private String getFieldNameOfUniqueKey() {
final SolrQuery solrQuery = new SolrQuery();
try {
solrQuery.setRequestHandler("/schema/uniquekey");
final QueryRequest req = new QueryRequest(solrQuery);
if (isBasicAuthEnabled()) {
req.setBasicAuthCredentials(getUsername(), getPassword());
}
return(req.process(getSolrClient()).getResponse().get("uniqueKey").toString());
} catch (SolrServerException | IOException e) {
getLogger().error("Solr query to retrieve uniqueKey-field failed due to {}", solrQuery.toString(), e, e);
throw new ProcessException(e);
}
}
@Override
public void doOnTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
final AtomicBoolean continuePaging = new AtomicBoolean(true);
final SolrQuery solrQuery = new SolrQuery();
try {
if (id_field == null) {
id_field = getFieldNameOfUniqueKey();
}
final String dateField = context.getProperty(DATE_FIELD).getValue();
final Map<String, String> stateMap = new HashMap<>(session.getState(Scope.CLUSTER).toMap());
solrQuery.setQuery("*:*");
final String query = context.getProperty(SOLR_QUERY).getValue();
if (!StringUtils.isBlank(query) && !query.equals("*:*")) {
solrQuery.addFilterQuery(query);
}
final String automatedFilterQuery = "%s:[%s TO *]".formatted(dateField, stateMap.get(STATE_MANAGER_FILTER));
solrQuery.addFilterQuery(automatedFilterQuery);
final List<String> fieldList = new ArrayList<>();
final String returnFields = context.getProperty(RETURN_FIELDS).getValue();
if (!StringUtils.isBlank(returnFields)) {
fieldList.addAll(Arrays.asList(returnFields.trim().split(",")));
if (!fieldList.contains(dateField)) {
fieldList.add(dateField);
dateFieldNotInSpecifiedFieldsList.set(true);
}
for (String returnField : fieldList) {
solrQuery.addField(returnField.trim());
}
}
solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, stateMap.get(STATE_MANAGER_CURSOR_MARK));
solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger());
final String sortClause = "%s asc, %s asc".formatted(dateField, id_field);
solrQuery.setParam("sort", sortClause);
while (continuePaging.get()) {
StopWatch timer = new StopWatch(true);
final QueryRequest req = new QueryRequest(solrQuery);
if (isBasicAuthEnabled()) {
req.setBasicAuthCredentials(getUsername(), getPassword());
}
logger.debug(solrQuery.toQueryString());
final QueryResponse response = req.process(getSolrClient());
final SolrDocumentList documentList = response.getResults();
if (response.getResults().size() > 0) {
final SolrDocument lastSolrDocument = documentList.get(response.getResults().size()-1);
final Object dateObject = lastSolrDocument.get(dateField);
final String latestDateValue = getDateFormatted(dateObject);
final String newCursorMark = response.getNextCursorMark();
solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, newCursorMark);
stateMap.put(STATE_MANAGER_CURSOR_MARK, newCursorMark);
stateMap.put(STATE_MANAGER_FILTER, latestDateValue);
FlowFile flowFile = session.create();
flowFile = session.putAttribute(flowFile, "solrQuery", solrQuery.toString());
if (context.getProperty(RETURN_TYPE).getValue().equals(MODE_XML.getValue())){
if (dateFieldNotInSpecifiedFieldsList.get()) {
for (SolrDocument doc : response.getResults()) {
doc.removeFields(dateField);
}
}
flowFile = session.write(flowFile, SolrUtils.getOutputStreamCallbackToTransformSolrResponseToXml(response));
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/xml");
} else {
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).evaluateAttributeExpressions()
.asControllerService(RecordSetWriterFactory.class);
final RecordSchema schema = writerFactory.getSchema(null, null);
final RecordSet recordSet = SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema);
final StringBuffer mimeType = new StringBuffer();
final FlowFile flowFileRef = flowFile;
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try {
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, flowFileRef);
writer.write(recordSet);
writer.flush();
mimeType.append(writer.getMimeType());
} catch (SchemaNotFoundException e) {
throw new ProcessException("Could not parse Solr response", e);
}
}
});
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), mimeType.toString());
}
timer.stop();
StringBuilder transitUri = new StringBuilder("solr://");
transitUri.append(getSolrLocation());
if (getSolrLocation().equals(SolrUtils.SOLR_TYPE_CLOUD.getValue())) {
transitUri.append(":").append(context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue());
}
final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration);
session.transfer(flowFile, REL_SUCCESS);
}
continuePaging.set(response.getResults().size() == Integer.parseInt(context.getProperty(BATCH_SIZE).getValue()));
}
session.setState(stateMap, Scope.CLUSTER);
} catch (final SolrServerException | SchemaNotFoundException | IOException e) {
context.yield();
session.rollback();
logger.error("Failed to execute query {} due to {}", solrQuery.toString(), e, e);
throw new ProcessException(e);
} catch (final Throwable t) {
context.yield();
session.rollback();
logger.error("Failed to execute query {} due to {}", solrQuery.toString(), t, t);
throw t;
}
}
private String getDateFormatted(final Object dateObject) {
final String formatted;
if (dateObject instanceof Date date) {
final OffsetDateTime dateTime = date.toInstant().atOffset(ZoneOffset.UTC);
formatted = DATE_TIME_FORMATTER.format(dateTime);
} else if (dateObject == null) {
formatted = null;
} else {
throw new IllegalArgumentException("Date Object Type [%s] not supported".formatted(dateObject.getClass()));
}
return formatted;
}
}

View File

@ -1,283 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.MultiMapSolrParams;
import org.apache.solr.common.util.ContentStreamBase;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
import static org.apache.nifi.processors.solr.SolrUtils.KERBEROS_USER_SERVICE;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
@Tags({"Apache", "Solr", "Put", "Send"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to Solr")
@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
description="These parameters will be passed to Solr on the request")
public class PutSolrContentStream extends SolrProcessor {
public static final PropertyDescriptor CONTENT_STREAM_PATH = new PropertyDescriptor
.Builder().name("Content Stream Path")
.description("The path in Solr to post the ContentStream")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("/update/json/docs")
.build();
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor
.Builder().name("Content-Type")
.description("Content-Type being sent to Solr")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("application/json")
.build();
public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor
.Builder().name("Commit Within")
.description("The number of milliseconds before the given update is committed")
.required(false)
.addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("5000")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("The original FlowFile")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed for any reason other than Solr being unreachable")
.build();
public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
.name("connection_failure")
.description("FlowFiles that failed because Solr is unreachable")
.build();
public static final String COLLECTION_PARAM_NAME = "collection";
public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
@Override
protected void init(final ProcessorInitializationContext context) {
super.init(context);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(SOLR_TYPE);
descriptors.add(SOLR_LOCATION);
descriptors.add(COLLECTION);
descriptors.add(CONTENT_STREAM_PATH);
descriptors.add(CONTENT_TYPE);
descriptors.add(COMMIT_WITHIN);
descriptors.add(KERBEROS_USER_SERVICE);
descriptors.add(BASIC_USERNAME);
descriptors.add(BASIC_PASSWORD);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(SOLR_SOCKET_TIMEOUT);
descriptors.add(SOLR_CONNECTION_TIMEOUT);
descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_CONNECTION_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return this.descriptors;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter")
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
}
@Override
protected void doOnTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
final AtomicReference<Exception> error = new AtomicReference<>(null);
final AtomicReference<Exception> connectionError = new AtomicReference<>(null);
final boolean isSolrCloud = SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue());
final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
final String contentStreamPath = context.getProperty(CONTENT_STREAM_PATH).evaluateAttributeExpressions(flowFile).getValue();
final MultiMapSolrParams requestParams = new MultiMapSolrParams(SolrUtils.getRequestParams(context, flowFile));
StopWatch timer = new StopWatch(true);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
ContentStreamUpdateRequest request = new ContentStreamUpdateRequest(contentStreamPath);
request.setParams(new ModifiableSolrParams());
// add the extra params, don't use 'set' in case of repeating params
Iterator<String> paramNames = requestParams.getParameterNamesIterator();
while (paramNames.hasNext()) {
String paramName = paramNames.next();
for (String paramValue : requestParams.getParams(paramName)) {
request.getParams().add(paramName, paramValue);
}
}
// specify the collection for SolrCloud
if (isSolrCloud) {
request.setParam(COLLECTION_PARAM_NAME, collection);
}
if (commitWithin != null && commitWithin > 0) {
request.setParam(COMMIT_WITHIN_PARAM_NAME, commitWithin.toString());
}
// if a username and password were provided then pass them for basic auth
if (isBasicAuthEnabled()) {
request.setBasicAuthCredentials(getUsername(), getPassword());
}
try (final BufferedInputStream bufferedIn = new BufferedInputStream(in)) {
// add the FlowFile's content on the UpdateRequest
request.addContentStream(new ContentStreamBase() {
@Override
public InputStream getStream() throws IOException {
return bufferedIn;
}
@Override
public String getContentType() {
return context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(flowFile).getValue();
}
});
UpdateResponse response = request.process(getSolrClient());
getLogger().debug("Got {} response from Solr", new Object[]{response.getStatus()});
} catch (SolrException e) {
error.set(e);
} catch (SolrServerException e) {
if (causedByIOException(e)) {
connectionError.set(e);
} else {
error.set(e);
}
} catch (IOException e) {
connectionError.set(e);
}
}
});
timer.stop();
if (error.get() != null) {
getLogger().error("Failed to send {} to Solr due to {}; routing to failure",
new Object[]{flowFile, error.get()});
session.transfer(flowFile, REL_FAILURE);
} else if (connectionError.get() != null) {
getLogger().error("Failed to send {} to Solr due to {}; routing to connection_failure",
new Object[]{flowFile, connectionError.get()});
session.penalize(flowFile);
session.transfer(flowFile, REL_CONNECTION_FAILURE);
} else {
StringBuilder transitUri = new StringBuilder("solr://");
transitUri.append(getSolrLocation());
if (isSolrCloud) {
transitUri.append(":").append(collection);
}
final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
session.getProvenanceReporter().send(flowFile, transitUri.toString(), duration, true);
getLogger().info("Successfully sent {} to Solr in {} millis", new Object[]{flowFile, duration});
session.transfer(flowFile, REL_SUCCESS);
}
}
private boolean causedByIOException(SolrServerException e) {
boolean foundIOException = false;
Throwable cause = e.getCause();
while (cause != null) {
if (cause instanceof IOException) {
foundIOException = true;
break;
}
cause = cause.getCause();
}
return foundIOException;
}
}

View File

@ -1,328 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.MultiMapSolrParams;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
import static org.apache.nifi.processors.solr.SolrUtils.KERBEROS_USER_SERVICE;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
@Tags({"Apache", "Solr", "Put", "Send","Record"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
description="These parameters will be passed to Solr on the request")
public class PutSolrRecord extends SolrProcessor {
public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor.Builder()
.name("Solr Update Path")
.description("The path in Solr to post the FlowFile Records")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("/update")
.build();
public static final PropertyDescriptor FIELDS_TO_INDEX = new PropertyDescriptor.Builder()
.name("Fields To Index")
.description("Comma-separated list of field names to write")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor.Builder()
.name("Commit Within")
.description("The number of milliseconds before the given update is committed")
.required(false)
.addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("5000")
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The number of solr documents to index per batch")
.required(false)
.addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("500")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("The FlowFile is routed to this relationship when it has been successfully sent to Solr")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed for any reason other than Solr being unreachable")
.build();
public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder()
.name("connection_failure")
.description("FlowFiles that failed because Solr is unreachable")
.build();
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("put-solr-record-record-reader")
.displayName("Record Reader")
.description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
public static final String COLLECTION_PARAM_NAME = "collection";
public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
private static final String EMPTY_STRING = "";
@Override
protected void init(final ProcessorInitializationContext context) {
super.init(context);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(SOLR_TYPE);
descriptors.add(SOLR_LOCATION);
descriptors.add(COLLECTION);
descriptors.add(UPDATE_PATH);
descriptors.add(RECORD_READER);
descriptors.add(FIELDS_TO_INDEX);
descriptors.add(COMMIT_WITHIN);
descriptors.add(KERBEROS_USER_SERVICE);
descriptors.add(BASIC_USERNAME);
descriptors.add(BASIC_PASSWORD);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(SOLR_SOCKET_TIMEOUT);
descriptors.add(SOLR_CONNECTION_TIMEOUT);
descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
descriptors.add(BATCH_SIZE);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_CONNECTION_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return this.descriptors;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter")
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
}
@Override
public void doOnTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
final AtomicReference<Exception> error = new AtomicReference<>(null);
final AtomicReference<Exception> connectionError = new AtomicReference<>(null);
final boolean isSolrCloud = SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue());
final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
final String contentStreamPath = context.getProperty(UPDATE_PATH).evaluateAttributeExpressions(flowFile).getValue();
final MultiMapSolrParams requestParams = new MultiMapSolrParams(SolrUtils.getRequestParams(context, flowFile));
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final String fieldsToIndex = context.getProperty(FIELDS_TO_INDEX).evaluateAttributeExpressions(flowFile).getValue();
final Long batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(flowFile).asLong();
final List<String> fieldList = new ArrayList<>();
if (!StringUtils.isBlank(fieldsToIndex)) {
Arrays.asList(fieldsToIndex.split(",")).forEach(f -> fieldList.add(f.trim()));
}
StopWatch timer = new StopWatch(true);
try (final InputStream in = session.read(flowFile);
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
Record record;
List<SolrInputDocument> inputDocumentList = new LinkedList<>();
try {
while ((record = reader.nextRecord()) != null) {
SolrInputDocument inputDoc = new SolrInputDocument();
writeRecord(record, inputDoc, fieldList, EMPTY_STRING);
inputDocumentList.add(inputDoc);
if (inputDocumentList.size() == batchSize) {
index(isSolrCloud, collection, commitWithin, contentStreamPath, requestParams, inputDocumentList);
inputDocumentList = new ArrayList<>();
}
index(isSolrCloud, collection, commitWithin, contentStreamPath, requestParams, inputDocumentList);
}
} catch (SolrException e) {
error.set(e);
} catch (SolrServerException e) {
if (causedByIOException(e)) {
//Exit in case of a solr connection error
connectionError.set(e);
} else {
error.set(e);
}
} catch (IOException e) {
//Exit in case of a solr connection error
connectionError.set(e);
}
} catch (final IOException | SchemaNotFoundException | MalformedRecordException e) {
getLogger().error("Could not parse incoming data", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
timer.stop();
if (error.get() != null) {
getLogger().error("Failed to send all the records of the {} to Solr due to {}; routing to failure",
new Object[]{flowFile, error.get()});
session.transfer(flowFile, REL_FAILURE);
} else if (connectionError.get() != null) {
getLogger().error("Failed to send {} to Solr due to {}; routing to connection_failure",
new Object[]{flowFile, connectionError.get()});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_CONNECTION_FAILURE);
} else {
StringBuilder transitUri = new StringBuilder("solr://");
transitUri.append(getSolrLocation());
if (isSolrCloud) {
transitUri.append(":").append(collection);
}
final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
session.getProvenanceReporter().send(flowFile, transitUri.toString(), duration, true);
getLogger().info("Successfully sent {} to Solr in {} millis", new Object[]{flowFile, duration});
session.transfer(flowFile, REL_SUCCESS);
}
}
private void index(boolean isSolrCloud, String collection, Long commitWithin, String contentStreamPath, MultiMapSolrParams requestParams, List<SolrInputDocument> inputDocumentList)
throws IOException, SolrServerException,SolrException {
UpdateRequest request = new UpdateRequest(contentStreamPath);
request.setParams(new ModifiableSolrParams());
// add the extra params, don't use 'set' in case of repeating params
Iterator<String> paramNames = requestParams.getParameterNamesIterator();
while (paramNames.hasNext()) {
String paramName = paramNames.next();
for (String paramValue : requestParams.getParams(paramName)) {
request.getParams().add(paramName, paramValue);
}
}
// specify the collection for SolrCloud
if (collection != null) {
request.setParam(COLLECTION_PARAM_NAME, collection);
}
if (commitWithin != null && commitWithin > 0) {
request.setParam(COMMIT_WITHIN_PARAM_NAME, commitWithin.toString());
}
// if a username and password were provided then pass them for basic auth
if (isBasicAuthEnabled()) {
request.setBasicAuthCredentials(getUsername(), getPassword());
}
request.add(inputDocumentList);
UpdateResponse response = request.process(getSolrClient());
getLogger().debug("Got {} response from Solr", new Object[]{response.getStatus()});
inputDocumentList.clear();
}
private boolean causedByIOException(SolrServerException e) {
boolean foundIOException = false;
Throwable cause = e.getCause();
while (cause != null) {
if (cause instanceof IOException) {
foundIOException = true;
break;
}
cause = cause.getCause();
}
return foundIOException;
}
}

View File

@ -1,615 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StopWatch;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.FacetField;
import org.apache.solr.client.solrj.response.FieldStatsInfo;
import org.apache.solr.client.solrj.response.IntervalFacet;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.RangeFacet;
import org.apache.solr.client.solrj.response.RangeFacet.Count;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.FacetParams;
import org.apache.solr.common.params.MultiMapSolrParams;
import org.apache.solr.common.params.StatsParams;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
import static org.apache.nifi.processors.solr.SolrUtils.KERBEROS_USER_SERVICE;
import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
@Tags({"Apache", "Solr", "Get", "Query", "Records"})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer")
@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
description="These parameters will be passed to Solr on the request")
@WritesAttributes({
@WritesAttribute(attribute = "solr.connect", description = "Solr connect string"),
@WritesAttribute(attribute = "solr.collection", description = "Solr collection"),
@WritesAttribute(attribute = "solr.query", description = "Query string sent to Solr"),
@WritesAttribute(attribute = "solr.cursor.mark", description = "Cursor mark can be used for scrolling Solr"),
@WritesAttribute(attribute = "solr.status.code", description = "Status code of Solr request. A status code of 0 indicates that the request was successfully processed"),
@WritesAttribute(attribute = "solr.query.time", description = "The elapsed time to process the query (in ms)"),
@WritesAttribute(attribute = "solr.start", description = "Solr start parameter (result offset) for the query"),
@WritesAttribute(attribute = "solr.rows", description = "Number of Solr documents to be returned for the query"),
@WritesAttribute(attribute = "solr.number.results", description = "Number of Solr documents that match the query"),
@WritesAttribute(attribute = "mime.type", description = "The mime type of the data format"),
@WritesAttribute(attribute = "querysolr.exeption.class", description = "The Java exception class raised when the processor fails"),
@WritesAttribute(attribute = "querysolr.exeption.message", description = "The Java exception message raised when the processor fails")
})
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class QuerySolr extends SolrProcessor {
public static final AllowableValue MODE_XML = new AllowableValue("XML");
public static final AllowableValue MODE_REC = new AllowableValue("Records");
public static final AllowableValue RETURN_TOP_RESULTS = new AllowableValue("return_only_top_results", "Only top results");
public static final AllowableValue RETURN_ALL_RESULTS = new AllowableValue("return_all_results", "Entire results");
public static final String MIME_TYPE_JSON = "application/json";
public static final String MIME_TYPE_XML = "application/xml";
public static final String ATTRIBUTE_SOLR_CONNECT = "solr.connect";
public static final String ATTRIBUTE_SOLR_COLLECTION = "solr.collection";
public static final String ATTRIBUTE_SOLR_QUERY = "solr.query";
public static final String ATTRIBUTE_CURSOR_MARK = "solr.cursor.mark";
public static final String ATTRIBUTE_SOLR_STATUS = "solr.status.code";
public static final String ATTRIBUTE_SOLR_START = "solr.start";
public static final String ATTRIBUTE_SOLR_ROWS = "solr.rows";
public static final String ATTRIBUTE_SOLR_NUMBER_RESULTS = "solr.number.results";
public static final String ATTRIBUTE_QUERY_TIME = "solr.query.time";
public static final String EXCEPTION = "querysolr.exeption";
public static final String EXCEPTION_MESSAGE = "querysolr.exeption.message";
public static final Integer UPPER_LIMIT_START_PARAM = 10000;
public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor
.Builder().name("return_type")
.displayName("Return Type")
.description("Output format of Solr results. Write Solr documents to FlowFiles as XML or using a Record Writer")
.required(true)
.allowableValues(MODE_XML, MODE_REC)
.defaultValue(MODE_XML.getValue())
.build();
public static final PropertyDescriptor SOLR_PARAM_QUERY = new PropertyDescriptor
.Builder().name("solr_param_query")
.displayName("Solr Query")
.description("Solr Query, e. g. field:value")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("*:*")
.build();
public static final PropertyDescriptor SOLR_PARAM_REQUEST_HANDLER = new PropertyDescriptor
.Builder().name("solr_param_request_handler")
.displayName("Request Handler")
.description("Define a request handler here, e. g. /query")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("/select")
.build();
public static final PropertyDescriptor SOLR_PARAM_FIELD_LIST = new PropertyDescriptor
.Builder().name("solr_param_field_list")
.displayName("Field List")
.description("Comma separated list of fields to be included into results, e. g. field1,field2")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor SOLR_PARAM_SORT = new PropertyDescriptor
.Builder().name("solr_param_sort")
.displayName("Sorting of result list")
.description("Comma separated sort clauses to define the sorting of results, e. g. field1 asc, field2 desc")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor SOLR_PARAM_START = new PropertyDescriptor
.Builder().name("solr_param_start")
.displayName("Start of results")
.description("Offset of result set")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor SOLR_PARAM_ROWS = new PropertyDescriptor
.Builder().name("solr_param_rows")
.displayName("Rows")
.description("Number of results to be returned for a single request")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor AMOUNT_DOCUMENTS_TO_RETURN = new PropertyDescriptor
.Builder().name("amount_documents_to_return")
.displayName("Total amount of returned results")
.description("Total amount of Solr documents to be returned. If this property is set to \"Only top results\", " +
"only single requests will be sent to Solr and the results will be written into single FlowFiles. If it is set to " +
"\"Entire results\", all results matching to the query are retrieved via multiple Solr requests and " +
"returned in multiple FlowFiles. For both options, the number of Solr documents to be returned in a FlowFile depends on " +
"the configuration of the \"Rows\" property")
.required(true)
.allowableValues(RETURN_ALL_RESULTS, RETURN_TOP_RESULTS)
.defaultValue(RETURN_TOP_RESULTS.getValue())
.build();
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Specifies the value to send for the '" + propertyDescriptorName + "' Solr parameter")
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
}
public static final Relationship RESULTS = new Relationship.Builder().name("results")
.description("Results of Solr queries").build();
public static final Relationship FACETS = new Relationship.Builder().name("facets")
.description("Results of faceted search").build();
public static final Relationship STATS = new Relationship.Builder().name("stats")
.description("Stats about Solr index").build();
public static final Relationship ORIGINAL = new Relationship.Builder().name("original")
.description("Original flowfile").build();
public static final Relationship FAILURE = new Relationship.Builder().name("failure")
.description("Failure relationship").build();
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return this.descriptors;
}
@Override
protected void init(final ProcessorInitializationContext context) {
super.init(context);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(SOLR_TYPE);
descriptors.add(SOLR_LOCATION);
descriptors.add(COLLECTION);
descriptors.add(RETURN_TYPE);
descriptors.add(RECORD_WRITER);
descriptors.add(SOLR_PARAM_QUERY);
descriptors.add(SOLR_PARAM_REQUEST_HANDLER);
descriptors.add(SOLR_PARAM_FIELD_LIST);
descriptors.add(SOLR_PARAM_SORT);
descriptors.add(SOLR_PARAM_START);
descriptors.add(SOLR_PARAM_ROWS);
descriptors.add(AMOUNT_DOCUMENTS_TO_RETURN);
descriptors.add(KERBEROS_USER_SERVICE);
descriptors.add(BASIC_USERNAME);
descriptors.add(BASIC_PASSWORD);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(SOLR_SOCKET_TIMEOUT);
descriptors.add(SOLR_CONNECTION_TIMEOUT);
descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(FAILURE);
relationships.add(RESULTS);
relationships.add(FACETS);
relationships.add(STATS);
relationships.add(ORIGINAL);
this.relationships = Collections.unmodifiableSet(relationships);
}
public static final Set<String> SUPPORTED_SEARCH_COMPONENTS = new HashSet<>();
static {
SUPPORTED_SEARCH_COMPONENTS.addAll(Arrays.asList(StatsParams.STATS, FacetParams.FACET));
}
public static final Set<String> SEARCH_COMPONENTS_ON = new HashSet<>();
static {
SEARCH_COMPONENTS_ON.addAll(Arrays.asList("true", "on", "yes"));
}
@Override
protected final Collection<ValidationResult> additionalCustomValidation(ValidationContext context) {
final Collection<ValidationResult> problems = new ArrayList<>();
if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue())
&& !context.getProperty(RECORD_WRITER).isSet()) {
problems.add(new ValidationResult.Builder()
.explanation("for writing records a record writer has to be configured")
.valid(false)
.subject("Record writer check")
.build());
}
return problems;
}
@Override
public void doOnTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
FlowFile flowFileOriginal = session.get();
FlowFile flowFileResponse;
if (flowFileOriginal == null) {
if (context.hasNonLoopConnection()) {
return;
}
flowFileResponse = session.create();
} else {
flowFileResponse = session.create(flowFileOriginal);
}
final SolrQuery solrQuery = new SolrQuery();
final boolean isSolrCloud = SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue());
final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFileResponse).getValue();
final StringBuilder transitUri = new StringBuilder("solr://");
transitUri.append(getSolrLocation());
if (isSolrCloud) {
transitUri.append(":").append(collection);
}
final StopWatch timer = new StopWatch(false);
try {
solrQuery.setQuery(context.getProperty(SOLR_PARAM_QUERY).evaluateAttributeExpressions(flowFileResponse).getValue());
solrQuery.setRequestHandler(context.getProperty(SOLR_PARAM_REQUEST_HANDLER).evaluateAttributeExpressions(flowFileResponse).getValue());
if (context.getProperty(SOLR_PARAM_FIELD_LIST).isSet()) {
for (final String field : context.getProperty(SOLR_PARAM_FIELD_LIST).evaluateAttributeExpressions(flowFileResponse).getValue()
.split(",")) {
solrQuery.addField(field.trim());
}
}
// Avoid ArrayIndexOutOfBoundsException due to incorrectly configured sorting
try {
if (context.getProperty(SOLR_PARAM_SORT).isSet()) {
final List<SolrQuery.SortClause> sortings = new ArrayList<>();
for (final String sorting : context.getProperty(SOLR_PARAM_SORT).evaluateAttributeExpressions(flowFileResponse).getValue()
.split(",")) {
final String[] sortEntry = sorting.trim().split(" ");
sortings.add(new SolrQuery.SortClause(sortEntry[0], sortEntry[1]));
}
solrQuery.setSorts(sortings);
}
} catch (Exception e) {
throw new ProcessException("Error while parsing the sort clauses for the Solr query");
}
final Integer startParam = context.getProperty(SOLR_PARAM_START).isSet() ? Integer.parseInt(
context.getProperty(SOLR_PARAM_START).evaluateAttributeExpressions(flowFileResponse).getValue()) : CommonParams.START_DEFAULT;
solrQuery.setStart(startParam);
final Integer rowParam = context.getProperty(SOLR_PARAM_ROWS).isSet() ? Integer.parseInt(
context.getProperty(SOLR_PARAM_ROWS).evaluateAttributeExpressions(flowFileResponse).getValue()) : CommonParams.ROWS_DEFAULT;
solrQuery.setRows(rowParam);
final Map<String,String[]> additionalSolrParams = SolrUtils.getRequestParams(context, flowFileResponse);
final Set<String> searchComponents = extractSearchComponents(additionalSolrParams);
solrQuery.add(new MultiMapSolrParams(additionalSolrParams));
final Map<String,String> attributes = new HashMap<>();
attributes.put(ATTRIBUTE_SOLR_CONNECT, getSolrLocation());
if (isSolrCloud) {
attributes.put(ATTRIBUTE_SOLR_COLLECTION, collection);
}
attributes.put(ATTRIBUTE_SOLR_QUERY, solrQuery.toString());
if (flowFileOriginal != null) {
flowFileOriginal = session.putAllAttributes(flowFileOriginal, attributes);
}
flowFileResponse = session.putAllAttributes(flowFileResponse, attributes);
final boolean getEntireResults = RETURN_ALL_RESULTS.equals(context.getProperty(AMOUNT_DOCUMENTS_TO_RETURN).getValue());
boolean processFacetsAndStats = true;
boolean continuePaging = true;
while (continuePaging){
timer.start();
Map<String,String> responseAttributes = new HashMap<>();
responseAttributes.put(ATTRIBUTE_SOLR_START, solrQuery.getStart().toString());
responseAttributes.put(ATTRIBUTE_SOLR_ROWS, solrQuery.getRows().toString());
if (solrQuery.getStart() > UPPER_LIMIT_START_PARAM) {
logger.warn("The start parameter of Solr query {} exceeded the upper limit of {}. The query will not be processed " +
"to avoid performance or memory issues on the part of Solr.", new Object[]{solrQuery.toString(), UPPER_LIMIT_START_PARAM});
flowFileResponse = session.putAllAttributes(flowFileResponse, responseAttributes);
timer.stop();
break;
}
final QueryRequest req = new QueryRequest(solrQuery);
if (isBasicAuthEnabled()) {
req.setBasicAuthCredentials(getUsername(), getPassword());
}
final QueryResponse response = req.process(getSolrClient());
timer.stop();
final Long totalNumberOfResults = response.getResults().getNumFound();
responseAttributes.put(ATTRIBUTE_SOLR_NUMBER_RESULTS, totalNumberOfResults.toString());
responseAttributes.put(ATTRIBUTE_CURSOR_MARK, response.getNextCursorMark());
responseAttributes.put(ATTRIBUTE_SOLR_STATUS, String.valueOf(response.getStatus()));
responseAttributes.put(ATTRIBUTE_QUERY_TIME, String.valueOf(response.getQTime()));
flowFileResponse = session.putAllAttributes(flowFileResponse, responseAttributes);
if (response.getResults().size() > 0) {
if (context.getProperty(RETURN_TYPE).getValue().equals(MODE_XML.getValue())){
flowFileResponse = session.write(flowFileResponse, SolrUtils.getOutputStreamCallbackToTransformSolrResponseToXml(response));
flowFileResponse = session.putAttribute(flowFileResponse, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_XML);
} else {
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).evaluateAttributeExpressions(flowFileResponse)
.asControllerService(RecordSetWriterFactory.class);
final RecordSchema schema = writerFactory.getSchema(flowFileResponse.getAttributes(), null);
final RecordSet recordSet = SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema);
final StringBuffer mimeType = new StringBuffer();
final FlowFile flowFileResponseRef = flowFileResponse;
flowFileResponse = session.write(flowFileResponse, out -> {
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, flowFileResponseRef)) {
writer.write(recordSet);
writer.flush();
mimeType.append(writer.getMimeType());
} catch (SchemaNotFoundException e) {
throw new ProcessException("Could not parse Solr response", e);
}
});
flowFileResponse = session.putAttribute(flowFileResponse, CoreAttributes.MIME_TYPE.key(), mimeType.toString());
}
if (processFacetsAndStats) {
if (searchComponents.contains(FacetParams.FACET)) {
FlowFile flowFileFacets = session.create(flowFileResponse);
flowFileFacets = session.write(flowFileFacets, out -> {
try (
final OutputStreamWriter osw = new OutputStreamWriter(out);
final JsonWriter writer = new JsonWriter(osw)
) {
addFacetsFromSolrResponseToJsonWriter(response, writer);
}
});
flowFileFacets = session.putAttribute(flowFileFacets, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_JSON);
session.getProvenanceReporter().receive(flowFileFacets, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFileFacets, FACETS);
}
if (searchComponents.contains(StatsParams.STATS)) {
FlowFile flowFileStats = session.create(flowFileResponse);
flowFileStats = session.write(flowFileStats, out -> {
try (
final OutputStreamWriter osw = new OutputStreamWriter(out);
final JsonWriter writer = new JsonWriter(osw)
) {
addStatsFromSolrResponseToJsonWriter(response, writer);
}
});
flowFileStats = session.putAttribute(flowFileStats, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_JSON);
session.getProvenanceReporter().receive(flowFileStats, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFileStats, STATS);
}
processFacetsAndStats = false;
}
}
if (getEntireResults) {
final Integer totalDocumentsReturned = solrQuery.getStart() + solrQuery.getRows();
if (totalDocumentsReturned < totalNumberOfResults) {
solrQuery.setStart(totalDocumentsReturned);
session.getProvenanceReporter().receive(flowFileResponse, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFileResponse, RESULTS);
flowFileResponse = session.create(flowFileResponse);
} else {
continuePaging = false;
}
} else {
continuePaging = false;
}
}
} catch (Exception e) {
flowFileResponse = session.penalize(flowFileResponse);
flowFileResponse = session.putAttribute(flowFileResponse, EXCEPTION, e.getClass().getName());
flowFileResponse = session.putAttribute(flowFileResponse, EXCEPTION_MESSAGE, e.getMessage());
session.transfer(flowFileResponse, FAILURE);
logger.error("Failed to execute query {} due to {}. FlowFile will be routed to relationship failure", solrQuery.toString(), e, e);
if (flowFileOriginal != null) {
flowFileOriginal = session.penalize(flowFileOriginal);
}
}
if (!flowFileResponse.isPenalized()) {
session.getProvenanceReporter().receive(flowFileResponse, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFileResponse, RESULTS);
}
if (flowFileOriginal != null) {
if (!flowFileOriginal.isPenalized()) {
session.transfer(flowFileOriginal, ORIGINAL);
} else {
session.remove(flowFileOriginal);
}
}
}
private Set<String> extractSearchComponents(Map<String,String[]> solrParams) {
final Set<String> searchComponentsTemp = new HashSet<>();
for (final String searchComponent : SUPPORTED_SEARCH_COMPONENTS)
if (solrParams.keySet().contains(searchComponent)) {
if (SEARCH_COMPONENTS_ON.contains(solrParams.get(searchComponent)[0])) {
searchComponentsTemp.add(searchComponent);
}
}
return Collections.unmodifiableSet(searchComponentsTemp);
}
private static void addStatsFromSolrResponseToJsonWriter(final QueryResponse response, final JsonWriter writer) throws IOException {
writer.beginObject();
writer.name("stats_fields");
writer.beginObject();
for (Map.Entry<String,FieldStatsInfo> entry: response.getFieldStatsInfo().entrySet()) {
FieldStatsInfo fsi = entry.getValue();
writer.name(entry.getKey());
writer.beginObject();
writer.name("min").value(fsi.getMin().toString());
writer.name("max").value(fsi.getMax().toString());
writer.name("count").value(fsi.getCount());
writer.name("missing").value(fsi.getMissing());
writer.name("sum").value(fsi.getSum().toString());
writer.name("mean").value(fsi.getMean().toString());
writer.name("sumOfSquares").value(fsi.getSumOfSquares());
writer.name("stddev").value(fsi.getStddev());
writer.endObject();
}
writer.endObject();
writer.endObject();
}
private static void addFacetsFromSolrResponseToJsonWriter(final QueryResponse response, final JsonWriter writer) throws IOException {
writer.beginObject();
writer.name("facet_queries");
writer.beginArray();
for (final Map.Entry<String,Integer> facetQuery : response.getFacetQuery().entrySet()){
writer.beginObject();
writer.name("facet").value(facetQuery.getKey());
writer.name("count").value(facetQuery.getValue());
writer.endObject();
}
writer.endArray();
writer.name("facet_fields");
writer.beginObject();
for (final FacetField facetField : response.getFacetFields()){
writer.name(facetField.getName());
writer.beginArray();
for (final FacetField.Count count : facetField.getValues()) {
writer.beginObject();
writer.name("facet").value(count.getName());
writer.name("count").value(count.getCount());
writer.endObject();
}
writer.endArray();
}
writer.endObject();
writer.name("facet_ranges");
writer.beginObject();
for (final RangeFacet rangeFacet : response.getFacetRanges()) {
writer.name(rangeFacet.getName());
writer.beginArray();
final List<Count> list = rangeFacet.getCounts();
for (final Count count : list) {
writer.beginObject();
writer.name("facet").value(count.getValue());
writer.name("count").value(count.getCount());
writer.endObject();
}
writer.endArray();
}
writer.endObject();
writer.name("facet_intervals");
writer.beginObject();
for (final IntervalFacet intervalFacet : response.getIntervalFacets()) {
writer.name(intervalFacet.getField());
writer.beginArray();
for (final IntervalFacet.Count count : intervalFacet.getIntervals()) {
writer.beginObject();
writer.name("facet").value(count.getKey());
writer.name("count").value(count.getCount());
writer.endObject();
}
writer.endArray();
}
writer.endObject();
writer.endObject();
}
}

View File

@ -1,263 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosLoginException;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.solr.client.solrj.SolrClient;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
import static org.apache.nifi.processors.solr.SolrUtils.KERBEROS_USER_SERVICE;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_STANDARD;
import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
/**
* A base class for processors that interact with Apache Solr.
*
*/
public abstract class SolrProcessor extends AbstractProcessor {
private volatile SolrClient solrClient;
private volatile String solrLocation;
private volatile String basicUsername;
private volatile String basicPassword;
private volatile boolean basicAuthEnabled = false;
private volatile KerberosUser kerberosUser;
@OnScheduled
public final void onScheduled(final ProcessContext context) throws IOException {
this.solrLocation = context.getProperty(SOLR_LOCATION).evaluateAttributeExpressions().getValue();
this.basicUsername = context.getProperty(BASIC_USERNAME).evaluateAttributeExpressions().getValue();
this.basicPassword = context.getProperty(BASIC_PASSWORD).evaluateAttributeExpressions().getValue();
if (!StringUtils.isBlank(basicUsername) && !StringUtils.isBlank(basicPassword)) {
basicAuthEnabled = true;
}
this.solrClient = createSolrClient(context, solrLocation);
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
if (kerberosUserService != null) {
this.kerberosUser = kerberosUserService.createKerberosUser();
}
}
protected KerberosUser createKerberosKeytabUser(final KerberosCredentialsService kerberosCredentialsService) {
return new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
}
protected KerberosUser createKerberosPasswordUser(final String principal, final String password) {
return new KerberosPasswordUser(principal, password);
}
@OnStopped
public final void closeClient() {
if (solrClient != null) {
try {
solrClient.close();
} catch (IOException e) {
getLogger().debug("Error closing SolrClient", e);
}
}
if (kerberosUser != null) {
try {
kerberosUser.logout();
kerberosUser = null;
} catch (final KerberosLoginException e) {
getLogger().debug("Error logging out keytab user", e);
}
}
}
@Override
public final void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final KerberosUser kerberosUser = getKerberosUser();
if (kerberosUser == null) {
doOnTrigger(context, session);
} else {
// wrap doOnTrigger in a privileged action
final PrivilegedExceptionAction<Void> action = () -> {
doOnTrigger(context, session);
return null;
};
// execute the privileged action as the given keytab user
final KerberosAction kerberosAction = new KerberosAction<>(kerberosUser, action, getLogger());
try {
kerberosAction.execute();
} catch (ProcessException e) {
context.yield();
throw e;
}
}
}
/**
* This should be implemented just like the normal onTrigger method. When a KerberosCredentialsService is configured,
* this method will be wrapped in a PrivilegedAction and executed with the credentials of the service, otherwise this
* will be executed like a a normal call to onTrigger.
*/
protected abstract void doOnTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
/**
* Create a SolrClient based on the type of Solr specified.
*
* @param context
* The context
* @return an HttpSolrClient or CloudSolrClient
*/
protected SolrClient createSolrClient(final ProcessContext context, final String solrLocation) {
return SolrUtils.createSolrClient(context, solrLocation);
}
/**
* Returns the {@link org.apache.solr.client.solrj.SolrClient} that was created by the
* {@link #createSolrClient(org.apache.nifi.processor.ProcessContext, String)} method
*
* @return an HttpSolrClient or CloudSolrClient
*/
protected final SolrClient getSolrClient() {
return solrClient;
}
protected final String getSolrLocation() {
return solrLocation;
}
protected final String getUsername() {
return basicUsername;
}
protected final String getPassword() {
return basicPassword;
}
protected final boolean isBasicAuthEnabled() {
return basicAuthEnabled;
}
protected final KerberosUser getKerberosUser() {
return kerberosUser;
}
@Override
final protected Collection<ValidationResult> customValidate(ValidationContext context) {
final List<ValidationResult> problems = new ArrayList<>();
// For solr cloud the location will be the ZooKeeper host:port so we can't validate the SSLContext, but for standard solr
// we can validate if the url starts with https we need an SSLContextService, if it starts with http we can't have an SSLContextService
if (SOLR_TYPE_STANDARD.getValue().equals(context.getProperty(SOLR_TYPE).getValue())) {
final String solrLocation = context.getProperty(SOLR_LOCATION).evaluateAttributeExpressions().getValue();
if (solrLocation != null) {
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (solrLocation.startsWith("https:") && sslContextService == null) {
problems.add(new ValidationResult.Builder()
.subject(SSL_CONTEXT_SERVICE.getDisplayName())
.valid(false)
.explanation("an SSLContextService must be provided when using https")
.build());
} else if (solrLocation.startsWith("http:") && sslContextService != null) {
problems.add(new ValidationResult.Builder()
.subject(SSL_CONTEXT_SERVICE.getDisplayName())
.valid(false)
.explanation("an SSLContextService can not be provided when using http")
.build());
}
}
}
if (context.getProperty(BASIC_USERNAME).isSet() && context.getProperty(KERBEROS_USER_SERVICE).isSet()) {
problems.add(new ValidationResult.Builder()
.subject("Basic Auth and Kerberos")
.explanation("Cannot set both Basic Auth Username and Kerberos User Service")
.valid(false)
.build());
}
invalidIfEmpty(context, BASIC_USERNAME, problems);
invalidIfEmpty(context, BASIC_PASSWORD, problems);
final Collection<ValidationResult> otherProblems = this.additionalCustomValidation(context);
if (otherProblems != null) {
problems.addAll(otherProblems);
}
if (SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue()) && !context.getProperty(COLLECTION).isSet()) {
problems.add(new ValidationResult.Builder()
.subject(COLLECTION.getName())
.input(context.getProperty(COLLECTION).getValue())
.valid(false)
.explanation("A collection must specified for Solr Type of Cloud")
.build());
}
return problems;
}
private void invalidIfEmpty(final ValidationContext context, final PropertyDescriptor descriptor, final List<ValidationResult> problems) {
if (context.getProperty(descriptor).isSet() && context.getProperty(descriptor).evaluateAttributeExpressions().getValue().isBlank()) {
problems.add(new ValidationResult.Builder()
.subject(descriptor.getDisplayName())
.explanation("Property cannot be an empty string and cannot reference environment variables that do not exist")
.valid(false)
.build());
}
}
/**
* Allows additional custom validation to be done. This will be called from
* the parent's customValidation method.
*
* @param context
* The context
* @return Validation results indicating problems
*/
protected Collection<ValidationResult> additionalCustomValidation(ValidationContext context) {
return new ArrayList<>();
}
}

View File

@ -1,453 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.ListRecordSet;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.serialization.record.field.FieldConverter;
import org.apache.nifi.serialization.record.field.StandardFieldConverterRegistry;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.embedded.SSLConfig;
import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.MultiMapSolrParams;
public class SolrUtils {
public static final AllowableValue SOLR_TYPE_CLOUD = new AllowableValue(
"Cloud", "Cloud", "A SolrCloud instance.");
public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue(
"Standard", "Standard", "A stand-alone Solr instance.");
public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("Record Writer")
.displayName("Record Writer")
.description("The Record Writer to use in order to write Solr documents to FlowFiles. Must be set if \"Records\" is used as return type.")
.identifiesControllerService(RecordSetWriterFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
public static final PropertyDescriptor SOLR_TYPE = new PropertyDescriptor.Builder()
.name("Solr Type")
.description("The type of Solr instance, Cloud or Standard.")
.required(true)
.allowableValues(SOLR_TYPE_CLOUD, SOLR_TYPE_STANDARD)
.defaultValue(SOLR_TYPE_STANDARD.getValue())
.build();
public static final PropertyDescriptor COLLECTION = new PropertyDescriptor.Builder()
.name("Collection")
.description("The Solr collection name, only used with a Solr Type of Cloud")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor SOLR_LOCATION = new PropertyDescriptor.Builder()
.name("Solr Location")
.description("The Solr url for a Solr Type of Standard (ex: http://localhost:8984/solr/gettingstarted), " +
"or the ZooKeeper hosts for a Solr Type of Cloud (ex: localhost:9983).")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor BASIC_USERNAME = new PropertyDescriptor.Builder()
.name("Username")
.displayName("Basic Auth Username")
.description("The username to use when Solr is configured with basic authentication.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor BASIC_PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.displayName("Basic Auth Password")
.description("The password to use when Solr is configured with basic authentication.")
.required(true)
.dependsOn(BASIC_USERNAME)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.sensitive(true)
.build();
static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-user-service")
.displayName("Kerberos User Service")
.description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
.identifiesControllerService(KerberosUserService.class)
.required(false)
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. This property must be set when communicating with a Solr over https.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor SOLR_SOCKET_TIMEOUT = new PropertyDescriptor.Builder()
.name("Solr Socket Timeout")
.description("The amount of time to wait for data on a socket connection to Solr. A value of 0 indicates an infinite timeout.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("10 seconds")
.build();
public static final PropertyDescriptor SOLR_CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
.name("Solr Connection Timeout")
.description("The amount of time to wait when establishing a connection to Solr. A value of 0 indicates an infinite timeout.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("10 seconds")
.build();
public static final PropertyDescriptor SOLR_MAX_CONNECTIONS_PER_HOST = new PropertyDescriptor.Builder()
.name("Solr Maximum Connections Per Host")
.description("The maximum number of connections allowed from the Solr client to a single Solr host.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("5")
.build();
public static final String REPEATING_PARAM_PATTERN = "[\\w.]+\\.\\d+$";
private static final String ROOT_PATH = "/";
public static synchronized SolrClient createSolrClient(final PropertyContext context, final String solrLocation) {
final int socketTimeout = context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final int connectionTimeout = context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer maxConnectionsPerHost = context.getProperty(SOLR_MAX_CONNECTIONS_PER_HOST).asInteger();
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final SSLConfig sslConfig;
if (sslContextService == null) {
sslConfig = new SSLConfig(false, false, null, null, null, null);
} else {
sslConfig = new SSLConfig(true, true, sslContextService.getKeyStoreFile(), sslContextService.getKeyStorePassword(), sslContextService.getTrustStoreFile(),
sslContextService.getTrustStorePassword());
}
final Http2SolrClient httpClient = new Http2SolrClient.Builder(solrLocation)
.withConnectionTimeout(connectionTimeout, TimeUnit.MILLISECONDS)
.withRequestTimeout(socketTimeout, TimeUnit.MILLISECONDS)
.withMaxConnectionsPerHost(maxConnectionsPerHost)
.withSSLConfig(sslConfig)
.build();
if (SOLR_TYPE_STANDARD.getValue().equals(context.getProperty(SOLR_TYPE).getValue())) {
return httpClient;
} else {
// CloudSolrClient.Builder now requires a List of ZK addresses and znode for solr as separate parameters
final String[] zk = solrLocation.split(ROOT_PATH);
final List<String> zkList = Arrays.asList(zk[0].split(","));
String zkChrootPath = getZooKeeperChrootPathSuffix(solrLocation);
final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue();
final CloudHttp2SolrClient cloudClient = new CloudHttp2SolrClient.Builder(zkList, Optional.of(zkChrootPath))
.withHttpClient(httpClient)
.withDefaultCollection(collection)
.build();
return cloudClient;
}
}
private static String getZooKeeperChrootPathSuffix(final String solrLocation) {
String[] zkConnectStringAndChrootSuffix = solrLocation.split("(?=/)", 2);
if (zkConnectStringAndChrootSuffix.length > 1) {
final String chrootSuffix = zkConnectStringAndChrootSuffix[1];
return chrootSuffix;
} else {
return ROOT_PATH;
}
}
/**
* Writes each SolrDocument to a record.
*/
public static RecordSet solrDocumentsToRecordSet(final List<SolrDocument> docs, final RecordSchema schema) {
final List<Record> lr = new ArrayList<Record>();
for (SolrDocument doc : docs) {
final Map<String, Object> recordValues = new LinkedHashMap<>();
for (RecordField field : schema.getFields()){
final Object fieldValue = doc.getFieldValue(field.getFieldName());
if (fieldValue != null) {
if (field.getDataType().getFieldType().equals(RecordFieldType.ARRAY)){
recordValues.put(field.getFieldName(), ((List<?>) fieldValue).toArray());
} else {
recordValues.put(field.getFieldName(), fieldValue);
}
}
}
lr.add(new MapRecord(schema, recordValues));
}
return new ListRecordSet(schema, lr);
}
public static OutputStreamCallback getOutputStreamCallbackToTransformSolrResponseToXml(QueryResponse response) {
return new QueryResponseOutputStreamCallback(response);
}
/**
* Writes each SolrDocument in XML format to the OutputStream.
*/
private static class QueryResponseOutputStreamCallback implements OutputStreamCallback {
private final QueryResponse response;
public QueryResponseOutputStreamCallback(QueryResponse response) {
this.response = response;
}
@Override
public void process(OutputStream out) throws IOException {
IOUtils.write("<docs>", out, StandardCharsets.UTF_8);
for (SolrDocument doc : response.getResults()) {
final String xml = ClientUtils.toXML(toSolrInputDocument(doc));
IOUtils.write(xml, out, StandardCharsets.UTF_8);
}
IOUtils.write("</docs>", out, StandardCharsets.UTF_8);
}
public SolrInputDocument toSolrInputDocument(SolrDocument d) {
final SolrInputDocument doc = new SolrInputDocument();
for (String name : d.getFieldNames()) {
doc.addField(name, d.getFieldValue(name));
}
return doc;
}
}
public static Map<String, String[]> getRequestParams(ProcessContext context, FlowFile flowFile) {
final Map<String,String[]> paramsMap = new HashMap<>();
final SortedMap<String,String> repeatingParams = new TreeMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.isDynamic()) {
final String paramName = descriptor.getName();
final String paramValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue();
if (!paramValue.trim().isEmpty()) {
if (paramName.matches(REPEATING_PARAM_PATTERN)) {
repeatingParams.put(paramName, paramValue);
} else {
MultiMapSolrParams.addParam(paramName, paramValue, paramsMap);
}
}
}
}
for (final Map.Entry<String,String> entry : repeatingParams.entrySet()) {
final String paramName = entry.getKey();
final String paramValue = entry.getValue();
final int idx = paramName.lastIndexOf(".");
MultiMapSolrParams.addParam(paramName.substring(0, idx), paramValue, paramsMap);
}
return paramsMap;
}
/**
* Writes each Record as a SolrInputDocument.
*/
public static void writeRecord(final Record record, final SolrInputDocument inputDocument,final List<String> fieldsToIndex,String parentFieldName)
throws IOException {
RecordSchema schema = record.getSchema();
for (int i = 0; i < schema.getFieldCount(); i++) {
final RecordField field = schema.getField(i);
String fieldName;
if (StringUtils.isBlank(parentFieldName)) {
fieldName = field.getFieldName();
} else {
// Prefixing parent field name
fieldName = parentFieldName + "_" + field.getFieldName();
}
final Object value = record.getValue(field);
if (value != null) {
final DataType dataType = schema.getDataType(field.getFieldName()).orElse(null);
writeValue(inputDocument, value, fieldName, dataType, fieldsToIndex);
}
}
}
private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List<String> fieldsToIndex) throws IOException {
final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName);
if (coercedValue == null) {
return;
}
switch (chosenDataType.getFieldType()) {
case DATE: {
final FieldConverter<Object, String> converter = StandardFieldConverterRegistry.getRegistry().getFieldConverter(String.class);
final String stringValue = converter.convertField(coercedValue, Optional.ofNullable(RecordFieldType.DATE.getDefaultFormat()), fieldName);
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue);
addFieldToSolrDocument(inputDocument,fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z',fieldsToIndex);
} else {
addFieldToSolrDocument(inputDocument,fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z',fieldsToIndex);
}
break;
}
case TIMESTAMP: {
final FieldConverter<Object, String> converter = StandardFieldConverterRegistry.getRegistry().getFieldConverter(String.class);
final String stringValue = converter.convertField(coercedValue, Optional.ofNullable(RecordFieldType.TIMESTAMP.getDefaultFormat()), fieldName);
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue);
addFieldToSolrDocument(inputDocument,fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z',fieldsToIndex);
} else {
addFieldToSolrDocument(inputDocument,fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z',fieldsToIndex);
}
break;
}
case DOUBLE:
addFieldToSolrDocument(inputDocument,fieldName,DataTypeUtils.toDouble(coercedValue, fieldName),fieldsToIndex);
break;
case FLOAT:
addFieldToSolrDocument(inputDocument,fieldName,DataTypeUtils.toFloat(coercedValue, fieldName),fieldsToIndex);
break;
case LONG:
addFieldToSolrDocument(inputDocument,fieldName,DataTypeUtils.toLong(coercedValue, fieldName),fieldsToIndex);
break;
case INT:
case BYTE:
case SHORT:
addFieldToSolrDocument(inputDocument,fieldName,DataTypeUtils.toInteger(coercedValue, fieldName),fieldsToIndex);
break;
case CHAR:
case STRING:
addFieldToSolrDocument(inputDocument,fieldName,coercedValue.toString(),fieldsToIndex);
break;
case BIGINT:
if (coercedValue instanceof Long) {
addFieldToSolrDocument(inputDocument,fieldName, coercedValue,fieldsToIndex);
} else {
addFieldToSolrDocument(inputDocument,fieldName, coercedValue,fieldsToIndex);
}
break;
case DECIMAL:
addFieldToSolrDocument(inputDocument, fieldName, DataTypeUtils.toBigDecimal(coercedValue, fieldName), fieldsToIndex);
break;
case BOOLEAN:
final String stringValue = coercedValue.toString();
if ("true".equalsIgnoreCase(stringValue)) {
addFieldToSolrDocument(inputDocument,fieldName,true,fieldsToIndex);
} else if ("false".equalsIgnoreCase(stringValue)) {
addFieldToSolrDocument(inputDocument,fieldName,false,fieldsToIndex);
} else {
addFieldToSolrDocument(inputDocument,fieldName,stringValue,fieldsToIndex);
}
break;
case RECORD: {
final Record record = (Record) coercedValue;
writeRecord(record, inputDocument,fieldsToIndex,fieldName);
break;
}
case ARRAY:
default:
if (coercedValue instanceof Object[]) {
final Object[] values = (Object[]) coercedValue;
for(Object element : values){
if(element instanceof Record){
writeRecord((Record)element,inputDocument,fieldsToIndex,fieldName);
}else{
addFieldToSolrDocument(inputDocument,fieldName,coercedValue.toString(),fieldsToIndex);
}
}
} else {
addFieldToSolrDocument(inputDocument,fieldName,coercedValue.toString(),fieldsToIndex);
}
break;
}
}
private static void addFieldToSolrDocument(SolrInputDocument inputDocument,String fieldName,Object fieldValue,List<String> fieldsToIndex){
if (fieldsToIndex.isEmpty() || fieldsToIndex.contains(fieldName)){
inputDocument.addField(fieldName, fieldValue);
}
}
private static LocalDate getLocalDateFromEpochTime(String fieldName, Object coercedValue) {
Long date = DataTypeUtils.toLong(coercedValue, fieldName);
return Instant.ofEpochMilli(date).atZone(ZoneId.systemDefault()).toLocalDate();
}
private static LocalDateTime getLocalDateTimeFromEpochTime(String fieldName, Object coercedValue) {
Long date = DataTypeUtils.toLong(coercedValue, fieldName);
return Instant.ofEpochMilli(date).atZone(ZoneId.systemDefault()).toLocalDateTime();
}
}

View File

@ -1,18 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.solr.PutSolrContentStream
org.apache.nifi.processors.solr.PutSolrRecord
org.apache.nifi.processors.solr.GetSolr
org.apache.nifi.processors.solr.QuerySolr

View File

@ -1,56 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>GetSolr</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Usage Example</h2>
<p>
This processor pulls data from Solr collections. For its usage, Solr collections
have to fulfil two requirements:
</p>
<ul>
<li>The documents must include a date field containing the time when they were
indexed. Such kind of field can be easily added to documents at indexing time
e. g. using Solrs' UpdateRequestProcessor created by
'TimestampUpdateProcessorFactory'.</li>
<li>The configuration of the Solr index (e. g. schema.xml or managed-schema) must
define a uniqueKey field.</li>
</ul>
<p>
Backwards compatibility to configurations of the GetSolr processor used within releases
of NiFi prior to 1.5 can be realized as follows:
</p>
<ul>
<li>Find the file conf/.getSolr* within the prior NiFi installation.</li>
<li>Open the file and copy the timestamp defined for 'LastEndDate'.</li>
<li>Insert the timestamp into the field 'Initial Date Filter'.</li>
</ul>
<p>
Annotation: The value of property 'Solr Query' actually is not added to parameter 'q'
but to parameter 'fq' for two reasons:
</p>
<ul>
<li>Improving performance by leveraging Solrs' filter cache.</li>
<li>Scoring is not required for the purpose of this processor as the sorting
is fixed to 'DateField asc, IdField asc'</li>
</ul>
</body>
</html>

View File

@ -1,48 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>PutSolrContentStream</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Usage Example</h2>
<p>
This processor streams the contents of a FlowFile to an Apache Solr
update handler. Any properties added to this processor by the user are
passed to Solr on the update request. If a parameter must be sent multiple
times with different values, properties can follow a naming convention:
name.number, where name is the parameter name and number is a unique number.
Repeating parameters will be sorted by their property name.
</p>
<p>
Example: To specify multiple 'f' parameters for indexing custom json, the following
properties can be defined:
</p>
<ul>
<li><strong>split</strong>: /exams</li>
<li><strong>f.1</strong>: first:/first</li>
<li><strong>f.2</strong>: last:/last</li>
<li><strong>f.3</strong>: grade:/grade</li>
</ul>
<p>
This will result in sending the following url to Solr: </br>
split=/exams&f=first:/first&f=last:/last&f=grade:/grade
</p>
</body>
</html>

View File

@ -1,114 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>PutSolrRecord</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Usage Example</h2>
<p>
This processor reads the NiFi record and indexes it into Solr as a SolrDocument.
Any properties added to this processor by the user are
passed to Solr on the update request. It is required that the input record reader
should be specified for this processor. Additionally, if only selected fields of a record are to be indexed
you can specify the field name as a comma-separated list under the fields property.
</p>
<p>
Example: To specify specific fields of the record to be indexed:
</p>
<ul>
<li><strong>Fields To Index</strong>: field1,field2,field3</li>
</ul>
<p>
<strong>NOTE:</strong> In case of nested the field names should be prefixed with the parent field name.
</p>
<ul>
<li><strong>Fields To Index</strong>: parentField1,parentField2,<strong>parentField3_childField1</strong>,<strong>parentField3_childField2</strong></li>
</ul>
<p>
In case of nested records, this processor would flatten all the nested records into a single solr document, the field name of the field in a child document would follow the format of <strong>{Parent Field Name}_{Child Field Name}</strong>.
</p>
<p>
Example:
<strong>For a record created from the following json:</strong><br/>
</p>
<pre>
{
"first": "Abhi",
"last": "R",
"grade": 8,
"exams": {
"subject": "Maths",
"test" : "term1",
"marks" : 90
}
}
</pre>
<p>
<strong>The corresponding solr document would be represented as below:</strong><br/>
</p>
<pre>
{
"first": "Abhi",
"last": "R",
"grade": 8,
"exams_subject": "Maths",
"exams_test" : "term1",
"exams_marks" : 90
}
</pre>
<p>
Similarly in case of an array of nested records, this processor would flatten all the nested records into a single solr document, the field name of the field in a child document would follow the format of <strong>{Parent Field Name}_{Child Field Name}</strong> and would be a multivalued field in the solr document.
Example:
<strong>For a record created from the following json:</strong><br/>
</p>
<pre>
{
"first": "Abhi",
"last": "R",
"grade": 8,
"exams": [
{
"subject": "Maths",
"test" : "term1",
"marks" : 90
},
{
"subject": "Physics",
"test" : "term1",
"marks" : 95
}
]
}
</pre>
<p>
<strong>The corresponding solr document would be represented as below:</strong><br/>
</p>
<pre>
{
"first": "Abhi",
"last": "R",
"grade": 8,
"exams_subject": ["Maths","Physics"]
"exams_test" : ["term1","term1"]
"exams_marks" : [90,95]
}
</pre>
</body>
</html>

View File

@ -1,142 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>QuerySolr</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Usage Example</h2>
<p>
This processor queries Solr and writes results to FlowFiles. The processor can be used at the
beginning of dataflows and later. Solr results can be written to FlowFiles as Solr XML or using
records functions (supporting CSV, JSON, etc.). Additionally, facets and stats can be retrieved.
They are written to FlowFiles in JSON and sent to designated relationships.
</p>
<p>
The processor can either be configured to retrieve only top results or full result sets. However,
it should be emphasized that this processor is not designed to export large result sets from Solr.
If the processor is configured to return full result sets, the configured number of rows per
request will be used as batch size and the processor will iteratively increase the start parameter
returning results in one FlowFile per request. The processor will stop iterating through results as
soon as the start parameter exceeds 10000. For exporting large result sets, it can be considered
to make use of the processor GetSolr. Principally, it is also possible to embed this processor into a
dataflow iterating through results making use of the attribute solr.cursor.mark that is added to FlowFiles
for each request. Notice that the usage of Solr's cursor mark requires queries to fulfil several preconditions
(see Solr documentation for deep paging for additional details).
</p>
<p>
The most common Solr parameters can be defined via processor properties. Other parameters have to be set via
dynamic properties.
</p>
<p>
Parameters that can be set multiple times also have to be defined as dynamic properties
(e. g. fq, facet.field, stats.field). If these parameters must be set multiple times with different values,
properties can follow a naming convention:
name.number, where name is the parameter name and number is a unique number.
Repeating parameters will be sorted by their property name.
</p>
<p>
Example: Defining the fq parameter multiple times
</p>
<table>
<tr>
<th>Property Name</th>
<th>Property Value</th>
</tr>
<tr>
<td>fq.1</td>
<td><code>field1:value1</code></td>
</tr>
<tr>
<td>fq.2</td>
<td><code>field2:value2</code></td>
</tr>
<tr>
<td>fq.3</td>
<td><code>field3:value3</code></td>
</tr>
</table>
<p>
This definition will be appended to the Solr URL as follows:
fq=field1:value1&fq=field2:value2&fq=field3:value3
</p>
<p>
Facets and stats can be activated setting the respective Solr parameters as dynamic properties. Example:
</p>
<table>
<tr>
<th>Property Name</th>
<th>Property Value</th>
</tr>
<tr>
<td>facet</td>
<td><code>true</code></td>
</tr>
<tr>
<td>facet.field</td>
<td><code>fieldname</code></td>
</tr>
<tr>
<td>stats</td>
<td><code>true</code></td>
</tr>
<tr>
<td>stats.field</td>
<td><code>fieldname</code></td>
</tr>
</table>
<p>
Multiple fields for facets or stats can be defined in the same way as it is described for multiple filter queries:
</p>
<table>
<tr>
<th>Property Name</th>
<th>Property Value</th>
</tr>
<tr>
<td>facet</td>
<td><code>true</code></td>
</tr>
<tr>
<td>facet.field.1</td>
<td><code>firstField</code></td>
</tr>
<tr>
<td>facet.field.2</td>
<td><code>secondField</code></td>
</tr>
</table>
<p>
This definition will be appended to the Solr URL as follows:
facet=true&facet.field=firstField&facet.field=secondField
</p>
</body>
</html>

View File

@ -1,46 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-shared-bom</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../nifi-standard-shared-bundle/nifi-standard-shared-bom</relativePath>
</parent>
<artifactId>nifi-solr-bundle</artifactId>
<packaging>pom</packaging>
<description>A bundle of processors that can store and retrieve data from Apache Solr</description>
<properties>
<solr.version>9.4.1</solr.version>
<jersey.bom.version>2.41</jersey.bom.version>
</properties>
<modules>
<module>nifi-solr-processors</module>
<module>nifi-solr-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.glassfish.jersey</groupId>
<artifactId>jersey-bom</artifactId>
<version>${jersey.bom.version}</version>
<scope>import</scope>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -34,7 +34,6 @@
<module>nifi-update-attribute-bundle</module>
<module>nifi-kafka-bundle</module>
<module>nifi-kudu-bundle</module>
<module>nifi-solr-bundle</module>
<module>nifi-confluent-platform-bundle</module>
<module>nifi-aws-bundle</module>
<module>nifi-social-media-bundle</module>